Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer.produceAsync performance #531

Open
lukestephenson opened this issue Nov 7, 2022 · 25 comments
Open

Producer.produceAsync performance #531

lukestephenson opened this issue Nov 7, 2022 · 25 comments

Comments

@lukestephenson
Copy link
Contributor

I was making use of produceAsync in combination with ZIO.foreach to achieve batching / sending multiple records in parallel.

While I was expecting this to be slightly slower than the produceChunkAsync API, the order of magnitude of which it is slower was not.

For my experiment, I was trying to publish 1 million messages to Kafka (just a single broker / local docker). The timings I saw were:

Experiment Duration
ZIO publishing using a chunk aware publisher (ie avoiding more ZIO invocations) 1.3 seconds
ZIO publishing one message at a time, and relying on "sequence" to publish batches 23 seconds
Monix publishing of one message at a time, and relying on "sequence" to publish batches 3.5 seconds
Fs2-kafka publishing one message at a time and relying on sequence. 6 seconds

For the monix, fs2-kafka implementations, I tried to keep it as close to ZIO-kafka as I could.

Note the fs2-kafka batch API is implemented in terms of the single publish API (and a sequence), so will be basically the same.

The monix implementation (our internal publisher lib, so I can't share, but I'll try to copy the relevant bits into the demo project soon) makes use of scala.concurrent.blocking to flag that calling the underlying KafkaProducer.send method may block.

One of my colleagues played with the zio-kafka implementation, and found that the majority of the performance hit is from ZIO.attemptBlocking.

For a properly configured KafkaProducer, the majority of calls to send shouldn't block, but the current implementation appears optimised for all the calls blocking, and always shifting execution onto another thread.

The code I used for the experiment is in https://github.com/lukestephenson/kafka-perf.

Now, you might be thinking just use the chunking API and the problem goes away. Well yes to a certain extent. But regardless of this I do have serious concerns about just how much overhead using ZIO imposes. Also, the current chunking API doesn't support what we need. Our project has the ability to drop messages if publishing to kafka fails (eg message too large). The zio-kafka bulk publish API doesn't tell you which message failed (just that something in the batch failed), or have the ability to continue with the other messages past that point. So we would first need to improve that. Regardless of that, I don't think the produceAsync method should have overheads as high as it does currently.

@svroonland
Copy link
Collaborator

Thanks for your feedback @lukestephenson.

I'd say dropping the attemptBlocking should be safe, since send should not block like you mention. The attemptBlocking around most of the Consumer/Producer calls was likely added as a 'better safe than sorry', since it's hard to know if the implementation would block or not. Should it happen to block in some specific situation anyway, we can still rely on ZIO 2's automatic blocking detection.

@guizmaii
Copy link
Member

@svroonland FYI, I was asking questions on this blocking subject here: #492

@svroonland
Copy link
Collaborator

@guizmaii Ah that synchronized is a good example of why it's better to be safe than sorry when interacting with possibly blocking Java libraries. ZIO 2's auto blocking detection changes things a bit, I think we can rely on that a bit more for performance critical pieces like here with the Producer.

@svroonland
Copy link
Collaborator

@adamgfraser Perhaps you could comment on the performance of attemptBlocking?

@svroonland
Copy link
Collaborator

@lukestephenson Ran your benchmark, below are results before and after removing attemptBlocking

Before:

Took chunked 1879
Took chunked 1326
Took chunked 865
Took chunked 930
Took chunked 895
Took chunked 925
Took unchunked 20784
Took unchunked 17760
Took unchunked 20199
Took unchunked 18230
Took unchunked 18247
Took unchunked 18151

After:

Took chunked 1755
Took chunked 1284
Took chunked 1055
Took chunked 1070
Took chunked 1011
Took chunked 1029
Took unchunked 4332
Took unchunked 3800
Took unchunked 3877
Took unchunked 3858
Took unchunked 3863
Took unchunked 4030

@lukestephenson
Copy link
Contributor Author

Thanks for looking into this. Those results are similar to what we saw when removing attemptBlocking

@lukestephenson
Copy link
Contributor Author

I took a further look into performance after switching to attemptBlocking as I was still finding that the zio-kafka performance was almost double what I would see with my Monix implementation.

