Class: Skynet::Worker

Inherits:
Object
  • Object
show all
Includes:
GuidGenerator, SkynetDebugger
Defined in:
lib/skynet/skynet_worker.rb

Defined Under Namespace

Classes: ConnectionFailure, Error, NoManagerError, RespawnWorker

Constant Summary collapse

RETRY_TIME =
2
MEMORY_CHECK_DELAY =
30
MANAGER_PING_INTERVAL =
60
@@ok_to_mem_check =
false
@@lastmem =
nil
@@memct =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from GuidGenerator

#get_unique_id

Methods included from SkynetDebugger

#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn

Constructor Details

#initialize(worker_type, options = {}) ⇒ Worker

Returns a new instance of Worker.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/skynet/skynet_worker.rb', line 27

def initialize(worker_type, options = {})
  @worker_id    = get_unique_id(1).to_i
  @worker_type  = worker_type.to_sym
  @queue_id     = options[:queue_id] || 0
  @processed    = 0
  @in_process   = false
  @mq           = Skynet::MessageQueue.new

  debug "THIS WORKER TAKES #{worker_type}"

  @worker_info = {
    :tasktype     => worker_type,
    :hostname     => hostname,
    :process_id   => process_id,
    :worker_type  => payload_type,
    :worker_id    => worker_id,
    :version      => mq.get_worker_version,
  }
  @worker_info.merge!(options)
end

Instance Attribute Details

#messageObject

Returns the value of attribute message.



15
16
17
# File 'lib/skynet/skynet_worker.rb', line 15

def message
  @message
end

#mqObject

Returns the value of attribute mq.



15
16
17
# File 'lib/skynet/skynet_worker.rb', line 15

def mq
  @mq
end

#processedObject

Returns the value of attribute processed.



15
16
17
# File 'lib/skynet/skynet_worker.rb', line 15

def processed
  @processed
end

#queue_idObject (readonly)

Returns the value of attribute queue_id.



16
17
18
# File 'lib/skynet/skynet_worker.rb', line 16

def queue_id
  @queue_id
end

#taskObject

Returns the value of attribute task.



15
16
17
# File 'lib/skynet/skynet_worker.rb', line 15

def task
  @task
end

#worker_idObject (readonly)

Returns the value of attribute worker_id.



16
17
18
# File 'lib/skynet/skynet_worker.rb', line 16

def worker_id
  @worker_id
end

#worker_infoObject (readonly)

Returns the value of attribute worker_info.



16
17
18
# File 'lib/skynet/skynet_worker.rb', line 16

def worker_info
  @worker_info
end

#worker_typeObject (readonly)

Returns the value of attribute worker_type.



16
17
18
# File 'lib/skynet/skynet_worker.rb', line 16

def worker_type
  @worker_type
end

Class Method Details

.debug_class_descObject



23
24
25
# File 'lib/skynet/skynet_worker.rb', line 23

def self.debug_class_desc
  "WORKER-#{$$}"
end

.start(options = {}) ⇒ Object



362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/skynet/skynet_worker.rb', line 362

def self.start(options={})
  options[:worker_type]    ||= :any
  options[:required_libs]  ||= []

  OptionParser.new do |opt|
    opt.banner = "Usage: worker [options]"
    opt.on('-r', '--required LIBRARY', 'Include the specified libraries') do |v|
      options[:required_libs] << v
    end
    opt.on('--worker_type=WORKERTYPE', "master, task or any") do |v|
      if ["any","master","task"].include?(v)
        options[:worker_type] = v
      else
        raise Skynet::Error.new("#{v} is not a valid worker_type")
      end
    end
    opt.on('--config=CONFIG_FILE', 'Where to find the skynet.rb config file') do |v|
      options[:config_file] = File.expand_path(v)
    end
    opt.on('--queue=QUEUE_NAME', 'Which queue should these workers use (default "default").') do |v|
      options[:queue] = v
    end
    opt.on('--queue_id=queue_id', 'Which queue should these workers use (default 0).') do |v|
      options[:queue_id] = v.to_i
    end
    opt.parse!(ARGV)
  end

  if options[:queue]
    if options[:queue_id]
      raise Skynet::Error.new("You may either provide a queue_id or a queue, but not both.")
    end
    options[:queue_id] = config.queue_id_by_name(options[:queue])
  end

  options[:required_libs].each do |adlib|
    begin
      require adlib
    rescue MissingSourceFile => e
      error "The included lib #{adlib} was not found: #{e.inspect}"
      exit
    end
  end

  options[:config_file] ||= Skynet::CONFIG[:CONFIG_FILE]
  if options[:config_file]
    begin
      require options[:config_file]
    rescue MissingSourceFile => e
      error "The config file at #{options[:config_file]} was not found: #{e.inspect}"
      exit
    end
  else
    error "Config file missing. Please add a config/skynet_config.rb before starting."        
    exit
  end

  debug "WORKER STARTING WORKER_TYPE?:#{options[:worker_type]}. QUEUE: #{Skynet::Config.new.queue_name_by_id(options[:queue_id])}"

  begin
    worker = Skynet::Worker.new(options[:worker_type], options)
    worker.start
  rescue Skynet::Worker::NoManagerError => e
    fatal e.message
    exit
  rescue Skynet::Worker::RespawnWorker => e
    warn "WORKER #{$$} SCRIPT CAUGHT RESPAWN.  RESTARTING #{e.message}"
    cmd = "ruby #{Skynet::CONFIG[:LAUNCHER_PATH]} --worker_type=#{options[:worker_type]} --queue_id=#{options[:queue_id]} "
    cmd << "--config=#{options[:config_file]} "
    cmd << "-r #{options[:required_libs].join(' -r ')}" if options[:required_libs] and not options[:required_libs].empty?
    pid = Skynet.fork_and_exec(cmd)
    exit
  rescue SystemExit
    info "WORKER #{$$} EXITING GRACEFULLY"
  rescue Exception => e
    fatal "WORKER #{$$} DYING #{e.class} #{e.message} #{e.backtrace}"
    report = ExceptionReport.new(e)
    report.save
  end
end

Instance Method Details

#find_pid_size(file, format = :notpretty) ⇒ Object



331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/skynet/skynet_worker.rb', line 331

def find_pid_size(file, format=:notpretty)
  begin
    open(file).each { |line|
      if line.index('VmSize')
        temp = line[7..-5].strip.to_f/1000
        return BigDecimal(temp.to_s).truncate(5).to_s('F') if format == :pretty
        return temp
      end
    }
  rescue Exception => e
    warn "ERROR #{e.inspect}"
    '0'
  end
end

#get_memory_sizeObject



346
347
348
# File 'lib/skynet/skynet_worker.rb', line 346

def get_memory_size
  find_pid_size("/proc/self/status")
end

#hostnameObject



52
53
54
# File 'lib/skynet/skynet_worker.rb', line 52

def hostname
  @machine_name ||= Socket.gethostname
end

#interruptObject



163
164
165
166
167
168
169
170
171
172
173
# File 'lib/skynet/skynet_worker.rb', line 163

def interrupt
  if @die
    exit
  else
    @die = true
    if not @in_process
      notify_worker_stop
      exit
    end
  end
end

#managerObject



154
155
156
# File 'lib/skynet/skynet_worker.rb', line 154

def manager
  Skynet::Manager.get
end

#manager_send(method, *args) ⇒ Object



140
141
142
143
144
145
146
147
148
# File 'lib/skynet/skynet_worker.rb', line 140

def manager_send(method,*args)
  begin
    manager.send(method,*args)
  rescue DRb::DRbConnError, Errno::ECONNREFUSED  => e
    error "Worker could not connect to manager to call #{method} on manager #{e.inspect}"
  rescue Exception  => e
    error "Worker could not connect call #{method} on manager #{e.inspect} args:", args
  end
end

#max_memory_reached?Boolean

Returns:

  • (Boolean)


317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/skynet/skynet_worker.rb', line 317

def max_memory_reached?
  return false unless ok_to_mem_check?
   if !@memchecktime
    @memchecktime = Time.now
    return false
  elsif Time.now > (@memchecktime + MEMORY_CHECK_DELAY)
    @memchecktime = Time.now
    local_mem = get_memory_size.to_i
    return local_mem if local_mem > Skynet::CONFIG[:WORKER_MAX_MEMORY]
  else
    false
  end
end

#new_version_respawn?Boolean

Returns:

  • (Boolean)


60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/skynet/skynet_worker.rb', line 60

def new_version_respawn?
   if !@verchecktime
    @verchecktime = Time.now
    begin
      @curver = mq.get_worker_version
      debug "FINDING INITIAL VER #{@curver}"
    rescue  Skynet::RequestExpiredError => e
      warn "NO INITIAL VER IN MQ using 1"
      @curver = 1
    end
  else
    if Time.now < (@verchecktime + Skynet::CONFIG[:WORKER_VERSION_CHECK_DELAY])
      return false
    else
      @verchecktime = Time.now
      begin
        newver = mq.get_worker_version
        # debug "CURVER #{@curver} NEWVER: #{newver}"
        if newver != @curver and not mq.version_active?(@curver, queue_id)
          info "RESTARTING WORKER ON PID #{$$}"
          return true
        end
      rescue Skynet::RequestExpiredError => e
        warn "NO CURRENT WORKER REV IN MQ still using 1"
        mq.set_worker_version(1)
        return false
      end
    end
  end
  return false
end

#notify_task_begun(task) ⇒ Object



102
103
104
105
106
107
# File 'lib/skynet/skynet_worker.rb', line 102

def notify_task_begun(task)
  task[:processed] = @processed
  task[:started_at] = Time.now.to_i
  @in_process = true      
  write_worker_status(@worker_info.merge(task))
end

#notify_task_completeObject



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/skynet/skynet_worker.rb', line 109

def notify_task_complete
  @processed += 1
  @in_process = false

  write_worker_status(
    @worker_info.merge({
      :task_id       => 0,
      :job_id        => 0,
      :name          => "waiting for #{@worker_type}",
      :processed     => @processed,
      :map_or_reduce => nil,
      :started_at    => Time.now.to_i
    })
  )
end

#notify_worker_startedObject



92
93
94
95
96
97
98
99
100
# File 'lib/skynet/skynet_worker.rb', line 92

def notify_worker_started
  write_worker_status(
    @worker_info.merge({
      :name       => "waiting for #{@worker_type}",
      :processed  => 0,
      :started_at => Time.now.to_i
    })
  )
end

#notify_worker_stopObject



125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/skynet/skynet_worker.rb', line 125

def notify_worker_stop
  info "Worker #{process_id} stopping..."
  write_worker_status(
  @worker_info.merge({
    :task_id       => 0,
    :job_id        => 0,
    :name          => "waiting for #{@worker_type}",
    :processed     => @processed,
    :process_id    => nil,
    :map_or_reduce => nil,
    :started_at    => Time.now.to_i
    })
  )
end

#ok_to_mem_check?Boolean

Returns:

  • (Boolean)


350
351
352
353
354
355
356
357
358
359
360
# File 'lib/skynet/skynet_worker.rb', line 350

def ok_to_mem_check?
  return true if @@ok_to_mem_check == true
  return false if @@ok_to_mem_check == :notok
  if File.exists?('/proc/self/status')
  @@lastmem ||= get_memory_size.to_i
  return @@ok_to_mem_check = true
  else
    @@ok_to_mem_check = :notok
    return false
  end
end

#payload_typeObject



158
159
160
161
# File 'lib/skynet/skynet_worker.rb', line 158

def payload_type
  return nil if worker_type == :any
  return worker_type
end

#process_idObject



48
49
50
# File 'lib/skynet/skynet_worker.rb', line 48

def process_id
  $$
end

#startObject



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/skynet/skynet_worker.rb', line 175

def start
  exceptions = 0
  conerror   = 0
  @curver    = nil

  # setup signal handlers for manager
  Signal.trap("HUP")  do
    @respawn = true
    raise Skynet::Worker::RespawnWorker.new if not @in_process
  end
  Signal.trap("TERM") { interrupt       }
  Signal.trap("INT")  { interrupt       }

  raise Skynet::Worker::RespawnWorker.new if new_version_respawn?

  printlog "STARTING WORKER @ VER:#{@curver} type:#{@worker_type} QUEUE_ID:#{queue_id}"

  notify_worker_started

  message = nil
  task    = nil

  loop do
    message = nil
    begin
      if Skynet::CONFIG[:WORKER_MAX_PROCESSED] and Skynet::CONFIG[:WORKER_MAX_PROCESSED] > 0 and @processed >= Skynet::CONFIG[:WORKER_MAX_PROCESSED]
        raise Skynet::Worker::RespawnWorker.new("WORKER OVER MAX MEM AT: #{get_memory_size} MAX: #{Skynet::CONFIG[:WORKER_MAX_MEMORY]}")
      end
      if @die
        exit
      elsif @respawn
        raise Skynet::Worker::RespawnWorker.new()
      end

      if local_mem = max_memory_reached?
        raise Skynet::Worker::RespawnWorker.new("WORKER OVER MAX MEM AT: #{local_mem} MAX: #{Skynet::CONFIG[:WORKER_MAX_MEMORY]}")
      end

      if conerror > 0
        @mq = Skynet::MessageQueue.new
        warn "WORKER RECONNECTED AFTER #{conerror} tries"
        conerror = 0
      end

      # debug "1 START LOOPSSS at VER #{@curver}"
      #
      # debug "LOOK FOR WORK USING TEMPLATE", Skynet::Message.task_template(@curver)
      # message = Skynet::Message.new(mq.take(Skynet::Message.task_template(@curver),0.00001))
      message = mq.take_next_task(@curver, 0.00001, payload_type, queue_id)

      next unless message.respond_to?(:payload)

      task = message.payload
      error "BAD MESSAGE", task unless task.respond_to?(:map_or_reduce)

      info "STEP 2 GOT MESSAGE #{message.name} type:#{task.map_or_reduce}, jobid: #{message.job_id}, taskid:#{message.task_id} it: #{message.iteration}"
      debug "STEP 2.1 message=", message.to_a
      # info "STEP 3 GOT TASK taskid: #{task.task_id}"
      # debug "STEP 3.1 task=", task
      next unless task
      # maybe instead of putting a time in the future, it puts the start time and an offset in seconds

      # task.debug "taking task #{task.task_id} name:#{task.name}..."

      info "STEP 4 RUNNING TASK #{message.name} jobid: #{message.job_id} taskid: #{task.task_id}"
      notify_task_begun({
        :job_id        => message.job_id,
        :task_id       => message.task_id,
        :iteration     => message.iteration,
        :name          => message.name,
        :map_or_reduce => task.map_or_reduce
      })
      result = task.run(message.iteration)

      info "STEP 5 GOT RESULT FROM RUN TASK #{message.name} jobid: #{message.job_id} taskid: #{task.task_id}"
      debug "STEP 5.1 RESULT DATA:", result

      result_message = mq.write_result(message,result,task.result_timeout)
      info "STEP 6 WROTE RESULT MESSAGE #{message.name} jobid: #{message.job_id} taskid: #{task.task_id}"
      # debug "STEP 6.1 RESULT_MESSAGE:", result_message
      notify_task_complete

    rescue Skynet::Task::TimeoutError => e
      error "Task timed out while executing #{e.inspect} #{e.backtrace.join("\n")}"
      @in_process = false
      next

    rescue Skynet::Worker::RespawnWorker => e
      info "Respawning and taking worker status #{e.message}"
      notify_worker_stop
      raise e

    rescue Skynet::RequestExpiredError => e
      if new_version_respawn?
        notify_worker_stop
        manager_send(:restart_worker,$$)
      end
      sleep 1
      next

    rescue Skynet::ConnectionError, DRb::DRbConnError => e
      conerror += 1
      retry_time = conerror > 6 ? RETRY_TIME * 3 : RETRY_TIME
      error "#{e.message}, RETRY #{conerror} in #{retry_time} seconds !!"
      @mq = nil
      sleep retry_time
      if conerror > 20
        fatal "TOO MANY RECONNECTION EXCEPTIONS #{e.message}"
        notify_worker_stop
        raise e
      end
      next

    rescue NoManagerError => e
      fatal e.message
      break
    rescue Interrupt, SystemExit => e
      info "Exiting..."
      notify_worker_stop
      break
    rescue Exception => e
      error "skynet_worker.rb:#{__LINE__} #{e.inspect} #{e.backtrace.join("\n")}"
      exceptions += 1
      break if exceptions > 1000
      #mq.take(@next_worker_message.task_template,0.0005) if message
      if message
        mq.write_error(message,"#{e.inspect} #{e.backtrace.join("\n")}",(task.respond_to?(:result_timeout) ? task.result_timeout : 200))
      else
        # what do we do here
        # mq.write_error(message,"ERROR in WORKER [#{$$}] #{e.inspect} #{e.backtrace.join("\n")}")
      end
      # mq.write_error("ERROR in WORKER [#{$$}] #{e.inspect} #{e.backtrace.join("\n")}")
      @in_process = false
      next
    end
  end
end

#versionObject



56
57
58
# File 'lib/skynet/skynet_worker.rb', line 56

def version
  @curver
end

#write_worker_status(status) ⇒ Object



150
151
152
# File 'lib/skynet/skynet_worker.rb', line 150

def write_worker_status(status)
  manager_send(:worker_notify,status)
end