Class: DripDrop::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/dripdrop/node.rb,
lib/dripdrop/node/nodelet.rb

Defined Under Namespace

Classes: Nodelet

Constant Summary collapse

ZCTX =
ZMQ::Context.new 1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}, &block) ⇒ Node

Returns a new instance of Node.



22
23
24
25
26
27
28
29
30
31
# File 'lib/dripdrop/node.rb', line 22

def initialize(opts={},&block)
  @block      = block
  @thread     = nil # Thread containing the reactors
  @routing    = {}  # Routing table
  @debug      = opts[:debug]
  @recipients_for       = {}
  @handler_default_opts = {:debug => @debug}
  @nodelets   = {}  # Cache of registered nodelets
  @zctx = ZCTX
end

Instance Attribute Details

#debugObject

Returns the value of attribute debug.



20
21
22
# File 'lib/dripdrop/node.rb', line 20

def debug
  @debug
end

#nodeletsObject (readonly)

Returns the value of attribute nodelets.



19
20
21
# File 'lib/dripdrop/node.rb', line 19

def nodelets
  @nodelets
end

#routingObject (readonly)

Returns the value of attribute routing.



19
20
21
# File 'lib/dripdrop/node.rb', line 19

def routing
  @routing
end

#zm_reactorObject (readonly)

Returns the value of attribute zm_reactor.



19
20
21
# File 'lib/dripdrop/node.rb', line 19

def zm_reactor
  @zm_reactor
end

Class Method Details

.error_handler(e) ⇒ Object

Catch all error handler Global to all DripDrop Nodes



273
274
275
# File 'lib/dripdrop/node.rb', line 273

def self.error_handler(e)
  $stderr.write "#{e.class}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
end

Instance Method Details

#actionObject

When subclassing DripDrop::Node you probably want to define this method Otherwise it will attempt to run the @block passed into DripDrop::Node.new



55
56
57
58
59
60
61
# File 'lib/dripdrop/node.rb', line 55

def action
  if @block
    self.instance_eval(&@block)
  else
    raise "Could not start, no block or specified"
  end
end

#http_client(address, opts = {}) ⇒ Object

An EM HTTP client. Example:

client = http_client(addr)
client.send_message(:name => 'name', :body => 'hi') do |resp_msg|
  puts resp_msg.inspect
end


229
230
231
232
233
# File 'lib/dripdrop/node.rb', line 229

def http_client(address,opts={})
  uri     = URI.parse(address)
  h_opts  = handler_opts_given(opts)
  DripDrop::HTTPClientHandler.new(uri, h_opts)
end

#http_server(address, opts = {}, &block) ⇒ Object

Starts a new Thin HTTP server listening on address. Can have an on_recv handler that gets passed msg and response args.

http_server(addr) {|msg,response| response.send_message(msg)}


217
218
219
220
221
# File 'lib/dripdrop/node.rb', line 217

def http_server(address,opts={},&block)
  uri     = URI.parse(address)
  h_opts  = handler_opts_given(opts)
  DripDrop::HTTPServerHandler.new(uri, h_opts,&block)
end

#joinObject

If the reactor has started, this blocks until the thread running the reactor joins. This should block forever unless stop is called.



66
67
68
69
70
71
72
# File 'lib/dripdrop/node.rb', line 66

def join
  if @thread
    @thread.join
  else
    raise "Can't join on a node that isn't yet started"
  end
end

#nodelet(name, klass = Nodelet, *configure_args, &block) ⇒ Object

Nodelets are a way of segmenting a DripDrop::Node. This can be used for both organization and deployment. One might want the production deployment of an app to be broken across multiple servers or processes for instance:

nodelet :heartbeat do |nlet|
  nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
  EM::PeriodicalTimer.new(1) do
    nlet.ticker.send_message(:name => 'tick')
  end
end

Nodelets can also be subclassed, for instance:

