Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Idea] pause/resume/stop Stream and remove stopConsumption #941

Open
guizmaii opened this issue Jun 21, 2023 · 5 comments
Open

[Idea] pause/resume/stop Stream and remove stopConsumption #941

guizmaii opened this issue Jun 21, 2023 · 5 comments

Comments

@guizmaii
Copy link
Member

guizmaii commented Jun 21, 2023

We had some discussion yesterday on Discord about removing stopConsumption
See:

Found a new idea this morning while taking my shower 😄
I think we could "generalize" the "interruptor" idea.
We could return a "control" interface that'd have 3 methods:

  • pause: would pause the stream
  • resume: would resume the paused stream
  • stop: would stop the stream and the subscription
// pseudo-code just to give show the idea
(
  val (control, stream0) = 
    Consumer
      .plainStream(Sub.topic("topic-a"), ...)
      .mapZIO(processRecord0(_))
      .runDrain

  control.pause() // pauses stream0
  control.resume() // resume stream0
  control.stop() // stop/finalize stream0
).provide(ZLayer.scoped(Consumer.make(settings)))

The "Control" interface would be something like:

trait Control {
  def pause: IO[AlreadyStopped, Boolean] // Boolean as we probably want to give a feedback to the user
  def resume: IO[AlreadyStopped, Boolean] 
  def stop: UIO[AlreadyStopped, Boolean]
}
@erikvanoosten
Copy link
Collaborator

I have never used it, but I see that github offers Discussions. Perhaps we can move this there?

@erikvanoosten
Copy link
Collaborator

I guess it follows logically from our discussions because I had the almost the exact same idea. At least you put it in writing!
Important is that when the stream is paused, that commits are still processed. We need similar capacities (per partition instead of per subscription) to control rebalances.

@guizmaii
Copy link
Member Author

guizmaii commented Jun 21, 2023

when the stream is paused, that commits are still processed

Yes, that's the idea.
Also, the stop would probably need to finish committing everything scheduled (in the commit queue) before actually stopping, no?

We need similar capacities (per partition instead of per subscription) to control rebalances.

Not sure to see what you mean. Can you give an example maybe, please?

@erikvanoosten
Copy link
Collaborator

We need similar capacities (per partition instead of per subscription) to control rebalances.

Not sure to see what you mean. Can you give an example maybe, please?

This is the reason I started contributing to zio-kafka; during a rebalance we need to stop polling for a stream but let its commits complete (and wait for commit completion). That sounds pretty similar, right?
Right now, during a rebalance we stop polling, but we don't wait for the commits to complete.

@guizmaii
Copy link
Member Author

That sounds pretty similar, right?

It does indeed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants