Class: Wick::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/wick/stream.rb

Direct Known Subclasses

Bus

Instance Method Summary collapse

Constructor Details

#initializeStream

Returns a new instance of Stream.



3
4
5
6
7
8
9
10
# File 'lib/wick/stream.rb', line 3

def initialize
  @handlers = []
  @start_callbacks = []

  Wick.on_next_tick do
    self.start!
  end
end

Instance Method Details

#<<(data) ⇒ Object



12
13
14
15
16
# File 'lib/wick/stream.rb', line 12

def <<(data)
  @handlers.each do |handler|
    handler.call(data)
  end
end

#at_startObject



22
23
24
25
26
# File 'lib/wick/stream.rb', line 22

def at_start
  s = Stream.new
  @start_callbacks.push(proc { s << Wick::START })
  s
end

#combine(other, &combiner) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/wick/stream.rb', line 67

def combine(other, &combiner)
  latest_self  = nil
  latest_other = nil

  s = Stream.new

  self.each do |msg|
    latest_self = msg
    s << combiner.call(msg, latest_other)
  end

  other.each do |msg|
    latest_other = msg
    s << combiner.call(latest_self, msg)
  end

  s
end

#compactObject



56
57
58
# File 'lib/wick/stream.rb', line 56

def compact
  select { |msg| not msg.nil? }
end

#each(&handler) ⇒ Object



18
19
20
# File 'lib/wick/stream.rb', line 18

def each(&handler)
  @handlers << handler
end

#flat_map(&transformer) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/wick/stream.rb', line 40

def flat_map(&transformer)
  s = Stream.new
  self.each do |msg|
    transformer.call(msg).pipe!(s)
  end
  s
end

#log!(prefix) ⇒ Object



112
113
114
115
116
117
# File 'lib/wick/stream.rb', line 112

def log!(prefix)
  self.each do |msg|
    $stderr.puts("[#{prefix}] " + msg.inspect)
  end
  self
end

#map(&transformer) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/wick/stream.rb', line 32

def map(&transformer)
  s = Stream.new
  self.each do |msg|
    s << transformer.call(msg)
  end
  s
end

#merge(other) ⇒ Object



60
61
62
63
64
65
# File 'lib/wick/stream.rb', line 60

def merge(other)
  s = Stream.new
  self.pipe!(s)
  other.pipe!(s)
  s
end

#pipe!(other) ⇒ Object



119
120
121
122
123
# File 'lib/wick/stream.rb', line 119

def pipe!(other)
  self.each do |msg|
    other << msg
  end
end

#sampling(other, &combiner) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/wick/stream.rb', line 86

def sampling(other, &combiner)
  latest_other = nil

  other.each do |msg|
    latest_other = msg
  end

  s = Stream.new

  self.each do |msg|
    s << combiner.call(msg, latest_other)
  end

  s
end

#scan(initial, &scanner) ⇒ Object



102
103
104
105
106
107
108
109
110
# File 'lib/wick/stream.rb', line 102

def scan(initial, &scanner)
  s = Wick.from_array([initial])
  last = initial
  self.each do |msg|
    last = scanner.call(last, msg)
    s << last
  end
  s
end

#select(&predicate) ⇒ Object



48
49
50
51
52
53
54
# File 'lib/wick/stream.rb', line 48

def select(&predicate)
  s = Stream.new
  self.each do |msg|
    s << msg if predicate.call(msg)
  end
  s
end

#start!Object



28
29
30
# File 'lib/wick/stream.rb', line 28

def start!
  @start_callbacks.each(&:call)
end