Class: Spark::Accumulator::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/spark/accumulator.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeServer

Returns a new instance of Server.



213
214
215
216
217
218
219
220
# File 'lib/spark/accumulator.rb', line 213

def initialize
  @server = TCPServer.new(0)
  @host = @server.hostname
  @port = @server.port

  @threads = []
  handle_accept
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



193
194
195
# File 'lib/spark/accumulator.rb', line 193

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



193
194
195
# File 'lib/spark/accumulator.rb', line 193

def port
  @port
end

#serverObject (readonly)

Returns the value of attribute server.



193
194
195
# File 'lib/spark/accumulator.rb', line 193

def server
  @server
end

Class Method Details

.hostObject



203
204
205
206
# File 'lib/spark/accumulator.rb', line 203

def self.host
  start
  @instance.host
end

.portObject



208
209
210
211
# File 'lib/spark/accumulator.rb', line 208

def self.port
  start
  @instance.port
end

.startObject



195
196
197
# File 'lib/spark/accumulator.rb', line 195

def self.start
  @instance ||= Spark::Accumulator::Server.new
end

.stopObject



199
200
201
# File 'lib/spark/accumulator.rb', line 199

def self.stop
  @instance && @instance.stop
end

Instance Method Details

#handle_acceptObject



228
229
230
231
232
233
234
235
# File 'lib/spark/accumulator.rb', line 228

def handle_accept
  @threads << Thread.new do
    loop {
      handle_connection(@server.accept)
    }
  end

end

#handle_connection(socket) ⇒ Object



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/spark/accumulator.rb', line 237

def handle_connection(socket)
  @threads << Thread.new do
    until socket.closed?
      count = socket.read_int
      count.times do
        data = socket.read_data
        accum = Spark::Accumulator.instances[data[0]]
        if accum
          accum.add(data[1])
        else
          Spark.logger.warn("Accumulator with id #{data[0]} does not exist.")
        end
      end

      # http://stackoverflow.com/questions/28560133/ruby-server-java-scala-client-deadlock
      # socket.write_int(Spark::Constant::ACCUMULATOR_ACK)
    end

  end
end

#stopObject



222
223
224
225
226
# File 'lib/spark/accumulator.rb', line 222

def stop
  @threads.each(&:kill)
rescue
  nil
end