Module: EventMachine::Bucketer::Base

Included in:
InMemory, Redis
Defined in:
lib/em-bucketer/base.rb

Instance Method Summary collapse

Instance Method Details

#add_item(bucket_id, item_id, item, &blk) ⇒ Object

Adds a item to the specified bucket and calls the block when it is done

the bucket to put the item in of the item (used to ensure uniqueness within a bucket) placed in the bucket

Parameters:

  • bucket_id (String)

    the bucket id of

  • item_id (String)

    the item_id

  • item (Object)

    the item to be



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/em-bucketer/base.rb', line 21

def add_item(bucket_id, item_id, item, &blk)
  add_timer_if_first(bucket_id)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    add_bucket_to_db(bucket_id, item_id, item).callback do
      c.succeed
    end.errback do |e|
      c.fail e
    end
    check_bucket_full(bucket_id)
  end
end

#empty_bucket(bucket_id, &blk) ⇒ Object

Empty a bucket

of the bucket you want to empty

Parameters:

  • bucket_id (String)

    the bucket id



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/em-bucketer/base.rb', line 140

def empty_bucket(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    empty_bucket_in_db(bucket_id).callback do
      clear_timer(bucket_id)
      c.succeed
    end.errback do |e|
      c.fail e
    end
  end
end

#get_and_empty_bucket(bucket_id) {|Array| ... } ⇒ Object

Get the contents of a bucket then empty it

of the bucket you want to get into the bucket

Parameters:

  • bucket_id (String)

    the bucket id

Yields:

  • (Array)

    the items you put



88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/em-bucketer/base.rb', line 88

def get_and_empty_bucket(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    get_bucket(bucket_id).callback do |contents|
      empty_bucket(bucket_id).callback do
        c.succeed contents
      end.errback do |e|
        c.fail e
      end
    end.errback do |e|
      c.fail e
    end
  end
end

#get_and_remove(bucket_id, count) {|Array| ... } ⇒ Object

Get at most count number of items from the bucket and remove them.

of the bucket you want to get you want from the bucket into the bucket

Examples:

get 100 items from the bucket

bucketer.get_and_remove("1", 100) do |items|
  p "yay I got #{items.count} items"
  items.each do |i|
    p "got #{i}"
  end
end

Parameters:

  • bucket_id (String)

    the bucket id

  • count (Integer)

    the number of items

Yields:

  • (Array)

    the items you put



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/em-bucketer/base.rb', line 120

def get_and_remove(bucket_id, count, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    get_bucket_from_db(bucket_id).callback do |bucket|
      empty_bucket(bucket_id).callback do
        values = []
        EM::Iterator.new(bucket).each(get_and_remove_iterator(bucket_id, count, values, c), -> { c.succeed(values) })
      end.errback do |e|
        c.fail e
      end
    end.errback do |e|
      c.fail e
    end
  end
end

#get_bucket(bucket_id) {|Array| ... } ⇒ Object

Get the contents of a bucket.

of the bucket you want to get into the bucket

Parameters:

  • bucket_id (String)

    the bucket id

Yields:

  • (Array)

    the items you put



71
72
73
74
75
76
77
78
79
80
# File 'lib/em-bucketer/base.rb', line 71

def get_bucket(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    get_bucket_from_db(bucket_id).callback do |bucket|
      c.succeed bucket.values
    end.errback do |e|
      c.fail e
    end
  end
end

#on_bucket_full {|String| ... } ⇒ Object

Used to set a callback hook for when a bucket reaches the threshold size. It is IMPORTANT to note that the bucket will not automatically be emptied you must call empty_bucket if you want the bucket to be emptied. Also the callback will be called every time a item is added until the bucket is emptied.

Yields:

  • (String)

    The bucket id of the full bucket



43
44
45
# File 'lib/em-bucketer/base.rb', line 43

def on_bucket_full(&blk)
  @on_bucket_full_callbacks << blk
end

#on_bucket_timeout {|String| ... } ⇒ Object

Used to set a callback hook for when a bucket reaches the time limit. It is IMPORTANT to note that the bucket will not automatically be emptied you must call empty_bucket if you want the bucket to be emptied.

This timer is started once the bucket gets its first item and is cleared only when the bucket is emptied. The callback will only be called once at this time and then not again unless you empty the bucket and add something again.

Yields:

  • (String)

    The bucket id of the full bucket



61
62
63
# File 'lib/em-bucketer/base.rb', line 61

def on_bucket_timeout(&blk)
  @on_bucket_timeout_callbacks << blk
end

#setup(bucket_threshold_size, bucket_max_age) ⇒ Object



3
4
5
6
7
8
9
# File 'lib/em-bucketer/base.rb', line 3

def setup(bucket_threshold_size, bucket_max_age)
  @bucket_threshold_size = bucket_threshold_size
  @bucket_max_age = bucket_max_age
  @on_bucket_full_callbacks = []
  @on_bucket_timeout_callbacks = []
  @buckets_with_timers = Set.new
end