From 5bf51a127ece9fe5b2907ba856df885a9ae9d906 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Fri, 20 Oct 2023 16:46:01 +0200 Subject: [PATCH] v2.6.8 (#397) --- CHANGELOG.md | 6 +- Gemfile.lock | 2 +- lib/waterdrop/clients/buffered.rb | 62 ++++++++++- lib/waterdrop/clients/dummy.rb | 20 ++++ lib/waterdrop/version.rb | 2 +- spec/lib/waterdrop/clients/buffered_spec.rb | 116 ++++++++++++++++++++ spec/lib/waterdrop/clients/dummy_spec.rb | 45 ++++++++ 7 files changed, 243 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ca5b3b8..e028d44d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # WaterDrop changelog -## 2.7.0 (Unreleased) +## 2.6.8 (2023-10-20) - **[Feature]** Introduce transactions support. - [Improvement] Expand `LoggerListener` to inform about transactions (info level). - [Improvement] Allow waterdrop to use topic as a symbol or a string. @@ -9,10 +9,6 @@ - [Improvement] Provide `#purge` that will purge any outgoing data and data from the internal queues (both WaterDrop and librdkafka). - [Fix] Fix the `librdkafka.dispatch_error` error dispatch for errors with negative code. -### Upgrade Notes - -There are no breaking changes in this release. However, if you upgrade WaterDrop in Karafka **and** choose to use transactions, Karafka Web UI may not support it. Web UI will support transactional producers starting from `0.7.7`. - ## 2.6.7 (2023-09-01) - [Improvement] early flush data from `librdkafka` internal buffer before closing. - [Maintenance] Update the signing cert as the old one expired. diff --git a/Gemfile.lock b/Gemfile.lock index cddd93c7..c76f307a 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - waterdrop (2.7.0) + waterdrop (2.6.8) karafka-core (>= 2.2.3, < 3.0.0) zeitwerk (~> 2.3) diff --git a/lib/waterdrop/clients/buffered.rb b/lib/waterdrop/clients/buffered.rb index 9800ef8b..74faa95f 100644 --- a/lib/waterdrop/clients/buffered.rb +++ b/lib/waterdrop/clients/buffered.rb @@ -19,17 +19,73 @@ def initialize(*args) super @messages = [] @topics = Hash.new { |k, v| k[v] = [] } + + @transaction_mutex = Mutex.new + @transaction_active = false + @transaction_messages = [] + @transaction_topics = Hash.new { |k, v| k[v] = [] } + @transaction_level = 0 end # "Produces" message to Kafka: it acknowledges it locally, adds it to the internal buffer # @param message [Hash] `WaterDrop::Producer#produce_sync` message hash def produce(message) - # We pre-validate the message payload, so topic is ensured to be present - @topics[message.fetch(:topic)] << message - @messages << message + if @transaction_active + @transaction_topics[message.fetch(:topic)] << message + @transaction_messages << message + else + # We pre-validate the message payload, so topic is ensured to be present + @topics[message.fetch(:topic)] << message + @messages << message + end + SyncResponse.new end + # Yields the code pretending it is in a transaction + # Supports our aborting transaction flow + # Moves messages the appropriate buffers only if transaction is successful + def transaction + @transaction_level += 1 + + return yield if @transaction_mutex.owned? + + @transaction_mutex.lock + @transaction_active = true + + result = nil + commit = false + + catch(:abort) do + result = yield + commit = true + end + + commit || raise(WaterDrop::Errors::AbortTransaction) + + # Transfer transactional data on success + @transaction_topics.each do |topic, messages| + @topics[topic] += messages + end + + @messages += @transaction_messages + + result + rescue StandardError => e + return if e.is_a?(WaterDrop::Errors::AbortTransaction) + + raise + ensure + @transaction_level -= 1 + + if @transaction_level.zero? && @transaction_mutex.owned? + @transaction_topics.clear + @transaction_messages.clear + @transaction_active = false + @transaction_mutex.unlock + end + end + # Returns messages produced to a given topic # @param topic [String] def messages_for(topic) diff --git a/lib/waterdrop/clients/dummy.rb b/lib/waterdrop/clients/dummy.rb index ccee6564..984f456b 100644 --- a/lib/waterdrop/clients/dummy.rb +++ b/lib/waterdrop/clients/dummy.rb @@ -25,6 +25,26 @@ def respond_to_missing?(*_args) true end + # Yields the code pretending it is in a transaction + # Supports our aborting transaction flow + def transaction + result = nil + commit = false + + catch(:abort) do + result = yield + commit = true + end + + commit || raise(WaterDrop::Errors::AbortTransaction) + + result + rescue StandardError => e + return if e.is_a?(WaterDrop::Errors::AbortTransaction) + + raise + end + # @param _args [Object] anything really, this dummy is suppose to support anything # @return [self] returns self for chaining cases def method_missing(*_args) diff --git a/lib/waterdrop/version.rb b/lib/waterdrop/version.rb index 7803802a..23b9e449 100644 --- a/lib/waterdrop/version.rb +++ b/lib/waterdrop/version.rb @@ -3,5 +3,5 @@ # WaterDrop library module WaterDrop # Current WaterDrop version - VERSION = '2.7.0' + VERSION = '2.6.8' end diff --git a/spec/lib/waterdrop/clients/buffered_spec.rb b/spec/lib/waterdrop/clients/buffered_spec.rb index 8ab39cdc..2416f099 100644 --- a/spec/lib/waterdrop/clients/buffered_spec.rb +++ b/spec/lib/waterdrop/clients/buffered_spec.rb @@ -64,4 +64,120 @@ it { expect(client.messages).to be_empty } it { expect(client.messages_for('foo')).to be_empty } end + + describe '#transaction' do + context 'when no error and no abort' do + it 'expect to return the block value' do + expect(client.transaction { 1 }).to eq(1) + end + end + + context 'when running transaction with production of messages' do + it 'expect to add them to the buffers' do + client.transaction do + client.produce(topic: 'test', payload: 'test') + client.produce(topic: 'test', payload: 'test') + end + + expect(client.messages.size).to eq(5) + expect(client.messages_for('test').size).to eq(2) + end + end + + context 'when running nested transaction with production of messages' do + it 'expect to add them to the buffers' do + client.transaction do + client.produce(topic: 'test', payload: 'test') + client.produce(topic: 'test', payload: 'test') + + client.transaction do + client.produce(topic: 'test', payload: 'test') + client.produce(topic: 'test', payload: 'test') + end + end + + expect(client.messages.size).to eq(7) + expect(client.messages_for('test').size).to eq(4) + end + end + + context 'when running nested transaction with production of messages on abort' do + it 'expect to add them to the buffers' do + client.transaction do + client.produce(topic: 'test', payload: 'test') + client.produce(topic: 'test', payload: 'test') + + client.transaction do + client.produce(topic: 'test', payload: 'test') + client.produce(topic: 'test', payload: 'test') + + throw(:abort) + end + end + + expect(client.messages.size).to eq(3) + expect(client.messages_for('test').size).to eq(0) + end + end + + context 'when abort occurs' do + it 'expect not to raise error' do + expect do + client.transaction { throw(:abort) } + end.not_to raise_error + end + + it 'expect not to contain messages from the aborted transaction' do + client.transaction do + client.produce(topic: 'test', payload: 'test') + throw(:abort) + end + + expect(client.messages.size).to eq(3) + expect(client.messages_for('test')).to be_empty + end + end + + context 'when WaterDrop::Errors::AbortTransaction error occurs' do + it 'expect not to raise error' do + expect do + client.transaction { raise(WaterDrop::Errors::AbortTransaction) } + end.not_to raise_error + end + end + + context 'when different error occurs' do + it 'expect to raise error' do + expect do + client.transaction { raise(StandardError) } + end.to raise_error(StandardError) + end + + it 'expect not to contain messages from the aborted transaction' do + expect do + client.transaction do + client.produce(topic: 'test', payload: 'test') + + raise StandardError + end + end.to raise_error(StandardError) + + expect(client.messages.size).to eq(3) + expect(client.messages_for('test')).to be_empty + end + end + + context 'when running a nested transaction' do + it 'expect to work ok' do + result = client.transaction do + client.transaction do + client.produce(topic: '1', payload: '2') + 2 + end + end + + expect(result).to eq(2) + end + end + end end diff --git a/spec/lib/waterdrop/clients/dummy_spec.rb b/spec/lib/waterdrop/clients/dummy_spec.rb index 89562883..f79d7b66 100644 --- a/spec/lib/waterdrop/clients/dummy_spec.rb +++ b/spec/lib/waterdrop/clients/dummy_spec.rb @@ -24,4 +24,49 @@ describe '#respond_to?' do it { expect(client.respond_to?(:test)).to eq(true) } end + + describe '#transaction' do + context 'when no error and no abort' do + it 'expect to return the block value' do + expect(client.transaction { 1 }).to eq(1) + end + end + + context 'when abort occurs' do + it 'expect not to raise error' do + expect do + client.transaction { throw(:abort) } + end.not_to raise_error + end + end + + context 'when WaterDrop::Errors::AbortTransaction error occurs' do + it 'expect not to raise error' do + expect do + client.transaction { raise(WaterDrop::Errors::AbortTransaction) } + end.not_to raise_error + end + end + + context 'when different error occurs' do + it 'expect to raise error' do + expect do + client.transaction { raise(StandardError) } + end.to raise_error(StandardError) + end + end + + context 'when running a nested transaction' do + it 'expect to work ok' do + result = client.transaction do + client.transaction do + client.produce(topic: '1', payload: '2') + 2 + end + end + + expect(result).to eq(2) + end + end + end end