Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support - Consumer.NegativeAcknowledge #45

Open
dionjansen opened this issue Jun 24, 2020 · 6 comments · May be fixed by #83
Open

Support - Consumer.NegativeAcknowledge #45

dionjansen opened this issue Jun 24, 2020 · 6 comments · May be fixed by #83
Labels
enhancement New feature or request

Comments

@dionjansen
Copy link
Contributor

NegativeAcknowledge provides a nice way to shape error handling when messages cannot be processed and you want to do so at a later time with a given timeout:

Prefer negative acknowledgements over acknowledgement timeout. Negative acknowledgement controls the re-delivery of individual messages with more precision, and avoids invalid redeliveries when the message processing time exceeds the acknowledgement timeout.

@dionjansen
Copy link
Contributor Author

From what I can tell both the Java and C++ clients use a client sided tracker to keep track of negatively acknowledged messages add by the Consumer. negativeAcknowledge in a NegativeAcksTracker. The tracker is responsible for calling the RedeliverUnacknowledgedMessages consumer command, with those message ids that exceed their respective timeout.

I see there is some interaction with another mechanism required to track unacked messages but this only exists when the client actually sets an AckTimeout (request in #46)

@blankensteiner
Copy link
Contributor

This sounds related to the "reconsumeLater" functionality. Not sure how they differ.
If you are up for creating a PR, then I will be happy to review it.

@blankensteiner blankensteiner added the enhancement New feature or request label Dec 12, 2020
dionjansen added a commit to dionjansen/pulsar-dotpulsar that referenced this issue Jul 23, 2021
* feat: apache#45 `Consumer` now supports `NegativeAcknowledge`
* add `AcknowledgementTimeout` to `ConsumerOptions`
* add `NegativeAcknowledgementRedeliveryDelay` to `ConsumerOptions`
* add `NegativeackedMessageState` to manage nacked messages
* add `UnackedMessageState` to manage unacked messages
* add `MessageTracker` to periodically check unacked and nacked messages, on a fixed polling timeout of 10ms
* add `AwaitingAck` to track both unacked and nacked messages
* add `InactiveMessageTracker` to reduce overhead when no `AcknowledgementTimeout` or `NegativeAcknowledgementRedeliveryDelay` is configured
* add `InactiveNegativeackedMessageState` to reduce overhead when no `NegativeAcknowledgementRedeliveryDelay` is configured
* add `InactiveUnackedMessageState` to reduce overhead when no `AcknowledgementTimeout` is configured
* update `ConsumerBuilder` to allow setting `AcknowledgementTimeout`
* update `ConsumerBuilder` to allow setting `NegativeAcknowledgementRedeliveryDelay`
* refactor `ConsumerChannel` to support `NegativeAcknowledge`
* refactor `AsyncQueue<T>` to implement missing interface `IAsyncQueue<T>`
* refactor `BatchHandler<TMessage>` to implement missing interface `IBatchHandler<TMessage>`
* add `AutoFixture` and `AutoFixture.AutoNSubstitute` dependencies to unit test project
* add missing `ConsumerBuilderTests` unit tests
* add missing `ConsumerChannelFactoryTests` unit tests
* add missing `ConsumerChannelTests` unit tests
* add missing `ConsumerTests` unit tests
* add IntegrationTests for consumer ack timout and nack delays
* skipped integration test `SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition` to avoid CI failures
* skipped integration test `RoundRobinPartition_WhenSendMessages_ThenGetMessagesFromPartitionsInOrder` to avoid CI failures

Closes: apache#46
Closes: apache#45
@dionjansen dionjansen linked a pull request Jul 23, 2021 that will close this issue
dionjansen added a commit to dionjansen/pulsar-dotpulsar that referenced this issue Jul 25, 2021
* feat: apache#45 `Consumer` now supports `NegativeAcknowledge`
* add `AcknowledgementTimeout` to `ConsumerOptions`
* add `NegativeAcknowledgementRedeliveryDelay` to `ConsumerOptions`
* add `NegativeackedMessageState` to manage nacked messages
* add `UnackedMessageState` to manage unacked messages
* add `MessageTracker` to periodically check unacked and nacked messages, on a fixed polling timeout of 10ms
* add `AwaitingAck` to track both unacked and nacked messages
* add `InactiveMessageTracker` to reduce overhead when no `AcknowledgementTimeout` or `NegativeAcknowledgementRedeliveryDelay` is configured
* add `InactiveNegativeackedMessageState` to reduce overhead when no `NegativeAcknowledgementRedeliveryDelay` is configured
* add `InactiveUnackedMessageState` to reduce overhead when no `AcknowledgementTimeout` is configured
* update `ConsumerBuilder` to allow setting `AcknowledgementTimeout`
* update `ConsumerBuilder` to allow setting `NegativeAcknowledgementRedeliveryDelay`
* refactor `ConsumerChannel` to support `NegativeAcknowledge`
* refactor `AsyncQueue<T>` to implement missing interface `IAsyncQueue<T>`
* refactor `BatchHandler<TMessage>` to implement missing interface `IBatchHandler<TMessage>`
* add `AutoFixture` and `AutoFixture.AutoNSubstitute` dependencies to unit test project
* add missing `ConsumerBuilderTests` unit tests
* add missing `ConsumerChannelFactoryTests` unit tests
* add missing `ConsumerChannelTests` unit tests
* add missing `ConsumerTests` unit tests
* add IntegrationTests for consumer ack timout and nack delays
* skipped integration test `SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition` to avoid CI failures
* skipped integration test `RoundRobinPartition_WhenSendMessages_ThenGetMessagesFromPartitionsInOrder` to avoid CI failures

Closes: apache#46
Closes: apache#45
@alexcherka
Copy link

@blankensteiner Is this something we are considering in the future? Would love to utilize this functionality, it's already in F#'s version, but just wondering if we can expect this in the near future, it's just nice for automated retry or reconsuming upon an exception occurring or other.

@blankensteiner
Copy link
Contributor

Hi @alexcherka
@dionjansen has created a PR for it and @RobertIndie is reviewing it. Once they are finished I can review and merge it :-)

@alexcherka
Copy link

Hey @blankensteiner Just checking if we have an update for this? Been more than a year lol...

@blankensteiner
Copy link
Contributor

Hi @alexcherka
Doesn't seem like the PR is ready for review. I can't merge and release what I can't review :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants