Class: Wick::Stream
- Inherits:
-
Object
- Object
- Wick::Stream
- Defined in:
- lib/wick/stream.rb
Direct Known Subclasses
Instance Method Summary collapse
- #<<(data) ⇒ Object
- #at_start ⇒ Object
- #combine(other, &combiner) ⇒ Object
- #compact ⇒ Object
- #each(&handler) ⇒ Object
- #flat_map(&transformer) ⇒ Object
-
#initialize ⇒ Stream
constructor
A new instance of Stream.
- #log!(prefix) ⇒ Object
- #map(&transformer) ⇒ Object
- #merge(other) ⇒ Object
- #pipe!(other) ⇒ Object
- #sampling(other, &combiner) ⇒ Object
- #scan(initial, &scanner) ⇒ Object
- #select(&predicate) ⇒ Object
- #start! ⇒ Object
Constructor Details
#initialize ⇒ Stream
Returns a new instance of Stream.
3 4 5 6 7 8 9 10 |
# File 'lib/wick/stream.rb', line 3 def initialize @handlers = [] @start_callbacks = [] Wick.on_next_tick do self.start! end end |
Instance Method Details
#<<(data) ⇒ Object
12 13 14 15 16 |
# File 'lib/wick/stream.rb', line 12 def <<(data) @handlers.each do |handler| handler.call(data) end end |
#at_start ⇒ Object
22 23 24 25 26 |
# File 'lib/wick/stream.rb', line 22 def at_start s = Stream.new @start_callbacks.push(proc { s << Wick::START }) s end |
#combine(other, &combiner) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/wick/stream.rb', line 67 def combine(other, &combiner) latest_self = nil latest_other = nil s = Stream.new self.each do |msg| latest_self = msg s << combiner.call(msg, latest_other) end other.each do |msg| latest_other = msg s << combiner.call(latest_self, msg) end s end |
#compact ⇒ Object
56 57 58 |
# File 'lib/wick/stream.rb', line 56 def compact select { |msg| not msg.nil? } end |
#each(&handler) ⇒ Object
18 19 20 |
# File 'lib/wick/stream.rb', line 18 def each(&handler) @handlers << handler end |
#flat_map(&transformer) ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/wick/stream.rb', line 40 def flat_map(&transformer) s = Stream.new self.each do |msg| transformer.call(msg).pipe!(s) end s end |
#log!(prefix) ⇒ Object
112 113 114 115 116 117 |
# File 'lib/wick/stream.rb', line 112 def log!(prefix) self.each do |msg| $stderr.puts("[#{prefix}] " + msg.inspect) end self end |
#map(&transformer) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/wick/stream.rb', line 32 def map(&transformer) s = Stream.new self.each do |msg| s << transformer.call(msg) end s end |
#merge(other) ⇒ Object
60 61 62 63 64 65 |
# File 'lib/wick/stream.rb', line 60 def merge(other) s = Stream.new self.pipe!(s) other.pipe!(s) s end |
#pipe!(other) ⇒ Object
119 120 121 122 123 |
# File 'lib/wick/stream.rb', line 119 def pipe!(other) self.each do |msg| other << msg end end |
#sampling(other, &combiner) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/wick/stream.rb', line 86 def sampling(other, &combiner) latest_other = nil other.each do |msg| latest_other = msg end s = Stream.new self.each do |msg| s << combiner.call(msg, latest_other) end s end |
#scan(initial, &scanner) ⇒ Object
102 103 104 105 106 107 108 109 110 |
# File 'lib/wick/stream.rb', line 102 def scan(initial, &scanner) s = Wick.from_array([initial]) last = initial self.each do |msg| last = scanner.call(last, msg) s << last end s end |
#select(&predicate) ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/wick/stream.rb', line 48 def select(&predicate) s = Stream.new self.each do |msg| s << msg if predicate.call(msg) end s end |
#start! ⇒ Object
28 29 30 |
# File 'lib/wick/stream.rb', line 28 def start! @start_callbacks.each(&:call) end |