-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
how to/possible register a slice of workers #335
Comments
Hi @elee1766, a couple options for you. The first, and easier one, is to just use a AddWorker(workers, &myServiceHandler{}) What's not necessarily intuitive is that you can add workers to a Regarding value groups, you won't be able to use River's built-in constructs for this, but you could layer your own on top. The key would be that you'd need a non-generic interface ( // Non-generic interface implemented by WorkConfigurer below.
type WorkConfigurerInterface interface {
Configure(workers *Workers)
}
// Contains a generic job args param to allow it to call AddWorker with a type
// parameter.
type WorkConfigurer[T JobArgs] struct {
worker Worker[T]
}
func (c *WorkConfigurer[T]) Configure(workers *Workers) {
AddWorker(workers, c.worker)
}
//
// jobArgs1
//
type jobArgs1 struct{}
func (jobArgs1) Kind() string { return "job_args_1" }
type jobWorker1 struct{ WorkerDefaults[jobArgs1] }
func (w *jobWorker1) Work(ctx context.Context, job *Job[jobArgs1]) error { return nil }
//
// jobArgs2
//
type jobArgs2 struct{}
func (jobArgs2) Kind() string { return "job_args_2" }
type jobWorker2 struct{ WorkerDefaults[jobArgs2] }
func (w *jobWorker2) Work(ctx context.Context, job *Job[jobArgs2]) error { return nil }
func TestWorkConfigurer(t *testing.T) {
// Use configurers as your value group.
configurers := []WorkConfigurerInterface{
&WorkConfigurer[jobArgs1]{worker: &jobWorker1{}},
&WorkConfigurer[jobArgs2]{worker: &jobWorker2{}},
}
// And at some point iterate them to add to a workers bundle which will then
// be fed into a River client.
workers := NewWorkers()
for _, configurer := range configurers {
configurer.Configure(workers)
}
}
As far as we know, you're the first person to have run into this issue. I'd be careful about thinking that the concepts wrapped up in Fx like annotations and value groups are more "normal" than they actually are. I read through the Fx docs to help try and come up with the responses above, and although I'm sure Uber had good reason to come up with such a thing, I feel compelled to say that I found these concepts to be somewhat convoluted. I hate to sound too much like a Go dogmatist, but sometimes maximizing DRY isn't the only objective worth considering, and there is such a thing as too much abstraction. I run a big Go app at work, and we use something akin to dependency injection, but do so without the framework and use of reflection. |
first off - thank you for time and effort that go into your answers. we really appreciate it.
this is what we are currently doing. this mostly works, with the caveat that you either have to "invoke" your workers instead of provide, or make something consume whatever output the worker providers provide. it's a little annoying but it's no big issue.
ah. this was the missing step. I was trying to figure out a way to solve the problem without thinking to using the Workers struct in the interface, this just clicked. thanks again!
reading the code, i'm not sure this is true. the workersMap seems to be accessed in a thread-unsafe manner, which is okay if read only, but if i add a worker after Start() has been called, won't it race? from what we tested, it's okay to add workers to a worker instance as long as you are not concurrently scheduling a job on that client, or you have not yet called Start(). Maybe there should be a rwmutex/workersMap should be a sync.Map?
completely understand the sentiment. for us we have really enjoyed fx as a framework, and so as a result, we try to follow the conventional patterns when we have no strong opinion on which way to implement something; it has done us well, and we have found value groups to be quite useful. |
This is correct, workers can only be added before the client has been started. After that it is unsafe to do so. We wouldn’t want to add a mutex on every read of the workers map, though we could potentially make it panic if you ever try to do this once it’s been used in a client. Not sure if that’s worth doing right now. |
Yeah, FWIW that's what I meant as well. It is safe to add new workers after a client's been initialized, but you need to make sure to do so before calling
@elee1766 No worries! Hope this gets you to something that's reasonable workable given your setup.
Cool. And yep, fair enough. |
i was wondering if it was possible to register a slice of workers.
currently the pattern is to create a "Workers" object, then use generic function
however, this method of registering services through a centralized service registry doesn't really fit super well the application+dependency injection framework we use (https://github.com/uber-go/fx). We are working around it through provider a "Workers" object which is passed to components to register their own workers.
in fx, it is traditional to define what we call
value groups
, which are a slice of the same type, filled with any of your your dependencies that match the type.this is super useful because it would mean that I could provide "worker" components to my app, and any workers provided would provide themself into the slice, and then i can initialize my client. as a result, i can have certain clients which do not process every single kind of job, properly sharding work across workers.
if i cast each worker into a
Worker[JobArgs]
in order to create a[]Worker[JobArgs]
i think will lose the type information thatvar jobArgs T
is reading, which important as we need that struct passed into the workers registry, so this wont work.if i put the worker into something like
wrapperWorker[T JobArgs]
, and then put each worker into a[]any{}
, then the type information maybe is preserved, and I believe I could then use reflection to extract the struct type and properly instantiate aJobArgs
which could maybe be used to register the type in the Workers struct usingadd
, and also create a WorkUnitFactory with proper behavior, but of courseadd
isn't exposed and i haven't had the time to fork and try this yet.i also attempted to see if there was a way i could provide a custom Workers provider, but it seems that the Workers struct is not replaceable. I could solve this problem an easier way myself for my use case if you guys just provided something like
then i could just create my own Workers and WorkUnitFactory, without making my own client.
any thoughts/ideas here? i feel like i must not be the first person to run into this issue.
The text was updated successfully, but these errors were encountered: