Module: DripDrop::ZMQWritableHandler

Included in:
ZMQPubHandler, ZMQPushHandler, ZMQXRepHandler, ZMQXReqHandler
Defined in:
lib/dripdrop/handlers/zeromq.rb

Instance Method Summary collapse

Instance Method Details

#initialize(*args) ⇒ Object



38
39
40
41
42
# File 'lib/dripdrop/handlers/zeromq.rb', line 38

def initialize(*args)
  super(*args)
  @send_queue = []
  @send_queue_enabled = false
end

#on_writable(socket) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/dripdrop/handlers/zeromq.rb', line 44

def on_writable(socket)
  unless @send_queue.empty?
    message = @send_queue.shift

    num_parts = message.length
    message.each_with_index do |part,i|
      # Set the multi-part flag unless this is the last message
      flags = (i + 1 < num_parts ? ZMQ::SNDMORE : 0) | ZMQ::NOBLOCK

      if part.class == ZMQ::Message
        socket.send(part, flags)
      else
        if part.class == String
          socket.send_string(part, flags)
        else
          $stderr.write "Can only send Strings, not #{part.class}: #{part}" if @debug
        end
      end
    end
  else
    @connection.deregister_writable if @send_queue_enabled
  end
end

#send_message(message) ⇒ Object

Sends a message, accepting either a DripDrop::Message, a hash that looks like a DripDrop::Message (has keys :name, :head, :body), or your own custom messages. Custom messages should either be a String, or for multipart messages, an Array of String objects.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/dripdrop/handlers/zeromq.rb', line 72

def send_message(message)
  dd_message = dd_messagify(message)
  if dd_message.is_a?(DripDrop::Message)
    @send_queue.push([dd_message.encoded])
  elsif message.class == Array
    @send_queue.push(message)
  else
    @send_queue.push([message])
  end
    
  
  if @send_queue_enabled
    @connection.register_writable
     
    # Not sure why this is necessary, this is likely a bug in em-zeromq
    on_writable(@connection.socket)
  else
    on_writable(@connection.socket)
  end
end