Class: Thread::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/thread/channel.rb

Overview

A channel lets you send and receive various messages in a thread-safe way.

It also allows for guards upon sending and retrieval, to ensure the passed messages are safe to be consumed.

Instance Method Summary collapse

Constructor Details

#initialize(messages = [], &block) ⇒ Channel

Create a channel with optional initial messages and optional channel guard.



19
20
21
22
23
24
25
26
27
# File 'lib/thread/channel.rb', line 19

def initialize(messages = [], &block)
  @messages = []
  @mutex    = Mutex.new
  @check    = block

  messages.each {|o|
    send o
  }
end

Instance Method Details

#receive(&block) ⇒ Object

Receive a message, if there are none the call blocks until there’s one.

If a block is passed, it’s used as guard to match to a message.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/thread/channel.rb', line 50

def receive(&block)
  message = nil
  found   = false

  if block
    until found
      @mutex.synchronize {
        if index = @messages.find_index(&block)
          message = @messages.delete_at(index)
          found   = true
        else
          cond.wait @mutex
        end
      }
    end
  else
    until found
      @mutex.synchronize {
        if @messages.empty?
          cond.wait @mutex
        end

        unless @messages.empty?
          message = @messages.shift
          found   = true
        end
      }
    end
  end

  message
end

#receive!(&block) ⇒ Object

Receive a message, if there are none the call returns nil.

If a block is passed, it’s used as guard to match to a message.



86
87
88
89
90
91
92
# File 'lib/thread/channel.rb', line 86

def receive!(&block)
  if block
    @messages.delete_at(@messages.find_index(&block))
  else
    @messages.shift
  end
end

#send(what) ⇒ Object

Send a message to the channel.

If there’s a guard, the value is passed to it, if the guard returns a falsy value an ArgumentError exception is raised and the message is not sent.



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/thread/channel.rb', line 33

def send(what)
  if @check && !@check.call(what)
    raise ArgumentError, 'guard mismatch'
  end

  @mutex.synchronize {
    @messages << what

    cond.broadcast if cond?
  }

  self
end