-
-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use migration system for JobQueue and Persist frameworks #8
Merged
Merged
Changes from 10 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
49b26a9
Add migrations for HBPostgresJobQueue
adam-fowler cace687
Add persist migration
adam-fowler 28a831e
Set job queue group
adam-fowler 77ae661
Persist driver uses migrations
adam-fowler 7c0470f
Set migration group for jobs queue and persist
adam-fowler 3c94c17
Add Job last modified
adam-fowler 345f0b6
Revert migrations in tests, don't flag migrations as complete in a re…
adam-fowler 1de57aa
Set job queue label as test name
adam-fowler 2936688
Hopefully fix testRerunAtStartup
adam-fowler f027180
Update log level for onInit logging
adam-fowler 7b45b60
Rename tables to include pg
adam-fowler File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
48 changes: 48 additions & 0 deletions
48
Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// This source file is part of the Hummingbird server framework project | ||
// | ||
// Copyright (c) 2024 the Hummingbird authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
import HummingbirdPostgres | ||
import Logging | ||
@_spi(ConnectionPool) import PostgresNIO | ||
|
||
struct CreateJobQueue: HBPostgresMigration { | ||
func apply(connection: PostgresConnection, logger: Logger) async throws { | ||
try await connection.query( | ||
""" | ||
CREATE TABLE IF NOT EXISTS _hb_job_queue ( | ||
job_id uuid PRIMARY KEY, | ||
createdAt timestamp with time zone | ||
) | ||
""", | ||
logger: logger | ||
) | ||
try await connection.query( | ||
""" | ||
CREATE INDEX IF NOT EXISTS _hb_job_queueidx | ||
ON _hb_job_queue (createdAt ASC) | ||
""", | ||
logger: logger | ||
) | ||
} | ||
|
||
func revert(connection: PostgresConnection, logger: Logger) async throws { | ||
try await connection.query( | ||
"DROP TABLE _hb_job_queue", | ||
logger: logger | ||
) | ||
} | ||
|
||
var name: String { "_Create_JobQueue_Table_" } | ||
var group: HBMigrationGroup { .jobQueue } | ||
} |
48 changes: 48 additions & 0 deletions
48
Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// This source file is part of the Hummingbird server framework project | ||
// | ||
// Copyright (c) 2024 the Hummingbird authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
import HummingbirdPostgres | ||
import Logging | ||
@_spi(ConnectionPool) import PostgresNIO | ||
|
||
struct CreateJobs: HBPostgresMigration { | ||
func apply(connection: PostgresConnection, logger: Logger) async throws { | ||
try await connection.query( | ||
""" | ||
CREATE TABLE IF NOT EXISTS _hb_jobs ( | ||
id uuid PRIMARY KEY, | ||
job bytea, | ||
status smallint, | ||
lastModified TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP | ||
) | ||
""", | ||
logger: logger | ||
) | ||
} | ||
|
||
func revert(connection: PostgresConnection, logger: Logger) async throws { | ||
try await connection.query( | ||
"DROP TABLE _hb_jobs", | ||
logger: logger | ||
) | ||
} | ||
|
||
var name: String { "_Create_Jobs_Table_" } | ||
var group: HBMigrationGroup { .jobQueue } | ||
} | ||
|
||
extension HBMigrationGroup { | ||
/// JobQueue migration group | ||
public static var jobQueue: Self { .init("_hb_jobqueue") } | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,17 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// This source file is part of the Hummingbird server framework project | ||
// | ||
// Copyright (c) 2024 the Hummingbird authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
import Foundation | ||
import HummingbirdJobs | ||
@_spi(ConnectionPool) import HummingbirdPostgres | ||
|
@@ -38,23 +52,17 @@ public final class HBPostgresQueue: HBJobQueueDriver { | |
|
||
/// Queue configuration | ||
public struct Configuration: Sendable { | ||
let jobTable: String | ||
let jobQueueTable: String | ||
let pendingJobsInitialization: JobInitialization | ||
let failedJobsInitialization: JobInitialization | ||
let processingJobsInitialization: JobInitialization | ||
let pollTime: Duration | ||
|
||
public init( | ||
jobTable: String = "_hb_jobs", | ||
jobQueueTable: String = "_hb_job_queue", | ||
pendingJobsInitialization: HBPostgresQueue.JobInitialization = .doNothing, | ||
failedJobsInitialization: HBPostgresQueue.JobInitialization = .rerun, | ||
processingJobsInitialization: HBPostgresQueue.JobInitialization = .rerun, | ||
pendingJobsInitialization: JobInitialization = .doNothing, | ||
failedJobsInitialization: JobInitialization = .rerun, | ||
processingJobsInitialization: JobInitialization = .rerun, | ||
pollTime: Duration = .milliseconds(100) | ||
) { | ||
self.jobTable = jobTable | ||
self.jobQueueTable = jobQueueTable | ||
self.pendingJobsInitialization = pendingJobsInitialization | ||
self.failedJobsInitialization = failedJobsInitialization | ||
self.processingJobsInitialization = processingJobsInitialization | ||
|
@@ -68,50 +76,28 @@ public final class HBPostgresQueue: HBJobQueueDriver { | |
public let configuration: Configuration | ||
/// Logger used by queue | ||
public let logger: Logger | ||
|
||
let migrations: HBPostgresMigrations | ||
let isStopped: NIOLockedValueBox<Bool> | ||
|
||
/// Initialize a HBPostgresJobQueue | ||
/// - Parameters: | ||
/// - client: Postgres client | ||
/// - configuration: Queue configuration | ||
/// - logger: Logger used by queue | ||
public init(client: PostgresClient, configuration: Configuration = .init(), logger: Logger) { | ||
public init(client: PostgresClient, migrations: HBPostgresMigrations, configuration: Configuration = .init(), logger: Logger) async { | ||
self.client = client | ||
self.configuration = configuration | ||
self.logger = logger | ||
self.isStopped = .init(false) | ||
self.migrations = migrations | ||
await migrations.add(CreateJobs()) | ||
await migrations.add(CreateJobQueue()) | ||
} | ||
|
||
/// Run on initialization of the job queue | ||
public func onInit() async throws { | ||
do { | ||
self.logger.info("Waiting for JobQueue migrations") | ||
try await self.migrations.waitUntilCompleted() | ||
_ = try await self.client.withConnection { connection in | ||
try await connection.query( | ||
""" | ||
CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobTable) ( | ||
id uuid PRIMARY KEY, | ||
job bytea, | ||
status smallint | ||
) | ||
""", | ||
logger: self.logger | ||
) | ||
try await connection.query( | ||
""" | ||
CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable) ( | ||
job_id uuid PRIMARY KEY, | ||
createdAt timestamp with time zone | ||
) | ||
""", | ||
logger: self.logger | ||
) | ||
try await connection.query( | ||
""" | ||
CREATE INDEX IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable)idx | ||
ON \(unescaped: self.configuration.jobQueueTable) (createdAt ASC) | ||
""", | ||
logger: self.logger | ||
) | ||
self.logger.info("Update Jobs at initialization") | ||
try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection) | ||
try await self.updateJobsOnInit(withStatus: .processing, onInit: self.configuration.processingJobsInitialization, connection: connection) | ||
try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection) | ||
|
@@ -163,10 +149,10 @@ public final class HBPostgresQueue: HBJobQueueDriver { | |
let stream = try await connection.query( | ||
""" | ||
DELETE | ||
FROM \(unescaped: self.configuration.jobQueueTable) pse | ||
FROM _hb_job_queue pse | ||
WHERE pse.job_id = | ||
(SELECT pse_inner.job_id | ||
FROM \(unescaped: self.configuration.jobQueueTable) pse_inner | ||
FROM _hb_job_queue pse_inner | ||
ORDER BY pse_inner.createdAt ASC | ||
FOR UPDATE SKIP LOCKED | ||
LIMIT 1) | ||
|
@@ -180,7 +166,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { | |
} | ||
// select job from job table | ||
let stream2 = try await connection.query( | ||
"SELECT job FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)", | ||
"SELECT job FROM _hb_jobs WHERE id = \(jobId)", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These tables should probably be namespaced to pg right? |
||
logger: self.logger | ||
) | ||
|
||
|
@@ -206,7 +192,7 @@ public final class HBPostgresQueue: HBJobQueueDriver { | |
func add(_ job: HBQueuedJob<JobID>, connection: PostgresConnection) async throws { | ||
try await connection.query( | ||
""" | ||
INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status) | ||
INSERT INTO _hb_jobs (id, job, status) | ||
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending)) | ||
""", | ||
logger: self.logger | ||
|
@@ -215,31 +201,31 @@ public final class HBPostgresQueue: HBJobQueueDriver { | |
|
||
func delete(jobId: JobID, connection: PostgresConnection) async throws { | ||
try await connection.query( | ||
"DELETE FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)", | ||
"DELETE FROM _hb_jobs WHERE id = \(jobId)", | ||
logger: self.logger | ||
) | ||
} | ||
|
||
func addToQueue(jobId: JobID, connection: PostgresConnection) async throws { | ||
try await connection.query( | ||
""" | ||
INSERT INTO \(unescaped: self.configuration.jobQueueTable) (job_id, createdAt) VALUES (\(jobId), \(Date.now)) | ||
INSERT INTO _hb_job_queue (job_id, createdAt) VALUES (\(jobId), \(Date.now)) | ||
""", | ||
logger: self.logger | ||
) | ||
} | ||
|
||
func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws { | ||
try await connection.query( | ||
"UPDATE \(unescaped: self.configuration.jobTable) SET status = \(status) WHERE id = \(jobId)", | ||
"UPDATE _hb_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)", | ||
logger: self.logger | ||
) | ||
} | ||
|
||
func getJobs(withStatus status: Status) async throws -> [JobID] { | ||
return try await self.client.withConnection { connection in | ||
let stream = try await connection.query( | ||
"SELECT id FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)", | ||
"SELECT id FROM _hb_jobs WHERE status = \(status)", | ||
logger: self.logger | ||
) | ||
var jobs: [JobID] = [] | ||
|
@@ -254,12 +240,13 @@ public final class HBPostgresQueue: HBJobQueueDriver { | |
switch onInit { | ||
case .remove: | ||
try await connection.query( | ||
"DELETE FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)", | ||
"DELETE FROM _hb_jobs WHERE status = \(status)", | ||
logger: self.logger | ||
) | ||
case .rerun: | ||
guard status != .pending else { return } | ||
let jobs = try await getJobs(withStatus: status) | ||
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue") | ||
for jobId in jobs { | ||
try await self.addToQueue(jobId: jobId, connection: connection) | ||
} | ||
|
@@ -300,7 +287,7 @@ extension HBJobQueueDriver where Self == HBPostgresQueue { | |
/// - client: Postgres client | ||
/// - configuration: Queue configuration | ||
/// - logger: Logger used by queue | ||
public static func postgres(client: PostgresClient, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) -> Self { | ||
.init(client: client, configuration: configuration, logger: logger) | ||
public static func postgres(client: PostgresClient, migrations: HBPostgresMigrations, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) async -> Self { | ||
await Self(client: client, migrations: migrations, configuration: configuration, logger: logger) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,46 @@ | ||||||
//===----------------------------------------------------------------------===// | ||||||
// | ||||||
// This source file is part of the Hummingbird server framework project | ||||||
// | ||||||
// Copyright (c) 2024 the Hummingbird authors | ||||||
// Licensed under Apache License v2.0 | ||||||
// | ||||||
// See LICENSE.txt for license information | ||||||
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors | ||||||
// | ||||||
// SPDX-License-Identifier: Apache-2.0 | ||||||
// | ||||||
//===----------------------------------------------------------------------===// | ||||||
|
||||||
import Logging | ||||||
@_spi(ConnectionPool) import PostgresNIO | ||||||
|
||||||
struct CreatePersistTable: HBPostgresMigration { | ||||||
func apply(connection: PostgresConnection, logger: Logger) async throws { | ||||||
try await connection.query( | ||||||
""" | ||||||
CREATE TABLE IF NOT EXISTS _hb_persist ( | ||||||
"id" text PRIMARY KEY, | ||||||
"data" json NOT NULL, | ||||||
"expires" timestamp with time zone NOT NULL | ||||||
) | ||||||
""", | ||||||
logger: logger | ||||||
) | ||||||
} | ||||||
|
||||||
func revert(connection: PostgresConnection, logger: Logger) async throws { | ||||||
try await connection.query( | ||||||
"DROP TABLE _hb_persist", | ||||||
logger: logger | ||||||
) | ||||||
} | ||||||
|
||||||
var name: String { "_Create_Persist_Table_" } | ||||||
var group: HBMigrationGroup { .persist } | ||||||
} | ||||||
|
||||||
extension HBMigrationGroup { | ||||||
/// Persist driver migration group | ||||||
public static var persist: Self { .init("_hb_persist") } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise
Suggested change
|
||||||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this better off as such: