Class: Dplyr::RemoteTask

Inherits:
Object
  • Object
show all
Includes:
Dply::Logger
Defined in:
lib/dplyr/remote_task.rb

Constant Summary collapse

SSH_OPTS =
%w(-oBatchMode=yes -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Dply::Logger

#debug?, #logger, stderr, #stderr

Constructor Details

#initialize(host, task, id_size:) ⇒ RemoteTask

Returns a new instance of RemoteTask.



13
14
15
16
17
18
19
# File 'lib/dplyr/remote_task.rb', line 13

def initialize(host, task, id_size:)
  @host = host
  @task = task
  @id = host[:id]
  @messages = []
  @id_size = id_size
end

Instance Attribute Details

#exit_statusObject (readonly)

Returns the value of attribute exit_status.



9
10
11
# File 'lib/dplyr/remote_task.rb', line 9

def exit_status
  @exit_status
end

#messagesObject (readonly)

Returns the value of attribute messages.



9
10
11
# File 'lib/dplyr/remote_task.rb', line 9

def messages
  @messages
end

Instance Method Details

#addrObject



58
59
60
# File 'lib/dplyr/remote_task.rb', line 58

def addr
  @host.fetch :addr
end

#copy_application_ymlObject



29
30
31
32
33
34
35
36
# File 'lib/dplyr/remote_task.rb', line 29

def copy_application_yml
  src = "config/application.yml"
  dest = "#{user}@#{addr}:#{dir}/config/application.yml"
  return if not File.exist? src
  hprint "copying application.yml\n"
  command = ["scp", "-q", *SSH_OPTS, src, dest]
  system *command, exception: true
end

#dirObject



54
55
56
# File 'lib/dplyr/remote_task.rb', line 54

def dir
  @host.fetch :dir
end

#envObject



103
104
105
# File 'lib/dplyr/remote_task.rb', line 103

def env
  @env ||= "PATH=/usr/sbin:/usr/local/sbin:$PATH #{roles_env}"
end

#get_exit_status(pid) ⇒ Object



83
84
85
86
# File 'lib/dplyr/remote_task.rb', line 83

def get_exit_status(pid)
  pid, status = Process.waitpid2(pid)
  return status
end

#hprint(message) ⇒ Object



95
96
97
# File 'lib/dplyr/remote_task.rb', line 95

def hprint(message)
  printf output_template, @id, message
end

#output_templateObject



88
89
90
91
92
93
# File 'lib/dplyr/remote_task.rb', line 88

def output_template
  @output_template ||= begin
    id_template = "%-#{@id_size}s".bold.grey
    template = "#{id_template}  %s"
  end
end

#pty_read(file) ⇒ Object



66
67
68
69
70
71
72
73
74
75
# File 'lib/dplyr/remote_task.rb', line 66

def pty_read(file)
  file.each do |line|
    if line =~ /\Adply_msg\|/
      receive_message line
    else
      hprint line
    end
  end
rescue EOFError,Errno::ECONNRESET, Errno::EPIPE, Errno::EIO => e
end

#receive_message(msg_str) ⇒ Object



77
78
79
80
81
# File 'lib/dplyr/remote_task.rb', line 77

def receive_message(msg_str)
  msg = msg_str.partition("|")[2].strip
  return if msg.empty?
  @messages << msg
end

#reset!Object



99
100
101
# File 'lib/dplyr/remote_task.rb', line 99

def reset!
  @output_template = nil
end

#rolesObject



62
63
64
# File 'lib/dplyr/remote_task.rb', line 62

def roles
  @host.fetch :roles
end

#roles_envObject



107
108
109
110
111
# File 'lib/dplyr/remote_task.rb', line 107

def roles_env
  return "" if not roles.size > 0
  roles_str = roles.join(",")
  "DPLY_ROLES=#{roles_str} DPLY_PERSIST_ROLES=1"
end

#runObject



21
22
23
24
25
26
27
# File 'lib/dplyr/remote_task.rb', line 21

def run
  reset!
  copy_application_yml
  r, w, pid = spawn_remote_cmd
  pty_read r
  @exit_status = get_exit_status pid
end

#spawn_remote_cmdObject



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/dplyr/remote_task.rb', line 38

def spawn_remote_cmd
  if logger.debug?
    script = %(#{env} drake --remote --debug -d #{dir} #{@task} 2>&1)
    err = :out
  else
    script = %(#{env} drake --remote -d #{dir} #{@task} 2>&1)
    err = "/dev/null"
  end
  command = ["ssh", "-tt", *SSH_OPTS, "-l", user, addr, script]
  PTY.spawn(*command, err: err)
end

#userObject



50
51
52
# File 'lib/dplyr/remote_task.rb', line 50

def user
  @host.fetch :user
end