Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per queue config + ENV config #10

Open
wants to merge 8 commits into
base: version-1.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
Unreleased Changes
------------------

* Feature - Support per queue configuration.
* Feature - Support loading global and queue specific configuration from ENV.

0.1.0 (2024-11-16)
------------------

Expand Down
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,19 @@ You also need to configure a mapping of ActiveJob queue names to SQS Queue URLs:

```yaml
# config/aws_sqs_active_job.yml
backpressure: 5 # configure global options for poller
max_messages: 3
queues:
default: 'https://my-queue-url.amazon.aws'
default:
url: 'https://my-queue-url.amazon.aws'
max_messages: 2 # queue specific values override global values
```

For a complete list of configuration options see the
[Aws::ActiveJob::SQS::Configuration](https://docs.aws.amazon.com/sdk-for-ruby/aws-activejob-sqs/api/Aws/ActiveJob/SQS/Configuration.html)
documentation.

You can configure SQS Active Job either through the yaml file or
You can configure SQS Active Job either through the environment, yaml file or
through code in your `config/<env>.rb` or initializers.

For file based configuration, you can use either
Expand All @@ -88,6 +92,16 @@ Aws::ActiveJob::SQS.configure do |config|
end
```

SQS Active Job loads global and queue specific values from your
environment. Global keys take the form of:
`AWS_ACTIVE_JOB_SQS_<KEY_NAME>` and queue specific keys take the
form of: `AWS_ACTIVE_JOB_SQS_<QUEUE_NAME>_<KEY_NAME>`. Example:

```shell
export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5
export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws
```

## Usage

To queue a job, you can just use standard ActiveJob methods:
Expand Down
2 changes: 1 addition & 1 deletion lib/active_job/queue_adapters/sqs_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def enqueue_at(job, timestamp)
def enqueue_all(jobs)
enqueued_count = 0
jobs.group_by(&:queue_name).each do |queue_name, same_queue_jobs|
queue_url = Aws::ActiveJob::SQS.config.queue_url_for(queue_name)
queue_url = Aws::ActiveJob::SQS.config.url_for(queue_name)
base_send_message_opts = { queue_url: queue_url }

same_queue_jobs.each_slice(10) do |chunk|
Expand Down
6 changes: 3 additions & 3 deletions lib/active_job/queue_adapters/sqs_adapter/params.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def initialize(job, body)
end

def queue_url
@queue_url ||= Aws::ActiveJob::SQS.config.queue_url_for(@job.queue_name)
@queue_url ||= Aws::ActiveJob::SQS.config.url_for(@job.queue_name)
end

def entry
Expand Down Expand Up @@ -61,15 +61,15 @@ def options_for_fifo
Digest::SHA256.hexdigest(ActiveSupport::JSON.dump(deduplication_body))

message_group_id = @job.message_group_id if @job.respond_to?(:message_group_id)
message_group_id ||= Aws::ActiveJob::SQS.config.message_group_id
message_group_id ||= Aws::ActiveJob::SQS.config.message_group_id_for(@job.queue_name)

options[:message_group_id] = message_group_id
options
end

def deduplication_body
ex_dedup_keys = @job.excluded_deduplication_keys if @job.respond_to?(:excluded_deduplication_keys)
ex_dedup_keys ||= Aws::ActiveJob::SQS.config.excluded_deduplication_keys
ex_dedup_keys ||= Aws::ActiveJob::SQS.config.excluded_deduplication_keys_for(@job.queue_name)

@body.except(*ex_dedup_keys)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/active_job/queue_adapters/sqs_async_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class SqsAsyncAdapter < SqsAdapter

def _enqueue(job, body = nil, send_message_opts = {})
# FIFO jobs must be queued in order, so do not queue async
queue_url = Aws::ActiveJob::SQS.config.queue_url_for(job.queue_name)
queue_url = Aws::ActiveJob::SQS.config.url_for(job.queue_name)
if Aws::ActiveJob::SQS.fifo?(queue_url)
super
else
Expand Down
122 changes: 104 additions & 18 deletions lib/aws/active_job/sqs/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class Configuration
# Default configuration options
# @api private
DEFAULTS = {
threads: 2 * Concurrent.processor_count,
backpressure: 10,
max_messages: 10,
shutdown_timeout: 15,
retry_standard_errors: true, # TODO: Remove in next MV
Expand All @@ -17,20 +19,42 @@ class Configuration
excluded_deduplication_keys: ['job_id']
}.freeze

# @api private
attr_accessor :queues, :max_messages, :visibility_timeout,
:shutdown_timeout, :client, :logger,
:async_queue_error_handler, :message_group_id
GLOBAL_ENV_CONFIGS = %i[
threads backpressure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - single line or multiline all

max_messages shutdown_timeout
visibility_timeout message_group_id
].freeze

attr_reader :excluded_deduplication_keys
QUEUE_ENV_CONFIGS = %i[
url max_messages
visibility_timeout message_group_id
].freeze

# Don't use this method directly: Configuration is a singleton class, use
# +Aws::ActiveJob::SQS.config+ to access the singleton config.
#
# This class provides a Configuration object for AWS ActiveJob
# by pulling configuration options from Runtime, the ENV, a YAML file,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ENV should take precedence over YAML, so that you can set ENV for the config file location. See how this was done in the sessionstore gem recently.

# and default settings, in that order. Values set on queues are used
# preferentially to global values.
#
# # Environment Variables
# The Configuration loads global and queue specific values from your
# environment. Global keys take the form of:
# `AWS_ACTIVE_JOB_SQS_<KEY_NAME>` and queue specific keys take the
# form of: `AWS_ACTIVE_JOB_SQS_<QUEUE_NAME>_<KEY_NAME>`. Example:
#
# export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5
# export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws
#
# @param [Hash] options
# @option options [Hash[Symbol, String]] :queues A mapping between the
# active job queue name and the SQS Queue URL. Note: multiple active
# job queues can map to the same SQS Queue URL.
# @option options [Hash[Symbol, Hash]] :queues A mapping between the
# active job queue name and the queue properties. Valid properties
# are: url [Required], max_messages, shutdown_timeout,
# message_group_id, and :excluded_deduplication_keys. Values
# configured on the queue are used preferentially to the global
# values.
# Note: multiple active job queues can map to the same SQS Queue URL.
#
# @option options [Integer] :max_messages
# The max number of messages to poll for in a batch.
Expand All @@ -50,7 +74,7 @@ class Configuration
# will not be deleted from the SQS queue and will be retryable after
# the visibility timeout.
#
# @ option options [Boolean] :retry_standard_errors
# @option options [Boolean] :retry_standard_errors
# If `true`, StandardErrors raised by ActiveJobs are left on the queue
# and will be retried (pending the SQS Queue's redrive/DLQ/maximum receive settings).
# This behavior overrides the standard Rails ActiveJob
Expand All @@ -73,7 +97,7 @@ class Configuration
# See the (SQS FIFO Documentation)[https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html]
#
# @option options [Callable] :async_queue_error_handler An error handler
# to be called when the async active job adapter experiances an error
# to be called when the async active job adapter experiences an error
# queueing a job. Only applies when
# +active_job.queue_adapter = :sqs_async+. Called with:
# [error, job, job_options]
Expand All @@ -87,12 +111,24 @@ class Configuration

def initialize(options = {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options[:config_file] ||= config_file if File.exist?(config_file)
options = DEFAULTS
resolved = DEFAULTS
.merge(file_options(options))

resolved = resolved
.merge(env_options(resolved))
.merge(options)
set_attributes(options)
set_attributes(resolved)
end

# @api private
attr_accessor :queues, :threads, :backpressure,
:max_messages, :visibility_timeout,
:shutdown_timeout, :client, :logger,
:async_queue_error_handler, :message_group_id,
:retry_standard_errors

attr_reader :excluded_deduplication_keys

def excluded_deduplication_keys=(keys)
@excluded_deduplication_keys = keys.map(&:to_s) | ['job_id']
end
Expand All @@ -106,11 +142,24 @@ def client
end

# Return the queue_url for a given job_queue name
def queue_url_for(job_queue)
job_queue = job_queue.to_sym
raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue
def url_for(job_queue)
queue_attribute_for(:url, job_queue)
end

def max_messages_for(job_queue)
queue_attribute_for(:max_messages, job_queue)
end

queues[job_queue]
def visibility_timeout_for(job_queue)
queue_attribute_for(:visibility_timeout, job_queue)
end

def message_group_id_for(job_queue)
queue_attribute_for(:message_group_id, job_queue)
end

def excluded_deduplication_keys_for(job_queue)
queue_attribute_for(:excluded_deduplication_keys, job_queue)
end

# @api private
Expand All @@ -131,6 +180,13 @@ def to_h

private

def queue_attribute_for(attribute, job_queue)
job_queue = job_queue.to_sym
raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue

queues[job_queue][attribute] || instance_variable_get("@#{attribute}")
end

# Set accessible attributes after merged options.
def set_attributes(options)
options.each_key do |opt_name|
Expand All @@ -139,12 +195,37 @@ def set_attributes(options)
end
end

# resolve ENV for global and queue specific options
def env_options(options)
resolved = {}
GLOBAL_ENV_CONFIGS.each do |cfg|
env_name = "AWS_ACTIVE_JOB_SQS_#{cfg.to_s.upcase}"
resolved[cfg] = parse_env_value(env_name) if ENV.key? env_name
end
options[:queues]&.each_key do |queue|
resolved[:queues] ||= {}
resolved[:queues][queue] = options[:queues][queue].dup
QUEUE_ENV_CONFIGS.each do |cfg|
env_name = "AWS_ACTIVE_JOB_SQS_#{queue.upcase}_#{cfg.to_s.upcase}"
resolved[:queues][queue][cfg] = parse_env_value(env_name) if ENV.key? env_name
end
end
resolved
end

def parse_env_value(key)
val = ENV.fetch(key, nil)
Integer(val)
rescue ArgumentError, TypeError
%w[true false].include?(val) ? val == 'true' : val
end

def file_options(options = {})
file_path = config_file_path(options)
if file_path
load_from_file(file_path)
else
{}
options
end
end

Expand All @@ -157,12 +238,17 @@ def config_file
# Load options from YAML file
def load_from_file(file_path)
opts = load_yaml(file_path) || {}
opts[:queues]&.each_key do |queue|
if opts[:queues][queue].is_a?(String)
opts[:queues][queue] = { url: opts[:queues][queue] }
end
end
opts.deep_symbolize_keys
end

# @return [String] Configuration path found in environment or YAML file.
def config_file_path(options)
options[:config_file] || ENV.fetch('AWS_SQS_ACTIVE_JOB_CONFIG_FILE', nil)
options[:config_file] || ENV.fetch('AWS_ACTIVE_JOB_SQS_CONFIG_FILE', nil)
end

def load_yaml(file_path)
Expand Down
47 changes: 24 additions & 23 deletions lib/aws/active_job/sqs/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ module SQS
class Poller
class Interrupt < StandardError; end

DEFAULT_OPTS = {
threads: 2 * Concurrent.processor_count,
max_messages: 10,
shutdown_timeout: 15,
backpressure: 10,
retry_standard_errors: true
}.freeze

def initialize(args = ARGV)
@options = parse_args(args)
# Set_environment must be run before we boot_rails
Expand All @@ -35,22 +27,28 @@ def run
boot_rails

# cannot load config (from file or initializers) until after
# rails has been booted.
@options = DEFAULT_OPTS
.merge(Aws::ActiveJob::SQS.config.to_h)
.merge(@options.to_h)
# rails has been booted.\
Aws::ActiveJob::SQS.configure do |cfg|
@options.each_pair do |key, value|
cfg.send(:"#{key}=", value) if cfg.respond_to?(:"#{key}=")
end
end

validate_config

config = Aws::ActiveJob::SQS.config

# ensure we have a logger configured
@logger = @options[:logger] || ActiveSupport::Logger.new($stdout)
@logger.info("Starting Poller with options=#{@options}")
@logger = config.logger || ActiveSupport::Logger.new($stdout)
@logger.info("Starting Poller with config=#{config.to_h}")

Signal.trap('INT') { raise Interrupt }
Signal.trap('TERM') { raise Interrupt }
@executor = Executor.new(
max_threads: @options[:threads],
max_threads: config.threads,
logger: @logger,
max_queue: @options[:backpressure],
retry_standard_errors: @options[:retry_standard_errors]
max_queue: config.backpressure,
retry_standard_errors: config.retry_standard_errors
)

poll
Expand All @@ -63,18 +61,19 @@ def run
private

def shutdown
@executor.shutdown(@options[:shutdown_timeout])
@executor.shutdown(Aws::ActiveJob::SQS.config.shutdown_timeout)
end

def poll
queue_url = Aws::ActiveJob::SQS.config.queue_url_for(@options[:queue])
@logger.info "Polling on: #{@options[:queue]} => #{queue_url}"
client = Aws::ActiveJob::SQS.config.client
config = Aws::ActiveJob::SQS.config
queue = @options[:queue]
queue_url = config.url_for(queue)
client = config.client
@poller = Aws::SQS::QueuePoller.new(queue_url, client: client)
poller_options = {
skip_delete: true,
max_number_of_messages: @options[:max_messages],
visibility_timeout: @options[:visibility_timeout]
max_number_of_messages: config.max_messages_for(queue),
visibility_timeout: config.visibility_timeout_for(queue)
}
# Limit max_number_of_messages for FIFO queues to 1
# this ensures jobs with the same message_group_id are processed
Expand All @@ -85,6 +84,8 @@ def poll

single_message = poller_options[:max_number_of_messages] == 1

@logger.info "Polling on: #{queue} => #{queue_url} with options=#{poller_options}"

@poller.poll(poller_options) do |msgs|
msgs = [msgs] if single_message
@logger.info "Processing batch of #{msgs.length} messages"
Expand Down
2 changes: 1 addition & 1 deletion spec/active_job/queue_adapters/sqs_adapter/params_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SqsAdapter

describe 'fifo queue' do
before do
allow(Aws::ActiveJob::SQS.config).to receive(:queue_url_for).and_return('https://queue-url.fifo')
allow(Aws::ActiveJob::SQS.config).to receive(:url_for).and_return('https://queue-url.fifo')
end

it 'includes message_group_id and message_deduplication_id' do
Expand Down
Loading