Module: BusinessFlow::ClusterLock::ClassMethods
- Defined in:
- lib/business_flow/cluster_lock.rb
Overview
DSL Methods
Constant Summary collapse
- RESULT_FINALIZE =
proc do |cluster_lock_info| @cluster_lock_info = cluster_lock_info self end
Class Method Summary collapse
- .acquire_lock(flow, lock_info, payload) ⇒ Object
- .cleanup(lock, zk_connection) ⇒ Object
-
.exception_to_error_type(exc) ⇒ Object
:reek:ControlParameter I’m using a case statement instead of a hash in a constant to ensure that this doesn’t throw exceptions if this file is required before zookeeper is.
- .inner_acquire_lock(zk_connection, lock, payload) ⇒ Object
- .instrumented_acquire_lock(flow, lock_info) ⇒ Object
-
.lock_name(flow) ⇒ Object
:reek:NilCheck.
- .with_lock(flow, lock_info) ⇒ Object
-
.zookeeper_server_list(flow) ⇒ Object
:reek:NilCheck.
Instance Method Summary collapse
- #add_cluster_luck_info_to_result_class ⇒ Object
- #build(parameter_object) ⇒ Object
- #default_lock_name ⇒ Object
- #execute(flow) ⇒ Object
-
#with_cluster_lock(lock_name = nil, opts = {}, &blk) ⇒ Object
rubocop:disable Metrics/ModuleLength.
- #with_zookeeper_servers(servers = nil, opts = {}, &blk) ⇒ Object
Class Method Details
.acquire_lock(flow, lock_info, payload) ⇒ Object
170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/business_flow/cluster_lock.rb', line 170 def self.acquire_lock(flow, lock_info, payload) zk_connection = ZK::Client::Threaded.new(lock_info.zookeeper_servers) lock = flow.instance_variable_set( :@_business_flow_cluster_lock, ZK::Locker::ExclusiveLocker.new(zk_connection, lock_info.lock_name) ) inner_acquire_lock(zk_connection, lock, payload) rescue StandardError zk_connection&.close! raise end |
.cleanup(lock, zk_connection) ⇒ Object
192 193 194 195 196 197 198 |
# File 'lib/business_flow/cluster_lock.rb', line 192 def self.cleanup(lock, zk_connection) lock&.unlock rescue StandardError # Just let the connection close handle this. ensure zk_connection&.close! end |
.exception_to_error_type(exc) ⇒ Object
:reek:ControlParameter I’m using a case statement instead of a hash in a constant to ensure that this doesn’t throw exceptions if this file is required before zookeeper is.
203 204 205 206 207 208 209 210 211 212 |
# File 'lib/business_flow/cluster_lock.rb', line 203 def self.exception_to_error_type(exc) case exc when ZK::Exceptions::LockAssertionFailedError :assert_failed when ZK::Exceptions::OperationTimeOut :zookeeper_timeout else :unknown_failure end end |
.inner_acquire_lock(zk_connection, lock, payload) ⇒ Object
182 183 184 185 186 187 188 189 190 |
# File 'lib/business_flow/cluster_lock.rb', line 182 def self.inner_acquire_lock(zk_connection, lock, payload) lock_held = lock.lock(wait: false) payload[:lock_acquired] = lock_held if payload unless lock_held zk_connection.close! raise LockFailure.new(:lock_unavailable, 'the lock was not available') end [zk_connection, lock] end |
.instrumented_acquire_lock(flow, lock_info) ⇒ Object
163 164 165 166 167 168 |
# File 'lib/business_flow/cluster_lock.rb', line 163 def self.instrumented_acquire_lock(flow, lock_info) flow.class.instrument(:cluster_lock_setup, flow) do |payload| payload[:lock_name] = lock_info.lock_name if payload acquire_lock(flow, lock_info, payload) end end |
.lock_name(flow) ⇒ Object
:reek:NilCheck
153 154 155 156 157 158 159 160 161 |
# File 'lib/business_flow/cluster_lock.rb', line 153 def self.lock_name(flow) lock_name = catch(:halt_step) { flow.class.with_cluster_lock.call(flow)&.merge_into(flow)&.to_s } if lock_name.nil? || lock_name.empty? raise LockFailure.new(:no_lock_name, 'no lock name provided') end lock_name end |
.with_lock(flow, lock_info) ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/business_flow/cluster_lock.rb', line 214 def self.with_lock(flow, lock_info, &) unless BusinessFlow::ClusterLock.disabled? zk_connection, lock = instrumented_acquire_lock(flow, lock_info) end yield lock_info rescue ZK::Exceptions::LockAssertionFailedError, ZK::Exceptions::OperationTimeOut => e # This would occur if we asserted a cluster lock while executing the flow. # This will have set an error on the flow, so we can carry on. raise LockFailure.new(exception_to_error_type(e), e.), cause: e ensure cleanup(lock, zk_connection) end |
.zookeeper_server_list(flow) ⇒ Object
:reek:NilCheck
142 143 144 145 146 147 148 149 150 |
# File 'lib/business_flow/cluster_lock.rb', line 142 def self.zookeeper_server_list(flow) servers = catch(:halt_step) { flow.class.with_zookeeper_servers.call(flow)&.merge_into(flow)&.to_s } if servers.nil? || servers.empty? raise LockFailure.new(:no_servers, 'no zookeeper servers provided') end servers end |
Instance Method Details
#add_cluster_luck_info_to_result_class ⇒ Object
131 132 133 134 135 136 137 138 139 |
# File 'lib/business_flow/cluster_lock.rb', line 131 def add_cluster_luck_info_to_result_class return if @cluster_lock_info_added result_class = const_get(:Result) DSL::PublicField.new(:cluster_lock_info).add_to(result_class) result_class.send(:define_method, :_business_flow_cluster_lock_finalize, RESULT_FINALIZE) @cluster_lock_info_added = true end |
#build(parameter_object) ⇒ Object
109 110 111 112 |
# File 'lib/business_flow/cluster_lock.rb', line 109 def build(parameter_object) add_cluster_luck_info_to_result_class super end |
#default_lock_name ⇒ Object
105 106 107 |
# File 'lib/business_flow/cluster_lock.rb', line 105 def default_lock_name proc { self.class.name } end |
#execute(flow) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/business_flow/cluster_lock.rb', line 114 def execute(flow) lock_info = LockInfo.new( ClassMethods.lock_name(flow), ClassMethods.zookeeper_server_list(flow) ) ClassMethods.with_lock(flow, lock_info) do super(flow)._business_flow_cluster_lock_finalize(lock_info) end rescue LockFailure => e result_from(e.add_to(flow))._business_flow_cluster_lock_finalize(lock_info) end |
#with_cluster_lock(lock_name = nil, opts = {}, &blk) ⇒ Object
rubocop:disable Metrics/ModuleLength
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/business_flow/cluster_lock.rb', line 83 def with_cluster_lock(lock_name = nil, opts = {}, &blk) if lock_name.is_a?(String) @lock_name = Step.new(Callable.new(proc { lock_name }), {}) elsif lock_name || blk @lock_name = Step.new(Callable.new(lock_name || blk), { default_output: :lock_name }.merge(opts)) else @lock_name ||= Step.new(Callable.new(default_lock_name), opts) end end |
#with_zookeeper_servers(servers = nil, opts = {}, &blk) ⇒ Object
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/business_flow/cluster_lock.rb', line 94 def with_zookeeper_servers(servers = nil, opts = {}, &blk) if servers.is_a?(String) @zookeeper_servers = Step.new(Callable.new(proc { servers }), {}) elsif servers || blk @zookeeper_servers = Step.new(Callable.new(servers || blk), { default_output: :zookeeper_servers }.merge(opts)) else @zookeeper_servers || Step.new(BusinessFlow::ClusterLock.default_servers, opts) end end |