Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[question] Azure EventHub #430

Open
ghstahl opened this issue Aug 9, 2023 · 1 comment
Open

[question] Azure EventHub #430

ghstahl opened this issue Aug 9, 2023 · 1 comment

Comments

@ghstahl
Copy link
Contributor

ghstahl commented Aug 9, 2023

Had anyone gotten this to work with azure eventhub.
I am using the simple example modified to use SASL.

created the 2 topics by hand on azure and am able to emit an event but the consumer isn't consuming.

topics

each with 10 partitions

cloudevents
cloudevents-group-table

azure setup

it just spins in the following call.

err := consumerGroup.Consume(ctx, topics, g)
{"level":"debug","caller":"C:/work/github/mapped/ccc/goka-play/cmd/1-simplest-sasl/main.go:91","time":"2023-08-09T08:48:21-07:00","message":"message emitted"}
{"level":"debug","caller":"C:/work/github/mapped/ccc/goka-play/internal/logger/goka-zerolog-logger.go:38","time":"2023-08-09T08:48:36-07:00","message":"[Processor cloudevents-group] setup generation 13, claims=map[string][]int32{\"cloudevents\":[]int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}}"}

I reset my azure keys so the ones in my test repo wouldn't work. Maybe you can see an obvious processor setup problem.

example

@frairon
Copy link
Contributor

frairon commented Aug 16, 2023

Hi Herb,
I have no experience using azure or eventhub in general, but checked your repo and it looks good I'd say.
Except one thing maybe: getSASLConfig() sets the Partitioner like

config.Producer.Partitioner = sarama.NewReferenceHashPartitioner

If the processor uses a different one, it will mess up the state. But at least there should be a log stating that the message was received in the first place, so it won't be the issue. It's also a good idea to use goka.DefaultConfig() instead of creating a new one from scratch using sarama.NewConfig(), to avoid missing some defaults that goka sets.

The spin in consumerGroup.Consume is intended, because internally this executes the rebalance-loop on g, so that shouldn't be the issue.
The log setup generation... actually indicates that everything is correct. Rather looks like there is no message in the stream for some reason.

Btw, the code says it's 8 partitions, but the log say it's 10. But that shouldn't be an issue. If input and table aren't co-partitioned, the processor will fail to restart anyway.

Sorry that's all I could see at the moment. Let me know if you need further assistance or I can help in any way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants