Class: Thrift::EventMachineConnection

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
EventMachine::Deferrable
Defined in:
lib/thrift_client/thrift/transport.rb

Constant Summary collapse

BUFFER_SIZE =

4kB

4096

Instance Method Summary collapse

Instance Method Details

#closeObject



112
113
114
115
116
117
118
119
# File 'lib/thrift_client/thrift/transport.rb', line 112

def close
  trap do
    if @connected
      @connected = false
      close_connection(true)
    end
  end
end

#connected?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/thrift_client/thrift/transport.rb', line 72

def connected?
  @connected
end

#connection_completedObject



76
77
78
79
# File 'lib/thrift_client/thrift/transport.rb', line 76

def connection_completed
  @connected = true
  succeed
end

#on_read_timeoutObject



99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/thrift_client/thrift/transport.rb', line 99

def on_read_timeout
  trap do
    deferrable = @deferrable
    if deferrable
      @deferrable = nil
      deferrable.fail(:read)
    else
      puts "read timeout, but no deferrable was found."
      fail
    end
  end
end

#post_initObject



65
66
67
68
69
70
# File 'lib/thrift_client/thrift/transport.rb', line 65

def post_init
  @connected = false
  @deferrable = nil
  @rbuf = ''
  @index = 0
end

#read(size, timeout) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/thrift_client/thrift/transport.rb', line 127

def read(size,timeout)
  timer = nil
  if timeout and timeout > 0
    timer = EventMachine.add_timer(timeout){
      on_read_timeout
    }
  end
  begin
    data = nil
    if can_read?(size)
      data = yank(size)
    else
      @size = size
      @deferrable ||= EventMachine::DefaultDeferrable.new
      data = EventMachine::Synchrony.sync @deferrable
      if data == :unbind
        raise TransportException.new(TransportException::ALREADY_CLOSE, "connection already closed.")
      elsif data == :read
        raise TransportException.new(TransportException::TIMED_OUT, "read timeout")
      elsif data == :readerror
        raise TransportException.new(TransportException::UNKNOWN, "read unknown error")
      end
    end
  ensure
    if timer
      EventMachine.cancel_timer(timer)
    end
  end
  data
end

#receive_data(data) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/thrift_client/thrift/transport.rb', line 158

def receive_data(data)
  trap do
    puts data.length
    @rbuf << data
    if @deferrable and can_read?(@size)
      data = yank(@size)
      @size = nil
      deferrable = @deferrable
      @deferrable = nil
      deferrable.succeed(data)
    end
  end
end

#unbindObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/thrift_client/thrift/transport.rb', line 81

def unbind
  connected = @connected
  @connected = false
  deferrable = @deferrable
  if connected
    if deferrable
      @deferrable = nil
      deferrable.fail(:unbind)
    else
      puts "connection closed by server, but no deferrable was found."
      fail
    end
  else
    puts "connection closed"
    fail
  end
end

#write(buf, timeout) ⇒ Object



121
122
123
124
125
# File 'lib/thrift_client/thrift/transport.rb', line 121

def write(buf,timeout)
  callback {
    send_data(buf)
  }
end