Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to/possible register a slice of workers #335

Open
elee1766 opened this issue May 3, 2024 · 4 comments
Open

how to/possible register a slice of workers #335

elee1766 opened this issue May 3, 2024 · 4 comments

Comments

@elee1766
Copy link
Contributor

elee1766 commented May 3, 2024

i was wondering if it was possible to register a slice of workers.

currently the pattern is to create a "Workers" object, then use generic function

func AddWorkerSafely[T JobArgs](workers *Workers, worker Worker[T]) error {
	var jobArgs T
	return workers.add(jobArgs, &workUnitFactoryWrapper[T]{worker: worker})
}

however, this method of registering services through a centralized service registry doesn't really fit super well the application+dependency injection framework we use (https://github.com/uber-go/fx). We are working around it through provider a "Workers" object which is passed to components to register their own workers.

in fx, it is traditional to define what we call value groups, which are a slice of the same type, filled with any of your your dependencies that match the type.

this is super useful because it would mean that I could provide "worker" components to my app, and any workers provided would provide themself into the slice, and then i can initialize my client. as a result, i can have certain clients which do not process every single kind of job, properly sharding work across workers.

if i cast each worker into a Worker[JobArgs] in order to create a []Worker[JobArgs] i think will lose the type information that var jobArgs T is reading, which important as we need that struct passed into the workers registry, so this wont work.

if i put the worker into something like wrapperWorker[T JobArgs], and then put each worker into a []any{}, then the type information maybe is preserved, and I believe I could then use reflection to extract the struct type and properly instantiate a JobArgs which could maybe be used to register the type in the Workers struct using add, and also create a WorkUnitFactory with proper behavior, but of course add isn't exposed and i haven't had the time to fork and try this yet.

i also attempted to see if there was a way i could provide a custom Workers provider, but it seems that the Workers struct is not replaceable. I could solve this problem an easier way myself for my use case if you guys just provided something like

type WorkerInfo interface {
  JobArgs() JobArgs
  WorkUnitFactory() workunit.WorkUnitFactory
}
type WorkersStore interface {
   Add(jobArgs JobArgs, workunit.WorkUnitFactory) error
   Get(key string) (workunit.WorkUnitFactory, ok) 
   Keys() []String
}

// maintain backwards compatibility by replacing the map but keeping the top level struct
// the client can access workersStore directly, like it is currently accessing the map
type Worker struct {
  workersStore WorkersStore
}

func NewWorkers() *Workers {
	return &Workers{
	   workersStore: newMapWorkersStore(),
	}
}

func NewWorkersWithStore(workerStore WorkerStore) *Workers {
	return &Workers{
	   workersStore: workersStore,
	}
}
func (w Workers) add(jobArgs JobArgs, workUnitFactory workunit.WorkUnitFactory) error {
	return w.workerStore.Add(jobArgs, workUnitFactory)
}

// public functions stay the same-ish

then i could just create my own Workers and WorkUnitFactory, without making my own client.

any thoughts/ideas here? i feel like i must not be the first person to run into this issue.

@brandur
Copy link
Contributor

brandur commented May 7, 2024

Hi @elee1766, a couple options for you.

The first, and easier one, is to just use a Workers struct as dependency that you inject through Fx into each of your services, and give each one the opportunity to add its own workers to it through the normal means:

AddWorker(workers, &myServiceHandler{})

What's not necessarily intuitive is that you can add workers to a Workers instance after workers has been sent into a River client. As long as a worker is in the bundle by the time the client tries to work a job of that kind, you're okay. So you'd inject the workers dependency, and every subservice receiving it would have the opportunity to register workers.

Regarding value groups, you won't be able to use River's built-in constructs for this, but you could layer your own on top. The key would be that you'd need a non-generic interface (WorkConfigurerInterface) which can encapsulate a generic struct that retains type information (WorkConfigurer) so that it can add itself to a workers bundle.

// Non-generic interface implemented by WorkConfigurer below.
type WorkConfigurerInterface interface {
	Configure(workers *Workers)
}

// Contains a generic job args param to allow it to call AddWorker with a type
// parameter.
type WorkConfigurer[T JobArgs] struct {
	worker Worker[T]
}

func (c *WorkConfigurer[T]) Configure(workers *Workers) {
	AddWorker(workers, c.worker)
}

//
// jobArgs1
//

type jobArgs1 struct{}

func (jobArgs1) Kind() string { return "job_args_1" }

type jobWorker1 struct{ WorkerDefaults[jobArgs1] }

func (w *jobWorker1) Work(ctx context.Context, job *Job[jobArgs1]) error { return nil }

//
// jobArgs2
//

type jobArgs2 struct{}

