Class: HomeQ::SOBS::Connection
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- HomeQ::SOBS::Connection
show all
- Includes:
- HomeQ, Base::Logging, Sender
- Defined in:
- lib/homeq/sobs/connection.rb
Constant Summary
collapse
/([^\r\n]+?)\r\n/m
- PUSH_TIMER_LENGTH =
1
- HELLO_TIMER_LENGTH =
5
- OUTBOUND_MAX =
1024 * 64
- REFUSE_SEND_THRESHOLD =
1024 * 1024
- MAX_INPUT_MSGS_PER_READ_CYCLE =
Shouldn’t make any difference to throughput, but a low(ish) number should smooth out latency.
2
- NO_ARG_MESSAGES =
[
'not_ignored',
'unknown_command',
'internal_error',
'out_of_memory',
'draining',
'bad_format',
'job_too_big',
'timed_out',
'reserve',
'subscribe',
'unsubscribe'
]
- ARG_MESSAGES =
[
'kicked',
'inserted',
'buried',
'hello',
'release',
'delete',
'bury',
'kick',
'deadline_soon',
'not_found',
'deleted',
'released',
]
- BODY_MESSAGES =
['ok']
- ARG_AND_BODY_MESSAGES =
['reserved']
- ARGS_AND_BODY_MESSAGES =
['put']
- ALL_MESSAGES =
NO_ARG_MESSAGES + ARG_MESSAGES +
BODY_MESSAGES + ARG_AND_BODY_MESSAGES + ARGS_AND_BODY_MESSAGES
Constants included
from HomeQ
VERSION
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from HomeQ
calculated_homeq_env, calculated_homeq_topology, included, queue_list_for_host_from_topology
Methods included from Sender
#book_it, #buried, #bury, #deadline_soon, #delete, #deleted, #hello, #inserted, #kick, #kicked, #not_found, #put, #release, #released, #reserve, #reserved, #stats, #stats_job, #subscribe, #unsubscribe
#logger
Instance Attribute Details
#outbound_max ⇒ Object
Returns the value of attribute outbound_max.
52
53
54
|
# File 'lib/homeq/sobs/connection.rb', line 52
def outbound_max
@outbound_max
end
|
#peer ⇒ Object
Returns the value of attribute peer.
50
51
52
|
# File 'lib/homeq/sobs/connection.rb', line 50
def peer
@peer
end
|
#refuse_send_threshold ⇒ Object
Returns the value of attribute refuse_send_threshold.
53
54
55
|
# File 'lib/homeq/sobs/connection.rb', line 53
def refuse_send_threshold
@refuse_send_threshold
end
|
#server ⇒ Object
49
50
51
|
# File 'lib/homeq/sobs/connection.rb', line 49
def server
@server
end
|
#state ⇒ Object
Returns the value of attribute state.
51
52
53
|
# File 'lib/homeq/sobs/connection.rb', line 51
def state
@state
end
|
Instance Method Details
#cancel_hello_timer ⇒ Object
530
531
532
533
|
# File 'lib/homeq/sobs/connection.rb', line 530
def cancel_hello_timer
@hello_timer.cancel if @hello_timer
@hello_timer = nil
end
|
#close ⇒ Object
516
517
518
|
# File 'lib/homeq/sobs/connection.rb', line 516
def close
@state.close
end
|
#closed ⇒ Object
548
549
|
# File 'lib/homeq/sobs/connection.rb', line 548
def closed
end
|
#closing ⇒ Object
546
547
|
# File 'lib/homeq/sobs/connection.rb', line 546
def closing
end
|
#connection_completed ⇒ Object
NB: EM does NOT call this for passive (server) connections
128
129
130
|
# File 'lib/homeq/sobs/connection.rb', line 128
def connection_completed
@opened_at = Time.now
end
|
#enter_congestion ⇒ Object
454
455
456
457
458
459
460
461
462
|
# File 'lib/homeq/sobs/connection.rb', line 454
def enter_congestion
unless @congested
logger.warn {
"Connection #{remote_endpoint} congested. Peer having problems?"
}
@congested = true
@congested_at = Time.now
end
end
|
#exit_congestion ⇒ Object
464
465
466
467
468
469
470
|
# File 'lib/homeq/sobs/connection.rb', line 464
def exit_congestion
@congested = false
logger.warn {
"Congested #{remote_endpoint} cleared. Was congested for " +
"#{Time.now - @congested_at} seconds."
}
end
|
#ok_to_receive? ⇒ Boolean
502
503
504
505
506
507
508
509
510
511
512
513
514
|
# File 'lib/homeq/sobs/connection.rb', line 502
def ok_to_receive?
if (@state.state == :start ||
@state.state == :waiting_for_hello ||
@state.state == :closed)
logger.error {
"SOBS peer #{remote_endpoint} sent isn't open."
}
close
false
else
true
end
end
|
#opened ⇒ Object
Higher-level interface for subclasses
544
545
|
# File 'lib/homeq/sobs/connection.rb', line 544
def opened
end
|
250
251
252
253
254
255
256
257
258
259
|
# File 'lib/homeq/sobs/connection.rb', line 250
def ()
elements = .split(/\s/)
command_name = elements[0].downcase
if self.respond_to?("parse_#{command_name}")
self.send("parse_#{command_name}", elements[1..-1])
else
unknown_message(command_name)
@data.slice!(0, @scanner.pos)
end
end
|
#parse_incoming_data ⇒ Object
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
|
# File 'lib/homeq/sobs/connection.rb', line 193
def parse_incoming_data
if outbound_maxed_out?
logger.debug3 {
"Short circuit deferring parsing incoming data until next tick. " +
"'#{@data[0..20]}' #{@data.length}"
}
@need_next_tick_parse = true
return
end
msgs_this_spin = 0
logger.debug4 {
'<-- ' + @data.inspect
}
@scanner.pos = 0
while ( = @scanner.scan(HEADER_REGEX)) do
len = @data.length
()
if len == @data.length
break end
logger.debug3 {
"<<<< #{.inspect}"
}
@msgs_in += 1
msgs_this_spin += 1
logger.debug3 {
"Msgs in: #{@msgs_in}"
}
if (msgs_this_spin == MAX_INPUT_MSGS_PER_READ_CYCLE &&
@data.length > 0 &&
(@state.state != :closed || @state.state != :closing))
logger.debug3 {
"Max input msgs per read cycle. " +
"Deferring parsing incoming data until next tick. " +
"'#{@data[0..20]}' #{@data.class} #{@data.length}"
}
@scanner = StringScanner.new(@data)
@need_next_tick_parse = true
return
end
logger.debug4 {
"BUF NOW: '#{@data}'"
}
@scanner.pos = 0
end
end
|
#post_init ⇒ Object
EventMachine::Connection entry points
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/homeq/sobs/connection.rb', line 59
def post_init
@state = Statemachine.build do
state :start do
event :connect, :waiting_for_hello, :hello event :close, :closing
event :unbind, :closed
end
state :waiting_for_hello do
on_entry :wait_for_hello
on_exit :cancel_hello_timer
event :close, :closing
event :good_hello, :open
event :bad_hello, :closing
event :hello_timeout, :closing
event :unbind, :closed
end
state :open do
on_entry :opened
event :close, :closing
event :unbind, :closed
end
state :closing do
on_entry :start_closing
event :unbind, :closed
event :good_hello, :closing
event :bad_hello, :closing
event :hello_timeout, :closing
event :close, :closing
end
state :closed do
on_entry :closed
event :close, :closed
end
end
@state.context = self
@hello_timer = nil
@data = ""
@scanner = StringScanner.new(@data)
@peer = nil
@outgoing = []
@rcvs = 0
@bytes_in = 0
@bytes_out = 0
@msgs_in = 0
@msgs_out = 0
@rcv_stats = {}
@snd_stats = {}
@opened_at = nil
@closed_at = nil
@outbound_bytes_highwater = 0
@congested = false
@refused_send = 0
@outbound_max ||= OUTBOUND_MAX
@refuse_send_threshold ||= REFUSE_SEND_THRESHOLD
setup_peer
@state.connect if peer
end
|
#process_data ⇒ Object
Do some parsing, and then, if needed, schedule a block to parse some more the next spin of the reactor.
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
# File 'lib/homeq/sobs/connection.rb', line 170
def process_data
parse_incoming_data
logger.debug4 {
"check if callback needed? " +
"needed: #{@need_next_tick_parse.inspect} && " +
"ticker: #{@process_ticker.inspect}"
}
if @need_next_tick_parse && !@ticker
EventMachine::next_tick {
logger.debug3 {
"Next tick parse callback."
}
@process_ticker = false
process_data
}
@process_ticker = true
logger.debug3 {
"Scheduled tick parse callback."
}
end
@need_next_tick_parse = false
end
|
#protocol_version_ok?(their_version) ⇒ Boolean
498
499
500
|
# File 'lib/homeq/sobs/connection.rb', line 498
def protocol_version_ok?(their_version)
their_version == HomeQ::SOBS::PROTOCOL_VERSION
end
|
#receive_data(data) ⇒ Object
Called by EM when data is recieved from the peer
133
134
135
136
137
138
139
140
141
142
|
# File 'lib/homeq/sobs/connection.rb', line 133
def receive_data data
return if (@state.state == :closed || @state.state == :closing)
logger.debug3 {
"Receive data callback."
}
@rcvs += 1
@bytes_in += data.length
@data << data
process_data
end
|
#receive_hello(message) ⇒ Object
399
400
401
402
403
404
405
406
407
408
409
|
# File 'lib/homeq/sobs/connection.rb', line 399
def receive_hello(message)
if !protocol_version_ok?(message.args[0].to_i)
logger.info {
"SOBS Peer #{remote_endpoint} sent incompatible " +
"protocol version #{message.args[0].to_i}"
}
@state.bad_hello
else
@state.good_hello
end
end
|
#receive_message(message) ⇒ Object
370
371
372
373
374
375
376
377
378
379
380
|
# File 'lib/homeq/sobs/connection.rb', line 370
def receive_message(message)
@rcv_stats[message.type] ||= 0
@rcv_stats[message.type] += 1
if self.respond_to?("receive_#{message.type}")
self.send("receive_#{message.type}", message)
else
logger.error {
"Ignoring message: #{message.type}"
}
end
end
|
#remote_endpoint ⇒ Object
441
442
443
|
# File 'lib/homeq/sobs/connection.rb', line 441
def remote_endpoint
peer ? "#{peer[1]}:#{peer[0]}" : '?:?'
end
|
#send_to_peer(data) ⇒ Object
472
473
474
475
476
477
478
479
480
481
|
# File 'lib/homeq/sobs/connection.rb', line 472
def send_to_peer(data)
if should_refuse_send?
@refused_send += 1
enter_congestion unless @congested
return false
end
@outgoing.push(data)
push_to_peer
true end
|
#setup_peer ⇒ Object
488
489
490
491
492
493
494
495
496
|
# File 'lib/homeq/sobs/connection.rb', line 488
def setup_peer
if (peername = get_peername)
@peer = Socket.unpack_sockaddr_in(peername)
@opened_at = Time.now
logger.info {
"SOBS connection to #{remote_endpoint} complete."
}
end
end
|
#should_refuse_send? ⇒ Boolean
445
446
447
448
449
450
451
452
|
# File 'lib/homeq/sobs/connection.rb', line 445
def should_refuse_send?
outbound_size = queued_outgoing_data_size
@outbound_bytes_highwater = [
@outbound_bytes_highwater,
outbound_size
].max
outbound_size > (@refuse_send_threshold || REFUSE_SEND_THRESHOLD)
end
|
#start_closing ⇒ Object
535
536
537
538
|
# File 'lib/homeq/sobs/connection.rb', line 535
def start_closing
close_connection_after_writing
closing
end
|
#to_s ⇒ Object
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
|
# File 'lib/homeq/sobs/connection.rb', line 415
def to_s
str = ''
str << "EM signature: #{signature} peer: #{remote_endpoint} \n"
str << "state: #{state.state} "
str << "opened: #{@opened_at} (#{Time.now - @opened_at}s)\n"
str << "rcvs: #{@rcvs} in: #{@msgs_in} out: #{@msgs_out}\n"
str << "bytes in: #{@bytes_in} bytes_out: #{@bytes_out}\n"
str << "outgoing buffers: #{@outgoing.length}, "
str << "#{queued_outgoing_data_size} total bytes\n"
str << "#{self.get_outbound_data_size} total outbound bytes\n"
str << "send ticker is #{@send_ticker ? '' : 'not '}running\n"
str << "outbound_max: #{@outbound_max || OUTBOUND_MAX} "
str << "refuse_send_threshold: "
str << "#{@refuse_send_threshold || REFUSE_SEND_THRESHOLD}\n"
str << "congested?: #{@congested ? 'Y' : 'N'}, "
str << "refused sends: #{@refused_send}\n"
str << "outbound_bytes_highwater: #{@outbound_bytes_highwater}\n"
@rcv_stats.each {|msg_name,count|
str << "#{msg_name} in: #{count}\n"
}
@snd_stats.each {|msg_name,count|
str << "#{msg_name} out: #{count}\n"
}
str
end
|
#unbind ⇒ Object
Called by EM when the connection goes away.
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/homeq/sobs/connection.rb', line 145
def unbind
@closed_at = Time.now
if peer
logger.info {
"Connection has terminated to sobs peer #{remote_endpoint}."
}
end
if @opened_at
logger.info {
"Open for #{'%6.4fs' % (@closed_at - @opened_at)}. "+
"In: #{@msgs_in}. Out: #{@msgs_out}. "+
"IO/sec: #{io_per_sec}"
}
end
@data = '' @state.unbind
server.connections.delete(self) if server
end
|
#unknown_message(command_name) ⇒ Object
483
484
485
486
|
# File 'lib/homeq/sobs/connection.rb', line 483
def unknown_message(command_name)
msg = "Unknown/unsupported command: #{command_name}"
sys.die(msg)
end
|
#wait_for_hello ⇒ Object
520
521
522
523
524
525
526
527
528
|
# File 'lib/homeq/sobs/connection.rb', line 520
def wait_for_hello
@hello_timer = EventMachine::Timer.new(HELLO_TIMER_LENGTH) do
logger.info {
"No hello received from #{remote_endpoint}. Bye."
}
@hello_timer = nil
@state.hello_timeout
end
end
|