Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job Queues #114

Open
wants to merge 54 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
473b330
:arrow_up: deps: bincode=1.0
barzamin Jul 31, 2018
03ced35
libs/: :sparkles: new lib time! turnstile, a job scheduling library
barzamin Jul 31, 2018
334182b
jobs: start hacking serialization in
barzamin Jul 31, 2018
9bd965a
:heavy_minus_sign: deps: bincode
barzamin Jul 31, 2018
22dfdbf
Merge branch 'master' into 57-jobqueue-poc
barzamin Aug 27, 2018
c12f91b
:heavy_plus_sign: dep on diesel in main crate
barzamin Sep 14, 2018
01e4fd0
db: :heavy_plus_sign: dep on serde(_json), diesel-derive-enum, turnstile
barzamin Sep 14, 2018
028970d
db: start working on jobs table :construction_worker:
barzamin Sep 14, 2018
f328e16
docs: :memo: write a bit down about how workers should work
barzamin Sep 14, 2018
3899179
:fire: get rid of some stale code in ::jobs
barzamin Sep 14, 2018
5376d8f
workers: start building out a job collector
barzamin Sep 14, 2018
f138228
turnstile: some dtypes related to job execution contracts
barzamin Sep 14, 2018
21b0701
Merge branch 'master' into 57-jobqueue-poc
barzamin Sep 14, 2018
4285bc6
turnstile: split out mod job, start working on worker
barzamin Sep 21, 2018
13e6417
db(jobs): add queue column
barzamin Sep 21, 2018
aa66f9e
get rid of jobs mod, move workers mod to a folder
barzamin Sep 21, 2018
d0906c0
deps(turnstile): ➕ quick-error, serde, serde_json
barzamin Oct 1, 2018
b661a01
db: rename job_data column to data
barzamin Oct 1, 2018
d0deb0a
posticle: improve Job, Perform traits, fix visibility
barzamin Oct 1, 2018
b888c43
deps(turnstile): ➕ threadpool
barzamin Oct 4, 2018
38dfe97
db(jobs): add NewJobRecord struct
barzamin Oct 4, 2018
cb389e0
turnstile: implement basic worker ticking, thread pooling
barzamin Oct 4, 2018
41b7790
workers: implement job collector, add hardcoded test job
barzamin Oct 4, 2018
1cb9218
Merge branch 'master' into 57-jobqueue-poc
barzamin Oct 4, 2018
fb17145
ci: use postgres 9.6 (we need ≥9.4 for JSONB)
barzamin Oct 4, 2018
a16ee40
fmt: succ whitespace
barzamin Oct 5, 2018
8897d20
turnstile: update README.md
barzamin Oct 12, 2018
2974dab
Merge branch 'master' into 57-jobqueue-poc
barzamin Oct 12, 2018
3052ffd
workers: some cleanup
barzamin Oct 12, 2018
0d24397
docs: move workers to an internals/ subdir
barzamin Oct 16, 2018
e239f43
Merge branch 'master' into 57-jobqueue-poc
barzamin Nov 4, 2018
0b49e70
workers: switch to log macros
barzamin Nov 7, 2018
2bf2cb0
Merge branch 'master' into 57-jobqueue-poc
barzamin Jan 13, 2019
3875b78
activitypub: log serialization errors in AS2 newtype
barzamin Jan 13, 2019
350d6e8
turnstile::ExecutionContract: add immediate_fail ctor
barzamin Jan 13, 2019
0a64e5b
some additional debug logging
barzamin Jan 13, 2019
42c63fe
workers: more logging and fix a silly logic bug (sleeping must be last)
barzamin Jan 13, 2019
0603109
rustfmt: BIG CHUNGUS
barzamin Jan 13, 2019
7a50a81
Merge branch 'master' into 57-jobqueue-poc
barzamin Mar 18, 2019
4a02840
rustodonctl: add list-jobs command
barzamin Mar 28, 2019
d7120dc
workers: remove test job
barzamin Mar 29, 2019
48ee931
jobs: records and types should derive Clone (and Copy where possible)
barzamin Mar 29, 2019
abb9d0e
mark failed jobs as dead
barzamin Mar 29, 2019
5b90586
Revert "workers: remove test job"
barzamin Mar 29, 2019
9107485
turnstile: rewrite to use failure (remove quick_error)
barzamin Mar 29, 2019
722e672
workers::init: comment
barzamin Mar 29, 2019
6f3a87b
worker: fix bug deleting all jobs each tick
barzamin Mar 29, 2019
2099fec
rustfmt
barzamin Mar 29, 2019
08c60d3
rustodonctl: colorize state in list-jobs
barzamin Mar 30, 2019
f2e70f2
turnstile: use Result<(), turnstile::Error>, not Fallible<()>
barzamin Mar 30, 2019
ef8cba9
worker: start properly handling job failure
barzamin Apr 10, 2019
50b5339
implement RetryQueued state and overhaul execution contract for retries
barzamin May 23, 2019
edf5be6
worker: check should_run before run
barzamin May 23, 2019
e137224
worker: drop successful jobs, kill unsuccessfully deserialized jobs
barzamin May 23, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ rust:
addons:
config:
retries: true
postgresql: 9.6

cache:
cargo: true
directories:
- /home/travis/.rvm

services:
- postgresql

before_install:
- sudo apt-get update -yqq
Expand Down
141 changes: 113 additions & 28 deletions Cargo.lock

Large diffs are not rendered by default.

47 changes: 25 additions & 22 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,50 @@ default-run = "rustodon"
members = ['lib/resopt/', 'lib/posticle/']

[dependencies]
dotenv = "0.13"
maplit = "1.0"
dotenv = "0.13.0"
maplit = "1.0.1"
failure = "0.1.5"
failure_derive = "0.1.5"
lazy_static = "1.3"
lazy_static = "1.3.0"
itertools = "0.8.0"
resopt = { path = "lib/resopt/" }
structopt = "0.2.14"
structopt = "0.2.15"
prettytable-rs = { version = "0.8", default-features = false, features = ["win_crlf"] }

regex = "1.1.0"
regex = "1.1.2"
posticle = { path = "lib/posticle/" }

slog = "2.4"
slog-term = "2.4"
slog-async = "2.3"
slog-scope = "4.0"
rocket-slog = "0.4"
slog = "2.4.1"
slog-term = "2.4.0"
slog-async = "2.3.0"
slog-scope = "4.1.1"
rocket-slog = "0.4.0"

rocket = "0.4"
rocket = "0.4.0"

serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
validator = "0.8"
validator_derive = "0.8"
serde = "1.0.89"
serde_derive = "1.0.89"
serde_json = "1.0.39"
validator = "0.8.0"
validator_derive = "0.8.0"

askama = { version = "0.8.0", features = ["with-rocket"] }
ammonia = "2.0.0"

diesel = { version = "1.4.1", features = ["postgres", "chrono", "r2d2"] }
diesel = { version = "1.4.1", features = ["postgres", "chrono", "serde_json", "r2d2"] }
diesel-derive-enum = { version = "0.4.4", features = ["postgres"] }
flaken = "0.2.2"
turnstile = { path = "lib/turnstile/" }

chrono = "0.4"
chrono = "0.4.6"
chrono-humanize = "0.0.11"

openssl = "0.10.16"
pwhash = "0.3"
openssl = "0.10.19"
pwhash = "0.3.0"

base32 = "0.4"
base32 = "0.4.0"

