Class: Fairy::SizedMarshaledQueue

Inherits:
MarshaledQueue show all
Defined in:
lib/fairy/share/port-marshaled-queue.rb

Instance Attribute Summary

Attributes inherited from MarshaledQueue

#fib_cv

Instance Method Summary collapse

Constructor Details

#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ SizedMarshaledQueue

Returns a new instance of SizedMarshaledQueue.



128
129
130
131
132
133
134
135
# File 'lib/fairy/share/port-marshaled-queue.rb', line 128

def initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond)
  super
  @max_size = policy[:size]
  @max_size ||= CONF.SIZEDMARSHAL_QUEUE_MAX_CHUNK_NO

  @pop_cv = @queues_cv
  @push_cv = @queues_mon.new_cond
end

Instance Method Details

#popObject



158
159
160
161
162
163
164
# File 'lib/fairy/share/port-marshaled-queue.rb', line 158

def pop
  e = super
  @queues_mon.synchronize do
  @push_cv.broadcast if @queues.size <= @max_size
  end
  e
end

#pop_allObject



166
167
168
169
170
171
172
# File 'lib/fairy/share/port-marshaled-queue.rb', line 166

def pop_all
  buf = super
  @queues_mon.synchronize do
  @push_cv.broadcast if @queues.size <= @max_size
  end
  buf
end

#pop_rawObject



174
175
176
177
178
179
180
# File 'lib/fairy/share/port-marshaled-queue.rb', line 174

def pop_raw
  raw = super
  @queues_mon.synchronize do
  @push_cv.broadcast if @queues.size <= @max_size
  end
  raw
end

#push(e) ⇒ Object



137
138
139
140
141
142
# File 'lib/fairy/share/port-marshaled-queue.rb', line 137

def push(e)
  @queues_mon.synchronize do
  @push_cv.wait_while{@queues.size > @max_size}
  end
  super
end

#push_all(buf) ⇒ Object



144
145
146
147
148
149
# File 'lib/fairy/share/port-marshaled-queue.rb', line 144

def push_all(buf)
  @queues_mon.synchronize do
  @push_cv.wait_while{@queues.size > @max_size}
  end
  super
end

#push_raw(raw) ⇒ Object



151
152
153
154
155
156
# File 'lib/fairy/share/port-marshaled-queue.rb', line 151

def push_raw(raw)
  @queues_mon.synchronize do
  @push_cv.wait_while{@queues.size > @max_size}
  end
  super
end