Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,36 @@ def register_subscriber(subscriber_klass)
def register_publisher(subscriber_klass)
boot_registry.register_publisher(subscriber_klass)
end

# Indicates shutdown in progress to allow guards in publish/consume paths
def shutting_down!
@shutting_down = true
end

def shutting_down?
!!@shutting_down
end

def amqp_inflight_count
@amqp_inflight_mutex ||= Mutex.new
@amqp_inflight_mutex.synchronize { @amqp_inflight_count ||= 0 }
end

def increment_amqp_inflight
@amqp_inflight_mutex ||= Mutex.new
@amqp_inflight_mutex.synchronize do
@amqp_inflight_count ||= 0
@amqp_inflight_count += 1
end
end

def decrement_amqp_inflight
@amqp_inflight_mutex ||= Mutex.new
@amqp_inflight_mutex.synchronize do
@amqp_inflight_count ||= 0
@amqp_inflight_count = [@amqp_inflight_count - 1, 0].max
end
end
end

class EventSourceLogger
Expand Down
3 changes: 2 additions & 1 deletion lib/event_source/configure/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ class Config

def initialize
@log_level = :warn
@shutdown_timeouts = { amqp_drain: 15, http_drain: 10 }
end

# TODO: add default for pub_sub_root
attr_writer :pub_sub_root, :protocols, :server_configurations
attr_accessor :app_name, :log_level
attr_accessor :app_name, :log_level, :auto_shutdown, :shutdown_timeouts

def load_protocols
@protocols.each do |protocol|
Expand Down
82 changes: 82 additions & 0 deletions lib/event_source/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,88 @@ def drop_connections_for(protocol)
end
end

# Cancel AMQP consumers and wait briefly for drain
def cancel_amqp_consumers!(timeout: 5)
logger.info "Cancelling AMQP consumers prior to shutdown"
# logger.info "AMQP inflight handlers (start): #{EventSource.amqp_inflight_count}"
#
any_consumers_before_cancel = connections_for(:amqp).any? do |connection|
connection.channels.values.any? do |channel|
begin
channel.channel_proxy.subject.any_consumers?
rescue StandardError
false
end
end
end
logger.info "AMQP inflight handlers (before_cancel): #{EventSource.amqp_inflight_count}, any_consumers=#{any_consumers_before_cancel}"

connections_for(:amqp).each do |connection|
connection.channels.each_value do |channel|
channel.subscribe_operations.each_value do |sub_op|
subject = sub_op.subject
if subject.respond_to?(:cancel_consumers!)
subject.cancel_consumers!
end
end
end
end

start = Time.now
loop do
inflight = EventSource.amqp_inflight_count
any_consumers = connections_for(:amqp).any? do |connection|
connection.channels.values.any? do |channel|
begin
channel.channel_proxy.subject.any_consumers?
rescue StandardError
false
end
end
end
logger.info "AMQP inflight handlers (drain): #{inflight}, any_consumers=#{any_consumers}"
break if inflight == 0 && !any_consumers
break if (Time.now - start) >= timeout
sleep 0.1
end
logger.info "AMQP consumer cancellation complete"
logger.info "AMQP inflight handlers (end): #{EventSource.amqp_inflight_count}"
end

# Gracefully stop HTTP workers by draining queued actions
def drain_http_workers!(timeout: 5)
logger.info "Draining HTTP workers prior to shutdown"
total_start = 0
connections_for(:http).each do |connection|
connection.channels.each_value do |channel|
begin
cp = channel.channel_proxy
if cp.respond_to?(:drain_and_close)
queued = channel.subscribe_operations.values.sum do |sub_op|
subj = sub_op.subject
subj.respond_to?(:size) ? subj.size : 0
end
total_start += queued
cp.drain_and_close(timeout: timeout)
end
rescue StandardError => e
logger.info "HTTP worker drain error: #{e.message}"
end
end
end
total_end = connections_for(:http).sum do |connection|
connection.channels.values.sum do |channel|
channel.subscribe_operations.values.sum do |sub_op|
subj = sub_op.subject
subj.respond_to?(:size) ? subj.size : 0
end
end
end
logger.info "HTTP worker drain complete"
logger.info "HTTP queued actions (start): #{total_start}"
logger.info "HTTP queued actions (end): #{total_end}"
end

# Remove a network resource from the connection registry
# @param connection_uri [String] the unique key for the connection to
# remove
Expand Down
2 changes: 0 additions & 2 deletions lib/event_source/protocols/amqp/bunny_exchange_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ def publish(payload:, publish_bindings:, headers: {})
bunny_publish_bindings[:headers] = headers unless headers.empty?

logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}"

@subject.publish(payload, bunny_publish_bindings)

logger.debug "BunnyExchange#publish published message: #{payload}"
logger.debug "BunnyExchange#publish published message to exchange: #{@subject.name}"
end
Expand Down
19 changes: 18 additions & 1 deletion lib/event_source/protocols/amqp/bunny_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,15 @@ def spawn_thread(options)
end

def add_consumer(subscriber_klass, options)
@subject.subscribe(options) do |delivery_info, metadata, payload|
consumer = @subject.subscribe(options) do |delivery_info, metadata, payload|
on_receive_message(
subscriber_klass,
delivery_info,
metadata,
payload
)
end
@consumers << consumer if consumer
end

def convert_to_subscribe_options(options)
Expand All @@ -124,6 +125,7 @@ def on_receive_message(
metadata,
payload
)
EventSource.increment_amqp_inflight
logger.debug '**************************'
logger.debug subscriber_klass.inspect
logger.debug delivery_info.inspect
Expand Down Expand Up @@ -158,6 +160,7 @@ def on_receive_message(
logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}"
ensure
subscriber = nil
EventSource.decrement_amqp_inflight
end

# Decodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_subscribe.yml message bindings.
Expand Down Expand Up @@ -247,6 +250,20 @@ def channel_item_queue_bindings_for(bindings)
def exchange_name_from_queue(queue_name)
queue_name.match(/^\w+\.(.+)/)[1]
end

public

# Cancel all registered consumers for this queue
def cancel_consumers!
@consumers.each do |consumer|
begin
consumer.cancel
rescue StandardError => e
logger.info "Consumer cancellation error: #{e.message}"
end
end
@consumers.clear
end
end
end
end
Expand Down
6 changes: 6 additions & 0 deletions lib/event_source/protocols/http/faraday_channel_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ def close
@worker.stop if defined?(@worker)
end

# Gracefully stop the worker after draining queued actions
def drain_and_close(timeout: 5)
return unless defined?(@worker) && @worker
@worker.stop_and_drain(timeout: timeout)
end

def active?
@worker.active?
end
Expand Down
5 changes: 5 additions & 0 deletions lib/event_source/protocols/http/faraday_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def actions
@subject.actions
end

# Current number of queued HTTP actions
def size
@subject.size
end

# Construct and subscribe a consumer_proxy with the queue
# @param [Object] subscriber_klass Subscriber class
# @return [Queue] Queue instance
Expand Down
16 changes: 16 additions & 0 deletions lib/event_source/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ def closed?
@subject.closed?
end

# Are there any items left to process in the queue?
# @return [Boolean]
def empty?
@subject.empty?
end

# Current number of items waiting in the queue
# @return [Integer]
def size
if @subject.respond_to?(:length)
@subject.length
else
@subject.size
end
end

# Register an action to be performed, with a resolver class and key.
def register_action(resolver, key)
@registered_actions << [resolver, key]
Expand Down
35 changes: 35 additions & 0 deletions lib/event_source/railtie.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
# frozen_string_literal: true

require 'logger'

module EventSource
# :nodoc:
module Railtie
Rails::Application::Finisher.initializer "event_source.boot", after: :finisher_hook do
logger = Logger.new($stdout)
logger.progname = 'EventSource graceful shutdown'
timeouts = EventSource.config.shutdown_timeouts || {}
amqp_timeout = timeouts[:amqp_drain] || 5
http_timeout = timeouts[:http_drain] || 5

# Perform shutdown work outside of trap/at_exit context to avoid
# ThreadError from mutex operations within Bunny (AMQP client).
shutdown = lambda do |reason|
Thread.new do
begin
EventSource.shutting_down!
logger.info "#{reason}, starting graceful shutdown"
logger.info "AMQP inflight handlers at shutdown start: #{EventSource.amqp_inflight_count}"
cm = EventSource::ConnectionManager.instance
# Stop consuming and allow in-flight handlers to drain briefly
cm.cancel_amqp_consumers!(timeout: amqp_timeout)
cm.drain_http_workers!(timeout: http_timeout)
cm.drop_connections_for(:amqp)
cm.drop_connections_for(:http)
rescue => e
logger.error "graceful shutdown error: #{e.class}: #{e.message}"
end
end.join
end

if EventSource.config.auto_shutdown
at_exit { shutdown.call('at_exit received') }

%w[TERM INT].each do |sig|
Signal.trap(sig) { shutdown.call("signal=#{sig} received") }
end
end
EventSource.initialize!
end
end
Expand Down
13 changes: 13 additions & 0 deletions lib/event_source/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ def stop
true
end

# Stop accepting new work, wait for queued actions to drain, then stop
# @param [Integer] timeout seconds to wait for drain
def stop_and_drain(timeout: 5)
logger.debug("Stop+Drain Worker for Queue: #{queue_proxy.name}, timeout=#{timeout}s")
start = Time.now
# Wait until queue empties or timeout
loop do
break if !actions_left? || (Time.now - start) >= timeout
sleep 0.1
end
stop
end

# Flag indicating whether this worker is accepting new actions
def active?
!queue_proxy.closed?
Expand Down
Loading