Skip to content

Commit

Permalink
v2.6.8 (#397)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Oct 20, 2023
1 parent 12fa831 commit 5bf51a1
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 10 deletions.
6 changes: 1 addition & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# WaterDrop changelog

## 2.7.0 (Unreleased)
## 2.6.8 (2023-10-20)
- **[Feature]** Introduce transactions support.
- [Improvement] Expand `LoggerListener` to inform about transactions (info level).
- [Improvement] Allow waterdrop to use topic as a symbol or a string.
Expand All @@ -9,10 +9,6 @@
- [Improvement] Provide `#purge` that will purge any outgoing data and data from the internal queues (both WaterDrop and librdkafka).
- [Fix] Fix the `librdkafka.dispatch_error` error dispatch for errors with negative code.

### Upgrade Notes

There are no breaking changes in this release. However, if you upgrade WaterDrop in Karafka **and** choose to use transactions, Karafka Web UI may not support it. Web UI will support transactional producers starting from `0.7.7`.

## 2.6.7 (2023-09-01)
- [Improvement] early flush data from `librdkafka` internal buffer before closing.
- [Maintenance] Update the signing cert as the old one expired.
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
waterdrop (2.7.0)
waterdrop (2.6.8)
karafka-core (>= 2.2.3, < 3.0.0)
zeitwerk (~> 2.3)

Expand Down
62 changes: 59 additions & 3 deletions lib/waterdrop/clients/buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,73 @@ def initialize(*args)
super
@messages = []
@topics = Hash.new { |k, v| k[v] = [] }

@transaction_mutex = Mutex.new
@transaction_active = false
@transaction_messages = []
@transaction_topics = Hash.new { |k, v| k[v] = [] }
@transaction_level = 0
end

# "Produces" message to Kafka: it acknowledges it locally, adds it to the internal buffer
# @param message [Hash] `WaterDrop::Producer#produce_sync` message hash
def produce(message)
# We pre-validate the message payload, so topic is ensured to be present
@topics[message.fetch(:topic)] << message
@messages << message
if @transaction_active
@transaction_topics[message.fetch(:topic)] << message
@transaction_messages << message
else
# We pre-validate the message payload, so topic is ensured to be present
@topics[message.fetch(:topic)] << message
@messages << message
end

SyncResponse.new
end

# Yields the code pretending it is in a transaction
# Supports our aborting transaction flow
# Moves messages the appropriate buffers only if transaction is successful
def transaction
@transaction_level += 1

return yield if @transaction_mutex.owned?

@transaction_mutex.lock
@transaction_active = true

result = nil
commit = false

catch(:abort) do
result = yield
commit = true
end

commit || raise(WaterDrop::Errors::AbortTransaction)

# Transfer transactional data on success
@transaction_topics.each do |topic, messages|
@topics[topic] += messages
end

@messages += @transaction_messages

result
rescue StandardError => e
return if e.is_a?(WaterDrop::Errors::AbortTransaction)

raise
ensure
@transaction_level -= 1

if @transaction_level.zero? && @transaction_mutex.owned?
@transaction_topics.clear
@transaction_messages.clear
@transaction_active = false
@transaction_mutex.unlock
end
end

# Returns messages produced to a given topic
# @param topic [String]
def messages_for(topic)
Expand Down
20 changes: 20 additions & 0 deletions lib/waterdrop/clients/dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ def respond_to_missing?(*_args)
true
end

# Yields the code pretending it is in a transaction
# Supports our aborting transaction flow
def transaction
result = nil
commit = false

catch(:abort) do
result = yield
commit = true
end

commit || raise(WaterDrop::Errors::AbortTransaction)

result
rescue StandardError => e
return if e.is_a?(WaterDrop::Errors::AbortTransaction)

raise
end

# @param _args [Object] anything really, this dummy is suppose to support anything
# @return [self] returns self for chaining cases
def method_missing(*_args)
Expand Down
2 changes: 1 addition & 1 deletion lib/waterdrop/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# WaterDrop library
module WaterDrop
# Current WaterDrop version
VERSION = '2.7.0'
VERSION = '2.6.8'
end
116 changes: 116 additions & 0 deletions spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,120 @@
it { expect(client.messages).to be_empty }
it { expect(client.messages_for('foo')).to be_empty }
end

describe '#transaction' do
context 'when no error and no abort' do
it 'expect to return the block value' do
expect(client.transaction { 1 }).to eq(1)
end
end

context 'when running transaction with production of messages' do
it 'expect to add them to the buffers' do
client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')
end

expect(client.messages.size).to eq(5)
expect(client.messages_for('test').size).to eq(2)
end
end

context 'when running nested transaction with production of messages' do
it 'expect to add them to the buffers' do
client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')

client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')
end
end

expect(client.messages.size).to eq(7)
expect(client.messages_for('test').size).to eq(4)
end
end

context 'when running nested transaction with production of messages on abort' do
it 'expect to add them to the buffers' do
client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')

client.transaction do
client.produce(topic: 'test', payload: 'test')
client.produce(topic: 'test', payload: 'test')

throw(:abort)
end
end

expect(client.messages.size).to eq(3)
expect(client.messages_for('test').size).to eq(0)
end
end

context 'when abort occurs' do
it 'expect not to raise error' do
expect do
client.transaction { throw(:abort) }
end.not_to raise_error
end

it 'expect not to contain messages from the aborted transaction' do
client.transaction do
client.produce(topic: 'test', payload: 'test')
throw(:abort)
end

expect(client.messages.size).to eq(3)
expect(client.messages_for('test')).to be_empty
end
end

context 'when WaterDrop::Errors::AbortTransaction error occurs' do
it 'expect not to raise error' do
expect do
client.transaction { raise(WaterDrop::Errors::AbortTransaction) }
end.not_to raise_error
end
end

context 'when different error occurs' do
it 'expect to raise error' do
expect do
client.transaction { raise(StandardError) }
end.to raise_error(StandardError)
end

it 'expect not to contain messages from the aborted transaction' do
expect do
client.transaction do
client.produce(topic: 'test', payload: 'test')

raise StandardError
end
end.to raise_error(StandardError)

expect(client.messages.size).to eq(3)
expect(client.messages_for('test')).to be_empty
end
end

context 'when running a nested transaction' do
it 'expect to work ok' do
result = client.transaction do
client.transaction do
client.produce(topic: '1', payload: '2')
2
end
end

expect(result).to eq(2)
end
end
end
end
45 changes: 45 additions & 0 deletions spec/lib/waterdrop/clients/dummy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,49 @@
describe '#respond_to?' do
it { expect(client.respond_to?(:test)).to eq(true) }
end

describe '#transaction' do
context 'when no error and no abort' do
it 'expect to return the block value' do
expect(client.transaction { 1 }).to eq(1)
end
end

context 'when abort occurs' do
it 'expect not to raise error' do
expect do
client.transaction { throw(:abort) }
end.not_to raise_error
end
end

context 'when WaterDrop::Errors::AbortTransaction error occurs' do
it 'expect not to raise error' do
expect do
client.transaction { raise(WaterDrop::Errors::AbortTransaction) }
end.not_to raise_error
end
end

context 'when different error occurs' do
it 'expect to raise error' do
expect do
client.transaction { raise(StandardError) }
end.to raise_error(StandardError)
end
end

context 'when running a nested transaction' do
it 'expect to work ok' do
result = client.transaction do
client.transaction do
client.produce(topic: '1', payload: '2')
2
end
end

expect(result).to eq(2)
end
end
end
end

0 comments on commit 5bf51a1

Please sign in to comment.