Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC - batched queue processing #99

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

gregplaysguitar
Copy link
Contributor

@gregplaysguitar gregplaysguitar commented Mar 16, 2020

@benjie this PR updates the job retrieval and update logic so it can handle batching. The batch size is hardcoded in the get_jobs function for now (limit 1) - once the concept is proved and accepted this would become a parameter to get_jobs and the calling code as needed.

All existing tests pass with batch size set to 1, and with batch size > 1, all pass except "runs jobs in parallel" which I guess has some implicit assumptions about worker picking up a single job at a time.

Performance-wise, running the included perf test I'm seeing job throughput increase with larger batches, as expected, but the baseline performance with batch size 1 is quite a bit slower than before (~600 vs ~1800 job/s). My assumption is that non-batched performance needs to stay pretty much the same, so keen for any feedback on what might be causing this. I'll keep playing around with it.

Performance impacts still need to be quantified for a real workload, which I'm planning to do in our app soon.

Let me know what you think, and in particular whether you think this is something you're keen to support in worker - if not, that's ok, we may end up maintaining a fork if we decide we definitely need batching.


TODO

Master branch is getting ~2150 jobs/s for me

The original batched get_jobs was performing really badly at ~680 jobs/s. Using a composite type instead of json internally increased that to ~840

Replacing the select array … into with a CTE got it right back up to ~1840 jobs/s at batch size 1. With batch size 4 that jumps to ~4400.
@gregplaysguitar
Copy link
Contributor Author

Update: I made some big perf improvements, see details in 863184c

@benjie
Copy link
Member

benjie commented Mar 18, 2020

Definitely interesting; I'm going to want to pull this down myself and have a play with it. In the mean time could you look at the changes in #73 and consider (but don't necessarily do it) putting them into this. In particular I think releasing the jobs in batches (e.g. once per second rather than every time a job completes) could make a difference although the performance testing I did on that before did not show this to be the case (I think I did something wrong in the perf tests). In particular releasing jobs in a batch should be independent from the jobs that were checked out (i.e. different batches).

Excellent work, and well done for noticing the issue with json_build_object being evaluated by the rows before filtering, this is an issue we suffer in PostGraphile from time to time also.

@gregplaysguitar
Copy link
Contributor Author

Thanks @benjie - I've had a look at #73, made a couple of comments there.

Batched job completion does seem like it would help. In my observation though, it's the get_job query that seems to be causing the bulk of the load when the system is under strain, so that's why I looked into batching that. But combining both ideas sounds like a good plan.

I think we would want to consolidate some APIs, there are a few different ways of doing things floating around now, e.g. complete_batch vs complete_jobs & fail_jobs - keen to take a steer from you on that as I have less context around different use-cases. If you do want to go ahead with this then perhaps it would be worth documenting the interfaces before we do any further work? I'd be happy to review

For my part I will be doing some deeper perf analysis (i.e. production-scale load testing) at some point soon and can include these changes as well as #73 in that.

@benjie
Copy link
Member

benjie commented Mar 20, 2020

Can you prove complete_batch is significantly faster than calling the complete_job / fail_job functions? If not, I’d drop it (it’s why I never merged #73).

@benjie
Copy link
Member

benjie commented Mar 20, 2020

As for get_jobs... what if we were to drop the DB function and inline it into the JS as a prepared statement? I don’t think we need get_job either. TBH only add_job is needed - it’s the only part of our public interface, everything else is internal. Perhaps prepared statements rather than function calls would perform better.

@gregplaysguitar
Copy link
Contributor Author

Can you prove complete_batch is significantly faster than calling the complete_job / fail_job functions?

Sure, I'll try that next time I'm working on this and let you know. I'm not sure it will be any different though, and I reckon having separate is probably more efficient given failing jobs should be a rare case. Also having them separate means we don't need to use plpgsql. I'd tend towards your implementation at this stage

Perhaps prepared statements rather than function calls would perform better.

Yes this might be true, but could we achieve the same thing by just making it a sql function instead of plpgsql? If we remove the worker_id check at the top then get_jobs becomes a single SQL statement with CTEs

@benjie
Copy link
Member

benjie commented Mar 20, 2020

If we remove the worker_id check at the top then get_jobs becomes a single SQL statement with CTEs

Yeah, I considered this; I think for this to be viable we should make it STRICT, and given we do so the default for task_identifiers should be an empty array rather than NULL, and thus the checks should be updated such that an empty array counts as "all identifiers". I'm not super comfortable with this though.

Actually, having given it more thought, lets just change it to SQL, drop that check, and add a CHECK constraint to the tables that asserts that (locked_by is null) = (locked_at is null). I don't think the CHECK will add significant performance cost; and besides it'd be write cost rather than read cost.

@benjie
Copy link
Member

benjie commented Apr 6, 2020

Hey @gregplaysguitar just checking in; how's your experimentation going?

@gregplaysguitar
Copy link
Contributor Author

@benjie sorry about the lack of action here. This is still something I want to look at but priorities have shifted a bit, so haven't got time right now. If you're happy to leave this PR here I'll come back to it when I can?

@benjie
Copy link
Member

benjie commented Apr 28, 2020

Sure 👍

@benjie benjie changed the base branch from master to main June 24, 2020 11:32
@benjie
Copy link
Member

benjie commented Aug 4, 2020

(Gentle nudge @gregplaysguitar)

@gregplaysguitar
Copy link
Contributor Author

@benjie I've finally been able to run some proper tests on this, and I'm seeing some good improvements with batching. I'm keen to invest the time to finish this implementation - do you have any suggestions for how to go about it, or should I just follow my nose?

Copy link
Member

@benjie benjie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm so excited you're working on this again Greg! Really love to squeeze every ounce of performance out of Postgres that we can!

I'm not happy with the current flushing of jobs. What seems to be happening right now is that you pull down a batch of jobs (let's say 20 jobs); then you execute these jobs, then you report back.

One of these jobs might take 5 minutes, all the rest might take 10ms. So it the process is killed 2.5 minutes in, all 20 jobs will not be released, even though 19 of them completed successfully a couple minutes ago. It also means the queue will be larger with more locked jobs, which makes finding new jobs more expensive, giving a performance impact.

Lets solve it instead by batching the release of the jobs. So each job should release as soon as it's done (success or failure), but that release can go to a queue that can be flushed every [configurable period] of time, which defaults to say 1 second. For 20 very fast tasks, 1 second should catch all of them. You can see my initial thoughts on this approach in #73

I'm also concerned about the linearity of these job executions. Say job 2 takes 5 minutes; that means that jobs 3-20 don't even start until 5 minutes later. I wonder if we should just run all the tasks pulled down in parallel; user configurable with a default of just pulling down 1 task to maintain current behaviour. This would give a different concurrency control setting (and maybe we'd deprecate the old one). What do you think?

@@ -125,6 +125,49 @@ begin
end;
$$;
ALTER FUNCTION graphile_worker.add_job(identifier text, payload json, queue_name text, run_at timestamp with time zone, max_attempts integer, job_key text, priority integer, flags text[]) OWNER TO graphile_worker_role;
CREATE FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json[]) RETURNS void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the mixture of json and array here; I'd rather that the argument was just json and we used json_array_elements.

Suggested change
CREATE FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json[]) RETURNS void
CREATE FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json) RETURNS void

However; I note a more troubling issue. PostgreSQL's JSON supports arbitrary precision numbers in JSON (I think); however Node is limited to IEEE754's 64bit floats, which gives ~53bits of integer safety, which is not enough to cover the bigint size of Graphile Worker's PKs. We can get around this by encoding the ID as a string, but we must be very careful to do so on the calling side always. I've not finished reading the code yet so haven't checked, but if there isn't already a note on this we should add one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to use a composite type here for failures?

create type graphile_worker.failure_info as (
  job_id bigint,
  failure_message text
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failures json seems a bit ambiguous to me, because we will always have an array here. I think composite type makes sense, I'll have to double check what my original reasoning was for not doing that, but it might not have been anything in particular

) and locked_by = worker_id;
end if;
end;
$$;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you drop the if statements you could convert this to sql rather than plpgsql which might have an impact on performance. Maybe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea anyway, always better to use plain sql if possible, imo

and (forbidden_flags is null or (flags ?| forbidden_flags) is not true)
order by priority asc, run_at asc, id asc
-- TODO make this a configurable value
limit 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the job_count the first argument to get_jobs.

Also maintaining both get_job and get_jobs is likely to be a pain; lets switch worker to using get_jobs (assuming no performance cost) and replace get_job(...) with a call to get_jobs(1, ...) for legacy support.

@@ -111,7 +109,9 @@ export function makeNewWorker(

// `doNext` cannot be executed concurrently, so we know this is safe.
// eslint-disable-next-line require-atomic-updates
activeJob = jobRow && jobRow.id ? jobRow : null;
const validJobs = jobRows.filter((r) => r.id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

???

@benjie
Copy link
Member

benjie commented Sep 23, 2020

@gregplaysguitar This is exciting! I suspect you're seeing good results in throughput, but potentially an increase in latency? Let's see if we can achieve a throughput increase without trading too much latency - make sure you benchmark with some slower tasks (e.g. try throwing a if (Math.random() < 0.2) {await sleep(1000)} in there) to see how the metrics change.

@gregplaysguitar
Copy link
Contributor Author

Ok, sounds good, and good suggestions. I'll aim to chip away at this among my other stuff.

Re latency - my testing methodology is to gradually increase load (and hence throughput) while monitoring our key operational latency metric, which encompasses graphile tasks plus a lot of other stuff going on in the db. With batching enabled the system coped with around 10-15% more load before breaching the latency SLO. I did see a little bit more latency early with batching, as you suggested, but not enough to be an issue. Hopefully with some of the suggested improvements we can get that back down.

@benjie
Copy link
Member

benjie commented Oct 23, 2023

Hey @gregplaysguitar do you have any interest in picking this back up? We've moved a lot of the logic from the migrations into the JS (e.g. getJob is now in JS: https://github.com/graphile/worker/blob/main/src/sql/getJob.ts) so iterating should be significantly less effort than before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants