Class: Kaede::Recorder

Inherits:
Object
  • Object
show all
Defined in:
lib/kaede/recorder.rb

Defined Under Namespace

Classes: FFprobeError

Constant Summary collapse

BUFSIZ =
188 * 1024
ALLOWED_DURATION_ERROR =
20

Instance Method Summary collapse

Constructor Details

#initialize(notifier) ⇒ Recorder

Returns a new instance of Recorder.



8
9
10
# File 'lib/kaede/recorder.rb', line 8

def initialize(notifier)
  @notifier = notifier
end

Instance Method Details

#after_record(program) ⇒ Object



124
125
126
127
128
129
130
131
132
133
# File 'lib/kaede/recorder.rb', line 124

def after_record(program)
  @notifier.notify_after_record(program)
  unless verify_duration(program, cache_path(program))
    redo_ts_process(program)
  end
  move_ass_to_cabinet(program)
  clean_ts(program)
  enqueue_to_redis(program)
  FileUtils.rm(cache_path(program).to_s)
end

#before_record(program) ⇒ Object



120
121
122
# File 'lib/kaede/recorder.rb', line 120

def before_record(program)
  @notifier.notify_before_record(program)
end

#cabinet_ass_path(program) ⇒ Object



43
44
45
# File 'lib/kaede/recorder.rb', line 43

def cabinet_ass_path(program)
  Kaede.config.cabinet_dir.join("#{program.formatted_fname}.raw.ass")
end

#cabinet_path(program) ⇒ Object



39
40
41
# File 'lib/kaede/recorder.rb', line 39

def cabinet_path(program)
  Kaede.config.cabinet_dir.join("#{program.formatted_fname}.ts")
end

#cache_ass_path(program) ⇒ Object



35
36
37
# File 'lib/kaede/recorder.rb', line 35

def cache_ass_path(program)
  Kaede.config.cache_dir.join("#{program.tid}_#{program.pid}.raw.ass")
end

#cache_path(program) ⇒ Object



31
32
33
# File 'lib/kaede/recorder.rb', line 31

def cache_path(program)
  Kaede.config.cache_dir.join("#{program.tid}_#{program.pid}.cache.ts")
end

#calculate_duration(program) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/kaede/recorder.rb', line 63

def calculate_duration(program)
  duration = (program.end_time - program.start_time).to_i - 10
  end_datetime = program.end_time.to_datetime
  if end_datetime.sunday? && end_datetime.hour == 22 && end_datetime.min == 27
    # For MX
    duration += 3 * 60
  elsif program.channel_name =~ /NHK/
    # For NHK
    duration += 25
  end
  duration
end

#clean_ts(program) ⇒ Object



144
145
146
147
148
# File 'lib/kaede/recorder.rb', line 144

def clean_ts(program)
  unless system(Kaede.config.clean_ts.to_s, cache_path(program).to_s, cabinet_path(program).to_s)
    raise "clean-ts failure: #{program.formatted_fname}"
  end
end

#do_record(program) ⇒ Object



47
48
49
50
51
52
53
54
55
# File 'lib/kaede/recorder.rb', line 47

def do_record(program)
  spawn_recpt1(program)
  spawn_tail(program)
  spawn_b25(program)
  spawn_ass(program)
  spawn_repeater
  wait_recpt1
  finalize
end

#enqueue_to_redis(program) ⇒ Object



166
167
168
# File 'lib/kaede/recorder.rb', line 166

def enqueue_to_redis(program)
  Kaede.config.redis.rpush(Kaede.config.redis_queue, program.formatted_fname)
end

#ffprobe(path) ⇒ Object



187
188
189
190
191
192
193
194
# File 'lib/kaede/recorder.rb', line 187

def ffprobe(path)
  outbuf, errbuf, status = Open3.capture3('ffprobe', '-show_format', '-print_format', 'json', path.to_s)
  if status.success?
    JSON.parse(outbuf)['format']
  else
    raise FFprobeError.new("ffprobe exited with #{status.exitstatus}: #{errbuf}")
  end
end

#finalizeObject



112
113
114
115
116
117
118
# File 'lib/kaede/recorder.rb', line 112

def finalize
  Process.kill(:INT, @tail_pid)
  Process.waitpid(@tail_pid)
  @repeater_thread.join
  Process.waitpid(@b25_pid)
  Process.waitpid(@ass_pid)
