Skip to content

Commit

Permalink
Merge pull request #7 from hummingbird-project/jobs-refactor
Browse files Browse the repository at this point in the history
Changes for jobs refactor in Hummingbird
  • Loading branch information
adam-fowler authored Mar 6, 2024
2 parents 8e7f196 + de29b3d commit de58ed4
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 264 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ let package = Package(
.library(name: "HummingbirdPostgres", targets: ["HummingbirdPostgres"]),
],
dependencies: [
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.3"),
.package(url: "https://github.com/hummingbird-project/hummingbird.git", branch: "2.x.x-jobs-refactor"),
.package(url: "https://github.com/vapor/postgres-nio", from: "1.20.0"),
],
targets: [
Expand Down
50 changes: 34 additions & 16 deletions Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import HummingbirdJobs
@_spi(ConnectionPool) import HummingbirdPostgres
import Logging
import NIOConcurrencyHelpers
import NIOCore
@_spi(ConnectionPool) import PostgresNIO

@_spi(ConnectionPool)
public final class HBPostgresJobQueue: HBJobQueue {
public final class HBPostgresQueue: HBJobQueueDriver {
public typealias JobID = UUID

/// what to do with failed/processing jobs from last time queue was handled
Expand Down Expand Up @@ -47,9 +48,9 @@ public final class HBPostgresJobQueue: HBJobQueue {
public init(
jobTable: String = "_hb_jobs",
jobQueueTable: String = "_hb_job_queue",
pendingJobsInitialization: HBPostgresJobQueue.JobInitialization = .doNothing,
failedJobsInitialization: HBPostgresJobQueue.JobInitialization = .rerun,
processingJobsInitialization: HBPostgresJobQueue.JobInitialization = .rerun,
pendingJobsInitialization: HBPostgresQueue.JobInitialization = .doNothing,
failedJobsInitialization: HBPostgresQueue.JobInitialization = .rerun,
processingJobsInitialization: HBPostgresQueue.JobInitialization = .rerun,
pollTime: Duration = .milliseconds(100)
) {
self.jobTable = jobTable
Expand All @@ -61,12 +62,19 @@ public final class HBPostgresJobQueue: HBJobQueue {
}
}

let client: PostgresClient
let configuration: Configuration
let logger: Logger
/// Postgres client used by Job queue
public let client: PostgresClient
/// Job queue configuration
public let configuration: Configuration
/// Logger used by queue
public let logger: Logger
let isStopped: NIOLockedValueBox<Bool>

/// Initialize a HBPostgresJobQueue
/// - Parameters:
/// - client: Postgres client
/// - configuration: Queue configuration
/// - logger: Logger used by queue
public init(client: PostgresClient, configuration: Configuration = .init(), logger: Logger) {
self.client = client
self.configuration = configuration
Expand All @@ -82,7 +90,7 @@ public final class HBPostgresJobQueue: HBJobQueue {
"""
CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobTable) (
id uuid PRIMARY KEY,
job json,
job bytea,
status smallint
)
""",
Expand Down Expand Up @@ -116,9 +124,9 @@ public final class HBPostgresJobQueue: HBJobQueue {

/// Push Job onto queue
/// - Returns: Identifier of queued job
@discardableResult public func push(_ job: HBJob) async throws -> JobID {
@discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID {
try await self.client.withConnection { connection in
let queuedJob = HBQueuedJob<JobID>(id: .init(), job: job)
let queuedJob = HBQueuedJob<JobID>(id: .init(), jobBuffer: buffer)
try await add(queuedJob, connection: connection)
try await addToQueue(jobId: queuedJob.id, connection: connection)
return queuedJob.id
Expand Down Expand Up @@ -179,10 +187,10 @@ public final class HBPostgresJobQueue: HBJobQueue {
do {
try await self.setStatus(jobId: jobId, status: .processing, connection: connection)
// if failed to find a job in the job table try getting another index
guard let job = try await stream2.decode(HBAnyCodableJob.self, context: .default).first(where: { _ in true }) else {
guard let buffer = try await stream2.decode(ByteBuffer.self, context: .default).first(where: { _ in true }) else {
continue
}
return HBQueuedJob(id: jobId, job: job.job)
return HBQueuedJob(id: jobId, jobBuffer: buffer)
} catch {
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
throw JobQueueError.decodeJobFailed
Expand All @@ -199,7 +207,7 @@ public final class HBPostgresJobQueue: HBJobQueue {
try await connection.query(
"""
INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status)
VALUES (\(job.id), \(job.anyCodableJob), \(Status.pending))
VALUES (\(job.id), \(job.jobBuffer), \(Status.pending))
""",
logger: self.logger
)
Expand Down Expand Up @@ -262,9 +270,9 @@ public final class HBPostgresJobQueue: HBJobQueue {
}

/// extend HBPostgresJobQueue to conform to AsyncSequence
extension HBPostgresJobQueue {
extension HBPostgresQueue {
public struct AsyncIterator: AsyncIteratorProtocol {
let queue: HBPostgresJobQueue
let queue: HBPostgresQueue

public func next() async throws -> Element? {
while true {
Expand All @@ -285,4 +293,14 @@ extension HBPostgresJobQueue {
}
}

extension HBAnyCodableJob: PostgresCodable {}
@_spi(ConnectionPool)
extension HBJobQueueDriver where Self == HBPostgresQueue {
/// Return Postgres driver for Job Queue
/// - Parameters:
/// - client: Postgres client
/// - configuration: Queue configuration
/// - logger: Logger used by queue
public static func postgres(client: PostgresClient, configuration: HBPostgresQueue.Configuration = .init(), logger: Logger) -> Self {
.init(client: client, configuration: configuration, logger: logger)
}
}
Loading

0 comments on commit de58ed4

Please sign in to comment.