Class: GemeraldBeanstalk::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/gemerald_beanstalk/connection.rb

Constant Summary collapse

BEGIN_REQUEST_STATES =
[:ready, :multi_part_request_in_progress]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(beanstalk, connection = nil) ⇒ Connection

Returns a new instance of Connection.



78
79
80
81
82
83
84
85
86
# File 'lib/gemerald_beanstalk/connection.rb', line 78

def initialize(beanstalk, connection = nil)
  @beanstalk = beanstalk
  @connection = connection
  @inbound_state = :ready
  @mutex = Mutex.new
  @outbound_state = :ready
  @tube_used = 'default'
  @tubes_watched = Set.new(%w[default])
end

Instance Attribute Details

#beanstalkObject (readonly)

Returns the value of attribute beanstalk.



5
6
7
# File 'lib/gemerald_beanstalk/connection.rb', line 5

def beanstalk
  @beanstalk
end

#mutexObject (readonly)

Returns the value of attribute mutex.



5
6
7
# File 'lib/gemerald_beanstalk/connection.rb', line 5

def mutex
  @mutex
end

#producer=(value) ⇒ Object (writeonly)

Sets the attribute producer

Parameters:

  • value

    the value to set the attribute producer to.



6
7
8
# File 'lib/gemerald_beanstalk/connection.rb', line 6

def producer=(value)
  @producer = value
end

#tube_usedObject (readonly)

Returns the value of attribute tube_used.



5
6
7
# File 'lib/gemerald_beanstalk/connection.rb', line 5

def tube_used
  @tube_used
end

#tubes_watchedObject (readonly)

Returns the value of attribute tubes_watched.



5
6
7
# File 'lib/gemerald_beanstalk/connection.rb', line 5

def tubes_watched
  @tubes_watched
end

#waiting=(value) ⇒ Object (writeonly)

Sets the attribute waiting

Parameters:

  • value

    the value to set the attribute waiting to.



6
7
8
# File 'lib/gemerald_beanstalk/connection.rb', line 6

def waiting=(value)
  @waiting = value
end

#worker=(value) ⇒ Object (writeonly)

Sets the attribute worker

Parameters:

  • value

    the value to set the attribute worker to.



6
7
8
# File 'lib/gemerald_beanstalk/connection.rb', line 6

def worker=(value)
  @worker = value
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


9
10
11
# File 'lib/gemerald_beanstalk/connection.rb', line 9

def alive?
  return @inbound_state != :closed && @oubound_state != :closed
end

#begin_multi_part_request(multi_part_request) ⇒ Object



14
15
16
17
18
19
# File 'lib/gemerald_beanstalk/connection.rb', line 14

def begin_multi_part_request(multi_part_request)
  return false unless outbound_ready?
  @multi_part_request = multi_part_request
  @outbound_state = :multi_part_request_in_progress
  return true
end

#begin_requestObject



22
23
24
25
26
# File 'lib/gemerald_beanstalk/connection.rb', line 22

def begin_request
  return false unless BEGIN_REQUEST_STATES.include?(@outbound_state)
  @outbound_state = :request_in_progress
  return true
end

#close_connectionObject



29
30
31
32
# File 'lib/gemerald_beanstalk/connection.rb', line 29

def close_connection
  @inbound_state = @outbound_state = :closed
  @connection.close_connection unless @connection.nil?
end

#complete_requestObject



35
36
37
38
39
# File 'lib/gemerald_beanstalk/connection.rb', line 35

def complete_request
  return false unless request_in_progress?
  @outbound_state = :ready
  return true
end

#execute(raw_command) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/gemerald_beanstalk/connection.rb', line 42

def execute(raw_command)
  command = response = nil
  @mutex.synchronize do
    return if waiting? || request_in_progress?
    puts "#{Time.now.to_f}: #{raw_command}" if ENV['VERBOSE']
    if multi_part_request_in_progress?
      (command = @multi_part_request).body = raw_command
    else
      command = GemeraldBeanstalk::Command.new(raw_command, self)
      if !command.valid?
        response = command.error
      elsif command.multi_part_request?
        return begin_multi_part_request(command)
      end
    end
    begin_request
  end
  puts "#{Time.now.to_f}: #{command.inspect}" if ENV['VERBOSE']
  # Execute command unless parsing already yielded a response
  response ||= beanstalk.execute(command)
  transmit(response) unless response.nil?
end

#ignore(tube, force = false) ⇒ Object



66
67
68
69
70
# File 'lib/gemerald_beanstalk/connection.rb', line 66

def ignore(tube, force = false)
  return nil unless @tubes_watched.length > 1 || force
  @tubes_watched.delete(tube)
  return @tubes_watched.length
end

#inbound_ready?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/gemerald_beanstalk/connection.rb', line 73

def inbound_ready?
  return @inbound_state == :ready
end

#multi_part_request_in_progress?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/gemerald_beanstalk/connection.rb', line 89

def multi_part_request_in_progress?
  return @outbound_state == :multi_part_request_in_progress
end

#outbound_ready?Boolean

Returns:

  • (Boolean)


94
95
96
# File 'lib/gemerald_beanstalk/connection.rb', line 94

def outbound_ready?
  return @outbound_state == :ready
end

#producer?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/gemerald_beanstalk/connection.rb', line 99

def producer?
  return !!@producer
end

#request_in_progress?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/gemerald_beanstalk/connection.rb', line 104

def request_in_progress?
  return @outbound_state == :request_in_progress
end

#response_receivedObject



109
110
111
112
113
# File 'lib/gemerald_beanstalk/connection.rb', line 109

def response_received
  return false unless waiting? || timed_out?
  @inbound_state = :ready
  return true
end

#timed_out?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/gemerald_beanstalk/connection.rb', line 116

def timed_out?
  return @inbound_state == :timed_out
end

#transmit(message) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/gemerald_beanstalk/connection.rb', line 121

def transmit(message)
  return if !alive? || @connection.nil?
  puts "#{Time.now.to_f}: #{message}" if ENV['VERBOSE']
  @connection.send_data(message)
  complete_request
  response_received
  return self
end

#use(tube_name) ⇒ Object



131
132
133
# File 'lib/gemerald_beanstalk/connection.rb', line 131

def use(tube_name)
  @tube_used = tube_name
end

#wait(timeout = nil) ⇒ Object



136
137
138
139
140
141
# File 'lib/gemerald_beanstalk/connection.rb', line 136

def wait(timeout = nil)
  return false unless inbound_ready?
  @wait_timeout = timeout
  @inbound_state = :waiting
  return true
end

#wait_timed_outObject



144
145
146
147
148
149
# File 'lib/gemerald_beanstalk/connection.rb', line 144

def wait_timed_out
  return false unless @inbound_state == :waiting
  @wait_timeout = nil
  @inbound_state = :timed_out
  return true
end

#waiting?Boolean

Returns:

  • (Boolean)


152
153
154
155
156
157
# File 'lib/gemerald_beanstalk/connection.rb', line 152

def waiting?
  return false unless @inbound_state == :waiting
  return true if @wait_timeout.nil? || @wait_timeout > Time.now.to_f
  wait_timed_out
  return false
end

#watch(tube) ⇒ Object



160
161
162
163
# File 'lib/gemerald_beanstalk/connection.rb', line 160

def watch(tube)
  @tubes_watched << tube
  return @tubes_watched.length
end

#worker?Boolean

Returns:

  • (Boolean)


166
167
168
# File 'lib/gemerald_beanstalk/connection.rb', line 166

def worker?
  return !!@worker
end