Class: IntellisenseRuby::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/intellisense-ruby/consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, secret, options = {}) ⇒ Consumer

public: Creates a new consumer

The consumer continuously takes messages off the queue and makes requests to the intellify api

queue - Queue synchronized between client and consumer secret - String of the project’s secret options - Hash of consumer options

batch_size - Fixnum of how many items to send in a batch
on_error   - Proc of what to do on an error


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/intellisense-ruby/consumer.rb', line 23

def initialize(queue, secret, options = {})
  @options = options
  @queue = queue
  @secret = secret
  @batch_size = options[:batch_size] || IntellisenseRuby::Defaults::Queue::BATCH_SIZE
  @on_error = options[:on_error] || Proc.new { |status, error| }
  @transport = options[:transport] || IntellisenseRuby::Defaults::Request::TRANSPORT

  @current_batch = []

  @mutex = Mutex.new

  @logger = options[:logger]
  log("intellisense-ruby: SENSOR initialized! with transport #{@transport}")

end

Instance Attribute Details

#transportObject

Returns the value of attribute transport.



10
11
12
# File 'lib/intellisense-ruby/consumer.rb', line 10

def transport
  @transport
end

Instance Method Details

#flushObject

public: Flush some events from our queue



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/intellisense-ruby/consumer.rb', line 50

def flush
  # Block until we have something to send
  item = @queue.pop()

  # Synchronize on additions to the current batch
  @mutex.synchronize {
    @current_batch << item
    until @current_batch.length >= @batch_size || @queue.empty?
      @current_batch << @queue.pop()
    end
  }

  #log("about to send messages " + @current_batch)

  case @transport
  when 'http'
    log("intellisense-ruby: SENSOR using HTTP transport to send #{@current_batch.to_s}")
    req = IntellisenseRuby::Request.new
    res = req.post(@secret, @current_batch)
    @on_error.call(res.status, res.error) unless res.status == 200
    @mutex.synchronize {
      @current_batch = []
    }
  when 'sqs'
    log("intellisense-ruby: SENSOR using SQS transport to send #{@current_batch.to_s}")
    sqs = IntellisenseRuby::SQS_Publisher.new @options
    @current_batch.each {
      |msg_data|
      sent = sqs.send_message(msg_data)
      if !sent #put the message back on the queue
        @queue << msg_data
      end
    }
    # sqs.send_message(@current_batch)
    @mutex.synchronize {
      @current_batch = []
    }
  else
    # cannot do anything
    log_error('intellisense-ruby: SENSOR UNKNOWN transport - error...')
  end

end

#is_requesting?Boolean

public: Check whether we have outstanding requests.

Returns:

  • (Boolean)


96
97
98
99
100
101
102
# File 'lib/intellisense-ruby/consumer.rb', line 96

def is_requesting?
  requesting = nil
  @mutex.synchronize {
    requesting = !@current_batch.empty?
  }
  requesting
end

#runObject

public: Continuously runs the loop to check for new events



42
43
44
45
46
# File 'lib/intellisense-ruby/consumer.rb', line 42

def run
  while true
    flush
  end
end