Class: Fairy::PBarrierMemoryBuffer

Inherits:
PSingleExportFilter show all
Defined in:
lib/fairy/node/p-barrier.rb

Constant Summary collapse

ST_ALL_IMPORTED =
:ST_ALL_IMPORTED

Constants included from PSingleExportable

Fairy::PSingleExportable::END_OF_STREAM, Fairy::PSingleExportable::ST_EXPORT_FINISH, Fairy::PSingleExportable::ST_WAIT_EXPORT_FINISH

Constants inherited from PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes included from PSingleExportable

#export

Attributes inherited from PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods included from PSingleExportable

#start, #start_export, #terminate, #wait_export_finish

Methods inherited from PFilter

#abort_running, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, ntask, bjob, opts = nil) ⇒ PBarrierMemoryBuffer

Returns a new instance of PBarrierMemoryBuffer.



15
16
17
18
19
20
21
# File 'lib/fairy/node/p-barrier.rb', line 15

def initialize(id, ntask, bjob, opts=nil)
#      @export = Export.new()
  super

  @queuing_policy = CONF.BARRIER_MEMORY_BUFFERING_POLICY
  @queue = eval("#{@queuing_policy[:queuing_class]}").new(@queuing_policy)
end

Instance Method Details

#basic_each(&block) ⇒ Object



85
86
87
88
89
90
91
# File 'lib/fairy/node/p-barrier.rb', line 85

def basic_each(&block)
  @bjob.wait_export
  
  while (e = @queue.pop) != END_OF_STREAM
  block.call e
  end
end

#basic_start(&block) ⇒ Object



73
74
75
76
77
78
79
80
81
82
# File 'lib/fairy/node/p-barrier.rb', line 73

def basic_start(&block)
  Log::debug(self, "START")

  begin
  @input.each{|e| @queue.push e}
  ensure
  @queue.push :END_OF_STREAM
  self.status = ST_ALL_IMPORTED
  end
end

#input=(input) ⇒ Object

start

  end
  self
end


37
38
39
40
# File 'lib/fairy/node/p-barrier.rb', line 37

def input=(input)
  super
  start_buffering
end

#start_bufferingObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fairy/node/p-barrier.rb', line 42

def start_buffering
  Log::info self, "START  BUFFERING: #{self.class}"

  start_watch_status

  @main_thread = Thread.start {
  begin
    self.status = ST_ACTIVATE
    if @begin_block_source
 bsource = BSource.new(@begin_block_source, @context, self)
 bsource.evaluate
    end
    begin
 basic_start{}
    ensure
 if @end_block_source
   bsource = BSource.new(@end_block_source, @context, self)
   bsource.evaluate
 end

 @main_thread = nil
 Log::info self, "FINISH BUFFERING: #{self.class}"
    end
  rescue Exception
    Log::error_exception(self)
    handle_exception($!)
    raise
  end
  }
end