Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BindExceptions when testing multi-module projects using ZIO Kafka Testkit #987

Open
mattdavpir opened this issue Jul 19, 2023 · 9 comments

Comments

@mattdavpir
Copy link

I've got a project using the ZIO Kafka Testkit that is sometimes failing with BindExceptions when starting the embedded Kafka from the testkit. I think that this is because the tests are running concurrently and each module's tests have their own refs that are being incremented independently, leading to them both trying to bind the same ports.

Would it be possible to add something to handle ports not being available when starting the embedded Kafka (maybe even just incrementing the ports and trying again)?

@guizmaii
Copy link
Member

guizmaii commented Jul 19, 2023

Would it be possible to add something to handle ports not being available when starting the embedded Kafka (maybe even just incrementing the ports and trying again)?

There's already such a mechanism implemented but it might not work perfectly
Are you using the ZIOSpecWithKafka trait? (https://zio.dev/zio-kafka/writing-tests#ziospecwithkafka-trait)

@mattdavpir
Copy link
Author

Hey, thanks for the response!

I am using the ZIOSpecWithKafka trait, I had a brief poke around and did see that it was incrementing a Ref but I think multiple modules running tests concurrently could still conflict with each other, e.g. they both create a Ref(Ports(6001, 7001)), both do a getAndUpdate to set the next value to Ports(6002, 7002), but when the second tries to start up on 6001 and 7001 neither port will be free. Similarly I think if I had a separate process constantly bound to 6001 and 7001 I think that the first embedded startup would fail, whereas I guess ideally (for me, at least) it would then automatically try again with different ports.

Very possible I'm missing something obvious here, though!

@guizmaii
Copy link
Member

I don't remember the code (which I implemented several months ago 😅)
Are we catching the BindExceptions and retrying with the next possible ports?

@erikvanoosten
Copy link
Collaborator

@mattdavpir Yes, you are absolutely right. This is definitely a limitation of how the ports are allocated.

Usually you run a single embedded kafka for all tests. Is that an option for you as well?

@erikvanoosten
Copy link
Collaborator

Are we catching the BindExceptions and retrying with the next possible ports?

No we don't.
That would be a solution though.

@guizmaii
Copy link
Member

The fix is kinda trivial: You catch the BindException and retry with the next port.
@mattdavpir Do you feel like giving it a try? 🙂

@mattdavpir
Copy link
Author

Usually you run a single embedded kafka for all tests. Is that an option for you as well

I've got a couple of tests split across a couple of different sbt modules so I don't think so, but for now I can just explicitly disable the parallel execution of the tests to avoid the conflict.

Do you feel like giving it a try? 🙂

Happy to do some when I get around to my next "personal dev time" slot next week, but I've only just started tinkering around with ZIO so I don't know how idiomatic it would be (but hopefully it's simple enough not to have too much scope to go wrong).

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jul 19, 2023

Something like this in zio-kafka-testkit/src/main/scala/zio/kafka/testkit/Kafka.scala maybe:

  private def embeddedWithBrokerProps(
    presetProps: Ports => Map[String, String],
    customProps: Ports => Map[String, String]
  ): ZLayer[Any, Throwable, Kafka] =
    ZLayer.scoped {
      def makeEmbeddedKafka(retriesLeft: Int): ZIO[Scope, EmbeddedKafkaStartException, EmbeddedKafkaService] = {
        for {
          ports <- nextPorts
          brokerProps = presetProps(ports) ++ customProps(ports) // custom is after to allow overriding
          embeddedKafkaConfig = EmbeddedKafkaConfig(
            ports.kafkaPort,
            ports.zookeeperPort,
            brokerProps
          )
          kafka <- ZIO.acquireRelease(
            ZIO
              .attemptBlocking(EmbeddedKafkaService(EmbeddedKafka.start()(embeddedKafkaConfig)))
              .catchSome {
                case _: BindException if retriesLeft > 0 => makeEmbeddedKafka(retriesLeft - 1)
              }
              .catchNonFatalOrDie { e =>
                ZIO.fail(EmbeddedKafkaStartException("Failed to start embedded Kafka", e))
              }
          )(_.stop())
        } yield kafka
      }

      makeEmbeddedKafka(3)
    }

Not tested!

@mishto
Copy link

mishto commented Aug 26, 2024

@guizmaii I am encountering this issue as well. Do you mind if I give it a try?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants