Class: Vertx::EventBus

Inherits:
Object
  • Object
show all
Defined in:
lib/vertx/event_bus.rb

Overview

For point to point messaging, messages can be sent to an address using the send method. The messages will be delivered to a single handler, if one is registered on that address. If more than one handler is registered on the same address, Vert.x will choose one and deliver the message to that. Vert.x will aim to fairly distribute messages in a round-robin way, but does not guarantee strict round-robin under all circumstances.

All messages sent over the bus are transient. On event of failure of all or part of the event bus messages may be lost. Applications should be coded to cope with lost messages, e.g. by resending them, and making application services idempotent.

The order of messages received by any specific handler from a specific sender should match the order of messages sent from that sender.

When sending a message, a reply handler can be provided. If so, it will be called when the reply from the receiver has been received. Reply messages can also be replied to, etc, ad infinitum.

Different event bus instances can be clustered together over a network, to give a single logical event bus.

When receiving a message in a handler the received object is an instance of EventBus::Message - this contains the actual message plus a reply method which can be used to reply to it.

Author:

Constant Summary collapse

@@handler_map =
{}
@@j_eventbus =
org.jruby.jubilee.vertx.JubileeVertx.vertx().eventBus()

Class Method Summary collapse

Class Method Details

.convert_msg(message) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/vertx/event_bus.rb', line 140

def EventBus.convert_msg(message)
  if message.is_a? Hash
    message = org.vertx.java.core.json.JsonObject.new(JSON.generate(message))
  elsif message.is_a? Buffer
    message = message._to_java_buffer
  elsif message.is_a? Fixnum
    message = java.lang.Long.new(message)
  elsif message.is_a? Float
    message = java.lang.Double.new(message)
  end
  message
end

.publish(address, message) ⇒ Object

Publish a message on the event bus

Parameters:

  • message (Hash)

    The message to publish



69
70
71
72
# File 'lib/vertx/event_bus.rb', line 69

def EventBus.publish(address, message)
  EventBus.send_or_pub(false, address, message)
  self
end

.register_handler(address, local_only = false, &message_hndlr) ⇒ FixNum

Register a handler. received by the handler. A single handler can be registered against many addresses.

Parameters:

  • address (String)

    The address to register for. Messages sent to that address will be

  • local_only (Boolean) (defaults to: false)

    If true then handler won’t be propagated across cluster

  • message_hndlr (Block)

    The handler

Returns:



97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/vertx/event_bus.rb', line 97

def EventBus.register_handler(address, local_only = false, &message_hndlr)
  raise "An address must be specified" if !address
  raise "A message handler must be specified" if !message_hndlr
  internal = InternalHandler.new(message_hndlr)
  if local_only
    @@j_eventbus.registerLocalHandler(address, internal)
  else
    @@j_eventbus.registerHandler(address, internal)
  end
  id = java.util.UUID.randomUUID.toString
  @@handler_map[id] = [address, internal]
  id
end

.register_simple_handler(local_only = false, &message_hndlr) ⇒ FixNum

Registers a handler against a uniquely generated address, the address is returned as the id received by the handler. A single handler can be registered against many addresses.

Parameters:

  • local_only (Boolean) (defaults to: false)

    If true then handler won’t be propagated across cluster

  • message_hndlr (Block)

    The handler

Returns:



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/vertx/event_bus.rb', line 116

def EventBus.register_simple_handler(local_only = false, &message_hndlr)
  raise "A message handler must be specified" if !message_hndlr
  internal = InternalHandler.new(message_hndlr)
  id = java.util.UUID.randomUUID.toString
  if local_only
    @@j_eventbus.registerLocalHandler(id, internal)
  else
    @@j_eventbus.registerHandler(id, internal)
  end
  @@handler_map[id] = [id, internal]
  id
end

.send(address, message, &reply_handler) ⇒ Object

Send a message on the event bus It will be called when the reply from a receiver is received.

Parameters:

  • message (Hash)

    The message to send

  • reply_handler (Block)

    An optional reply handler.



62
63
64
65
# File 'lib/vertx/event_bus.rb', line 62

def EventBus.send(address, message, &reply_handler)
  EventBus.send_or_pub(true, address, message, reply_handler)
  self
end

.send_or_pub(send, address, message, reply_handler = nil) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/vertx/event_bus.rb', line 75

def EventBus.send_or_pub(send, address, message, reply_handler = nil)
  raise "An address must be specified" if !address
  raise "A message must be specified" if message == nil
  message = convert_msg(message)
  if send
    if reply_handler != nil
      @@j_eventbus.send(address, message, InternalHandler.new(reply_handler))
    else
      @@j_eventbus.send(address, message)
    end
  else
    @@j_eventbus.publish(address, message)
  end
  self
end

.unregister_handler(handler_id) ⇒ Object

Unregisters a handler

Parameters:

  • handler_id (FixNum)

    The id of the handler to unregister. Returned from register_handler



131
132
133
134
135
136
137
# File 'lib/vertx/event_bus.rb', line 131

def EventBus.unregister_handler(handler_id)
  raise "A handler_id must be specified" if !handler_id
  tuple = @@handler_map.delete(handler_id)
  raise "Cannot find handler for id #{handler_id}" if !tuple
  @@j_eventbus.unregisterHandler(tuple.first, tuple.last)
  self
end