Class: IntellisenseRuby::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/intellisense-ruby/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, secret, options = {}) ⇒ Consumer

public: Creates a new consumer

The consumer continuously takes messages off the queue and makes requests to the intellify api

queue - Queue synchronized between client and consumer secret - String of the project’s secret options - Hash of consumer options

batch_size - Fixnum of how many items to send in a batch
on_error   - Proc of what to do on an error


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/intellisense-ruby/consumer.rb', line 21

def initialize(queue, secret, options = {})
  @options = options
  @queue = queue
  @secret = secret
  @batch_size = options[:batch_size] || IntellisenseRuby::Defaults::Queue::BATCH_SIZE
  @on_error = options[:on_error] || Proc.new { |status, error| }
  @transport = options[:transport] || IntellisenseRuby::Defaults::Request::TRANSPORT

  @current_batch = []

  @mutex = Mutex.new

  @logger = options[:logger]
  log('intellisense-ruby: SENSOR initialized! with transport ' + @transport)

end

Instance Method Details

#flushObject

public: Flush some events from our queue



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/intellisense-ruby/consumer.rb', line 48

def flush
  # Block until we have something to send
  item = @queue.pop()

  # Synchronize on additions to the current batch
  @mutex.synchronize {
    @current_batch << item
    until @current_batch.length >= @batch_size || @queue.empty?
      @current_batch << @queue.pop()
    end
  }

  #log("about to send messages " + @current_batch)

  case @transport
  when 'http'
    log("intellisense-ruby: SENSOR using HTTP transport to send #{@current_batch.to_s}")
    req = IntellisenseRuby::Request.new
    res = req.post(@secret, @current_batch)
    @on_error.call(res.status, res.error) unless res.status == 200
    @mutex.synchronize {
      @current_batch = []
    }
  when 'sqs'
    log("intellisense-ruby: SENSOR using SQS transport to send #{@current_batch.to_s}")
    sqs = IntellisenseRuby::SQS_Publisher.new @options
    sqs.send_message(@current_batch)
    @mutex.synchronize {
      @current_batch = []
    }
  else
    # cannot do anything
    log_error('intellisense-ruby: SENSOR UNKNOWN transport - error...')
  end

end

#is_requesting?Boolean

public: Check whether we have outstanding requests.

Returns:

  • (Boolean)


87
88
89
90
91
92
93
# File 'lib/intellisense-ruby/consumer.rb', line 87

def is_requesting?
  requesting = nil
  @mutex.synchronize {
    requesting = !@current_batch.empty?
  }
  requesting
end

#runObject

public: Continuously runs the loop to check for new events



40
41
42
43
44
# File 'lib/intellisense-ruby/consumer.rb', line 40

def run
  while true
    flush
  end
end