Module: BusinessFlow::ClusterLock::ClassMethods

Defined in:
lib/business_flow/cluster_lock.rb

Overview

DSL Methods

Constant Summary collapse

RESULT_FINALIZE =
proc do |cluster_lock_info|
  @cluster_lock_info = cluster_lock_info
  self
end

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.acquire_lock(flow, lock_info, payload) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
# File 'lib/business_flow/cluster_lock.rb', line 170

def self.acquire_lock(flow, lock_info, payload)
  zk_connection = ZK::Client::Threaded.new(lock_info.zookeeper_servers)
  lock = flow.instance_variable_set(
    :@_business_flow_cluster_lock,
    ZK::Locker::ExclusiveLocker.new(zk_connection, lock_info.lock_name)
  )
  inner_acquire_lock(zk_connection, lock, payload)
rescue StandardError
  zk_connection&.close!
  raise
end

.cleanup(lock, zk_connection) ⇒ Object



192
193
194
195
196
197
198
# File 'lib/business_flow/cluster_lock.rb', line 192

def self.cleanup(lock, zk_connection)
  lock&.unlock
rescue StandardError
  # Just let the connection close handle this.
ensure
  zk_connection&.close!
end

.exception_to_error_type(exc) ⇒ Object

:reek:ControlParameter I’m using a case statement instead of a hash in a constant to ensure that this doesn’t throw exceptions if this file is required before zookeeper is.



203
204
205
206
207
208
209
210
211
212
# File 'lib/business_flow/cluster_lock.rb', line 203

def self.exception_to_error_type(exc)
  case exc
  when ZK::Exceptions::LockAssertionFailedError
    :assert_failed
  when ZK::Exceptions::OperationTimeOut
    :zookeeper_timeout
  else
    :unknown_failure
  end
end

.inner_acquire_lock(zk_connection, lock, payload) ⇒ Object



182
183
184
185
186
187
188
189
190
# File 'lib/business_flow/cluster_lock.rb', line 182

def self.inner_acquire_lock(zk_connection, lock, payload)
  lock_held = lock.lock(wait: false)
  payload[:lock_acquired] = lock_held if payload
  unless lock_held
    zk_connection.close!
    raise LockFailure.new(:lock_unavailable, 'the lock was not available')
  end
  [zk_connection, lock]
end

.instrumented_acquire_lock(flow, lock_info) ⇒ Object



163
164
165
166
167
168
# File 'lib/business_flow/cluster_lock.rb', line 163

def self.instrumented_acquire_lock(flow, lock_info)
  flow.class.instrument(:cluster_lock_setup, flow) do |payload|
    payload[:lock_name] = lock_info.lock_name if payload
    acquire_lock(flow, lock_info, payload)
  end
end

.lock_name(flow) ⇒ Object

:reek:NilCheck



153
154
155
156
157
158
159
160
161
# File 'lib/business_flow/cluster_lock.rb', line 153

def self.lock_name(flow)
  lock_name =
    catch(:halt_step) { flow.class.with_cluster_lock.call(flow)&.merge_into(flow)&.to_s }
  if lock_name.nil? || lock_name.empty?
    raise LockFailure.new(:no_lock_name, 'no lock name provided')
  end

  lock_name
end

.with_lock(flow, lock_info) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/business_flow/cluster_lock.rb', line 214

def self.with_lock(flow, lock_info, &)
  unless BusinessFlow::ClusterLock.disabled?
    zk_connection, lock =
      instrumented_acquire_lock(flow, lock_info)
  end
  yield lock_info
rescue ZK::Exceptions::LockAssertionFailedError, ZK::Exceptions::OperationTimeOut => e
  # This would occur if we asserted a cluster lock while executing the flow.
  # This will have set an error on the flow, so we can carry on.
  raise LockFailure.new(exception_to_error_type(e), e.message), cause: e
ensure
  cleanup(lock, zk_connection)
end

.zookeeper_server_list(flow) ⇒ Object

:reek:NilCheck



142
143
144
145
146
147
148
149
150
# File 'lib/business_flow/cluster_lock.rb', line 142

def self.zookeeper_server_list(flow)
  servers =
    catch(:halt_step) { flow.class.with_zookeeper_servers.call(flow)&.merge_into(flow)&.to_s }
  if servers.nil? || servers.empty?
    raise LockFailure.new(:no_servers, 'no zookeeper servers provided')
  end

  servers
end

Instance Method Details

#add_cluster_luck_info_to_result_classObject



131
132
133
134
135
136
137
138
139
# File 'lib/business_flow/cluster_lock.rb', line 131

def add_cluster_luck_info_to_result_class
  return if @cluster_lock_info_added

  result_class = const_get(:Result)
  DSL::PublicField.new(:cluster_lock_info).add_to(result_class)
  result_class.send(:define_method, :_business_flow_cluster_lock_finalize,
                    RESULT_FINALIZE)
  @cluster_lock_info_added = true
end

#build(parameter_object) ⇒ Object



109
110
111
112
# File 'lib/business_flow/cluster_lock.rb', line 109

def build(parameter_object)
  add_cluster_luck_info_to_result_class
  super
end

#default_lock_nameObject



105
106
107
# File 'lib/business_flow/cluster_lock.rb', line 105

def default_lock_name
  proc { self.class.name }
end

#execute(flow) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/business_flow/cluster_lock.rb', line 114

def execute(flow)
  lock_info = LockInfo.new(
    ClassMethods.lock_name(flow),
    ClassMethods.zookeeper_server_list(flow)
  )
  ClassMethods.with_lock(flow, lock_info) do
    super(flow)._business_flow_cluster_lock_finalize(lock_info)
  end
rescue LockFailure => e
  result_from(e.add_to(flow))._business_flow_cluster_lock_finalize(lock_info)
end

#with_cluster_lock(lock_name = nil, opts = {}, &blk) ⇒ Object

rubocop:disable Metrics/ModuleLength



83
84
85
86
87
88
89
90
91
92
# File 'lib/business_flow/cluster_lock.rb', line 83

def with_cluster_lock(lock_name = nil, opts = {}, &blk)
  if lock_name.is_a?(String)
    @lock_name = Step.new(Callable.new(proc { lock_name }), {})
  elsif lock_name || blk
    @lock_name = Step.new(Callable.new(lock_name || blk),
                          { default_output: :lock_name }.merge(opts))
  else
    @lock_name ||= Step.new(Callable.new(default_lock_name), opts)
  end
end

#with_zookeeper_servers(servers = nil, opts = {}, &blk) ⇒ Object



94
95
96
97
98
99
100
101
102
103
# File 'lib/business_flow/cluster_lock.rb', line 94

def with_zookeeper_servers(servers = nil, opts = {}, &blk)
  if servers.is_a?(String)
    @zookeeper_servers = Step.new(Callable.new(proc { servers }), {})
  elsif servers || blk
    @zookeeper_servers = Step.new(Callable.new(servers || blk),
                                  { default_output: :zookeeper_servers }.merge(opts))
  else
    @zookeeper_servers || Step.new(BusinessFlow::ClusterLock.default_servers, opts)
  end
end