Module: Semian
- Extended by:
- Semian, Instrumentable
- Included in:
- Semian
- Defined in:
- lib/semian.rb,
lib/semian/rails.rb,
lib/semian/adapter.rb,
lib/semian/version.rb,
lib/semian/platform.rb,
lib/semian/resource.rb,
lib/semian/simple_state.rb,
lib/semian/instrumentable.rb,
lib/semian/pid_controller.rb,
lib/semian/simple_integer.rb,
lib/semian/circuit_breaker.rb,
lib/semian/protected_resource.rb,
lib/semian/activerecord_adapter.rb,
lib/semian/dual_circuit_breaker.rb,
lib/semian/unprotected_resource.rb,
lib/semian/pid_controller_thread.rb,
lib/semian/simple_sliding_window.rb,
lib/semian/configuration_validator.rb,
lib/semian/adaptive_circuit_breaker.rb,
lib/semian/circuit_breaker_behaviour.rb,
lib/semian/simple_exponential_smoother.rb,
lib/semian/activerecord_postgresql_adapter.rb,
lib/semian/activerecord_trilogy_adapter.rb,
lib/semian/redis_client.rb,
lib/semian/redis/v5.rb,
lib/semian/net_http.rb,
lib/semian/mysql2.rb,
lib/semian/redis.rb,
lib/semian/grpc.rb
Overview
Overview
Semian is a library that can be used to control access to external services.
It’s desirable to control access to external services so that in the case that one is slow or not responding, the performance of an entire system is not compromised.
Semian uses the concept of a “resource” as an identifier that controls access to some external service. So for example, “mysql” or “redis” would be considered resources. If a system is sharded, like a database, you would typically create a resource for every shard.
Resources are visible across an IPC namespace. This means that you can register a resource in one process and access it from another. This is useful in application servers like Unicorn that are multi-process. A resource is persistent. It will continue to exist even after the application exits, and will only be destroyed by manually removing it with the ipcrm command, calling Resource.destroy, or rebooting the machine.
Each resource has a configurable number of tickets. Tickets are what controls access to the external service. If a client does not have a ticket, it cannot access a service. If there are no tickets available, the client will block for a configurable amount of time until a ticket is available. If there are no tickets available after the timeout period has elapsed, the client will be unable to access the service and an error will be raised.
Resources also integrate a circuit breaker in order to fail faster and to let the resource the time to recover. If error_threshold errors happen in the span of error_timeout then the circuit will be opened and every attempt to acquire the resource will immediately fail.
Once in open state, after error_timeout is elapsed, the circuit will transition in the half-open state. In that state a single error will fully re-open the circuit, and the circuit will transition back to the closed state only after the resource is acquired success_threshold consecutive times.
A resource is registered by using the Semian.register method.
Examples
Registering a resource
Semian.register(
:mysql_shard0,
tickets: 10,
timeout: 0.5,
error_threshold: 3,
error_timeout: 10,
success_threshold: 2,
)
This registers a new resource called :mysql_shard0 that has 10 tickets and a default timeout of 500 milliseconds.
After 3 failures in the span of 10 seconds the circuit will be open. After an additional 10 seconds it will transition to half-open. And finally after 2 successful acquisitions of the resource it will transition back to the closed state.
Using a resource
Semian[:mysql_shard0].acquire do
# Perform a MySQL query here
end
This acquires a ticket for the :mysql_shard0 resource. If we use the example above, the ticket count would be lowered to 9 when block is executed, then raised to 10 when the block completes.
Overriding the default timeout
Semian[:mysql_shard0].acquire(timeout: 1) do
# Perform a MySQL query here
end
This is the same as the previous example, but overrides the timeout from the default value of 500 milliseconds to 1 second.
Defined Under Namespace
Modules: ActiveRecordAdapter, ActiveRecordPostgreSQLAdapter, ActiveRecordTrilogyAdapter, Adapter, AdapterError, CircuitBreakerBehaviour, GRPC, Instrumentable, Mysql2, NetHTTP, Rails, RedisClient, RedisClientCommon, RedisClientPool, RedisV4, RedisV5, RedisV5Client, Simple, SlidingWindowBehavior, ThreadSafe Classes: AdaptiveCircuitBreaker, CircuitBreaker, ConfigurationValidator, DualCircuitBreaker, InternalError, PIDControllerThread, ProtectedResource, Resource, SemaphoreMissingError, SimpleExponentialSmoother, SyscallError, TimeoutError, UnprotectedResource
Constant Summary collapse
- BaseError =
Class.new(StandardError)
- OpenCircuitError =
Class.new(BaseError)
- MAX_TICKETS =
Maximum number of tickets available on this system.
INT2FIX(system_max_semaphore_count)
- VERSION =
"0.28.1"
Constants included from Instrumentable
Instance Attribute Summary collapse
-
#default_force_config_validation ⇒ Object
Returns the value of attribute default_force_config_validation.
-
#default_permissions ⇒ Object
Returns the value of attribute default_permissions.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#maximum_lru_size ⇒ Object
Returns the value of attribute maximum_lru_size.
-
#minimum_lru_time ⇒ Object
Returns the value of attribute minimum_lru_time.
-
#namespace ⇒ Object
Returns the value of attribute namespace.
Instance Method Summary collapse
-
#[](name) ⇒ Object
Retrieves a resource by name.
- #bulkheads_disabled_in_thread?(thread) ⇒ Boolean
- #consumers ⇒ Object
- #destroy(name) ⇒ Object
- #destroy_all_resources ⇒ Object
- #disable_bulkheads_for_thread(thread) ⇒ Object
- #disabled? ⇒ Boolean
- #issue_disabled_semaphores_warning ⇒ Object
-
#register(name, **options) ⇒ Object
Registers a resource.
- #reset! ⇒ Object
- #resources ⇒ Object
- #retrieve_or_register(name, **args) ⇒ Object
- #semaphores_enabled? ⇒ Boolean
-
#sysv_semaphores_supported? ⇒ Boolean
Determines if Semian supported on the current platform.
- #thread_safe=(thread_safe) ⇒ Object
-
#thread_safe? ⇒ Boolean
We only allow disabling thread-safety for parts of the code that are on the hot path.
-
#unregister(name) ⇒ Object
Unregister will not destroy the semian resource, but it will remove it from the hash of registered resources, and decrease the number of registered workers.
-
#unregister_all_resources ⇒ Object
Unregisters all resources.
Methods included from Instrumentable
notify, subscribe, subscribers, unsubscribe
Instance Attribute Details
#default_force_config_validation ⇒ Object
Returns the value of attribute default_force_config_validation.
109 110 111 |
# File 'lib/semian.rb', line 109 def default_force_config_validation @default_force_config_validation end |
#default_permissions ⇒ Object
Returns the value of attribute default_permissions.
109 110 111 |
# File 'lib/semian.rb', line 109 def end |
#logger ⇒ Object
Returns the value of attribute logger.
158 159 160 |
# File 'lib/semian.rb', line 158 def logger @logger end |
#maximum_lru_size ⇒ Object
Returns the value of attribute maximum_lru_size.
109 110 111 |
# File 'lib/semian.rb', line 109 def maximum_lru_size @maximum_lru_size end |
#minimum_lru_time ⇒ Object
Returns the value of attribute minimum_lru_time.
109 110 111 |
# File 'lib/semian.rb', line 109 def minimum_lru_time @minimum_lru_time end |
#namespace ⇒ Object
Returns the value of attribute namespace.
109 110 111 |
# File 'lib/semian.rb', line 109 def namespace @namespace end |
Instance Method Details
#[](name) ⇒ Object
Retrieves a resource by name.
260 261 262 |
# File 'lib/semian.rb', line 260 def [](name) resources[name] end |
#bulkheads_disabled_in_thread?(thread) ⇒ Boolean
308 309 310 |
# File 'lib/semian.rb', line 308 def bulkheads_disabled_in_thread?(thread) thread.thread_variable_get(THREAD_BULKHEAD_DISABLED_VAR) end |
#consumers ⇒ Object
328 329 330 331 332 333 334 |
# File 'lib/semian.rb', line 328 def consumers return @consumers if defined?(@consumers) && @consumers @reset_mutex.synchronize do @consumers ||= Concurrent::Map.new end end |
#destroy(name) ⇒ Object
264 265 266 267 |
# File 'lib/semian.rb', line 264 def destroy(name) resource = resources.delete(name) resource&.destroy end |
#destroy_all_resources ⇒ Object
269 270 271 272 |
# File 'lib/semian.rb', line 269 def destroy_all_resources resources.values.each(&:destroy) resources.clear end |
#disable_bulkheads_for_thread(thread) ⇒ Object
312 313 314 315 316 317 318 |
# File 'lib/semian.rb', line 312 def disable_bulkheads_for_thread(thread) old_value = thread.thread_variable_get(THREAD_BULKHEAD_DISABLED_VAR) thread.thread_variable_set(THREAD_BULKHEAD_DISABLED_VAR, true) yield ensure thread.thread_variable_set(THREAD_BULKHEAD_DISABLED_VAR, old_value) end |
#disabled? ⇒ Boolean
15 16 17 |
# File 'lib/semian/platform.rb', line 15 def disabled? ENV.key?("SEMIAN_SEMAPHORES_DISABLED") || ENV.key?("SEMIAN_DISABLED") end |
#issue_disabled_semaphores_warning ⇒ Object
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/semian.rb', line 130 def issue_disabled_semaphores_warning return if defined?(@warning_issued) @warning_issued = true if !sysv_semaphores_supported? logger.info("Semian sysv semaphores are not supported on #{RUBY_PLATFORM} - all operations will no-op") elsif disabled? logger.info("Semian semaphores are disabled, is this what you really want? - all operations will no-op") end end |
#register(name, **options) ⇒ Object
Registers a resource.
name: Name of the resource - this can be either a string or symbol. (required)
circuit_breaker: The boolean if you want a circuit breaker acquired for your resource. Default true.
bulkhead: The boolean if you want a bulkhead to be acquired for your resource. Default true.
tickets: Number of tickets. If this value is 0, the ticket count will not be set, but the resource must have been previously registered otherwise an error will be raised. Mutually exclusive with the ‘quota’ argument.
quota: Calculate tickets as a ratio of the number of registered workers. Must be greater than 0, less than or equal to 1. There will always be at least 1 ticket, as it is calculated as (workers * quota).ceil Mutually exclusive with the ‘ticket’ argument. but the resource must have been previously registered otherwise an error will be raised. (bulkhead)
permissions: Octal permissions of the resource. Default to Semian.default_permissions (0660). (bulkhead)
timeout: Default timeout in seconds. Default 0. (bulkhead)
error_timeout: The duration in seconds since the last error after which the error count is reset to 0. (circuit breaker required)
error_threshold: The amount of errors that must happen within error_timeout amount of time to open the circuit. (circuit breaker required)
error_threshold_timeout: The duration in seconds to examine number of errors to compare with error_threshold. Default same as error_timeout. (circuit breaker)
error_threshold_timeout_enabled: flag to enable/disable filter time window based error eviction (error_threshold_timeout). Default true. (circuit breaker)
success_threshold: The number of consecutive success after which an half-open circuit will be fully closed. (circuit breaker required)
exceptions: An array of exception classes that should be accounted as resource errors. Default []. (circuit breaker)
# +exponential_backoff_error_timeout+: When set to true, instead of opening the circuit for the full
error_timeout duration, it starts with a smaller timeout and increases exponentially on each subsequent opening up to error_timeout. This helps avoid over-opening the circuit for temporary issues. Default false. (circuit breaker)
exponential_backoff_initial_timeout: The initial timeout in seconds when exponential backoff is enabled. Only valid when exponential_backoff_error_timeout is true. Default 1. (circuit breaker)
exponential_backoff_multiplier: The factor by which to multiply the timeout on each subsequent opening when exponential backoff is enabled. Only valid when exponential_backoff_error_timeout is true. Default 2. (circuit breaker)
adaptive_circuit_breaker: Enable adaptive circuit breaker using PID controller. Default false. When enabled, this replaces the traditional circuit breaker with an adaptive version that dynamically adjusts rejection rates based on service health. (adaptive circuit breaker)
dual_circuit_breaker: Enable dual circuit breaker mode where both legacy and adaptive circuit breakers are initialized. Default false. When enabled, both circuit breakers track requests, but only one is used for decision-making based on use_adaptive. (dual circuit breaker)
use_adaptive: A callable (Proc/lambda) that returns true to use adaptive circuit breaker or false to use legacy. Only used when dual_circuit_breaker is enabled. Default: ->() { false }. Example: ->() { MyFeatureFlag.enabled?(:adaptive_circuit_breaker) } (dual circuit breaker)
Returns the registered resource.
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/semian.rb', line 229 def register(name, **) return UnprotectedResource.new(name) if ENV.key?("SEMIAN_DISABLED") # Validate configuration before proceeding ConfigurationValidator.new(name, ).validate! circuit_breaker = if [:dual_circuit_breaker] create_dual_circuit_breaker(name, **) elsif [:adaptive_circuit_breaker] create_adaptive_circuit_breaker(name, **) else create_circuit_breaker(name, **) end bulkhead = create_bulkhead(name, **) resources[name] = ProtectedResource.new(name, bulkhead, circuit_breaker) end |
#reset! ⇒ Object
298 299 300 301 302 303 |
# File 'lib/semian.rb', line 298 def reset! @reset_mutex.synchronize do @consumers = Concurrent::Map.new @resources = LRUHash.new end end |
#resources ⇒ Object
320 321 322 323 324 325 326 |
# File 'lib/semian.rb', line 320 def resources return @resources if defined?(@resources) && @resources @reset_mutex.synchronize do @resources ||= LRUHash.new end end |
#retrieve_or_register(name, **args) ⇒ Object
248 249 250 251 252 253 254 255 256 257 |
# File 'lib/semian.rb', line 248 def retrieve_or_register(name, **args) # If consumer who retrieved / registered by a Semian::Adapter, keep track # of who the consumer was so that we can clear the resource reference if needed. consumer = args.delete(:consumer) if consumer&.class&.include?(Semian::Adapter) && !args[:dynamic] consumer_set = consumers.compute_if_absent(name) { ObjectSpace::WeakMap.new } consumer_set[consumer] = true end self[name] || register(name, **args) end |
#semaphores_enabled? ⇒ Boolean
11 12 13 |
# File 'lib/semian/platform.rb', line 11 def semaphores_enabled? !disabled? && sysv_semaphores_supported? end |
#sysv_semaphores_supported? ⇒ Boolean
Determines if Semian supported on the current platform.
7 8 9 |
# File 'lib/semian/platform.rb', line 7 def sysv_semaphores_supported? /linux/.match(RUBY_PLATFORM) end |
#thread_safe=(thread_safe) ⇒ Object
124 125 126 |
# File 'lib/semian.rb', line 124 def thread_safe=(thread_safe) @thread_safe = thread_safe end |
#thread_safe? ⇒ Boolean
We only allow disabling thread-safety for parts of the code that are on the hot path. Since locking there could have a significant impact. Everything else is enforced thread safety
118 119 120 121 122 |
# File 'lib/semian.rb', line 118 def thread_safe? return @thread_safe if defined?(@thread_safe) @thread_safe = true end |
#unregister(name) ⇒ Object
Unregister will not destroy the semian resource, but it will remove it from the hash of registered resources, and decrease the number of registered workers. Semian.destroy removes the underlying resource, but Semian.unregister will remove all references, while preserving the underlying semian resource (and sysV semaphore). Also clears any semian_resources in use by any semian adapters if the weak reference is still alive.
282 283 284 285 286 287 288 289 |
# File 'lib/semian.rb', line 282 def unregister(name) resource = resources.delete(name) if resource resource.bulkhead&.unregister_worker consumers_for_resource = consumers.delete(name) || ObjectSpace::WeakMap.new consumers_for_resource.each_key(&:clear_semian_resource) end end |
#unregister_all_resources ⇒ Object
Unregisters all resources
292 293 294 295 296 |
# File 'lib/semian.rb', line 292 def unregister_all_resources resources.keys.each do |resource| unregister(resource) end end |