diff --git a/CHANGELOG.md b/CHANGELOG.md index c7b351b..3517ed0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ Unreleased Changes ------------------ +* Feature - Support per queue configuration. +* Feature - Support loading global and queue specific configuration from ENV. + 0.1.0 (2024-11-16) ------------------ diff --git a/README.md b/README.md index 997f178..18d9d62 100644 --- a/README.md +++ b/README.md @@ -63,15 +63,19 @@ You also need to configure a mapping of ActiveJob queue names to SQS Queue URLs: ```yaml # config/aws_sqs_active_job.yml +backpressure: 5 # configure global options for poller +max_messages: 3 queues: - default: 'https://my-queue-url.amazon.aws' + default: + url: 'https://my-queue-url.amazon.aws' + max_messages: 2 # queue specific values override global values ``` For a complete list of configuration options see the [Aws::ActiveJob::SQS::Configuration](https://docs.aws.amazon.com/sdk-for-ruby/aws-activejob-sqs/api/Aws/ActiveJob/SQS/Configuration.html) documentation. -You can configure SQS Active Job either through the yaml file or +You can configure SQS Active Job either through the environment, yaml file or through code in your `config/.rb` or initializers. For file based configuration, you can use either @@ -88,6 +92,16 @@ Aws::ActiveJob::SQS.configure do |config| end ``` +SQS Active Job loads global and queue specific values from your +environment. Global keys take the form of: +`AWS_ACTIVE_JOB_SQS_` and queue specific keys take the +form of: `AWS_ACTIVE_JOB_SQS__`. Example: + +```shell +export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5 +export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws +``` + ## Usage To queue a job, you can just use standard ActiveJob methods: diff --git a/lib/active_job/queue_adapters/sqs_adapter.rb b/lib/active_job/queue_adapters/sqs_adapter.rb index 1cda41d..a05f93a 100644 --- a/lib/active_job/queue_adapters/sqs_adapter.rb +++ b/lib/active_job/queue_adapters/sqs_adapter.rb @@ -27,7 +27,7 @@ def enqueue_at(job, timestamp) def enqueue_all(jobs) enqueued_count = 0 jobs.group_by(&:queue_name).each do |queue_name, same_queue_jobs| - queue_url = Aws::ActiveJob::SQS.config.queue_url_for(queue_name) + queue_url = Aws::ActiveJob::SQS.config.url_for(queue_name) base_send_message_opts = { queue_url: queue_url } same_queue_jobs.each_slice(10) do |chunk| diff --git a/lib/active_job/queue_adapters/sqs_adapter/params.rb b/lib/active_job/queue_adapters/sqs_adapter/params.rb index 190652c..eb8aac4 100644 --- a/lib/active_job/queue_adapters/sqs_adapter/params.rb +++ b/lib/active_job/queue_adapters/sqs_adapter/params.rb @@ -22,7 +22,7 @@ def initialize(job, body) end def queue_url - @queue_url ||= Aws::ActiveJob::SQS.config.queue_url_for(@job.queue_name) + @queue_url ||= Aws::ActiveJob::SQS.config.url_for(@job.queue_name) end def entry @@ -61,7 +61,7 @@ def options_for_fifo Digest::SHA256.hexdigest(ActiveSupport::JSON.dump(deduplication_body)) message_group_id = @job.message_group_id if @job.respond_to?(:message_group_id) - message_group_id ||= Aws::ActiveJob::SQS.config.message_group_id + message_group_id ||= Aws::ActiveJob::SQS.config.message_group_id_for(@job.queue_name) options[:message_group_id] = message_group_id options @@ -69,7 +69,7 @@ def options_for_fifo def deduplication_body ex_dedup_keys = @job.excluded_deduplication_keys if @job.respond_to?(:excluded_deduplication_keys) - ex_dedup_keys ||= Aws::ActiveJob::SQS.config.excluded_deduplication_keys + ex_dedup_keys ||= Aws::ActiveJob::SQS.config.excluded_deduplication_keys_for(@job.queue_name) @body.except(*ex_dedup_keys) end diff --git a/lib/active_job/queue_adapters/sqs_async_adapter.rb b/lib/active_job/queue_adapters/sqs_async_adapter.rb index 3971be9..16fc4b7 100644 --- a/lib/active_job/queue_adapters/sqs_async_adapter.rb +++ b/lib/active_job/queue_adapters/sqs_async_adapter.rb @@ -19,7 +19,7 @@ class SqsAsyncAdapter < SqsAdapter def _enqueue(job, body = nil, send_message_opts = {}) # FIFO jobs must be queued in order, so do not queue async - queue_url = Aws::ActiveJob::SQS.config.queue_url_for(job.queue_name) + queue_url = Aws::ActiveJob::SQS.config.url_for(job.queue_name) if Aws::ActiveJob::SQS.fifo?(queue_url) super else diff --git a/lib/aws/active_job/sqs/configuration.rb b/lib/aws/active_job/sqs/configuration.rb index cbdc60e..eae8b55 100644 --- a/lib/aws/active_job/sqs/configuration.rb +++ b/lib/aws/active_job/sqs/configuration.rb @@ -8,6 +8,8 @@ class Configuration # Default configuration options # @api private DEFAULTS = { + threads: 2 * Concurrent.processor_count, + backpressure: 10, max_messages: 10, shutdown_timeout: 15, retry_standard_errors: true, # TODO: Remove in next MV @@ -17,20 +19,42 @@ class Configuration excluded_deduplication_keys: ['job_id'] }.freeze - # @api private - attr_accessor :queues, :max_messages, :visibility_timeout, - :shutdown_timeout, :client, :logger, - :async_queue_error_handler, :message_group_id + GLOBAL_ENV_CONFIGS = %i[ + threads backpressure + max_messages shutdown_timeout + visibility_timeout message_group_id + ].freeze - attr_reader :excluded_deduplication_keys + QUEUE_ENV_CONFIGS = %i[ + url max_messages + visibility_timeout message_group_id + ].freeze # Don't use this method directly: Configuration is a singleton class, use # +Aws::ActiveJob::SQS.config+ to access the singleton config. # + # This class provides a Configuration object for AWS ActiveJob + # by pulling configuration options from Runtime, the ENV, a YAML file, + # and default settings, in that order. Values set on queues are used + # preferentially to global values. + # + # # Environment Variables + # The Configuration loads global and queue specific values from your + # environment. Global keys take the form of: + # `AWS_ACTIVE_JOB_SQS_` and queue specific keys take the + # form of: `AWS_ACTIVE_JOB_SQS__`. Example: + # + # export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5 + # export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws + # # @param [Hash] options - # @option options [Hash[Symbol, String]] :queues A mapping between the - # active job queue name and the SQS Queue URL. Note: multiple active - # job queues can map to the same SQS Queue URL. + # @option options [Hash[Symbol, Hash]] :queues A mapping between the + # active job queue name and the queue properties. Valid properties + # are: url [Required], max_messages, shutdown_timeout, + # message_group_id, and :excluded_deduplication_keys. Values + # configured on the queue are used preferentially to the global + # values. + # Note: multiple active job queues can map to the same SQS Queue URL. # # @option options [Integer] :max_messages # The max number of messages to poll for in a batch. @@ -50,7 +74,7 @@ class Configuration # will not be deleted from the SQS queue and will be retryable after # the visibility timeout. # - # @ option options [Boolean] :retry_standard_errors + # @option options [Boolean] :retry_standard_errors # If `true`, StandardErrors raised by ActiveJobs are left on the queue # and will be retried (pending the SQS Queue's redrive/DLQ/maximum receive settings). # This behavior overrides the standard Rails ActiveJob @@ -73,7 +97,7 @@ class Configuration # See the (SQS FIFO Documentation)[https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html] # # @option options [Callable] :async_queue_error_handler An error handler - # to be called when the async active job adapter experiances an error + # to be called when the async active job adapter experiences an error # queueing a job. Only applies when # +active_job.queue_adapter = :sqs_async+. Called with: # [error, job, job_options] @@ -87,12 +111,24 @@ class Configuration def initialize(options = {}) options[:config_file] ||= config_file if File.exist?(config_file) - options = DEFAULTS + resolved = DEFAULTS .merge(file_options(options)) + + resolved = resolved + .merge(env_options(resolved)) .merge(options) - set_attributes(options) + set_attributes(resolved) end + # @api private + attr_accessor :queues, :threads, :backpressure, + :max_messages, :visibility_timeout, + :shutdown_timeout, :client, :logger, + :async_queue_error_handler, :message_group_id, + :retry_standard_errors + + attr_reader :excluded_deduplication_keys + def excluded_deduplication_keys=(keys) @excluded_deduplication_keys = keys.map(&:to_s) | ['job_id'] end @@ -106,11 +142,24 @@ def client end # Return the queue_url for a given job_queue name - def queue_url_for(job_queue) - job_queue = job_queue.to_sym - raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue + def url_for(job_queue) + queue_attribute_for(:url, job_queue) + end + + def max_messages_for(job_queue) + queue_attribute_for(:max_messages, job_queue) + end - queues[job_queue] + def visibility_timeout_for(job_queue) + queue_attribute_for(:visibility_timeout, job_queue) + end + + def message_group_id_for(job_queue) + queue_attribute_for(:message_group_id, job_queue) + end + + def excluded_deduplication_keys_for(job_queue) + queue_attribute_for(:excluded_deduplication_keys, job_queue) end # @api private @@ -131,6 +180,13 @@ def to_h private + def queue_attribute_for(attribute, job_queue) + job_queue = job_queue.to_sym + raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue + + queues[job_queue][attribute] || instance_variable_get("@#{attribute}") + end + # Set accessible attributes after merged options. def set_attributes(options) options.each_key do |opt_name| @@ -139,12 +195,37 @@ def set_attributes(options) end end + # resolve ENV for global and queue specific options + def env_options(options) + resolved = {} + GLOBAL_ENV_CONFIGS.each do |cfg| + env_name = "AWS_ACTIVE_JOB_SQS_#{cfg.to_s.upcase}" + resolved[cfg] = parse_env_value(env_name) if ENV.key? env_name + end + options[:queues]&.each_key do |queue| + resolved[:queues] ||= {} + resolved[:queues][queue] = options[:queues][queue].dup + QUEUE_ENV_CONFIGS.each do |cfg| + env_name = "AWS_ACTIVE_JOB_SQS_#{queue.upcase}_#{cfg.to_s.upcase}" + resolved[:queues][queue][cfg] = parse_env_value(env_name) if ENV.key? env_name + end + end + resolved + end + + def parse_env_value(key) + val = ENV.fetch(key, nil) + Integer(val) + rescue ArgumentError, TypeError + %w[true false].include?(val) ? val == 'true' : val + end + def file_options(options = {}) file_path = config_file_path(options) if file_path load_from_file(file_path) else - {} + options end end @@ -157,12 +238,17 @@ def config_file # Load options from YAML file def load_from_file(file_path) opts = load_yaml(file_path) || {} + opts[:queues]&.each_key do |queue| + if opts[:queues][queue].is_a?(String) + opts[:queues][queue] = { url: opts[:queues][queue] } + end + end opts.deep_symbolize_keys end # @return [String] Configuration path found in environment or YAML file. def config_file_path(options) - options[:config_file] || ENV.fetch('AWS_SQS_ACTIVE_JOB_CONFIG_FILE', nil) + options[:config_file] || ENV.fetch('AWS_ACTIVE_JOB_SQS_CONFIG_FILE', nil) end def load_yaml(file_path) diff --git a/lib/aws/active_job/sqs/poller.rb b/lib/aws/active_job/sqs/poller.rb index c23ba93..44e729d 100644 --- a/lib/aws/active_job/sqs/poller.rb +++ b/lib/aws/active_job/sqs/poller.rb @@ -12,14 +12,6 @@ module SQS class Poller class Interrupt < StandardError; end - DEFAULT_OPTS = { - threads: 2 * Concurrent.processor_count, - max_messages: 10, - shutdown_timeout: 15, - backpressure: 10, - retry_standard_errors: true - }.freeze - def initialize(args = ARGV) @options = parse_args(args) # Set_environment must be run before we boot_rails @@ -35,22 +27,28 @@ def run boot_rails # cannot load config (from file or initializers) until after - # rails has been booted. - @options = DEFAULT_OPTS - .merge(Aws::ActiveJob::SQS.config.to_h) - .merge(@options.to_h) + # rails has been booted.\ + Aws::ActiveJob::SQS.configure do |cfg| + @options.each_pair do |key, value| + cfg.send(:"#{key}=", value) if cfg.respond_to?(:"#{key}=") + end + end + validate_config + + config = Aws::ActiveJob::SQS.config + # ensure we have a logger configured - @logger = @options[:logger] || ActiveSupport::Logger.new($stdout) - @logger.info("Starting Poller with options=#{@options}") + @logger = config.logger || ActiveSupport::Logger.new($stdout) + @logger.info("Starting Poller with config=#{config.to_h}") Signal.trap('INT') { raise Interrupt } Signal.trap('TERM') { raise Interrupt } @executor = Executor.new( - max_threads: @options[:threads], + max_threads: config.threads, logger: @logger, - max_queue: @options[:backpressure], - retry_standard_errors: @options[:retry_standard_errors] + max_queue: config.backpressure, + retry_standard_errors: config.retry_standard_errors ) poll @@ -63,18 +61,19 @@ def run private def shutdown - @executor.shutdown(@options[:shutdown_timeout]) + @executor.shutdown(Aws::ActiveJob::SQS.config.shutdown_timeout) end def poll - queue_url = Aws::ActiveJob::SQS.config.queue_url_for(@options[:queue]) - @logger.info "Polling on: #{@options[:queue]} => #{queue_url}" - client = Aws::ActiveJob::SQS.config.client + config = Aws::ActiveJob::SQS.config + queue = @options[:queue] + queue_url = config.url_for(queue) + client = config.client @poller = Aws::SQS::QueuePoller.new(queue_url, client: client) poller_options = { skip_delete: true, - max_number_of_messages: @options[:max_messages], - visibility_timeout: @options[:visibility_timeout] + max_number_of_messages: config.max_messages_for(queue), + visibility_timeout: config.visibility_timeout_for(queue) } # Limit max_number_of_messages for FIFO queues to 1 # this ensures jobs with the same message_group_id are processed @@ -85,6 +84,8 @@ def poll single_message = poller_options[:max_number_of_messages] == 1 + @logger.info "Polling on: #{queue} => #{queue_url} with options=#{poller_options}" + @poller.poll(poller_options) do |msgs| msgs = [msgs] if single_message @logger.info "Processing batch of #{msgs.length} messages" diff --git a/spec/active_job/queue_adapters/sqs_adapter/params_spec.rb b/spec/active_job/queue_adapters/sqs_adapter/params_spec.rb index 78804ff..36349a4 100644 --- a/spec/active_job/queue_adapters/sqs_adapter/params_spec.rb +++ b/spec/active_job/queue_adapters/sqs_adapter/params_spec.rb @@ -49,7 +49,7 @@ class SqsAdapter describe 'fifo queue' do before do - allow(Aws::ActiveJob::SQS.config).to receive(:queue_url_for).and_return('https://queue-url.fifo') + allow(Aws::ActiveJob::SQS.config).to receive(:url_for).and_return('https://queue-url.fifo') end it 'includes message_group_id and message_deduplication_id' do diff --git a/spec/active_job/queue_adapters/sqs_adapter_spec.rb b/spec/active_job/queue_adapters/sqs_adapter_spec.rb index 4f116a6..055eac9 100644 --- a/spec/active_job/queue_adapters/sqs_adapter_spec.rb +++ b/spec/active_job/queue_adapters/sqs_adapter_spec.rb @@ -23,7 +23,7 @@ module QueueAdapters describe 'fifo queues' do before do - allow(Aws::ActiveJob::SQS.config).to receive(:queue_url_for).and_return('https://queue-url.fifo') + allow(Aws::ActiveJob::SQS.config).to receive(:url_for).and_return('https://queue-url.fifo') end it 'adds message_deduplication_id and default message_group_id if job does not override it' do diff --git a/spec/active_job/queue_adapters/sqs_async_adapter_spec.rb b/spec/active_job/queue_adapters/sqs_async_adapter_spec.rb index e941f75..9ca5154 100644 --- a/spec/active_job/queue_adapters/sqs_async_adapter_spec.rb +++ b/spec/active_job/queue_adapters/sqs_async_adapter_spec.rb @@ -58,7 +58,7 @@ def mock_async end it 'queues jobs to fifo queues synchronously' do - allow(Aws::ActiveJob::SQS.config).to receive(:queue_url_for) + allow(Aws::ActiveJob::SQS.config).to receive(:url_for) .and_return('https://queue-url.fifo') expect(Concurrent::Promises).not_to receive(:future) expect(client).to receive(:send_message) diff --git a/spec/aws/active_job/sqs/configuration_spec.rb b/spec/aws/active_job/sqs/configuration_spec.rb index 157369e..a69c254 100644 --- a/spec/aws/active_job/sqs/configuration_spec.rb +++ b/spec/aws/active_job/sqs/configuration_spec.rb @@ -7,7 +7,7 @@ module SQS let(:expected_file_opts) do { max_messages: 5, - queues: { default: 'https://queue-url' } + queues: { default: { url: 'https://queue-url', max_messages: 2 } } } end @@ -41,13 +41,80 @@ module SQS it 'accepts YAML config with alias' do allow_any_instance_of(ERB).to receive(:result).and_return(<<~YAML) common: &common - default: 'https://queue-url' + default: + url: 'https://queue-url' queues: <<: *common YAML expect { Aws::ActiveJob::SQS::Configuration.new }.to_not raise_error end + context 'ENV set' do + Configuration::GLOBAL_ENV_CONFIGS.each do |config_name| + describe "ENV #{config_name}" do + let(:env_name) { "AWS_ACTIVE_JOB_SQS_#{config_name.to_s.upcase}" } + + let(:cfg) { Configuration.new } + + before(:each) do + ENV[env_name] = 'env_value' + + file_options = {} + file_options[config_name] = 'file_value' + allow_any_instance_of(Configuration) + .to receive(:file_options).and_return(file_options) + end + + after(:each) do + ENV.delete(env_name) + end + + it 'uses values from ENV over default and file' do + expect(cfg.send(config_name)).to eq('env_value') + end + + it 'uses runtime configured values over ENV' do + options = {} + options[config_name] = 'runtime_value' + cfg = Configuration.new(options) + expect(cfg.send(config_name)).to eq('runtime_value') + end + end + end + + Configuration::QUEUE_ENV_CONFIGS.each do |config_name| + describe "ENV queue #{config_name}" do + let(:env_name) { "AWS_ACTIVE_JOB_SQS_DEFAULT_#{config_name.to_s.upcase}" } + + let(:cfg) { Configuration.new } + + before(:each) do + ENV[env_name] = 'env_value' + + file_options = {queues: {default: {}}} + file_options[:queues][:default][config_name] = 'file_value' + allow_any_instance_of(Configuration) + .to receive(:file_options).and_return(file_options) + end + + after(:each) do + ENV.delete(env_name) + end + + it 'uses values from ENV over default and file' do + expect(cfg.send(:"#{config_name}_for", :default)).to eq('env_value') + end + + it 'uses runtime configured values over ENV' do + options = {queues: {default: {}}} + options[:queues][:default][config_name] = 'runtime_value' + cfg = Configuration.new(options) + expect(cfg.send(:"#{config_name}_for", :default)).to eq('runtime_value') + end + end + end + end + describe '#client' do it 'does not create client on initialize' do expect(Aws::SQS::Client).not_to receive(:new) @@ -62,21 +129,30 @@ module SQS end end - describe '#queue_url_for' do - let(:queue_url) { 'https://queue_url' } + Configuration::QUEUE_ENV_CONFIGS.each do |config_name| + describe "##{config_name}" do + let(:cfg) do + queues = { + default: {}, + override: {} + } + queues[:override][config_name] = 'queue_value' + options = {queues: queues} + options[config_name] = 'global_value' + Aws::ActiveJob::SQS::Configuration.new(**options) + end - let(:cfg) do - Aws::ActiveJob::SQS::Configuration.new( - queues: { default: queue_url } - ) - end + it 'returns the queue value when set' do + expect(cfg.send(:"#{config_name}_for", :override)).to eq('queue_value') + end - it 'returns the queue url' do - expect(cfg.queue_url_for(:default)).to eq queue_url - end + it 'returns the global value when unset' do + expect(cfg.send(:"#{config_name}_for", :default)).to eq('global_value') + end - it 'raises an ArgumentError when the queue is not mapped' do - expect { cfg.queue_url_for(:not_mapped) }.to raise_error(ArgumentError) + it 'raises an ArgumentError when the queue is not mapped' do + expect { cfg.send(:"#{config_name}_for", :not_mapped) }.to raise_error(ArgumentError) + end end end end diff --git a/spec/aws/active_job/sqs/poller_spec.rb b/spec/aws/active_job/sqs/poller_spec.rb index 1415eaa..979cfd1 100644 --- a/spec/aws/active_job/sqs/poller_spec.rb +++ b/spec/aws/active_job/sqs/poller_spec.rb @@ -47,10 +47,10 @@ module SQS allow(poller).to receive(:poll) # no-op the poll poller.run - options = poller.instance_variable_get(:@options) - expect(options[:max_messages]).to eq 5 # from test app config file - expect(options[:visibility_timeout]).to eq 360 # from argv - expect(options[:shutdown_timeout]).to eq 15 # from defaults + config = Aws::ActiveJob::SQS.config + expect(config.max_messages).to eq 5 # from test app config file + expect(config.visibility_timeout).to eq 360 # from argv + expect(config.shutdown_timeout).to eq 15 # from defaults end it 'polls the configured queue' do @@ -71,7 +71,7 @@ module SQS expect(queue_poller).to receive(:poll).with( { skip_delete: true, - max_number_of_messages: 5, + max_number_of_messages: 2, # from queue config in app config file visibility_timeout: 360 } ) @@ -82,7 +82,7 @@ module SQS it 'sets max_number_of_messages to 1 for fifo queues' do allow(poller).to receive(:boot_rails) # no-op the boot - allow(Aws::ActiveJob::SQS.config).to receive(:queue_url_for).and_return('https://queue-url.fifo') + allow(Aws::ActiveJob::SQS.config).to receive(:url_for).and_return('https://queue-url.fifo') expect(Aws::SQS::QueuePoller).to receive(:new).and_return(queue_poller) expect(queue_poller).to receive(:poll).with( diff --git a/spec/dummy/config/aws_sqs_active_job.yml b/spec/dummy/config/aws_sqs_active_job.yml index 69f4154..dc8c630 100644 --- a/spec/dummy/config/aws_sqs_active_job.yml +++ b/spec/dummy/config/aws_sqs_active_job.yml @@ -1,3 +1,5 @@ max_messages: 5 queues: - default: 'https://queue-url' + default: + url: 'https://queue-url' + max_messages: 2