Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error rate circuit breaker #264

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
41 changes: 41 additions & 0 deletions lib/semian.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
require 'semian/platform'
require 'semian/resource'
require 'semian/circuit_breaker'
require 'semian/error_rate_circuit_breaker'
require 'semian/protected_resource'
require 'semian/unprotected_resource'
require 'semian/simple_sliding_window'
require 'semian/time_sliding_window'
require 'semian/simple_integer'
require 'semian/simple_state'
require 'semian/lru_hash'
Expand Down Expand Up @@ -126,6 +128,9 @@ def to_s
#
# +circuit_breaker+: The boolean if you want a circuit breaker acquired for your resource. Default true.
#
# +circuit_breaker_type+: The string representing the type of circuit breaker, one of :normal or :error_rate
# Default normal (optional)
#
# +bulkhead+: The boolean if you want a bulkhead to be acquired for your resource. Default true.
#
# +tickets+: Number of tickets. If this value is 0, the ticket count will not be set,
Expand Down Expand Up @@ -153,6 +158,18 @@ def to_s
# +exceptions+: An array of exception classes that should be accounted as resource errors. Default [].
# (circuit breaker)
#
# +error_percent_threshold+: The percentage of time spent making calls that ultimately ended in error
# that will trigger the circuit opening (error_rate circuit breaker required)
#
# +request_volume_threshold+: The number of calls that must happen within the time_window before the circuit
damianthe marked this conversation as resolved.
Show resolved Hide resolved
# will consider opening based on error_percent_threshold. For example, if the value is 20, then if only 19 requests
# are received in the rolling window the circuit will not trip open even if all 19 failed.
# Without this the circuit would open if the first request was an error (100% failure rate).
# (error_rate circuit breaker required)
#
# +time_window+: The time window in seconds over which the error rate will be calculated
# (error_rate circuit breaker required)
#
# Returns the registered resource.
def register(name, **options)
circuit_breaker = create_circuit_breaker(name, **options)
Expand Down Expand Up @@ -245,9 +262,33 @@ def thread_safe=(thread_safe)

private

def create_error_rate_circuit_breaker(name, **options)
require_keys!([:success_threshold, :error_percent_threshold, :error_timeout,
:request_volume_threshold, :time_window], options)

exceptions = options[:exceptions] || []
ErrorRateCircuitBreaker.new(name,
success_threshold: options[:success_threshold],
error_percent_threshold: options[:error_percent_threshold],
error_timeout: options[:error_timeout],
exceptions: Array(exceptions) + [::Semian::BaseError],
half_open_resource_timeout: options[:half_open_resource_timeout],
request_volume_threshold: options[:request_volume_threshold],
time_window: options[:time_window],
implementation: implementation(**options))
end

def create_circuit_breaker(name, **options)
circuit_breaker = options.fetch(:circuit_breaker, true)
return unless circuit_breaker

type = options.fetch(:circuit_breaker_type, :normal)
unless [:normal, :error_rate].include?(type)
raise ArgumentError, "Unknown 'circuit_breaker_type': #{type}, should be :normal or :error_rate"
end

return create_error_rate_circuit_breaker(name, **options) if type == :error_rate

require_keys!([:success_threshold, :error_threshold, :error_timeout], options)

exceptions = options[:exceptions] || []
Expand Down
188 changes: 188 additions & 0 deletions lib/semian/error_rate_circuit_breaker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
module Semian
class ErrorRateCircuitBreaker #:nodoc:
damianthe marked this conversation as resolved.
Show resolved Hide resolved
extend Forwardable

def_delegators :@state, :closed?, :open?, :half_open?

attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error, :error_percent_threshold,
:request_volume_threshold, :success_threshold, :time_window

def initialize(name, exceptions:, error_percent_threshold:, error_timeout:, time_window:,
request_volume_threshold:, success_threshold:, implementation:,
half_open_resource_timeout: nil, time_source: nil)

raise 'error_threshold_percent should be between 0.0 and 1.0 exclusive' unless (0.0001...1.0).cover?(error_percent_threshold)
damianthe marked this conversation as resolved.
Show resolved Hide resolved

@name = name.to_sym
@error_timeout = error_timeout * 1000
@exceptions = exceptions
@half_open_resource_timeout = half_open_resource_timeout
@error_percent_threshold = error_percent_threshold
@last_error_time = nil
@request_volume_threshold = request_volume_threshold
@success_threshold = success_threshold
@time_source = time_source ? time_source : -> { Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) }
@window = implementation::TimeSlidingWindow.new(time_window, @time_source)
@state = implementation::State.new

reset
end

def acquire(resource = nil, &block)
return yield if disabled?
transition_to_half_open if transition_to_half_open?

raise OpenCircuitError unless request_allowed?

time_start = current_time
result = nil
begin
result = maybe_with_half_open_resource_timeout(resource, &block)
rescue *@exceptions => error
if !error.respond_to?(:marks_semian_circuits?) || error.marks_semian_circuits?
mark_failed(error, current_time - time_start)
end
raise error
else
mark_success(current_time - time_start)
end
result
end

def transition_to_half_open?
open? && error_timeout_expired? && !half_open?
end

def request_allowed?
closed? || half_open? || transition_to_half_open?
end