class SpecialNodelet < DripDrop::Node::Nodelet
  def action
    nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
    EM::PeriodicalTimer.new(1) do
      nlet.ticker.send_message(:name => 'tick')
    end
  end
end

nodelet :heartbeat, SpecialNodelet

If you specify a block, Nodelet#action will be ignored and the block will be run



141
142
143
144
145
146
147
148
149
# File 'lib/dripdrop/node.rb', line 141

def nodelet(name,klass=Nodelet,*configure_args,&block)
  nlet = @nodelets[name] ||= klass.new(self,name,*configure_args)
  if block
    block.call(nlet)
  else
    nlet.action
  end
  nlet
end

#recv_internal(dest, identifier, &block) ⇒ Object

Defines a subscriber to the channel dest, to receive messages from send_internal. identifier is a unique identifier for this receiver. The identifier can be used by remove_recv_internal



256
257
258
259
260
261
262
# File 'lib/dripdrop/node.rb', line 256

def recv_internal(dest,identifier,&block)
  if @recipients_for[dest]
    @recipients_for[dest][identifier] =  block
  else
    @recipients_for[dest] = {identifier => block}
  end
end

#remove_recv_internal(dest, identifier) ⇒ Object

Deletes a subscriber to the channel dest previously identified by a reciever created with recv_internal



266
267
268
269
# File 'lib/dripdrop/node.rb', line 266

def remove_recv_internal(dest,identifier)
  return false unless @recipients_for[dest]
  @recipients_for[dest].delete(identifier)
end

#route(name, handler_type, *handler_args) ⇒ Object

Defines a new route. Routes are the recommended way to instantiate handlers. For example:

route :stats_pub, :zmq_publish, 'tcp://127.0.0.1:2200', :bind
route :stats_sub, :zmq_subscribe, stats_pub.address, :connect

Will make the following methods available within the reactor block:

stats_pub  # A regular zmq_publish handler
:stats_sub # A regular zmq_subscribe handler

See the docs for routes_for for more info in grouping routes for nodelets and maintaining sanity in larger apps



86
87
88
# File 'lib/dripdrop/node.rb', line 86

def route(name,handler_type,*handler_args)
  route_full(nil, name, handler_type, *handler_args)
end

#route_full(nodelet, name, handler_type, *handler_args) ⇒ Object

Probably not useful for most, apps. This is used internally to create a route for a given nodelet.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/dripdrop/node.rb', line 92

def route_full(nodelet, name, handler_type, *handler_args)
  # If we're in a route_for block, prepend appropriately
  full_name = (nodelet && nodelet.name) ? "#{nodelet.name}_#{name}".to_sym : name
  
  handler = self.send(handler_type, *handler_args)
  @routing[full_name] = handler
  
  # Define the route name as a singleton method
  (class << self; self; end).class_eval do
    define_method(full_name) { handler }
  end
  
  handler
end

#routes_for(nodelet_name, &block) ⇒ Object

DEPRECATED, will be deleted in 0.8



108
109
110
111
112
# File 'lib/dripdrop/node.rb', line 108

def routes_for(nodelet_name,&block)
  $stderr.write "routes_for is now deprecated, use nodelet instead"
  nlet = nodelet(nodelet_name,&block)
  block.call(nlet)
end

#send_internal(dest, data) ⇒ Object

An inprocess pub/sub queue that works similarly to EM::Channel, but has manually specified identifiers for subscribers letting you more easily delete subscribers without crazy id tracking.

This is useful for situations where you want to broadcast messages across your app, but need a way to properly delete listeners.

dest is the name of the pub/sub channel. data is any type of ruby var you’d like to send.



244
245
246
247
248
249
250
251
# File 'lib/dripdrop/node.rb', line 244

def send_internal(dest,data)
  return false unless @recipients_for[dest]
  blocks = @recipients_for[dest].values
  return false unless blocks
  blocks.each do |block|
    block.call(data)
  end
end

#startObject

Starts the reactors and runs the block passed to initialize. This is non-blocking.



35
36
37
38
39
40
# File 'lib/dripdrop/node.rb', line 35

def start
  @thread = Thread.new do
    EM.error_handler {|e| self.error_handler e}
    EM.run { action }
  end
end

#start!Object

Blocking version of start, equivalent to start then join



43
44
45
46
# File 'lib/dripdrop/node.rb', line 43

def start!
  self.start
  self.join
end

#stopObject

Stops the reactors. If you were blocked on #join, that will unblock.



49
50
51
# File 'lib/dripdrop/node.rb', line 49

def stop
  EM.stop
end

#websocket(address, opts = {}) ⇒ Object

Binds an EM websocket connection to address. takes blocks for on_open, on_recv, on_close and on_error.

For example on_recv could be used to echo incoming messages thusly:

websocket(addr).on_open {|conn|
  ws.send_message(:name => 'ws_open_ack')
}.on_recv {|msg,conn|
  conn.send(msg)
}.on_close {|conn|
}.on_error {|reason,conn|
}

The ws object that’s passed into the handlers is not the DripDrop::WebSocketHandler object, but an em-websocket object.



208
209
210
211
212
# File 'lib/dripdrop/node.rb', line 208

def websocket(address,opts={})
  uri     = URI.parse(address)
  h_opts  = handler_opts_given(opts)
  DripDrop::WebSocketHandler.new(uri,h_opts)
end

#zmq_publish(address, socket_ctype, opts = {}) ⇒ Object

Creates a ZMQ::PUB type socket, can only send messages via send_message



159
160
161
# File 'lib/dripdrop/node.rb', line 159

def zmq_publish(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQPubHandler,ZMQ::PUB,address,socket_ctype,opts)
end

#zmq_pull(address, socket_ctype, opts = {}, &block) ⇒ Object

Creates a ZMQ::PULL type socket. Can only receive messages via on_recv



164
165
166
# File 'lib/dripdrop/node.rb', line 164

def zmq_pull(address,socket_ctype,opts={},&block)
  zmq_handler(DripDrop::ZMQPullHandler,ZMQ::PULL,address,socket_ctype,opts)
end

#zmq_push(address, socket_ctype, opts = {}) ⇒ Object

Creates a ZMQ::PUSH type socket, can only send messages via send_message



169
170
171
# File 'lib/dripdrop/node.rb', line 169

def zmq_push(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQPushHandler,ZMQ::PUSH,address,socket_ctype,opts)
end

#zmq_subscribe(address, socket_ctype, opts = {}, &block) ⇒ Object

Creates a ZMQ::SUB type socket. Can only receive messages via on_recv. zmq_subscribe sockets have a topic_filter option, which restricts which messages they can receive. It takes a regexp as an option.



154
155
156
# File 'lib/dripdrop/node.rb', line 154

def zmq_subscribe(address,socket_ctype,opts={},&block)
  zmq_handler(DripDrop::ZMQSubHandler,ZMQ::SUB,address,socket_ctype,opts)
end

#zmq_xrep(address, socket_ctype, opts = {}) ⇒ Object

Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply to the original source of the message.

Receiving with XREP sockets in DripDrop is different than other types of sockets, on_recv passes 2 arguments to its callback, message, and response. A minimal example is shown below:

zmq_xrep(z_addr, :bind).on_recv do |message,response|
  response.send_message(message)
end


185
186
187
# File 'lib/dripdrop/node.rb', line 185

def zmq_xrep(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQXRepHandler,ZMQ::XREP,address,socket_ctype,opts)
end

#zmq_xreq(address, socket_ctype, opts = {}) ⇒ Object

See the documentation for zmq_xrep for more info



190
191
192
# File 'lib/dripdrop/node.rb', line 190

def zmq_xreq(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQXReqHandler,ZMQ::XREQ,address,socket_ctype,opts)
end