Class: Chewy::Strategy::DelayedSidekiq::Scheduler
- Inherits:
-
Object
- Object
- Chewy::Strategy::DelayedSidekiq::Scheduler
- Defined in:
- lib/chewy/strategy/delayed_sidekiq/scheduler.rb
Constant Summary collapse
- DEFAULT_TTL =
in seconds
60 * 60 * 24
- DEFAULT_LATENCY =
10
- DEFAULT_MARGIN =
2
- DEFAULT_QUEUE =
'chewy'
- KEY_PREFIX =
'chewy:delayed_sidekiq'
- ALL_SETS_KEY =
"#{KEY_PREFIX}:all_sets".freeze
- FALLBACK_FIELDS =
'all'
- FIELDS_IDS_SEPARATOR =
';'
- IDS_SEPARATOR =
','
Instance Method Summary collapse
-
#initialize(type, ids, options = {}) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#postpone ⇒ Object
the diagram:.
Constructor Details
#initialize(type, ids, options = {}) ⇒ Scheduler
Returns a new instance of Scheduler.
26 27 28 29 30 |
# File 'lib/chewy/strategy/delayed_sidekiq/scheduler.rb', line 26 def initialize(type, ids, = {}) @type = type @ids = ids @options = end |
Instance Method Details
#postpone ⇒ Object
the diagram:
inputs: latency == 2 reindex_time = Time.current
Parallel OR Sequential triggers of reindex: | What is going on in reindex store (Redis):
|
process 1 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1] Schedule.new(CitiesIndex, [1]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 | | process 2 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2] Schedule.new(CitiesIndex, [2]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 1 (reindex_time + (latency - 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [3]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 2 (reindex_time + (latency + 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [4]).postpone | chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4] | chewy:delayed_sidekiq:timechunks = [ | { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"} | { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"} | ] | & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex), | chewy:delayed_sidekiq:CitiesIndex:1679347868
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/chewy/strategy/delayed_sidekiq/scheduler.rb', line 68 def postpone ::Sidekiq.redis do |redis| # warning: Redis#sadd will always return an Integer in Redis 5.0.0. Use Redis#sadd? instead if redis.respond_to?(:sadd?) redis.sadd?(ALL_SETS_KEY, timechunks_key) redis.sadd?(timechunk_key, serialize_data) else redis.sadd(ALL_SETS_KEY, timechunks_key) redis.sadd(timechunk_key, serialize_data) end redis.expire(timechunk_key, ttl) unless redis.zrank(timechunks_key, timechunk_key) redis.zadd(timechunks_key, at, timechunk_key) redis.expire(timechunks_key, ttl) ::Sidekiq::Client.push( 'queue' => sidekiq_queue, 'at' => at + margin, 'class' => Chewy::Strategy::DelayedSidekiq::Worker, 'args' => [type_name, at] ) end end end |