def mark_failed(error, time_spent)
push_error(error, time_spent)
if closed?
transition_to_open if error_threshold_reached?
damianthe marked this conversation as resolved.
Show resolved Hide resolved
elsif half_open?
transition_to_open
end
end

def mark_success(time_spent)
@window << [true, time_spent]
return unless half_open?
transition_to_close if success_threshold_reached?
end

def reset
@last_error_time = nil
@window.clear
transition_to_close
end

def destroy
@state.destroy
end

def in_use?
return false if error_timeout_expired?
error_count > 0
end

private

def current_time
@time_source.call
end

def transition_to_close
notify_state_transition(:closed)
log_state_transition(:closed)
@state.close!
end

def transition_to_open
notify_state_transition(:open)
log_state_transition(:open)
@state.open!
@window.clear
end

def transition_to_half_open
notify_state_transition(:half_open)
log_state_transition(:half_open)
@state.half_open!
@window.clear
end

def success_threshold_reached?
success_count >= @success_threshold
end

def error_threshold_reached?
return false if @window.empty? || @window.length < @request_volume_threshold
success_time_spent, error_time_spent = calculate_time_spent
total_time = error_time_spent + success_time_spent
error_time_spent / total_time >= @error_percent_threshold
end

def calculate_time_spent
@window.each_with_object([0.0, 0.0]) do |entry, sum|
if entry[0] == true
sum[0] = entry[1] + sum[0]
else
sum[1] = entry[1] + sum[1]
end
end
end

def error_count
@window.count { |entry| entry[0] == false }.to_f
end

def success_count
@window.count { |entry| entry[0] == true }.to_f
end

def error_timeout_expired?
return false unless @last_error_time
current_time - @last_error_time >= @error_timeout
end

def push_error(error, time_spent)
@last_error = error
@last_error_time = current_time
@window << [false, time_spent]
end

def log_state_transition(new_state)
return if @state.nil? || new_state == @state.value

str = "[#{self.class.name}] State transition from #{@state.value} to #{new_state}."
str << " success_count=#{success_count} error_count=#{error_count}"
str << " success_count_threshold=#{@success_threshold} error_count_percent=#{@error_percent_threshold}"
str << " error_timeout=#{@error_timeout} error_last_at=\"#{@last_error_time}\""
str << " name=\"#{@name}\""
Semian.logger.info(str)
end

def notify_state_transition(new_state)
Semian.notify(:state_change, self, nil, nil, state: new_state)
end

def disabled?
ENV['SEMIAN_CIRCUIT_BREAKER_DISABLED'] || ENV['SEMIAN_DISABLED']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires rethinking of changes in master branch. Idea to not use ENV inside business logic, only during the configuration phase.

end

def maybe_with_half_open_resource_timeout(resource, &block)
result =
if half_open? && @half_open_resource_timeout && resource.respond_to?(:with_resource_timeout)
resource.with_resource_timeout(@half_open_resource_timeout) do
block.call
end
else
block.call
end

result
end
end
end
103 changes: 103 additions & 0 deletions lib/semian/time_sliding_window.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
require 'thread'

module Semian
module Simple
class TimeSlidingWindow #:nodoc:
extend Forwardable

def_delegators :@window, :size, :empty?, :length
attr_reader :time_window_millis

Pair = Struct.new(:head, :tail)

# A sliding window is a structure that stores the most recent entries that were pushed within the last slice of time
def initialize(time_window, time_source = nil)
@time_window_millis = time_window * 1000
@time_source = time_source ? time_source : -> { Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) }
@window = []
end

def count(&block)
_remove_old
vals = @window.map(&:tail)
vals.count(&block)
end

def each_with_object(memo, &block)
_remove_old
vals = @window.map(&:tail)
vals.each_with_object(memo, &block)
end

def push(value)
_remove_old # make room
@window << Pair.new(current_time, value)
self
end

alias_method :<<, :push

def clear
@window.clear
self
end

def last
@window.last&.tail
end

def remove_old
_remove_old
end
damianthe marked this conversation as resolved.
Show resolved Hide resolved

alias_method :destroy, :clear

private

def _remove_old
midtime = current_time - time_window_millis
# special case, everything is too old
@window.clear if [email protected]? && @window.last.head < midtime
damianthe marked this conversation as resolved.
Show resolved Hide resolved
# otherwise we find the index position where the cutoff is
idx = ([email protected]).bsearch { |n| @window[n].head >= midtime }
damianthe marked this conversation as resolved.
Show resolved Hide resolved
@window.slice!(0, idx) if idx
end

def current_time
@time_source.call
end
end
end

module ThreadSafe
class TimeSlidingWindow < Simple::TimeSlidingWindow
def initialize(*)
super
@lock = Mutex.new
end

# #size, #last, and #clear are not wrapped in a mutex. For the first two,
# the worst-case is a thread-switch at a timing where they'd receive an
# out-of-date value--which could happen with a mutex as well.
#
# As for clear, it's an all or nothing operation. Doesn't matter if we
# have the lock or not.

def count(*)
@lock.synchronize { super }
end

def each_with_object(*)
@lock.synchronize { super }
end

def remove_old
@lock.synchronize { super }
end

def push(*)
@lock.synchronize { super }
end
end
end
end
Loading