end

#move_ass_to_cabinet(program) ⇒ Object



135
136
137
138
139
140
141
142
# File 'lib/kaede/recorder.rb', line 135

def move_ass_to_cabinet(program)
  ass_path = cache_ass_path(program)
  if ass_path.size == 0
    ass_path.unlink
  else
    FileUtils.mv(ass_path.to_s, cabinet_ass_path(program).to_s)
  end
end

#record(db, pid) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/kaede/recorder.rb', line 12

def record(db, pid)
  program = db.get_program(pid)
  before_record(program)

  puts "Start #{pid} #{program.syoboi_url}"
  do_record(program)

  program = db.get_program(pid)
  puts "Done #{pid} #{program.syoboi_url}"
  after_record(program)
rescue Exception => e
  @notifier.notify_exception(e, program)
  raise e
end

#record_path(program) ⇒ Object



27
28
29
# File 'lib/kaede/recorder.rb', line 27

def record_path(program)
  Kaede.config.record_dir.join("#{program.tid}_#{program.pid}.ts")
end

#redo_ts_process(program) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/kaede/recorder.rb', line 150

def redo_ts_process(program)
  unless system(Kaede.config.b25.to_s, '-v0', '-s1', '-m1', record_path(program).to_s, cache_path(program).to_s)
    @notifier.notify_redo_error(program)
    return false
  end
  unless system(Kaede.config.assdumper.to_s, record_path(program).to_s, out: cache_ass_path(program).to_s)
    @notifier.notify_redo_error(program)
    return false
  end
  unless verify_duration(program, cache_path(program))
    @notifier.notify_redo_error(program)
    return false
  end
  true
end

#spawn_ass(program) ⇒ Object



88
89
90
91
92
# File 'lib/kaede/recorder.rb', line 88

def spawn_ass(program)
  @ass_pipe_r, @ass_pipe_w = IO.pipe
  @ass_pid = spawn(Kaede.config.assdumper.to_s, '/dev/stdin', in: @ass_pipe_r, out: cache_ass_path(program).to_s)
  @ass_pipe_r.close
end

#spawn_b25(program) ⇒ Object



82
83
84
85
86
# File 'lib/kaede/recorder.rb', line 82

def spawn_b25(program)
  @b25_pipe_r, @b25_pipe_w = IO.pipe
  @b25_pid = spawn(Kaede.config.b25.to_s, '-v0', '-s1', '-m1', '/dev/stdin', cache_path(program).to_s, in: @b25_pipe_r)
  @b25_pipe_r.close
end

#spawn_recpt1(program) ⇒ Object



57
58
59
60
61
# File 'lib/kaede/recorder.rb', line 57

def spawn_recpt1(program)
  path = record_path(program)
  path.open('w') {}
  @recpt1_pid = spawn(Kaede.config.recpt1.to_s, program.channel_for_recorder.to_s, calculate_duration(program).to_s, path.to_s)
end

#spawn_repeaterObject



96
97
98
99
100
101
102
103
104
105
106
# File 'lib/kaede/recorder.rb', line 96

def spawn_repeater
  @repeater_thread = Thread.start do
    while buf = @tail_pipe_r.read(BUFSIZ)
      @b25_pipe_w.write(buf)
      @ass_pipe_w.write(buf)
    end
    @tail_pipe_r.close
    @b25_pipe_w.close
    @ass_pipe_w.close
  end
end

#spawn_tail(program) ⇒ Object



76
77
78
79
80
# File 'lib/kaede/recorder.rb', line 76

def spawn_tail(program)
  @tail_pipe_r, @tail_pipe_w = IO.pipe
  @tail_pid = spawn('tail', '-f', record_path(program).to_s, out: @tail_pipe_w)
  @tail_pipe_w.close
end

#verify_duration(program, path) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
# File 'lib/kaede/recorder.rb', line 172

def verify_duration(program, path)
  expected_duration = calculate_duration(program)
  json = ffprobe(path)
  got_duration = json['duration'].to_f
  if (got_duration - expected_duration).abs < ALLOWED_DURATION_ERROR
    true
  else
    @notifier.notify_duration_error(program, got_duration)
    false
  end
end

#wait_recpt1Object



108
109
110
# File 'lib/kaede/recorder.rb', line 108

def wait_recpt1
  Process.waitpid(@recpt1_pid)
end