Skip to content

Handlers

Adriano Caloiaro edited this page Sep 16, 2023 · 1 revision

Handlers

Handlers are application code that is run when jobs are available on a queue.

Handler concurrency is configurable, allowing job processing to be parallelized over multiple CPUs, or serialized over just one.

Handlers are always associated with a single queue.

The simplest Handler consists of the queue name that it processes, and the function to run when jobs are available on that queue.

e.g. A handler that listens on the my_jobs queue and prints got a job to stdout:

h := handler.New("my_jobs", func(ctx context.Context) (err error) {
  fmt.Println("got a job")
  return
}))

Dont forget to start listening on this queue by calling Start():

  err = nq.Start(ctx, h)
  if err != nil {
  	log.Println("error listening to queue", err)
  }

Of course, this example is very simple, and the handler doesn't even inspect the Job that triggered it. See below for more detailed examples.

Usage

Fetching jobs from the context

Typically, a handler needs to know more about the Job that triggered it. The Job is available on the context.Context passed into our handlers, which can be accessed as follows:

  h := handler.New("my_jobs", func(ctx context.Context) (err error) {
  	var j *jobs.Job
  	j, err = jobs.FromContext(ctx)
    if err != nil {
      return err 
    }

  	log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  	return
})

Configuration Options

Concurrency

Handlers can be serialized (concurrency 1) or paralellized (concurrency >= 2)

Example

Concurrently fetch up to 4 jobs off the my_jobs queue.

h := handler.New(queue, func(ctx context.Context) (err error) {
	var j *jobs.Job
	j, _ = jobs.FromContext(ctx)
	log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
	return
}, handler.Concurrency(4))

JobTimeout

The job timeout enforces a strict duration that the handler may spend handling a single job. Jobs that run longer than JobTimeout are retried with exponential backoff.

Example

Jobs on my_jobs will never succeed because the handler is given only one millisecond per job to complete its work, but the handler function sleeps for 1 second. Jobs will be retried, but never succeed, and eventually will be moved to the dead queue.

h := handler.New("my_jobs", func(ctx context.Context) (err error) {
  time.Sleep(1 * time.Second)
  return
}, handler.JobTimeout(1*time.Millisecond))