Class: DripDrop::Node
- Inherits:
-
Object
- Object
- DripDrop::Node
- Defined in:
- lib/dripdrop/node.rb,
lib/dripdrop/node/nodelet.rb
Defined Under Namespace
Classes: Nodelet
Constant Summary collapse
- ZCTX =
ZMQ::Context.new 1
Instance Attribute Summary collapse
-
#debug ⇒ Object
Returns the value of attribute debug.
-
#nodelets ⇒ Object
readonly
Returns the value of attribute nodelets.
-
#routing ⇒ Object
readonly
Returns the value of attribute routing.
-
#zm_reactor ⇒ Object
readonly
Returns the value of attribute zm_reactor.
Class Method Summary collapse
-
.error_handler(e) ⇒ Object
Catch all error handler Global to all DripDrop Nodes.
Instance Method Summary collapse
-
#action ⇒ Object
When subclassing
DripDrop::Node
you probably want to define this method Otherwise it will attempt to run the @block passed intoDripDrop::Node.new
. -
#http_client(address, opts = {}) ⇒ Object
An EM HTTP client.
-
#http_server(address, opts = {}, &block) ⇒ Object
Starts a new Thin HTTP server listening on address.
-
#initialize(opts = {}, &block) ⇒ Node
constructor
A new instance of Node.
-
#join ⇒ Object
If the reactor has started, this blocks until the thread running the reactor joins.
-
#nodelet(name, klass = Nodelet, *configure_args, &block) ⇒ Object
Nodelets are a way of segmenting a DripDrop::Node.
-
#recv_internal(dest, identifier, &block) ⇒ Object
Defines a subscriber to the channel
dest
, to receive messages fromsend_internal
. -
#remove_recv_internal(dest, identifier) ⇒ Object
Deletes a subscriber to the channel
dest
previously identified by a reciever created withrecv_internal
. -
#route(name, handler_type, *handler_args) ⇒ Object
Defines a new route.
-
#route_full(nodelet, name, handler_type, *handler_args) ⇒ Object
Probably not useful for most, apps.
-
#routes_for(nodelet_name, &block) ⇒ Object
DEPRECATED, will be deleted in 0.8.
-
#send_internal(dest, data) ⇒ Object
An inprocess pub/sub queue that works similarly to EM::Channel, but has manually specified identifiers for subscribers letting you more easily delete subscribers without crazy id tracking.
-
#start ⇒ Object
Starts the reactors and runs the block passed to initialize.
-
#start! ⇒ Object
Blocking version of start, equivalent to
start
thenjoin
. -
#stop ⇒ Object
Stops the reactors.
-
#websocket(address, opts = {}) ⇒ Object
Binds an EM websocket connection to
address
. -
#zmq_publish(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::PUB type socket, can only send messages via
send_message
. -
#zmq_pull(address, socket_ctype, opts = {}, &block) ⇒ Object
Creates a ZMQ::PULL type socket.
-
#zmq_push(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::PUSH type socket, can only send messages via
send_message
. -
#zmq_subscribe(address, socket_ctype, opts = {}, &block) ⇒ Object
Creates a ZMQ::SUB type socket.
-
#zmq_xrep(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely powerful, so their functionality is currently limited.
-
#zmq_xreq(address, socket_ctype, opts = {}) ⇒ Object
See the documentation for
zmq_xrep
for more info.
Constructor Details
#initialize(opts = {}, &block) ⇒ Node
Returns a new instance of Node.
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/dripdrop/node.rb', line 22 def initialize(opts={},&block) @block = block @thread = nil # Thread containing the reactors @routing = {} # Routing table @debug = opts[:debug] @recipients_for = {} @handler_default_opts = {:debug => @debug} @nodelets = {} # Cache of registered nodelets @zctx = ZCTX end |
Instance Attribute Details
#debug ⇒ Object
Returns the value of attribute debug.
20 21 22 |
# File 'lib/dripdrop/node.rb', line 20 def debug @debug end |
#nodelets ⇒ Object (readonly)
Returns the value of attribute nodelets.
19 20 21 |
# File 'lib/dripdrop/node.rb', line 19 def nodelets @nodelets end |
#routing ⇒ Object (readonly)
Returns the value of attribute routing.
19 20 21 |
# File 'lib/dripdrop/node.rb', line 19 def routing @routing end |
#zm_reactor ⇒ Object (readonly)
Returns the value of attribute zm_reactor.
19 20 21 |
# File 'lib/dripdrop/node.rb', line 19 def zm_reactor @zm_reactor end |
Class Method Details
.error_handler(e) ⇒ Object
Catch all error handler Global to all DripDrop Nodes
273 274 275 |
# File 'lib/dripdrop/node.rb', line 273 def self.error_handler(e) $stderr.write "#{e.class}: #{e.}\n\t#{e.backtrace.join("\n\t")}" end |
Instance Method Details
#action ⇒ Object
When subclassing DripDrop::Node
you probably want to define this method Otherwise it will attempt to run the @block passed into DripDrop::Node.new
55 56 57 58 59 60 61 |
# File 'lib/dripdrop/node.rb', line 55 def action if @block self.instance_eval(&@block) else raise "Could not start, no block or specified" end end |
#http_client(address, opts = {}) ⇒ Object
An EM HTTP client. Example:
client = http_client(addr)
client.(:name => 'name', :body => 'hi') do |resp_msg|
puts resp_msg.inspect
end
229 230 231 232 233 |
# File 'lib/dripdrop/node.rb', line 229 def http_client(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::HTTPClientHandler.new(uri, h_opts) end |
#http_server(address, opts = {}, &block) ⇒ Object
Starts a new Thin HTTP server listening on address. Can have an on_recv
handler that gets passed msg
and response
args.
http_server(addr) {|msg,response| response.(msg)}
217 218 219 220 221 |
# File 'lib/dripdrop/node.rb', line 217 def http_server(address,opts={},&block) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::HTTPServerHandler.new(uri, h_opts,&block) end |
#join ⇒ Object
If the reactor has started, this blocks until the thread running the reactor joins. This should block forever unless stop
is called.
66 67 68 69 70 71 72 |
# File 'lib/dripdrop/node.rb', line 66 def join if @thread @thread.join else raise "Can't join on a node that isn't yet started" end end |
#nodelet(name, klass = Nodelet, *configure_args, &block) ⇒ Object
Nodelets are a way of segmenting a DripDrop::Node. This can be used for both organization and deployment. One might want the production deployment of an app to be broken across multiple servers or processes for instance:
nodelet :heartbeat do |nlet|
nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
EM::PeriodicalTimer.new(1) do
nlet.ticker.(:name => 'tick')
end
end
Nodelets can also be subclassed, for instance:
class SpecialNodelet < DripDrop::Node::Nodelet
def action
nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
EM::PeriodicalTimer.new(1) do
nlet.ticker.(:name => 'tick')
end
end
end
nodelet :heartbeat, SpecialNodelet
If you specify a block, Nodelet#action will be ignored and the block will be run
141 142 143 144 145 146 147 148 149 |
# File 'lib/dripdrop/node.rb', line 141 def nodelet(name,klass=Nodelet,*configure_args,&block) nlet = @nodelets[name] ||= klass.new(self,name,*configure_args) if block block.call(nlet) else nlet.action end nlet end |
#recv_internal(dest, identifier, &block) ⇒ Object
Defines a subscriber to the channel dest
, to receive messages from send_internal
. identifier
is a unique identifier for this receiver. The identifier can be used by remove_recv_internal
256 257 258 259 260 261 262 |
# File 'lib/dripdrop/node.rb', line 256 def recv_internal(dest,identifier,&block) if @recipients_for[dest] @recipients_for[dest][identifier] = block else @recipients_for[dest] = {identifier => block} end end |
#remove_recv_internal(dest, identifier) ⇒ Object
Deletes a subscriber to the channel dest
previously identified by a reciever created with recv_internal
266 267 268 269 |
# File 'lib/dripdrop/node.rb', line 266 def remove_recv_internal(dest,identifier) return false unless @recipients_for[dest] @recipients_for[dest].delete(identifier) end |
#route(name, handler_type, *handler_args) ⇒ Object
Defines a new route. Routes are the recommended way to instantiate handlers. For example:
route :stats_pub, :zmq_publish, 'tcp://127.0.0.1:2200', :bind
route :stats_sub, :zmq_subscribe, stats_pub.address, :connect
Will make the following methods available within the reactor block:
stats_pub # A regular zmq_publish handler
:stats_sub # A regular zmq_subscribe handler
See the docs for routes_for
for more info in grouping routes for nodelets and maintaining sanity in larger apps
86 87 88 |
# File 'lib/dripdrop/node.rb', line 86 def route(name,handler_type,*handler_args) route_full(nil, name, handler_type, *handler_args) end |
#route_full(nodelet, name, handler_type, *handler_args) ⇒ Object
Probably not useful for most, apps. This is used internally to create a route for a given nodelet.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/dripdrop/node.rb', line 92 def route_full(nodelet, name, handler_type, *handler_args) # If we're in a route_for block, prepend appropriately full_name = (nodelet && nodelet.name) ? "#{nodelet.name}_#{name}".to_sym : name handler = self.send(handler_type, *handler_args) @routing[full_name] = handler # Define the route name as a singleton method (class << self; self; end).class_eval do define_method(full_name) { handler } end handler end |
#routes_for(nodelet_name, &block) ⇒ Object
DEPRECATED, will be deleted in 0.8
108 109 110 111 112 |
# File 'lib/dripdrop/node.rb', line 108 def routes_for(nodelet_name,&block) $stderr.write "routes_for is now deprecated, use nodelet instead" nlet = nodelet(nodelet_name,&block) block.call(nlet) end |
#send_internal(dest, data) ⇒ Object
An inprocess pub/sub queue that works similarly to EM::Channel, but has manually specified identifiers for subscribers letting you more easily delete subscribers without crazy id tracking.
This is useful for situations where you want to broadcast messages across your app, but need a way to properly delete listeners.
dest
is the name of the pub/sub channel. data
is any type of ruby var you’d like to send.
244 245 246 247 248 249 250 251 |
# File 'lib/dripdrop/node.rb', line 244 def send_internal(dest,data) return false unless @recipients_for[dest] blocks = @recipients_for[dest].values return false unless blocks blocks.each do |block| block.call(data) end end |
#start ⇒ Object
Starts the reactors and runs the block passed to initialize. This is non-blocking.
35 36 37 38 39 40 |
# File 'lib/dripdrop/node.rb', line 35 def start @thread = Thread.new do EM.error_handler {|e| self.error_handler e} EM.run { action } end end |
#start! ⇒ Object
Blocking version of start, equivalent to start
then join
43 44 45 46 |
# File 'lib/dripdrop/node.rb', line 43 def start! self.start self.join end |
#stop ⇒ Object
Stops the reactors. If you were blocked on #join, that will unblock.
49 50 51 |
# File 'lib/dripdrop/node.rb', line 49 def stop EM.stop end |
#websocket(address, opts = {}) ⇒ Object
Binds an EM websocket connection to address
. takes blocks for on_open
, on_recv
, on_close
and on_error
.
For example on_recv
could be used to echo incoming messages thusly:
websocket(addr).on_open {|conn|
ws.(:name => 'ws_open_ack')
}.on_recv {|msg,conn|
conn.send(msg)
}.on_close {|conn|
}.on_error {|reason,conn|
}
The ws
object that’s passed into the handlers is not the DripDrop::WebSocketHandler
object, but an em-websocket object.
208 209 210 211 212 |
# File 'lib/dripdrop/node.rb', line 208 def websocket(address,opts={}) uri = URI.parse(address) h_opts = handler_opts_given(opts) DripDrop::WebSocketHandler.new(uri,h_opts) end |
#zmq_publish(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::PUB type socket, can only send messages via send_message
159 160 161 |
# File 'lib/dripdrop/node.rb', line 159 def zmq_publish(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQPubHandler,ZMQ::PUB,address,socket_ctype,opts) end |
#zmq_pull(address, socket_ctype, opts = {}, &block) ⇒ Object
Creates a ZMQ::PULL type socket. Can only receive messages via on_recv
164 165 166 |
# File 'lib/dripdrop/node.rb', line 164 def zmq_pull(address,socket_ctype,opts={},&block) zmq_handler(DripDrop::ZMQPullHandler,ZMQ::PULL,address,socket_ctype,opts) end |
#zmq_push(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::PUSH type socket, can only send messages via send_message
169 170 171 |
# File 'lib/dripdrop/node.rb', line 169 def zmq_push(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQPushHandler,ZMQ::PUSH,address,socket_ctype,opts) end |
#zmq_subscribe(address, socket_ctype, opts = {}, &block) ⇒ Object
Creates a ZMQ::SUB type socket. Can only receive messages via on_recv
. zmq_subscribe sockets have a topic_filter
option, which restricts which messages they can receive. It takes a regexp as an option.
154 155 156 |
# File 'lib/dripdrop/node.rb', line 154 def zmq_subscribe(address,socket_ctype,opts={},&block) zmq_handler(DripDrop::ZMQSubHandler,ZMQ::SUB,address,socket_ctype,opts) end |
#zmq_xrep(address, socket_ctype, opts = {}) ⇒ Object
Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply to the original source of the message.
Receiving with XREP sockets in DripDrop is different than other types of sockets, on_recv passes 2 arguments to its callback, message
, and response
. A minimal example is shown below:
zmq_xrep(z_addr, :bind).on_recv do |,response|
response.()
end
185 186 187 |
# File 'lib/dripdrop/node.rb', line 185 def zmq_xrep(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQXRepHandler,ZMQ::XREP,address,socket_ctype,opts) end |
#zmq_xreq(address, socket_ctype, opts = {}) ⇒ Object
See the documentation for zmq_xrep
for more info
190 191 192 |
# File 'lib/dripdrop/node.rb', line 190 def zmq_xreq(address,socket_ctype,opts={}) zmq_handler(DripDrop::ZMQXReqHandler,ZMQ::XREQ,address,socket_ctype,opts) end |