Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use migration system for JobQueue and Persist frameworks #8

Merged
merged 11 commits into from
Mar 7, 2024
48 changes: 48 additions & 0 deletions Sources/HummingbirdJobsPostgres/Migrations/CreateJobQueue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO

struct CreateJobQueue: HBPostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS _hb_job_queue (
job_id uuid PRIMARY KEY,
createdAt timestamp with time zone
)
""",
logger: logger
)
try await connection.query(
"""
CREATE INDEX IF NOT EXISTS _hb_job_queueidx
ON _hb_job_queue (createdAt ASC)
""",
logger: logger
)
}

func revert(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"DROP TABLE _hb_job_queue",
logger: logger
)
}

var name: String { "_Create_JobQueue_Table_" }
var group: HBMigrationGroup { .jobQueue }
}
48 changes: 48 additions & 0 deletions Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO

struct CreateJobs: HBPostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS _hb_jobs (
id uuid PRIMARY KEY,
job bytea,
status smallint,
lastModified TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
)
""",
logger: logger
)
}

func revert(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"DROP TABLE _hb_jobs",
logger: logger
)
}

var name: String { "_Create_Jobs_Table_" }
var group: HBMigrationGroup { .jobQueue }
}

extension HBMigrationGroup {
/// JobQueue migration group
public static var jobQueue: Self { .init("_hb_jobqueue") }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this better off as such:

Suggested change
public static var jobQueue: Self { .init("_hb_jobqueue") }
public static var jobQueue: Self { .init("_hb_pg_jobqueue") }

}
89 changes: 38 additions & 51 deletions Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import HummingbirdJobs
@_spi(ConnectionPool) import HummingbirdPostgres
Expand Down Expand Up @@ -38,23 +52,17 @@ public final class HBPostgresQueue: HBJobQueueDriver {

/// Queue configuration
public struct Configuration: Sendable {
let jobTable: String
let jobQueueTable: String
let pendingJobsInitialization: JobInitialization
let failedJobsInitialization: JobInitialization
let processingJobsInitialization: JobInitialization
let pollTime: Duration

public init(
jobTable: String = "_hb_jobs",
jobQueueTable: String = "_hb_job_queue",
pendingJobsInitialization: HBPostgresQueue.JobInitialization = .doNothing,
failedJobsInitialization: HBPostgresQueue.JobInitialization = .rerun,
processingJobsInitialization: HBPostgresQueue.JobInitialization = .rerun,
pendingJobsInitialization: JobInitialization = .doNothing,
failedJobsInitialization: JobInitialization = .rerun,
processingJobsInitialization: JobInitialization = .rerun,
pollTime: Duration = .milliseconds(100)
) {
self.jobTable = jobTable
self.jobQueueTable = jobQueueTable
self.pendingJobsInitialization = pendingJobsInitialization
self.failedJobsInitialization = failedJobsInitialization
self.processingJobsInitialization = processingJobsInitialization
Expand All @@ -68,50 +76,28 @@ public final class HBPostgresQueue: HBJobQueueDriver {
public let configuration: Configuration
/// Logger used by queue
public let logger: Logger

let migrations: HBPostgresMigrations
let isStopped: NIOLockedValueBox<Bool>

/// Initialize a HBPostgresJobQueue
/// - Parameters:
/// - client: Postgres client
/// - configuration: Queue configuration
/// - logger: Logger used by queue
public init(client: PostgresClient, configuration: Configuration = .init(), logger: Logger) {
public init(client: PostgresClient, migrations: HBPostgresMigrations, configuration: Configuration = .init(), logger: Logger) async {
self.client = client
self.configuration = configuration
self.logger = logger
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateJobs())
await migrations.add(CreateJobQueue())
}

/// Run on initialization of the job queue
public func onInit() async throws {
do {
self.logger.info("Waiting for JobQueue migrations")
try await self.migrations.waitUntilCompleted()
_ = try await self.client.withConnection { connection in
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobTable) (
id uuid PRIMARY KEY,
job bytea,
status smallint
)
""",
logger: self.logger
)
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable) (
job_id uuid PRIMARY KEY,
createdAt timestamp with time zone
)
""",
logger: self.logger
)
try await connection.query(
"""
CREATE INDEX IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable)idx
ON \(unescaped: self.configuration.jobQueueTable) (createdAt ASC)
""",
logger: self.logger
)
self.logger.info("Update Jobs at initialization")
try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection)
try await self.updateJobsOnInit(withStatus: .processing, onInit: self.configuration.processingJobsInitialization, connection: connection)
try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection)
Expand Down Expand Up @@ -163,10 +149,10 @@ public final class HBPostgresQueue: HBJobQueueDriver {
let stream = try await connection.query(
"""
DELETE
FROM \(unescaped: self.configuration.jobQueueTable) pse
FROM _hb_job_queue pse
WHERE pse.job_id =
(SELECT pse_inner.job_id
FROM \(unescaped: self.configuration.jobQueueTable) pse_inner
FROM _hb_job_queue pse_inner
ORDER BY pse_inner.createdAt ASC
FOR UPDATE SKIP LOCKED
LIMIT 1)
Expand All @@ -180,7 +166,7 @@ public final class HBPostgresQueue: HBJobQueueDriver {
}
// select job from job table
let stream2 = try await connection.query(
"SELECT job FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)",
"SELECT job FROM _hb_jobs WHERE id = \(jobId)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tables should probably be namespaced to pg right?

logger: self.logger
)

Expand All @@ -206,7 +192,7 @@ public final class HBPostgresQueue: HBJobQueueDriver {
func add(_ job: HBQueuedJob<JobID>, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status)
INSERT INTO _hb_jobs (id, job, status)
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending))
""",
logger: self.logger
Expand All @@ -215,31 +201,31 @@ public final class HBPostgresQueue: HBJobQueueDriver {

func delete(jobId: JobID, connection: PostgresConnection) async throws {
try await connection.query(
"DELETE FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)",
"DELETE FROM _hb_jobs WHERE id = \(jobId)",
logger: self.logger
)
}

func addToQueue(jobId: JobID, connection: PostgresConnection) async throws {
try await connection.query(
"""
INSERT INTO \(unescaped: self.configuration.jobQueueTable) (job_id, createdAt) VALUES (\(jobId), \(Date.now))
INSERT INTO _hb_job_queue (job_id, createdAt) VALUES (\(jobId), \(Date.now))
""",
logger: self.logger
)
}

func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws {
try await connection.query(
"UPDATE \(unescaped: self.configuration.jobTable) SET status = \(status) WHERE id = \(jobId)",
"UPDATE _hb_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
logger: self.logger
)
}

func getJobs(withStatus status: Status) async throws -> [JobID] {
return try await self.client.withConnection { connection in
let stream = try await connection.query(
"SELECT id FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)",
"SELECT id FROM _hb_jobs WHERE status = \(status)",
logger: self.logger
)
var jobs: [JobID] = []
Expand All @@ -254,12 +240,13 @@ public final class HBPostgresQueue: HBJobQueueDriver {
switch onInit {
case .remove:
try await connection.query(
"DELETE FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)",
"DELETE FROM _hb_jobs WHERE status = \(status)",
logger: self.logger
)
case .rerun:
guard status != .pending else { return }
let jobs = try await getJobs(withStatus: status)
self.logger.info("Moving \(jobs.count) jobs with status: \(status) to job queue")
for jobId in jobs {
try await self.addToQueue(jobId: jobId, connection: connection)
}
Expand Down Expand Up @@ -300,7 +287,7 @@ extension HBJobQueueDriver where Self == HBPostgresQueue {
/// - client: Postgres client
/// - configuration: Queue configuration
/// - logger: Logger used by queue
public static func postgres(client: PostgresClient, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) -> Self {
.init(client: client, configuration: configuration, logger: logger)
public static func postgres(client: PostgresClient, migrations: HBPostgresMigrations, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) async -> Self {
await Self(client: client, migrations: migrations, configuration: configuration, logger: logger)
}
}
46 changes: 46 additions & 0 deletions Sources/HummingbirdPostgres/CreatePersistTable.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Logging
@_spi(ConnectionPool) import PostgresNIO

struct CreatePersistTable: HBPostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS _hb_persist (
"id" text PRIMARY KEY,
"data" json NOT NULL,
"expires" timestamp with time zone NOT NULL
)
""",
logger: logger
)
}

func revert(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"DROP TABLE _hb_persist",
logger: logger
)
}

var name: String { "_Create_Persist_Table_" }
var group: HBMigrationGroup { .persist }
}

extension HBMigrationGroup {
/// Persist driver migration group
public static var persist: Self { .init("_hb_persist") }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise

Suggested change
public static var persist: Self { .init("_hb_persist") }
public static var persist: Self { .init("_hb_pg_persist") }

}
9 changes: 6 additions & 3 deletions Sources/HummingbirdPostgres/Migrations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public actor HBPostgresMigrations {
/// - dryRun: Should migrations actually be applied, or should we just report what would be applied and reverted
@_spi(ConnectionPool)
public func apply(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, dryRun: dryRun)
try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, completeMigrations: true, dryRun: dryRun)
}

/// Revery database migrations
Expand All @@ -75,14 +75,15 @@ public actor HBPostgresMigrations {
/// - dryRun: Should migrations actually be reverted, or should we just report what would be reverted
@_spi(ConnectionPool)
public func revert(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, dryRun: dryRun)
try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, completeMigrations: false, dryRun: dryRun)
}

private func migrate(
client: PostgresClient,
migrations: [HBPostgresMigration],
groups: [HBMigrationGroup],
logger: Logger,
completeMigrations: Bool,
dryRun: Bool
) async throws {
switch self.state {
Expand Down Expand Up @@ -151,7 +152,9 @@ public actor HBPostgresMigrations {
self.setFailed(error)
throw error
}
self.setCompleted()
if completeMigrations {
self.setCompleted()
}
}

/// Report if the migration process has completed
Expand Down
Loading
Loading