Class: Avro::IPC::Requestor
- Inherits:
-
Object
- Object
- Avro::IPC::Requestor
- Defined in:
- lib/avro/ipc.rb
Overview
Base class for the client side of a protocol interaction.
Instance Attribute Summary collapse
-
#local_protocol ⇒ Object
readonly
Returns the value of attribute local_protocol.
-
#remote_hash ⇒ Object
Returns the value of attribute remote_hash.
-
#remote_protocol ⇒ Object
Returns the value of attribute remote_protocol.
-
#send_protocol ⇒ Object
Returns the value of attribute send_protocol.
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
-
#initialize(local_protocol, transport) ⇒ Requestor
constructor
A new instance of Requestor.
- #read_call_response(message_name, decoder) ⇒ Object
- #read_error(writers_schema, readers_schema, decoder) ⇒ Object
- #read_handshake_response(decoder) ⇒ Object
- #read_response(writers_schema, readers_schema, decoder) ⇒ Object
- #request(message_name, request_datum) ⇒ Object
- #write_call_request(message_name, request_datum, encoder) ⇒ Object
- #write_handshake_request(encoder) ⇒ Object
- #write_request(request_schema, request_datum, encoder) ⇒ Object
Constructor Details
#initialize(local_protocol, transport) ⇒ Requestor
Returns a new instance of Requestor.
86 87 88 89 90 91 92 |
# File 'lib/avro/ipc.rb', line 86 def initialize(local_protocol, transport) @local_protocol = local_protocol @transport = transport @remote_protocol = nil @remote_hash = nil @send_protocol = nil end |
Instance Attribute Details
#local_protocol ⇒ Object (readonly)
Returns the value of attribute local_protocol.
83 84 85 |
# File 'lib/avro/ipc.rb', line 83 def local_protocol @local_protocol end |
#remote_hash ⇒ Object
Returns the value of attribute remote_hash.
83 84 85 |
# File 'lib/avro/ipc.rb', line 83 def remote_hash @remote_hash end |
#remote_protocol ⇒ Object
Returns the value of attribute remote_protocol.
83 84 85 |
# File 'lib/avro/ipc.rb', line 83 def remote_protocol @remote_protocol end |
#send_protocol ⇒ Object
Returns the value of attribute send_protocol.
84 85 86 |
# File 'lib/avro/ipc.rb', line 84 def send_protocol @send_protocol end |
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
83 84 85 |
# File 'lib/avro/ipc.rb', line 83 def transport @transport end |
Instance Method Details
#read_call_response(message_name, decoder) ⇒ Object
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/avro/ipc.rb', line 194 def read_call_response(, decoder) # The format of a call response is: # * response metadata, a map with values of type bytes # * a one-byte error flag boolean, followed by either: # * if the error flag is false, # the message response, serialized per the message's response schema. # * if the error flag is true, # the error, serialized per the message's error union schema. = META_READER.read(decoder) # remote response schema = remote_protocol.[] raise AvroError.new("Unknown remote message: #{}") unless # local response schema = local_protocol.[] unless raise AvroError.new("Unknown local message: #{}") end # error flag if !decoder.read_boolean writers_schema = .response readers_schema = .response read_response(writers_schema, readers_schema, decoder) else writers_schema = .errors || SYSTEM_ERROR_SCHEMA readers_schema = .errors || SYSTEM_ERROR_SCHEMA raise read_error(writers_schema, readers_schema, decoder) end end |
#read_error(writers_schema, readers_schema, decoder) ⇒ Object
231 232 233 234 |
# File 'lib/avro/ipc.rb', line 231 def read_error(writers_schema, readers_schema, decoder) datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) AvroRemoteError.new(datum_reader.read(decoder)) end |
#read_handshake_response(decoder) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/avro/ipc.rb', line 168 def read_handshake_response(decoder) handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder) we_have_matching_schema = false case handshake_response['match'] when 'BOTH' self.send_protocol = false we_have_matching_schema = true when 'CLIENT' raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) self.remote_hash = handshake_response['serverHash'] self.send_protocol = false we_have_matching_schema = true when 'NONE' raise AvroError.new('Handshake failure. match == NONE') if send_protocol self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) self.remote_hash = handshake_response['serverHash'] self.send_protocol = true else raise AvroError.new("Unexpected match: #{match}") end return we_have_matching_schema end |
#read_response(writers_schema, readers_schema, decoder) ⇒ Object
226 227 228 229 |
# File 'lib/avro/ipc.rb', line 226 def read_response(writers_schema, readers_schema, decoder) datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) datum_reader.read(decoder) end |
#request(message_name, request_datum) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/avro/ipc.rb', line 104 def request(, request_datum) # Writes a request message and reads a response or error message. # build handshake and call request buffer_writer = StringIO.new(String.new('', encoding: 'BINARY')) buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) write_handshake_request(buffer_encoder) write_call_request(, request_datum, buffer_encoder) # send the handshake and call request; block until call response call_request = buffer_writer.string call_response = transport.transceive(call_request) # process the handshake and call response buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_response)) if read_handshake_response(buffer_decoder) read_call_response(, buffer_decoder) else request(, request_datum) end end |
#write_call_request(message_name, request_datum, encoder) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/avro/ipc.rb', line 143 def write_call_request(, request_datum, encoder) # The format of a call request is: # * request metadata, a map with values of type bytes # * the message name, an Avro string, followed by # * the message parameters. Parameters are serialized according to # the message's request declaration. # TODO request metadata (not yet implemented) = {} META_WRITER.write(, encoder) = local_protocol.[] unless raise AvroError, "Unknown message: #{}" end encoder.write_string(.name) write_request(.request, request_datum, encoder) end |
#write_handshake_request(encoder) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/avro/ipc.rb', line 125 def write_handshake_request(encoder) local_hash = local_protocol.md5 remote_name = transport.remote_name remote_hash = REMOTE_HASHES[remote_name] unless remote_hash remote_hash = local_hash self.remote_protocol = local_protocol end request_datum = { 'clientHash' => local_hash, 'serverHash' => remote_hash } if send_protocol request_datum['clientProtocol'] = local_protocol.to_s end HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder) end |
#write_request(request_schema, request_datum, encoder) ⇒ Object
163 164 165 166 |
# File 'lib/avro/ipc.rb', line 163 def write_request(request_schema, request_datum, encoder) datum_writer = Avro::IO::DatumWriter.new(request_schema) datum_writer.write(request_datum, encoder) end |