Class: Concurrent::Promises::Channel

Inherits:
Synchronization::Object
  • Object
show all
Defined in:
lib/concurrent-ruby-edge/concurrent/edge/channel.rb

Overview

A first in first out channel that accepts messages with push family of methods and returns messages with pop family of methods. Pop and push operations can be represented as futures, see #pop_op and #push_op. The capacity of the channel can be limited to support back pressure, use capacity option in #initialize. #pop method blocks ans #pop_op returns pending future if there is no message in the channel. If the capacity is limited the #push method blocks and #push_op returns pending future.

Constant Summary collapse

UNLIMITED_CAPACITY =

Default capacity of the Channel, makes it accept unlimited number of messages.

::Object.new
ANY =

An object which matches anything (with #===)

Object.new.tap do |any|
  def any.===(other)
    true
  end

  def any.to_s
    'ANY'
  end
end

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capacity = UNLIMITED_CAPACITY) ⇒ Channel

Create channel.

Parameters:

  • capacity (Integer, UNLIMITED_CAPACITY) (defaults to: UNLIMITED_CAPACITY)

    the maximum number of messages which can be stored in the channel.



61
62
63
64
65
66
67
68
69
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 61

def initialize(capacity = UNLIMITED_CAPACITY)
  super()
  @Capacity = capacity
  @Mutex    = Mutex.new
  # TODO (pitr-ch 28-Jan-2019): consider linked lists or other data structures for following attributes, things are being deleted from the middle
  @Probes      = []
  @Messages    = []
  @PendingPush = []
end

Class Method Details

.select(channels, timeout = nil) ⇒ ::Array(Channel, Object)?

Returns:

See Also:



319
320
321
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 319

def select(channels, timeout = nil)
  channels.first.select(channels[1..-1], timeout)
end

.select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object)?

Returns:

See Also:



337
338
339
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 337

def select_matching(matcher, channels, timeout = nil)
  channels.first.select_matching(matcher, channels[1..-1], timeout)
end

.select_op(channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))

Returns:

See Also:



313
314
315
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 313

def select_op(channels, probe = Promises.resolvable_future)
  channels.first.select_op(channels[1..-1], probe)
end

.select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))

Returns:

See Also:



331
332
333
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 331

def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
  channels.first.select_op_matching(matcher, channels[1..-1], probe)
end

.try_select(channels) ⇒ ::Array(Channel, Object)

Returns:

See Also:



307
308
309
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 307

def try_select(channels)
  channels.first.try_select(channels[1..-1])
end

.try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)

Returns:

See Also:



325
326
327
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 325

def try_select_matching(matcher, channels)
  channels.first.try_select_matching(matcher, channels[1..-1])
end

Instance Method Details

#capacityInteger

Returns Maximum capacity of the Channel.

Returns:

  • (Integer)

    Maximum capacity of the Channel.



292
293
294
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 292

def capacity
  @Capacity
end

#peek(no_value = nil) ⇒ Object, no_value

Behaves as #try_pop but it does not remove the message from the channel

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

Returns:

  • (Object, no_value)

    message or nil when there is no message



206
207
208
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 206

def peek(no_value = nil)
  peek_matching ANY, no_value
end

#peek_matching(matcher, no_value = nil) ⇒ Object, no_value

