-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow wrapping Sarama producer/consumer for OpenTelemetry support #356
Conversation
Hi @eko, Generally I'd say we're open to integrating new frameworks and OpenTelemetry seems promising.
So here are two alternatives: Alternative 1: leave goka out of the wrapping-concept.Here the idea is, that the wrapping-responsibility is shifted to the user. Alternative 2: create wrapping builderWe could add custom builders that wrap the things created by the underlying builder. They will replace the builder which find in the options when applied. // helper builder that takes a wrapper and a builder
type saramaConsumerWrappingBuilder struct {
childWrapper SaramaConsumerBuilder
wrapper ConsumerWrapper
}
// builder function that wraps the result of the child-wrapper
func (scw *saramaConsumerWrappingBuilder) build(brokers []string, clientID string) (sarama.Consumer, error) {
cons, err := scw.childWrapper(brokers, clientID)
if err != nil {
return nil, err
}
return scw.wrapper(cons), nil
}
func WithSaramaConsumerWrapper(consumerWrapper ConsumerWrapper) ProcessorOption {
return func(o *poptions, gg *GroupGraph) {
if o.builders.consumerSarama == nil {
panic("cannot wrap non existent builder")
}
// put the existing builder into the new builder
wrappingBuilder := &saramaConsumerWrappingBuilder{
childWrapper: o.builders.consumerSarama,
wrapper: consumerWrapper,
}
// replace the current builder by the wrapper
o.builders.consumerSarama = wrappingBuilder.build
}
}
// use it like this:
goka.NewProcessor(...
goka.WithSaramaConsumerWrapper(myWrapper()),
// allows to wrap multiple times, because goka's not aware that anything is wrapped.
goka.WithSaramaConsumerWrapper(anotherWrapper()),
) That however requires, that we change the option-appliers at options.go Although the second alternative looks more elegant, It still seems quite arbitrary to add such specific functionality to the very generic concept of those builders. As a compromise, we could also go with Alternative-1 (not changing any options), still providing a helper that allows to wrap Builders. goka.NewProcessor(...,
goka.WithSaramaConsumerBuilder(goka.WrappingBuilder(myWrapper(), goka.DefaultSaramaConsumerBuilder)),
// or
goka.WithSaramaConsumerBuilder(goka.WrappingBuilder(myWrapper(), MyCustomBuilder())),
), What's your opinion on that? I haven't checked the promise-map implementation yet, will do that the next days :) Thanks again! |
Thank you for this complete answer Franz! I understand your points and I think Alternative 2 you proposed could work and will also allow wrapping multiple times. I would preferred to go with Alternative 1 and directly use the However, I have some things I would like to highlight. I have 3 items I would need to "wrap" for this implementation and some have some specific needs: ConsumerBuilderIt will work well with the type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error) sarama.ConsumerGroupHandler (*Processor)This item is the processor, err := goka.NewProcessor(pg.bootstrapServers, g, opts...)
processor = func(processor *goka.Processor) *goka.Processor {
return otelsarama.WrapConsumerGroupHandler(processor, tracerProvider).(*goka.Processor)
}(processor) The I don't have any other idea for this one at the moment. ProducerBuilderI have a little issue with this one because the type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32, producerWrapper ProducerWrapper) (Producer, error) Do you think we could add another option system using variadics such as So people doesn't have to re-declare their own I will try to give a look on what we could do at the beginning of the week but I would really like to have your advices on these things. Thanks again! |
Here is a little update on current status:
Concerning multiple wrapping, users can already do it on their side with something like: goka.WithSaramaConsumerGroupHandlerWrapper(func(consumerGroupHandler sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler {
return something_else.WrapConsumerGroupHandler(
any_other_wrapper.WrapConsumerGroupHandler(
yet_another_one.WrapConsumerGroupHandler(
consumerGroupHandler,
),
),
)
}), Anyway, the PR has been lightened thanks to your remarks but feel free to share with me other ideas to continue improving it. Thanks |
@eko , sorry for the long delay. The other error-handling PR really kept me busy for some time. So what if we do not create any generic wrappers, but provide "tools" that can be used to tweak the components. I tried it in this goka-tools-branch, which would allow you to create your components like this: // create options builder, optionally specifying different parent builders or sarama config,
// otherwise uses goka default.
otelbuilder := opentel.NewBuilder().WithConfig(...).WithConsumerBuilder(..)
goka.NewProcessor(...,
otelbuilder.BuildProcOptions()...
)
goka.NewView(...,
otelbuilder.BuildViewOptions()...
)
goka.NewEmitter(...,
otelbuilder.BuildEmitterOptions()...
) That way, goka would not be aware that it's being wrapped, so no major changes needed (except see notes). NotesIf we do it that way, we will have to refactor the producer-builder to create the sarama-interface, and processors and emitters build their own wrapping Since the opentel-Producer-Wrapper seems to back up the metadata, we should be able to continue using this approach and avoid the sync-map. Otherwise it's a bug in the opentel-support which we should fix. I know I said earlier to avoid touching the builder-interfaces. But seeing the producer-builder like this seems super inconsistent. So we'll have to live with the API change I guess :) What are your thoughts on that? |
Hi @frairon, Thank you for working on this P.o.C. I think you're right it could be better to provide a unified way to build the OpenTelemetry instrumentation for both producer and consumer / consumer group. In case of a custom builder implementation users can still implement their own Builder implementation if they have some special cases like multiple wrapping so I think it's okay.
Concerning this metadata implementation, I won't say it's a bug but a wrong way to implement it ;-) You (Goka) and the otelsarama instrumentation both use this field to maintain your state/promise but I think the issue is that you should not use it (if possible, and it is) because this field should be left to the end-user to use it. So I think both implementations have to be fixed. What are the next step for you to have a first version available? Thanks again for your time! |
Hello @frairon I've been using Goka for some time now and have found this PR would create observability benefit. I'm about to try to contribute this PR. I'm having a bit of difficulty understanding what you mentioned below.
If you don't mind sharing, I'd be glad to assist you in any way I can. Thanks, |
Hi @korrawit - wow that has been abandoned some time ago. To be honest I can't remember what the issues were exactly here, I think the main problem was how to get the instrumentation injected as painless as possible without messing with goka's API for too much. Let me check it out in more detail again and get back to you! |
@korrawit, I checked some things.
So.. that's the plan I guess. What do you think? |
Hi @frairon,
I would like to implement OpenTelemetry support with Goka.
I've seen that some discussions have already started on issue #160 some time ago.
In order to do this, I would like to use the opentelemetry-go-contrib/Shopify/sarama instrumentation library because Goka is using it 🙂
How it works
In order to implement it, I need to be able to wrap the Sarama producer / consumer so I've added some
ProcessorOptions
which works like this and does not modify the actual public signatures:Side notes
As the Sarama OpenTelemetry implementation is using the
msg.Metadata
field to store its tracing span information and Goka was using it too, I've updated the Goka usage to don't use it because it was creating some issues.Instead of using the
Metadata
field in Goka to store thePromise
object, I've created async.Map
and the promise is retrieved using the message pointer as key.Feedbacks
I would really appreciate your feedbacks about these changes and hope to see the OpenTelemetry available very soon in Goka ✌️
Thank you!