Skip to content

Commit

Permalink
Support custom oauth bearer (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Mar 26, 2024
1 parent 5069347 commit 74cb6bd
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

This release contains **BREAKING** changes. Make sure to read and apply upgrade notes.

- **[Feature]** Support custom OAuth providers.
- **[Breaking]** Drop Ruby `2.7` support.
- **[Breaking]** Change default timeouts so final delivery `message.timeout.ms` is less that `max_wait_time` so we do not end up with not final verdict.
- **[Breaking]** Update all the time related configuration settings to be in `ms` and not mixed.
Expand Down
10 changes: 5 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
PATH
remote: .
specs:
waterdrop (2.7.0.alpha3)
karafka-core (>= 2.4.0.alpha1, < 3.0.0)
waterdrop (2.7.0.beta1)
karafka-core (>= 2.4.0.beta2, < 3.0.0)
zeitwerk (~> 2.3)

GEM
Expand Down Expand Up @@ -31,9 +31,9 @@ GEM
ffi (1.16.3)
i18n (1.14.4)
concurrent-ruby (~> 1.0)
karafka-core (2.4.0.alpha1)
karafka-rdkafka (>= 0.15.0.alpha1, < 0.16.0)
karafka-rdkafka (0.15.0.alpha1)
karafka-core (2.4.0.beta2)
karafka-rdkafka (>= 0.15.0.beta3, < 0.16.0)
karafka-rdkafka (0.15.0.beta3)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
Expand Down
10 changes: 8 additions & 2 deletions config/locales/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@ en:
missing: must be present
logger_format: must be present
deliver_format: must be boolean
instrument_on_wait_queue_full_format: must be boolean
id_format: must be a non-empty string
monitor_format: must be present
client_class_format: must be present
max_payload_size_format: must be an integer that is equal or bigger than 1
max_wait_timeout_format: must be an integer that is equal or bigger than 0
kafka_format: must be a hash with symbol based keys
kafka_key_must_be_a_symbol: All keys under the kafka settings scope need to be symbols
wait_on_queue_full_format: must be boolean
wait_backoff_on_queue_full_format: must be a numeric that is bigger or equal to 0
wait_timeout_on_queue_full_format: must be a numeric that is bigger or equal to 0
wait_backoff_on_queue_full_format: must be a numeric that is equal or bigger to 0
wait_timeout_on_queue_full_format: must be a numeric that is equal or bigger to 0
wait_backoff_on_transaction_command_format: must be a numeric that is equal or bigger to 0
max_attempts_on_transaction_command_format: must be an integer that is equal or bigger than 1
oauth.token_provider_listener_format: 'must be false or respond to #on_oauthbearer_token_refresh'

message:
missing: must be present
Expand Down
38 changes: 34 additions & 4 deletions lib/waterdrop/clients/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,50 @@ class << self
# @param producer [WaterDrop::Producer] producer instance with its config, etc
# @note We overwrite this that way, because we do not care
def new(producer)
config = producer.config.kafka.to_h
kafka_config = producer.config.kafka.to_h
monitor = producer.config.monitor

client = ::Rdkafka::Config.new(config).producer
client = ::Rdkafka::Config.new(kafka_config).producer(native_kafka_auto_start: false)

# Register statistics runner for this particular type of callbacks
::Karafka::Core::Instrumentation.statistics_callbacks.add(
producer.id,
Instrumentation::Callbacks::Statistics.new(producer.id, client.name, monitor)
)

# Register error tracking callback
::Karafka::Core::Instrumentation.error_callbacks.add(
producer.id,
Instrumentation::Callbacks::Error.new(producer.id, client.name, monitor)
)

# Register oauth bearer refresh for this particular type of callbacks
::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.add(
producer.id,
Instrumentation::Callbacks::OauthbearerTokenRefresh.new(client, monitor)
)

# This callback is not global and is per client, thus we do not have to wrap it with a
# callbacks manager to make it work
client.delivery_callback = Instrumentation::Callbacks::Delivery.new(
producer.id,
producer.transactional?,
producer.config.monitor
monitor
)

oauth_listener = producer.config.oauth.token_provider_listener
# We need to subscribe the oauth listener here because we want it to be ready before
# any producer callbacks run. In theory because WaterDrop rdkafka producer is lazy loaded
# we would have enough time to make user subscribe it himself, but then it would not
# coop with auto-configuration coming from Karafka. The way it is done below, if it is
# configured it will be subscribed and if not, user always can subscribe it himself as
# long as it is done prior to first usage
monitor.subscribe(oauth_listener) if oauth_listener

client.start

# Switch to the transactional mode if user provided the transactional id
client.init_transactions if config.key?(:'transactional.id')
client.init_transactions if kafka_config.key?(:'transactional.id')

client
end
Expand Down
8 changes: 8 additions & 0 deletions lib/waterdrop/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ class Config
constructor: ->(middleware) { middleware || WaterDrop::Middleware.new }
)

# Namespace for oauth related configuration
setting :oauth do
# option [false, #call] Listener for using oauth bearer. This listener will be able to
# get the client name to decide whether to use a single multi-client token refreshing
# or have separate tokens per instance.
setting :token_provider_listener, default: false
end

# Configuration method
# @yield Runs a block of code providing a config singleton instance to it
# @yieldparam [WaterDrop::Config] WaterDrop config instance
Expand Down
11 changes: 11 additions & 0 deletions lib/waterdrop/contracts/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,24 @@ class Config < ::Karafka::Core::Contractable::Contract

required(:id) { |val| val.is_a?(String) && !val.empty? }
required(:logger) { |val| !val.nil? }
required(:monitor) { |val| !val.nil? }
required(:deliver) { |val| [true, false].include?(val) }
required(:max_payload_size) { |val| val.is_a?(Integer) && val >= 1 }
required(:max_wait_timeout) { |val| val.is_a?(Numeric) && val >= 0 }
required(:client_class) { |val| !val.nil? }
required(:kafka) { |val| val.is_a?(Hash) && !val.empty? }
required(:wait_on_queue_full) { |val| [true, false].include?(val) }
required(:instrument_on_wait_queue_full) { |val| [true, false].include?(val) }
required(:wait_backoff_on_queue_full) { |val| val.is_a?(Numeric) && val >= 0 }
required(:wait_timeout_on_queue_full) { |val| val.is_a?(Numeric) && val >= 0 }
required(:wait_backoff_on_transaction_command) { |val| val.is_a?(Numeric) && val >= 0 }
required(:max_attempts_on_transaction_command) { |val| val.is_a?(Integer) && val >= 1 }

nested(:oauth) do
required(:token_provider_listener) do |val|
val == false || val.respond_to?(:on_oauthbearer_token_refresh)
end
end

# rdkafka allows both symbols and strings as keys for config but then casts them to strings
# This can be confusing, so we expect all keys to be symbolized
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

module WaterDrop
module Instrumentation
module Callbacks
# Callback that is triggered when oauth token needs to be refreshed.
class OauthbearerTokenRefresh
# @param bearer [Rdkafka::Producer] given rdkafka instance. It is needed as
# we need to have a reference to call `#oauthbearer_set_token` or
# `#oauthbearer_set_token_failure` upon the event.
# @param monitor [WaterDrop::Instrumentation::Monitor] monitor we are using
def initialize(bearer, monitor)
@bearer = bearer
@monitor = monitor
end

# Upon receiving of this event, user is required to invoke either `#oauthbearer_set_token`
# or `#oauthbearer_set_token_failure` on the `event[:bearer]` depending whether token
# obtaining was successful or not.
#
# Please refer to WaterDrop and Karafka documentation or `Rdkafka::Helpers::OAuth`
# documentation directly for exact parameters of those methods.
#
# @param _rd_config [Rdkafka::Config]
# @param bearer_name [String] name of the bearer for which we refresh
def call(_rd_config, bearer_name)
return unless @bearer.name == bearer_name

@monitor.instrument(
'oauthbearer.token_refresh',
bearer: @bearer,
caller: self
)
end
end
end
end
end
2 changes: 2 additions & 0 deletions lib/waterdrop/instrumentation/notifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
messages.produced_sync
messages.buffered

oauthbearer.token_refresh

transaction.started
transaction.committed
transaction.aborted
Expand Down
13 changes: 1 addition & 12 deletions lib/waterdrop/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,6 @@ def client
@pid = Process.pid
@client = Builder.new.call(self, @config)

# Register statistics runner for this particular type of callbacks
::Karafka::Core::Instrumentation.statistics_callbacks.add(
@id,
Instrumentation::Callbacks::Statistics.new(@id, @client.name, @config.monitor)
)

# Register error tracking callback
::Karafka::Core::Instrumentation.error_callbacks.add(
@id,
Instrumentation::Callbacks::Error.new(@id, @client.name, @config.monitor)
)

@status.connected!
@monitor.instrument('producer.connected', producer_id: id)
end
Expand Down Expand Up @@ -209,6 +197,7 @@ def close(force: false)
# Remove callbacks runners that were registered
::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id)
::Karafka::Core::Instrumentation.error_callbacks.delete(@id)
::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id)

@status.closed!
end
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.7.0.alpha3'
VERSION = '2.7.0.beta1'
end
116 changes: 116 additions & 0 deletions spec/lib/waterdrop/contracts/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@
{
id: SecureRandom.uuid,
logger: Logger.new('/dev/null'),
monitor: WaterDrop::Instrumentation::Monitor.new,
deliver: false,
client_class: WaterDrop::Clients::Rdkafka,
max_payload_size: 1024 * 1024,
max_wait_timeout: 1,
wait_on_queue_full: true,
wait_backoff_on_queue_full: 1,
wait_timeout_on_queue_full: 10,
wait_backoff_on_transaction_command: 15,
max_attempts_on_transaction_command_format: 5,
instrument_on_wait_queue_full: true,
max_attempts_on_transaction_command: 1,
oauth: {
token_provider_listener: false
},
kafka: {
'bootstrap.servers': 'localhost:9092,localhots:9092'
}
Expand Down Expand Up @@ -45,6 +54,34 @@
it { expect(contract_errors[:id]).not_to be_empty }
end

context 'when monitor is missing' do
before { config.delete(:monitor) }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:monitor]).not_to be_empty }
end

context 'when monitor is nil' do
before { config[:monitor] = nil }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:monitor]).not_to be_empty }
end

context 'when client_class is missing' do
before { config.delete(:client_class) }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:client_class]).not_to be_empty }
end

context 'when client_class is nil' do
before { config[:client_class] = nil }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:client_class]).not_to be_empty }
end

context 'when logger is missing' do
before { config.delete(:logger) }

Expand Down Expand Up @@ -141,6 +178,45 @@
it { expect(contract_result).not_to be_success }
end

context 'when max_attempts_on_transaction_command is nil' do
before { config[:max_attempts_on_transaction_command] = nil }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:max_attempts_on_transaction_command]).not_to be_empty }
end

context 'when max_attempts_on_transaction_command is a negative int' do
before { config[:max_attempts_on_transaction_command] = -1 }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:max_attempts_on_transaction_command]).not_to be_empty }
end

context 'when max_attempts_on_transaction_command is a negative float' do
before { config[:max_attempts_on_transaction_command] = -0.1 }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:max_attempts_on_transaction_command]).not_to be_empty }
end

context 'when max_attempts_on_transaction_command is 0' do
before { config[:max_attempts_on_transaction_command] = 0 }

it { expect(contract_result).not_to be_success }
end

context 'when max_attempts_on_transaction_command is positive int' do
before { config[:max_attempts_on_transaction_command] = 1 }

it { expect(contract_result).to be_success }
end

context 'when max_attempts_on_transaction_command is positive float' do
before { config[:max_attempts_on_transaction_command] = 1.1 }

it { expect(contract_result).not_to be_success }
end

context 'when max_wait_timeout is missing' do
before { config.delete(:max_wait_timeout) }

Expand Down Expand Up @@ -194,6 +270,13 @@
it { expect(contract_errors[:wait_on_queue_full]).not_to be_empty }
end

context 'when instrument_on_wait_queue_full is not a boolean' do
before { config[:instrument_on_wait_queue_full] = 0 }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:instrument_on_wait_queue_full]).not_to be_empty }
end

context 'when wait_backoff_on_queue_full is not a numeric' do
before { config[:wait_backoff_on_queue_full] = 'na' }

Expand All @@ -208,6 +291,20 @@
it { expect(contract_errors[:wait_backoff_on_queue_full]).not_to be_empty }
end

context 'when wait_backoff_on_transaction_command is not a numeric' do
before { config[:wait_backoff_on_transaction_command] = 'na' }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:wait_backoff_on_transaction_command]).not_to be_empty }
end

context 'when wait_backoff_on_transaction_command is less than 0' do
before { config[:wait_backoff_on_transaction_command] = -1 }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:wait_backoff_on_transaction_command]).not_to be_empty }
end

context 'when wait_timeout_on_queue_full is not a numeric' do
before { config[:wait_timeout_on_queue_full] = 'na' }

Expand All @@ -221,4 +318,23 @@
it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:wait_timeout_on_queue_full]).not_to be_empty }
end

context 'when oauth token_provider_listener does not respond to on_oauthbearer_token_refresh' do
before { config[:oauth][:token_provider_listener] = true }

it { expect(contract_result).not_to be_success }
it { expect(contract_errors[:'oauth.token_provider_listener']).not_to be_empty }
end

context 'when oauth token_provider_listener responds to on_oauthbearer_token_refresh' do
let(:listener) do
Class.new do
def on_oauthbearer_token_refresh(_); end
end
end

before { config[:oauth][:token_provider_listener] = listener.new }

it { expect(contract_result).to be_success }
end
end
Loading

0 comments on commit 74cb6bd

Please sign in to comment.