Behaves as #try_pop but it does not remove the message from the channel

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

  • matcher (#===)

    only consider message which matches ‘matcher === a_message`

Returns:

  • (Object, no_value)

    message or nil when there is no message



212
213
214
215
216
217
218
219
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 212

def peek_matching(matcher, no_value = nil)
  @Mutex.synchronize do
    message = ns_shift_message matcher, false
    return message if message != NOTHING
    message = ns_consume_pending_push matcher, false
    return message != NOTHING ? message : no_value
  end
end

#pop(timeout = nil, timeout_value = nil) ⇒ Object?

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until a message is available in the channel for popping.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • timeout_value (Object) (defaults to: nil)

    a value returned by the method when it times out

Returns:

  • (Object, nil)

    message or nil when timed out



174
175
176
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 174

def pop(timeout = nil, timeout_value = nil)
  pop_matching ANY, timeout, timeout_value
end

#pop_matching(matcher, timeout = nil, timeout_value = nil) ⇒ Object?

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until a message is available in the channel for popping.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • timeout_value (Object) (defaults to: nil)

    a value returned by the method when it times out

  • matcher (#===)

    only consider message which matches ‘matcher === a_message`

Returns:

  • (Object, nil)

    message or nil when timed out



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 180

def pop_matching(matcher, timeout = nil, timeout_value = nil)
  # TODO (pitr-ch 27-Jan-2019): should it try to match pending pushes if it fails to match in the buffer? Maybe only if the size is zero. It could be surprising if it's used as a throttle it might be expected that it will not pop if buffer is full of messages which di not match, it might it expected it will block until the message is added to the buffer
  # that it returns even if the buffer is full. User might expect that it has to be in the buffer first.
  probe = @Mutex.synchronize do
    message = ns_shift_message matcher
    if message == NOTHING
      message = ns_consume_pending_push matcher
      return message if message != NOTHING
    else
      new_message = ns_consume_pending_push ANY
      @Messages.push new_message unless new_message == NOTHING
      return message
    end

    probe = Promises.resolvable_future
    @Probes.push probe, false, matcher
    probe
  end

  probe.value!(timeout, timeout_value, [true, timeout_value, nil])
end

#pop_op(probe = Promises.resolvable_future) ⇒ Future(Object)

Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with a channel value

Returns:

  • (Future(Object))

    the probe, its value will be the message when available.



157
158
159
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 157

def pop_op(probe = Promises.resolvable_future)
  @Mutex.synchronize { ns_pop_op(ANY, probe, false) }
end

#pop_op_matching(matcher, probe = Promises.resolvable_future) ⇒ Future(Object)

Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with a channel value

  • matcher (#===)

    only consider message which matches ‘matcher === a_message`

Returns:

  • (Future(Object))

    the probe, its value will be the message when available.



163
164
165
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 163

def pop_op_matching(matcher, probe = Promises.resolvable_future)
  @Mutex.synchronize { ns_pop_op(matcher, probe, false) }
end

#push(message, timeout = nil) ⇒ self, ...

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until the message is pushed into the channel.

Parameters:

  • message (Object)
  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (self, true, false)

    self implies timeout was not used, true implies timeout was used and it was pushed, false implies it was not pushed within timeout.



117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 117

def push(message, timeout = nil)
  pushed_op = @Mutex.synchronize do
    return timeout ? true : self if ns_try_push(message)

    pushed = Promises.resolvable_future
    # TODO (pitr-ch 06-Jan-2019): clear timed out pushes in @PendingPush, null messages
    @PendingPush.push message, pushed
    pushed
  end

  result = pushed_op.wait!(timeout, [true, self, nil])
  result == pushed_op ? self : result
end

#push_op(message) ⇒ ResolvableFuture(self)

Returns future which will fulfill when the message is pushed to the channel. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • message (Object)

Returns:

  • (ResolvableFuture(self))


98
99
100
101
102
103
104
105
106
107
108
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 98

def push_op(message)
  @Mutex.synchronize do
    if ns_try_push(message)
      Promises.fulfilled_future self
    else
      pushed = Promises.resolvable_future
      @PendingPush.push message, pushed
      return pushed
    end
  end
end

#select(channels, timeout = nil) ⇒ ::Array(Channel, Object)?

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (::Array(Channel, Object), nil)

    message or nil when timed out

See Also:



275
276
277
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 275

def select(channels, timeout = nil)
  select_matching ANY, channels, timeout
end

#select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object)?

Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

  • matcher (#===)

    only consider message which matches ‘matcher === a_message`

Returns:

  • (::Array(Channel, Object), nil)

    message or nil when timed out

See Also:



281
282
283
284
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 281

def select_matching(matcher, channels, timeout = nil)
  probe = select_op_matching(matcher, channels)
  probe.value!(timeout, nil, [true, nil, nil])
end

#select_op(channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))

When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with the message

Returns:

  • (ResolvableFuture(::Array(Channel, Object)))

    a future which is fulfilled with pair [channel, message] when one of the channels is available for reading



254
255
256
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 254

def select_op(channels, probe = Promises.resolvable_future)
  select_op_matching ANY, channels, probe
end

#select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))

When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.‘channel.pop_op.wait(1)` it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later “`ruby pop_op = channel.pop_op if pop_op.wait(1)

process_message pop_op.value

else

pop_op.then { |message| log_unprocessed_message message }

end “‘ or the operation can be prevented from completion after timing out by using `channel.pop_op.wait(1, [true, nil, nil])`. It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • probe (ResolvableFuture) (defaults to: Promises.resolvable_future)

    the future which will be fulfilled with the message

  • matcher (#===)

    only consider message which matches ‘matcher === a_message`

Returns:

  • (ResolvableFuture(::Array(Channel, Object)))

    a future which is fulfilled with pair [channel, message] when one of the channels is available for reading



260
261
262
263
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 260

def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
  [self, *channels].each { |ch| ch.partial_select_op matcher, probe }
  probe
end

#sizeInteger

Returns The number of messages currently stored in the channel.

Returns:

  • (Integer)

    The number of messages currently stored in the channel.



287
288
289
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 287

def size
  @Mutex.synchronize { @Messages.size }
end

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



297
298
299
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 297

def to_s
  format '%s capacity taken %s of %s>', super[0..-2], size, @Capacity
end

#try_pop(no_value = nil) ⇒ Object, no_value

Pop a message from the channel if there is one available.

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

Returns:

  • (Object, no_value)

    message or nil when there is no message



135
136
137
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 135

def try_pop(no_value = nil)
  try_pop_matching ANY, no_value
end

#try_pop_matching(matcher, no_value = nil) ⇒ Object, no_value

Pop a message from the channel if there is one available.

Parameters:

  • no_value (Object) (defaults to: nil)

    returned when there is no message available

  • matcher (#===)

    only consider message which matches ‘matcher === a_message`

Returns:

  • (Object, no_value)

    message or nil when there is no message



142
143
144
145
146
147
148
149
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 142

def try_pop_matching(matcher, no_value = nil)
  @Mutex.synchronize do
    message = ns_shift_message matcher
    return message if message != NOTHING
    message = ns_consume_pending_push matcher
    return message != NOTHING ? message : no_value
  end
end

#try_push(message) ⇒ true, false

Push the message into the channel if there is space available.

Parameters:

  • message (Object)

Returns:

  • (true, false)


74
75
76
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 74

def try_push(message)
  @Mutex.synchronize { ns_try_push(message) }
end

#try_select(channels) ⇒ ::Array(Channel, Object)?

If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.

Parameters:

Returns:

  • (::Array(Channel, Object), nil)

    pair [channel, message] if one of the channels is available for reading



229
230
231
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 229

def try_select(channels)
  try_select_matching ANY, channels
end

#try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)?

If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.

Parameters:

  • channels (Channel, ::Array<Channel>)
  • matcher (#===)

    only consider message which matches ‘matcher === a_message`

Returns:

  • (::Array(Channel, Object), nil)

    pair [channel, message] if one of the channels is available for reading



235
236
237
238
239
240
241
242
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 235

def try_select_matching(matcher, channels)
  message = nil
  channel = [self, *channels].find do |ch|
    message = ch.try_pop_matching(matcher, NOTHING)
    message != NOTHING
  end
  channel ? [channel, message] : nil
end