[dependencies.rocket_contrib]
version = "0.4"
version = "0.4.0"
default-features = false
features = ["json"]
1 change: 1 addition & 0 deletions diesel.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[print_schema]
file = "src/db/schema.rs"
with_docs = true
import_types = ["diesel::sql_types::*", "crate::db::types::Job_status"]
16 changes: 16 additions & 0 deletions docs/internals/workers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
okay so this is how jobs are gonna work
* spawn "dispatch" thread
* this boy just sits there in a nice little loop {}
* tries to grab N jobs from the db
* if it can't, just yields its time slice
* figure out what jobs we should run _now_ (`can_run: Vec<&dyn Job>`)
* ```sql
UPDATE jobs SET jobstatus = INFLIGHT, updated = #{time.now} WHERE id IN can_run
```
* push into thread queue
* spawn "timeout" thread
* also doing a loop {}
* ```sql
SELECT jobid FROM jobs WHERE jobstatus = INFLIGHT, "updated" + "timeout" < #{time.now}
```
* do we need to kill inflight jobs?
11 changes: 11 additions & 0 deletions lib/turnstile/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "turnstile"
version = "0.1.0"
edition = "2018"
authors = ["Erin Moon <[email protected]>"]

[dependencies]
threadpool = "1.7"
serde = "1.0"
serde_json = "1.0"
failure = "0.1.5"
5 changes: 5 additions & 0 deletions lib/turnstile/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# turnstile
thanks for the name, tef and cosine (っ´ω`)っ

## what's this?
turnstile is a crate that wraps up a bunch of the types we use for job scheduling in one nice package.
46 changes: 46 additions & 0 deletions lib/turnstile/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use failure::Fail;
use serde_json;
use std::any::Any;
use std::fmt::{self, Debug, Display};
use std::sync::Mutex;

#[derive(Fail, Debug)]
pub enum Error {
#[fail(display = "Inner error in job: {}", _0)]
JobInnerError(#[fail(cause)] failure::Error),

#[fail(display = "Job panicked: {:?}", _0)]
JobPanicked(#[fail(cause)] SyncPanicError),

#[fail(display = "Error deserializing job data: {}", _0)]
DeserializeError(#[fail(cause)] serde_json::Error),

#[fail(display = "Invalid kind for job")]
InvalidKind,
}

pub struct SyncPanicError {
inner: Mutex<Box<dyn Any + Send + 'static>>,
}

impl SyncPanicError {
pub(crate) fn new(inner: Box<dyn Any + Send + 'static>) -> Self {
Self {
inner: Mutex::new(inner),
}
}
}

impl Display for SyncPanicError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
(*self.inner.lock().unwrap()).fmt(f)
}
}

impl Debug for SyncPanicError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
(*self.inner.lock().unwrap()).fmt(f)
}
}

impl Fail for SyncPanicError {}
55 changes: 55 additions & 0 deletions lib/turnstile/src/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use failure::Fallible;
use std::time::Duration;

pub trait Job {
/// Returns a textual identifier for this job.
fn kind() -> &'static str;

/// Returns true, if this job is due to execute.
fn should_run(&self) -> bool;

/// Returns the execution contract of this job.
fn execution_contract() -> ExecutionContract;
}

#[derive(Debug, Clone, Copy)]
pub enum Backoff {
ConstantWait(Duration),
Exponential { base: Duration },
}

#[derive(Debug, Clone, Copy)]
pub enum RetryBehavior {
Backoff(Backoff),
Immediate,
}

#[derive(Debug, Clone, Copy)]
pub enum PanicBehavior {
Fail,
Retry(RetryBehavior),
}

#[derive(Debug, Clone, Copy)]
pub struct ExecutionContract {
pub timeout: Option<Duration>,
pub retry_behavior: RetryBehavior,
pub autoretry: Option<RetryBehavior>,
pub panic: PanicBehavior,
}

impl ExecutionContract {
pub const fn new() -> Self {
Self {
panic: PanicBehavior::Fail,
retry_behavior: RetryBehavior::Immediate,
autoretry: None,
timeout: None,
}
}
}

pub trait Perform {
/// Runs this job's action.
fn perform(&self) -> Fallible<()>;
}
7 changes: 7 additions & 0 deletions lib/turnstile/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod error;
pub mod job;
pub mod worker;

pub use crate::error::Error;
pub use crate::job::{ExecutionContract, Job, PanicBehavior, Perform};
pub use crate::worker::Worker;
91 changes: 91 additions & 0 deletions lib/turnstile/src/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use serde::de::Deserialize;
use serde_json::{self, Value};
use std::collections::HashMap;
use std::panic::{self, RefUnwindSafe};
use std::sync::Arc;
use threadpool::{Builder as ThreadPoolBuilder, ThreadPool};

use crate::error::{Error, SyncPanicError};
use crate::job::{ExecutionContract, Job, Perform};

type HandlerFn = Box<(Fn(Value) -> Result<(), Error> + Send + Sync + 'static)>;
type ShouldRunFn = Box<(Fn(Value) -> Result<bool, Error> + Send + Sync + 'static)>;

pub struct Worker {
handlers: HashMap<&'static str, Arc<HandlerFn>>,
run_checks: HashMap<&'static str, Arc<ShouldRunFn>>,
execution_contracts: HashMap<&'static str, ExecutionContract>,
thread_pool: ThreadPool,
}

impl Worker {
pub fn new() -> Worker {
Worker {
handlers: HashMap::new(),
run_checks: HashMap::new(),
execution_contracts: HashMap::new(),
thread_pool: ThreadPoolBuilder::new()
.thread_name("worker_thread".to_string())
.build(),
}
}

pub fn register_job<J>(&mut self)
where
for<'de> J: Job + Perform + Deserialize<'de> + RefUnwindSafe,
{
self.handlers.insert(
J::kind(),
Arc::new(Box::new(|value| {
let job: J =
serde_json::from_value(value).map_err(|e| Error::DeserializeError(e.into()))?;

panic::catch_unwind(|| Perform::perform(&job))
.map_err(|panic| Error::JobPanicked(SyncPanicError::new(panic)))?
.map_err(Error::JobInnerError)?;

Ok(())
})),
);

self.run_checks.insert(
J::kind(),
Arc::new(Box::new(|value| {
let job: J =
serde_json::from_value(value).map_err(|e| Error::DeserializeError(e.into()))?;

Ok(job.should_run())
})),
);

self.execution_contracts
.insert(J::kind(), J::execution_contract());
}

pub fn job_tick(
&mut self,
kind: &str,
data: Value,
on_final: impl Fn(Result<(), Error>, ExecutionContract) + Send + 'static,
) -> Result<(), Error> {
let handler = self.handlers.get(kind).ok_or(Error::InvalidKind)?.clone();
let execution_contract = self
.execution_contracts
.get(kind)
.ok_or(Error::InvalidKind)?
.clone();
self.thread_pool.execute(move || {
let result = handler(data);

on_final(result, execution_contract);
});

Ok(())
}

pub fn should_run(&mut self, kind: &str, data: Value) -> Result<bool, Error> {
let run_check = self.run_checks.get(kind).ok_or(Error::InvalidKind)?.clone();

run_check(data)
}
}
2 changes: 2 additions & 0 deletions migrations/2018-07-31-044227_jobs_table/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE jobs;
DROP TYPE job_status;
12 changes: 12 additions & 0 deletions migrations/2018-07-31-044227_jobs_table/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TYPE job_status AS ENUM ('waiting', 'running', 'dead');

CREATE TABLE jobs (
id BIGINT PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,

status job_status NOT NULL,

queue VARCHAR NOT NULL,
kind VARCHAR NOT NULL,
data JSONB NOT NULL
)
2 changes: 2 additions & 0 deletions migrations/2019-05-14-225400_job_attempt_tracking/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE jobs
DROP COLUMN last_attempt;
2 changes: 2 additions & 0 deletions migrations/2019-05-14-225400_job_attempt_tracking/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE jobs
ADD COLUMN last_attempt TIMESTAMP WITH TIME ZONE NULL;