A flexible, idiomatic approach to a Go Disque client.
Disco attempts to provide two ways of using Disque: a low level API that allows more flexibility and control for users that need it and high level API for the most common usage patterns.
The high level API attempts to provide a common usage pattern in a idiomatic Go manner, ideally it should simplify Disque usage by not having to deal with the nuts and bolts of the low level API.
Funnels are an abstraction on top of a disco.Pool
: they provide Go channels that you can use to enqueue or receive jobs from Disque.
// See GoDoc for further details in connection Pool options.
pool, _ := NewPool(2, 5, 1, time.Second * 200)
funnel := pool.NewFunnel("disco-test-queue", "other-queue")
defer funnel.Close()
// Enqueue jobs simply by directing them to the Outgoing channel.
funnel.Outgoing <- Job{Queue: "disco-test-queue", Payload: []byte("this-is-the-payload")}:
// Receive jobs from disque simply by leveraging the Incoming channel, you can leverage
// common Go patterns such as a select statement to handle timeouts or other kinds of errors.
select {
case job, ok := <- funnel.Incoming:
string(job.Payload) //=> "this-is-the-payload" {
case <- time.Tick(time.Second):
// Handle timeout (or not)
}
A funnel will also manage the job's lifecycle for you: jobs received via the Incoming
channel will be acknowledged in Disque automatically (you'll still have the option to put it back in the queue if need be) and jobs fetched from Disque after the funnel is closed will be automatically NAcked so as not to lose data.
Connections represent a persistent connection to a Disque cluster, it's the most basic form of Disco usage there is. Disco is built on top of Redigo, and the Connection struct is a Disque-specific wrapper around a redigo Conn interface, which means you can send commands to Disque directly.
// Will connect to the Disque nodes specified in the $DISQUE_NODES env variable.
connection, err := disco.NewConnection(100)
connection, err := disco.NewConnectionToURLS(100, "localhost:7701,localhost:7702,localhost:7703")
connection.Do("PING")
In most cases it's better to have a global connection pool that your application uses instead of manually creating them each time.
// Will connect to the Disque nodes specified in the $DISQUE_NODES env variable.
// Args are: Max idle connections, max active, cycle and idle timeout.
// see GoDoc for further details
pool, err := disco.NewPool(2, 5, 1, time.Second * 200)
connection := pool.Get()
defer connection.Close()
connection.Do("PING")
The AADDJOB
command is one of the two most used one, it enqueues a payload in a given queue in Disque.
connection, _ := disco.NewConnection(100)
id, err := connection.AddJob("disco-test-queue", "this-is-the-payload", time.Second * 10)
GETJOB
is the other fundamental Disque command: fetches enqueued jobs from a list of specified queues.
Keep in mind that this is a blocking call.
id, _ := connection.AddJob("disco-test-queue", "this-is-the-payload", time.Second * 10)
job, err := connection.GetJob(1, time.Second * 10, "disco-test-queue")
string(job.Payload) //=> "this-is-the-payload"
Wrapper around the 'ACKJOB' command.
Acknowledges the execution of one or more jobs via job IDs
job, _ := connection.GetJob(1, time.Second * 10, "disco-test-queue")
connection.Ack(job.ID)
Wrapper around the 'NACK' command.
The NACK command tells Disque to put the job back in the queue ASAP
job, _ := connection.GetJob(1, time.Second * 10, "disco-test-queue")
connection.NAck(job.ID)
You'll need gpm installed in order to pull in the necessary dependencies.
$ git clone [email protected]:pote/disco.git && cd disco
$ source .env.sample # feel free to cp it to .env and make any config changes you deem necessary.
$ make # Will pull dependencies if necessary, build the project and run the test suite.