Reconciliation driven resource queue. The main primitives of rescue
are the
Engine
and Task
types. A Task describes some kind of job, a piece of work
that has to be done. The job description is defined by a set of labels, simple
key-value pairs.
type Task struct {
// Core contains systemically relevant information fundamental for task
// distribution. Below is shown example metadata managed internally.
//
// task.rescue.io/cycles 4
// task.rescue.io/expiry 2023-09-28T14:23:24.161982Z
// task.rescue.io/object 1611318984211839461
// task.rescue.io/worker 90dc68ba-4820-42ac-a924-2450388c15a6
//
// It is possible to define an execution limit using the circuit breaker label
// Cancel. A task defining a maximum execution count will be executed at most
// Cancel times. Once the execution limit hit the task at hand will be kept on
// hold until its Cycles number is reset by some external process. This allows
// tasks to stay on queue until a resolution for the failing root cause may be
// found.
//
// task.rescue.io/cancel 5
//
Core *Core `json:"core,omitempty"`
// Cron contains optional scheduling information. A task may define to be
// scheduled at an interval on the clock. Below is an example of a task that
// is emitted at an interval of every 6 hours. That is every day at 00:00,
// 06:00, 12:00 and 18:00, measured in UTC.
//
// time.rescue.io/@every 6 hours
//
// Upon task creation, tasks defining an optional schedule will reflect the
// previous and the next tick according to their configured interval.
//
// time.rescue.io/tick-1 2023-09-28T12:00:00.000000Z
// time.rescue.io/tick+1 2023-09-28T18:00:00.000000Z
//
// The interval definition supports an opinionated set of duration units
// expressed in more or less natural language. Note that the third column of
// second and third order definitions for detailed schedules is not
// implemented at the moment. Only quantity=1 and quantity=x are supported
// right now. Important to show here right now is that the DSL allows for
// certain extensions, if desirable.
//
// quantity=1 quantity=x at / on (at)
//
// minute 15 minutes
// hour 4 hours at **:30
// day 5 days at 08:00
// week 2 weeks on Wednesday (at 06:00)
// month 3 months on the 15th (at 17:00)
//
// Note that scheduled tasks are emitted according to their specified
// interval, never earlier, but arguably later to a negligible extend.
// Scheduling will always depend on the current conditions of the underlying
// system. If hardware is overloaded or no worker process is running, then
// scheduling might be affected considerably. If workers search for tasks
// every 5 seconds, then scheduled tasks are likely to be executed with a
// delay of a couple of seconds.
Cron *Cron `json:"cron,omitempty"`
// Gate allows to trigger tasks after a set of dependencies finished
// processing. Consider task template x and many tasks y, where x is waiting
// for all y tasks to be finished in order to be triggered. Task template x
// would define the following label keys in Task.Gate, all provided with the
// reserved value "waiting".
//
// y.api.io/leaf-0 waiting
// y.api.io/leaf-1 waiting
// y.api.io/leaf-2 waiting
//
// Below are then all the many tasks y, each defining their own unique label
// key in Task.Gate with the reserved value "trigger".
//
// y.api.io/leaf-0 trigger
// y.api.io/leaf-1 trigger
// y.api.io/leaf-2 trigger
//
// Inside the Task.Gate of task template x, the reserved value "waiting" will
// be set to the reserved value "deleted" as soon as any of the respective
// dependency tasks y is being deleted after successful task execution. As
// soon as all tracked labels inside Task.Gate flipped from "waiting" to
// "deleted", a new task will be emitted containing the task template's
// Task.Meta and Task.Sync. Consequently the reserved values inside Task.Gate
// of the task template will all be reset back to "waiting" for the next cycle
// to begin.
Gate *Gate `json:"gate,omitempty"`
// Meta contains task specific information defined by the user. Any worker
// should be able to identify whether they are able to execute on a task
// successfully, given the task metadata. Upon task creation, certain metadata
// can be set by producers in order to inform consumers about the task's
// intention.
//
// x.api.io/action delete
// x.api.io/object 1234
//
Meta *Meta `json:"meta,omitempty"`
// Node contains addressable task delivery information for targeting any
// addressable worker within the network. The default delivery method is
// "any". Tasks may be processed by "all" workers within the network without
// acknowledgement of completion. Any particular worker may be addressed like
// shown below. Tasks not being addressed within a configured retention period
// are being deleted.
//
// addr.rescue.io/method uni
// task.rescue.io/worker 90dc68ba-4820-42ac-a924-2450388c15a6
//
Node *Node `json:"node,omitempty"`
// Root allows to manage a tree of dependencies. Consider task x and y, where
// x is the root of y.
//
// x.api.io/object 1234
//
// └ y.api.io/object 2345
// └ y.api.io/object 3456
// └ y.api.io/object 4567
//
// If task x is present it makes y obsolete. Scheduling and processing y if x
// is present may cause conflicts that are hard to resolve. So y may define x
// as root, causing Engine.Create and Engine.Search to neither schedule nor
// process y, if x happens to exist. In the described example, task y may
// define x as root like shown below, for y to be discarded by the system, if
// y happens to exist alongside x.
//
// x.api.io/object 1234
//
Root *Root `json:"root,omitempty"`
// Sync allows to manage task specific state across multiple scheduling cycles
// in combination with Task.Root. Any scheduled task may provide pointers to
// past state in order to inform task execution of future schedules.
// Internally the synced state will be persisted in the task templates
// defining Task.Cron upon deletion of scheduled tasks. The synced data will
// then be propagated to tasks scheduled on the next tick.
//
// x.api.io/latest 1234
//
Sync *Sync `json:"sync,omitempty"`
}
Engine.Create
submits a new task to the system. Anyone can create any task any
time. The task producer must just have an understanding of what consumers within
the system are capable of. Task.Meta
and Task.Root
of a queued task must
always match with a consumer in order to be processed. Scheduled tasks may be
created using Task.Cron
.
tas := &task.Task{
Meta: &task.Meta{
"test.api.io/action": "delete",
"test.api.io/object": "1234",
},
}
err := eng.Create(tas)
if err != nil {
panic(err)
}
Engine.Delete
removes an existing task from the system. Tasks can only be
deleted by the workers that own the task they have been assigned to. Task
ownership cannot be cherry-picked. Deleting an expired task causes an error on
the consumer side, because the worker falsely believing to still be the task's
assigned worker, is operating based on an outdated copy of the task that changed
meanwhile within the underlying system. Note that task templates defining
Task.Cron
or Task.Gate
may be deleted by anyone using the bypass label,
since those templates are never owned by any worker.
err := eng.Delete(tas)
if err != nil {
panic(err)
}
Engine.Exists
expresses whether a task with the given label set exists within
the underlying queue. Given a task was created with metadata a, b and c, Exists
will return true if called with metadata b and c. If workers would want to
verify whether they still own a task, they could do the following call.
Basically, calling tas.Core.All
returns a label set that matches all the given
label keys. That selective label set is then used by Exists to find any task
that matches the given query.
tas := &task.Task{
Core: tas.Core.All(task.Object, task.Worker),
}
exi, err := eng.Exists(tas)
if err != nil {
panic(err)
}
Engine.Expire
is a background process that every worker should continously
execute in order to revoke ownership from tasks that have not been completed
within the specified time of expiry. Expire goes through the full list of
available tasks and revokes ownership from every task that was found to be
expired. That means that in a cluster of multiple workers, it takes only a
single functioning worker to call expire in order to keep existing tasks
available to be worked on.
err := eng.Expire()
if err != nil {
panic(err)
}
Engine.Ticker
is an optional background process that every worker can
continously execute in order to emit scheduled tasks based on any task template
defining Task.Cron
. Ticker goes through the full list of available tasks and
creates new tasks for any task template that is found to be due for scheduling
based on its next tick. That means that in a cluster of multiple workers, it
takes only a single functioning worker to call ticker in order to keep
scheduling recurring tasks for anyone to work on.
err := eng.Ticker()
if err != nil {
panic(err)
}
Engine.Search
provides the calling worker with an available task.
tas, err := eng.Search()
if err != nil {
panic(err)
}
A common pattern to select the right task to work on would be some kind of
worker interface like shown below. Since any worker may be assigned to any task
at any time, the correct business logic must be invoked for the current task at
hand. That means a task must be identified using Filter
, and then be processed
using Ensure
. The asterisk may be used as a wildcard for matching any key or
value.
func (w *Worker) Ensure(tas *task.Task) error {
// business logic to cleanup properly
return nil
}
func (w *Worker) Filter(tas *task.Task) bool {
return tas.Meta.Has(map[string]string{
"test.api.io/action": "delete",
"test.api.io/object": "*",
})
}
If you have nothing else blocking the standard redis port on your machine, then you can simply run the Redis docker image and execute the conformance tests labelled with the redis tags.
docker run --rm --name redis-stack-rescue -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
go test ./... -race -tags redis
If you have multiple redis instances on your machine you should use a different port dedicated for the Rescue conformance tests.
docker run --rm --name redis-stack-rescue -p 6380:6379 -p 8002:8001 redis/redis-stack:latest
export REDIS_PORT=6380
go test ./... -race -tags redis