func (jobArgs2) Kind() string { return "job_args_2" }

type jobWorker2 struct{ WorkerDefaults[jobArgs2] }

func (w *jobWorker2) Work(ctx context.Context, job *Job[jobArgs2]) error { return nil }

func TestWorkConfigurer(t *testing.T) {
	// Use configurers as your value group.
	configurers := []WorkConfigurerInterface{
		&WorkConfigurer[jobArgs1]{worker: &jobWorker1{}},
		&WorkConfigurer[jobArgs2]{worker: &jobWorker2{}},
	}

	// And at some point iterate them to add to a workers bundle which will then
	// be fed into a River client.
	workers := NewWorkers()
	for _, configurer := range configurers {
		configurer.Configure(workers)
	}
}

any thoughts/ideas here? i feel like i must not be the first person to run into this issue.

As far as we know, you're the first person to have run into this issue.

I'd be careful about thinking that the concepts wrapped up in Fx like annotations and value groups are more "normal" than they actually are. I read through the Fx docs to help try and come up with the responses above, and although I'm sure Uber had good reason to come up with such a thing, I feel compelled to say that I found these concepts to be somewhat convoluted.

I hate to sound too much like a Go dogmatist, but sometimes maximizing DRY isn't the only objective worth considering, and there is such a thing as too much abstraction. I run a big Go app at work, and we use something akin to dependency injection, but do so without the framework and use of reflection.

@elee1766
Copy link
Contributor Author

elee1766 commented May 7, 2024

first off - thank you for time and effort that go into your answers. we really appreciate it.

The first, and easier one, is to just use a Workers struct as dependency that you inject through Fx into each of your services, and give each one the opportunity to add its own workers to it through the normal means:

this is what we are currently doing. this mostly works, with the caveat that you either have to "invoke" your workers instead of provide, or make something consume whatever output the worker providers provide. it's a little annoying but it's no big issue.

type WorkConfigurerInterface interface {
	Configure(workers *Workers)
}

ah. this was the missing step. I was trying to figure out a way to solve the problem without thinking to using the Workers struct in the interface, this just clicked. thanks again!

What's not necessarily intuitive is that you can add workers to a Workers instance after workers has been sent into a River client. As long as a worker is in the bundle by the time the client tries to work a job of that kind, you're okay. So you'd inject the workers dependency, and every subservice receiving it would have the opportunity to register workers.

reading the code, i'm not sure this is true. the workersMap seems to be accessed in a thread-unsafe manner, which is okay if read only, but if i add a worker after Start() has been called, won't it race? from what we tested, it's okay to add workers to a worker instance as long as you are not concurrently scheduling a job on that client, or you have not yet called Start(). Maybe there should be a rwmutex/workersMap should be a sync.Map?

I feel compelled to say that I found these concepts to be somewhat convoluted.
I hate to sound too much like a Go dogmatist, but sometimes maximizing DRY isn't the only objective worth considering, and there is such a thing as too much abstraction.

completely understand the sentiment. for us we have really enjoyed fx as a framework, and so as a result, we try to follow the conventional patterns when we have no strong opinion on which way to implement something; it has done us well, and we have found value groups to be quite useful.

@bgentry
Copy link
Contributor

bgentry commented May 7, 2024

the workersMap seems to be accessed in a thread-unsafe manner, which is okay if read only, but if i add a worker after Start() has been called, won't it race? from what we tested, it's okay to add workers to a worker instance as long as you are not concurrently scheduling a job on that client, or you have not yet called Start().

This is correct, workers can only be added before the client has been started. After that it is unsafe to do so.

We wouldn’t want to add a mutex on every read of the workers map, though we could potentially make it panic if you ever try to do this once it’s been used in a client. Not sure if that’s worth doing right now.

@brandur
Copy link
Contributor

brandur commented May 8, 2024

This is correct, workers can only be added before the client has been started. After that it is unsafe to do so.

We wouldn’t want to add a mutex on every read of the workers map, though we could potentially make it panic if you ever try to do this once it’s been used in a client. Not sure if that’s worth doing right now.

Yeah, FWIW that's what I meant as well. It is safe to add new workers after a client's been initialized, but you need to make sure to do so before calling Client.Start.

ah. this was the missing step. I was trying to figure out a way to solve the problem without thinking to using the Workers struct in the interface, this just clicked. thanks again!

@elee1766 No worries! Hope this gets you to something that's reasonable workable given your setup.

completely understand the sentiment. for us we have really enjoyed fx as a framework, and so as a result, we try to follow the conventional patterns when we have no strong opinion on which way to implement something; it has done us well, and we have found value groups to be quite useful.

Cool. And yep, fair enough.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants