-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Per queue config + ENV config #10
base: version-1.0
Are you sure you want to change the base?
Changes from all commits
691684b
41f34b1
3fa7137
a692829
20cea56
61e845c
1819cad
69cea68
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ class Configuration | |
# Default configuration options | ||
# @api private | ||
DEFAULTS = { | ||
threads: 2 * Concurrent.processor_count, | ||
backpressure: 10, | ||
max_messages: 10, | ||
shutdown_timeout: 15, | ||
retry_standard_errors: true, # TODO: Remove in next MV | ||
|
@@ -17,20 +19,42 @@ class Configuration | |
excluded_deduplication_keys: ['job_id'] | ||
}.freeze | ||
|
||
# @api private | ||
attr_accessor :queues, :max_messages, :visibility_timeout, | ||
:shutdown_timeout, :client, :logger, | ||
:async_queue_error_handler, :message_group_id | ||
GLOBAL_ENV_CONFIGS = %i[ | ||
threads backpressure | ||
max_messages shutdown_timeout | ||
visibility_timeout message_group_id | ||
].freeze | ||
|
||
attr_reader :excluded_deduplication_keys | ||
QUEUE_ENV_CONFIGS = %i[ | ||
url max_messages | ||
visibility_timeout message_group_id | ||
].freeze | ||
|
||
# Don't use this method directly: Configuration is a singleton class, use | ||
# +Aws::ActiveJob::SQS.config+ to access the singleton config. | ||
# | ||
# This class provides a Configuration object for AWS ActiveJob | ||
# by pulling configuration options from Runtime, the ENV, a YAML file, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think ENV should take precedence over YAML, so that you can set ENV for the config file location. See how this was done in the sessionstore gem recently. |
||
# and default settings, in that order. Values set on queues are used | ||
# preferentially to global values. | ||
# | ||
# # Environment Variables | ||
# The Configuration loads global and queue specific values from your | ||
# environment. Global keys take the form of: | ||
# `AWS_ACTIVE_JOB_SQS_<KEY_NAME>` and queue specific keys take the | ||
# form of: `AWS_ACTIVE_JOB_SQS_<QUEUE_NAME>_<KEY_NAME>`. Example: | ||
# | ||
# export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5 | ||
# export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws | ||
# | ||
# @param [Hash] options | ||
# @option options [Hash[Symbol, String]] :queues A mapping between the | ||
# active job queue name and the SQS Queue URL. Note: multiple active | ||
# job queues can map to the same SQS Queue URL. | ||
# @option options [Hash[Symbol, Hash]] :queues A mapping between the | ||
# active job queue name and the queue properties. Valid properties | ||
# are: url [Required], max_messages, shutdown_timeout, | ||
# message_group_id, and :excluded_deduplication_keys. Values | ||
# configured on the queue are used preferentially to the global | ||
# values. | ||
# Note: multiple active job queues can map to the same SQS Queue URL. | ||
# | ||
# @option options [Integer] :max_messages | ||
# The max number of messages to poll for in a batch. | ||
|
@@ -50,7 +74,7 @@ class Configuration | |
# will not be deleted from the SQS queue and will be retryable after | ||
# the visibility timeout. | ||
# | ||
# @ option options [Boolean] :retry_standard_errors | ||
# @option options [Boolean] :retry_standard_errors | ||
# If `true`, StandardErrors raised by ActiveJobs are left on the queue | ||
# and will be retried (pending the SQS Queue's redrive/DLQ/maximum receive settings). | ||
# This behavior overrides the standard Rails ActiveJob | ||
|
@@ -73,7 +97,7 @@ class Configuration | |
# See the (SQS FIFO Documentation)[https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html] | ||
# | ||
# @option options [Callable] :async_queue_error_handler An error handler | ||
# to be called when the async active job adapter experiances an error | ||
# to be called when the async active job adapter experiences an error | ||
# queueing a job. Only applies when | ||
# +active_job.queue_adapter = :sqs_async+. Called with: | ||
# [error, job, job_options] | ||
|
@@ -87,12 +111,24 @@ class Configuration | |
|
||
def initialize(options = {}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This style/approach reads a lot cleaner IMO |
||
options[:config_file] ||= config_file if File.exist?(config_file) | ||
options = DEFAULTS | ||
resolved = DEFAULTS | ||
.merge(file_options(options)) | ||
|
||
resolved = resolved | ||
.merge(env_options(resolved)) | ||
.merge(options) | ||
set_attributes(options) | ||
set_attributes(resolved) | ||
end | ||
|
||
# @api private | ||
attr_accessor :queues, :threads, :backpressure, | ||
:max_messages, :visibility_timeout, | ||
:shutdown_timeout, :client, :logger, | ||
:async_queue_error_handler, :message_group_id, | ||
:retry_standard_errors | ||
|
||
attr_reader :excluded_deduplication_keys | ||
|
||
def excluded_deduplication_keys=(keys) | ||
@excluded_deduplication_keys = keys.map(&:to_s) | ['job_id'] | ||
end | ||
|
@@ -106,11 +142,24 @@ def client | |
end | ||
|
||
# Return the queue_url for a given job_queue name | ||
def queue_url_for(job_queue) | ||
job_queue = job_queue.to_sym | ||
raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue | ||
def url_for(job_queue) | ||
queue_attribute_for(:url, job_queue) | ||
end | ||
|
||
def max_messages_for(job_queue) | ||
queue_attribute_for(:max_messages, job_queue) | ||
end | ||
|
||
queues[job_queue] | ||
def visibility_timeout_for(job_queue) | ||
queue_attribute_for(:visibility_timeout, job_queue) | ||
end | ||
|
||
def message_group_id_for(job_queue) | ||
queue_attribute_for(:message_group_id, job_queue) | ||
end | ||
|
||
def excluded_deduplication_keys_for(job_queue) | ||
queue_attribute_for(:excluded_deduplication_keys, job_queue) | ||
end | ||
|
||
# @api private | ||
|
@@ -131,6 +180,13 @@ def to_h | |
|
||
private | ||
|
||
def queue_attribute_for(attribute, job_queue) | ||
job_queue = job_queue.to_sym | ||
raise ArgumentError, "No queue defined for #{job_queue}" unless queues.key? job_queue | ||
|
||
queues[job_queue][attribute] || instance_variable_get("@#{attribute}") | ||
end | ||
|
||
# Set accessible attributes after merged options. | ||
def set_attributes(options) | ||
options.each_key do |opt_name| | ||
|
@@ -139,12 +195,37 @@ def set_attributes(options) | |
end | ||
end | ||
|
||
# resolve ENV for global and queue specific options | ||
def env_options(options) | ||
resolved = {} | ||
GLOBAL_ENV_CONFIGS.each do |cfg| | ||
env_name = "AWS_ACTIVE_JOB_SQS_#{cfg.to_s.upcase}" | ||
resolved[cfg] = parse_env_value(env_name) if ENV.key? env_name | ||
end | ||
options[:queues]&.each_key do |queue| | ||
resolved[:queues] ||= {} | ||
resolved[:queues][queue] = options[:queues][queue].dup | ||
QUEUE_ENV_CONFIGS.each do |cfg| | ||
env_name = "AWS_ACTIVE_JOB_SQS_#{queue.upcase}_#{cfg.to_s.upcase}" | ||
resolved[:queues][queue][cfg] = parse_env_value(env_name) if ENV.key? env_name | ||
end | ||
end | ||
resolved | ||
end | ||
|
||
def parse_env_value(key) | ||
val = ENV.fetch(key, nil) | ||
Integer(val) | ||
rescue ArgumentError, TypeError | ||
%w[true false].include?(val) ? val == 'true' : val | ||
end | ||
|
||
def file_options(options = {}) | ||
file_path = config_file_path(options) | ||
if file_path | ||
load_from_file(file_path) | ||
else | ||
{} | ||
options | ||
end | ||
end | ||
|
||
|
@@ -157,12 +238,17 @@ def config_file | |
# Load options from YAML file | ||
def load_from_file(file_path) | ||
opts = load_yaml(file_path) || {} | ||
opts[:queues]&.each_key do |queue| | ||
if opts[:queues][queue].is_a?(String) | ||
opts[:queues][queue] = { url: opts[:queues][queue] } | ||
end | ||
end | ||
opts.deep_symbolize_keys | ||
end | ||
|
||
# @return [String] Configuration path found in environment or YAML file. | ||
def config_file_path(options) | ||
options[:config_file] || ENV.fetch('AWS_SQS_ACTIVE_JOB_CONFIG_FILE', nil) | ||
options[:config_file] || ENV.fetch('AWS_ACTIVE_JOB_SQS_CONFIG_FILE', nil) | ||
end | ||
|
||
def load_yaml(file_path) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - single line or multiline all