It appears to be related to the performance of the promise implementation. Both the ZIO kafka and monix implementation rely on a Promise like contract (different implementations) to signal when the work has completed.
I've removed all the kafka logic and just timed how long the promises take to create.
For monix it looks like

import cats.effect.ExitCode
import monix.eval.{Task, TaskApp}

import scala.concurrent.Promise

object MonixPromisePerf extends TaskApp {
  override def run(args: List[String]): Task[ExitCode] = {
    val promiseLoop = Task.traverse((1 to 1_000_000).toList) { counter =>
      for {
        done <- Task.delay(Promise[Int]())
        _ = done.success(counter)
        _ <- Task.deferFuture(done.future)
      } yield done
    }

    val timed = promiseLoop.timed.flatMap(result => Task(println(s"Monix promiseLoop ${result._1.toMillis}")))
    val jobs = timed >> timed >> timed >> timed >> timed >> timed
    jobs.as(ExitCode.Success)
  }
}

For zio it looks like:

import zio.{ZIO, ZIOAppDefault, _}

object ZIOPromisePerf extends ZIOAppDefault {
  def run = {
    val promiseLoop = ZIO.foreach((1 to 1_000_000).toList) { counter =>
      for {
        done <- Promise.make[Throwable, Int]
        _ <- done.succeed(counter)
        _ <- done.await
      } yield done
    }

    val timed = promiseLoop.timed.flatMap(result => zio.Console.printLine(s"promiseLoop ${result._1.toMillis}"))
    timed.repeatN(5)
  }
}

In terms of performance, the Monix implementation averages 656ms, while the zio implementation takes 3100ms. While the comparison is a bit naive / limited, it roughly correlates to the difference I see with my original timings between the kafka producing performance for monix / zio.

@guizmaii
Copy link
Member

guizmaii commented Nov 24, 2022

I think @adamgfraser would want to see this Promise performances comparison

@guizmaii
Copy link
Member

FYI zio/zio#7557

@guizmaii
Copy link
Member

@lukestephenson Could you maybe re-run your perfs tests with this version of ZIO: 2.0.4+19-064d1cd7-SNAPSHOT, please?

To use a snapshot release you need to do this, in your build.sbt:

ThisBuild / resolvers ++= Resolver.sonatypeOssRepos("snapshots")

libraryDependencies += "dev.zio" %% "zio" % "2.0.4+19-064d1cd7-SNAPSHOT"

This is the version released with the Promise optimisation fix I mentioned previously (#531 (comment))

@lukestephenson
Copy link
Contributor Author

The promise performance on 2.0.4+19-064d1cd7-SNAPSHOT is looking a lot better. Locally it's down from 3100ms to 1100ms.

The kafka performance test has seen a similar reduction in time with the improvements to promise. So with the improvements to promise and removing the use of attemptBlocking, kafka publishing time is down from 23 seconds to about 4 seconds.

@domartynov
Copy link

@svroonland I think the implemented solution in #555 is wrong. Is it a safe to drop blocking given send blocks periodically until published messages are committed (which is more like ~100ms - 1000ms in our prod for example). Should we add a perf warning to the API and suggest to use produceChunkAsync. With a volume causing the issue above, it probably does not make sense to use a promise per message anyway (compared to the cost of Chunk objects) which is confirmed by your test data where it's x4 slower even after dropping blocking.

@guizmaii
Copy link
Member

guizmaii commented Jan 8, 2023

What do you think about using a dedicated thread pool?

Using the Blocking one isn't appropriate as it can be slow.
Using the default one is inappropriate, as we might block its threads.
If we're using a dedicated one for zio-kafka, we can tune it and use a Thread pool optimised for it

WDYT?
🤔

@svroonland
Copy link
Collaborator

Ah that's really annoying, that methods that are explicitly documented to be asynchronous can still block, but you're right, it looks like there's potential blocking while waiting for metadata.

I believe the actual shifting from the current ZIO thread to another thread (from the blocking pool in this case) is causing the slowdown, not using the blocking thread pool per se. So using a dedicated thread pool won't help, I think.

Perhaps we can build your earlier idea of having a ZIO buffer (Queue) of Chunk[ProducerRecord[Array[Byte], Array[Byte]]], combined with a dedicated fiber that is executed on the blocking pool which calls send() and completes a Promise for each Chunk of messages (which could be a Chunk of size 1 when using produce). That way we can be sure that calls to produce / produceChunk will never block and we don't have to shift to the blocking thread pool all the time.

@adamgfraser
Copy link
Contributor

Why aren't we just running this whole thing on the blocking thread pool? Generally the biggest cost is in shifting between executors. Running a whole bunch of operations on the blocking executor and shifting there and back around the whole thing is dramatically faster than shifting there and back around every individual workflow.

@svroonland
Copy link
Collaborator

@adamgfraser If I understand you correclty, I think that's what I'm proposing; a send loop/stream running on the blocking pool and a queue to communicate with the ZIO thread pool side. I hope to have some time to implement that later today.

@svroonland
Copy link
Collaborator

@guizmaii @adamgfraser #581

@guizmaii
Copy link
Member

guizmaii commented Jan 11, 2023

@lukestephenson Would it be possible for you to remove the dependency on "com.zendesk" %% "gourmand" in your https://github.com/lukestephenson/kafka-perf project maybe so that we can compare Monix and ZIO, please?
It seems to be proprietary. I can't compile your code

@lukestephenson
Copy link
Contributor Author

@guizmaii I'll remove the internal dependencies now

@lukestephenson
Copy link
Contributor Author

I've just pushed the kafka-perf. Appologies for the delay.

@guizmaii
Copy link
Member

Thanks Luke ❤️

@svroonland
Copy link
Collaborator

Shall we close this issue? Improvements were implemented in zio/zio#7557, #555 and #581

@lukestephenson
Copy link
Contributor Author

Firstly, thanks for all the fixes / effort investigated ❤️

I've updated the https://github.com/lukestephenson/kafka-perf repo to use the latest zio / zio-kafka dependencies and performance is much improved. Note however, to see good performance in my case I had to bump the value of ProducerSettings.sendBufferSize (I think the default of 4096 is too low).

Here are the latest timings I collected (please don't compare to be original timings when I raised the issue, I'm running on a different laptop).

Experiment Duration
ZIO publishing using a chunk aware publisher (ie avoiding more ZIO invocations) 1.3 seconds
ZIO publishing one message at a time, and relying on "sequence" to publish batches 3.4 seconds seconds
Monix publishing of one message at a time, and relying on "sequence" to publish batches 1.4 seconds
Fs2-kafka publishing one message at a time and relying on sequence. Did not rerun

With regards to the unchunked comparisons, originally ZIO was 6.6 times slower than my Monix implementation (23 vs 3.5 seconds). With the latest run, ZIO is only 2.4 times slower (using a sendBufferSize of 1mb - 3.4 vs 1.4 seconds). Things have improved heaps, but it still appears to be a lot slower than the comparable Monix implementation. I'm happy for this issue to be closed. It's probably confusing to have the discussion continue any longer on this issue, so if we did want further improvements we can always raise another issue. I suspect the slowness is because each message still needs to be pushed through a ZIO stream, and it isn't great with small chunk sizes.

Thanks again!

@guizmaii
Copy link
Member

guizmaii commented Jan 18, 2023

@lukestephenson Thanks again for all of this! 🙏

I know that the ZIO team is re-implementing Streams on a new idea to improve performances (see Adam Fraser's talk at FunScala London 2022 (I don't know if it's published on YT yet)).

They told me that they wanted to deliver this new implementation in March/April for the next ZIO World event (https://www.eventbrite.com/e/zio-world-2023-lisbon-portugal-tickets-492267383997)
That'd be interesting to test the perfs of their new implementation when they release the first alpha/beta.

Meanwhile, I think the next step for us regarding performance will first be to implement a few benchmarks.

I don't mind keeping this issue open. I think it contains a good conversation we can continue to iterate on with future improvements.

@svroonland svroonland changed the title Producer.produceAsync is extremely slow Producer.produceAsync performance Mar 1, 2023
@svroonland
Copy link
Collaborator

Renamed the issue because the previous title did not accurately reflect the latest performance comparison and can give false impressions to people not reading this entire thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants