diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5189e0a..5100843 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,7 @@ jobs: # file: info.lcov linux: runs-on: ubuntu-latest + timeout-minutes: 15 strategy: matrix: image: diff --git a/.gitignore b/.gitignore index f8c5fd7..f0a40d9 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ DerivedData/ .swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata .netrc .vscode +Package.resolved diff --git a/Package.resolved b/Package.resolved deleted file mode 100644 index aac1cd6..0000000 --- a/Package.resolved +++ /dev/null @@ -1,194 +0,0 @@ -{ - "pins" : [ - { - "identity" : "async-http-client", - "kind" : "remoteSourceControl", - "location" : "https://github.com/swift-server/async-http-client.git", - "state" : { - "revision" : "291438696abdd48d2a83b52465c176efbd94512b", - "version" : "1.20.1" - } - }, - { - "identity" : "hummingbird", - "kind" : "remoteSourceControl", - "location" : "https://github.com/hummingbird-project/hummingbird.git", - "state" : { - "revision" : "5c1f2eab53fd03e6d290ac27f99b2549bac2c9f4", - "version" : "2.0.0-alpha.2" - } - }, - { - "identity" : "postgres-nio", - "kind" : "remoteSourceControl", - "location" : "https://github.com/vapor/postgres-nio", - "state" : { - "revision" : "69ccfdf4c80144d845e3b439961b7ec6cd7ae33f", - "version" : "1.20.2" - } - }, - { - "identity" : "swift-algorithms", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-algorithms", - "state" : { - "revision" : "f6919dfc309e7f1b56224378b11e28bab5bccc42", - "version" : "1.2.0" - } - }, - { - "identity" : "swift-async-algorithms", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-async-algorithms.git", - "state" : { - "revision" : "da4e36f86544cdf733a40d59b3a2267e3a7bbf36", - "version" : "1.0.0" - } - }, - { - "identity" : "swift-atomics", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-atomics.git", - "state" : { - "revision" : "cd142fd2f64be2100422d658e7411e39489da985", - "version" : "1.2.0" - } - }, - { - "identity" : "swift-collections", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-collections.git", - "state" : { - "revision" : "94cf62b3ba8d4bed62680a282d4c25f9c63c2efb", - "version" : "1.1.0" - } - }, - { - "identity" : "swift-crypto", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-crypto.git", - "state" : { - "revision" : "cc76b894169a3c86b71bac10c78a4db6beb7a9ad", - "version" : "3.2.0" - } - }, - { - "identity" : "swift-distributed-tracing", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-distributed-tracing.git", - "state" : { - "revision" : "7fbb8b23b77ee548b3d0686b6faf735c1b3c7cb8", - "version" : "1.1.0" - } - }, - { - "identity" : "swift-http-types", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-http-types.git", - "state" : { - "revision" : "12358d55a3824bd5fed310b999ea8cf83a9a1a65", - "version" : "1.0.3" - } - }, - { - "identity" : "swift-log", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-log.git", - "state" : { - "revision" : "e97a6fcb1ab07462881ac165fdbb37f067e205d5", - "version" : "1.5.4" - } - }, - { - "identity" : "swift-metrics", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-metrics.git", - "state" : { - "revision" : "971ba26378ab69c43737ee7ba967a896cb74c0d1", - "version" : "2.4.1" - } - }, - { - "identity" : "swift-nio", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio.git", - "state" : { - "revision" : "635b2589494c97e48c62514bc8b37ced762e0a62", - "version" : "2.63.0" - } - }, - { - "identity" : "swift-nio-extras", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio-extras.git", - "state" : { - "revision" : "363da63c1966405764f380c627409b2f9d9e710b", - "version" : "1.21.0" - } - }, - { - "identity" : "swift-nio-http2", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio-http2.git", - "state" : { - "revision" : "0904bf0feb5122b7e5c3f15db7df0eabe623dd87", - "version" : "1.30.0" - } - }, - { - "identity" : "swift-nio-ssl", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio-ssl.git", - "state" : { - "revision" : "7c381eb6083542b124a6c18fae742f55001dc2b5", - "version" : "2.26.0" - } - }, - { - "identity" : "swift-nio-transport-services", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio-transport-services.git", - "state" : { - "revision" : "6cbe0ed2b394f21ab0d46b9f0c50c6be964968ce", - "version" : "1.20.1" - } - }, - { - "identity" : "swift-numerics", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-numerics.git", - "state" : { - "revision" : "0a5bc04095a675662cf24757cc0640aa2204253b", - "version" : "1.0.2" - } - }, - { - "identity" : "swift-service-context", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-service-context.git", - "state" : { - "revision" : "ce0141c8f123132dbd02fd45fea448018762df1b", - "version" : "1.0.0" - } - }, - { - "identity" : "swift-service-lifecycle", - "kind" : "remoteSourceControl", - "location" : "https://github.com/swift-server/swift-service-lifecycle.git", - "state" : { - "revision" : "55f45e39dd23c6cad82d4d529e22961cb5e493aa", - "version" : "2.4.0" - } - }, - { - "identity" : "swift-system", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-system.git", - "state" : { - "revision" : "025bcb1165deab2e20d4eaba79967ce73013f496", - "version" : "1.2.1" - } - } - ], - "version" : 2 -} diff --git a/Package.swift b/Package.swift index 9140f16..b551780 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( .library(name: "HummingbirdPostgres", targets: ["HummingbirdPostgres"]), ], dependencies: [ - .package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.2"), + .package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-alpha.3"), .package(url: "https://github.com/vapor/postgres-nio", from: "1.20.0"), ], targets: [ @@ -21,10 +21,19 @@ let package = Package( .product(name: "PostgresNIO", package: "postgres-nio"), ] ), + .target( + name: "HummingbirdJobsPostgres", + dependencies: [ + "HummingbirdPostgres", + .product(name: "HummingbirdJobs", package: "hummingbird"), + .product(name: "PostgresNIO", package: "postgres-nio"), + ] + ), .testTarget( name: "HummingbirdPostgresTests", dependencies: [ "HummingbirdPostgres", + "HummingbirdJobsPostgres", .product(name: "HummingbirdXCT", package: "hummingbird"), ] ), diff --git a/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift new file mode 100644 index 0000000..d06c603 --- /dev/null +++ b/Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift @@ -0,0 +1,288 @@ +import Foundation +import HummingbirdJobs +@_spi(ConnectionPool) import HummingbirdPostgres +import Logging +import NIOConcurrencyHelpers +@_spi(ConnectionPool) import PostgresNIO + +@_spi(ConnectionPool) +public final class HBPostgresJobQueue: HBJobQueue { + public typealias JobID = UUID + + /// what to do with failed/processing jobs from last time queue was handled + public enum JobInitialization: Sendable { + case doNothing + case rerun + case remove + } + + /// Errors thrown by HBPostgresJobQueue + public enum PostgresQueueError: Error, CustomStringConvertible { + case failedToAdd + + public var description: String { + switch self { + case .failedToAdd: + return "Failed to add job to queue" + } + } + } + + /// Job Status + enum Status: Int16, PostgresCodable { + case pending = 0 + case processing = 1 + case failed = 2 + } + + /// Queue configuration + public struct Configuration: Sendable { + let jobTable: String + let jobQueueTable: String + let pendingJobsInitialization: JobInitialization + let failedJobsInitialization: JobInitialization + let processingJobsInitialization: JobInitialization + let pollTime: Duration + + public init( + jobTable: String = "_hb_jobs", + jobQueueTable: String = "_hb_job_queue", + pendingJobsInitialization: HBPostgresJobQueue.JobInitialization = .doNothing, + failedJobsInitialization: HBPostgresJobQueue.JobInitialization = .rerun, + processingJobsInitialization: HBPostgresJobQueue.JobInitialization = .rerun, + pollTime: Duration = .milliseconds(100) + ) { + self.jobTable = jobTable + self.jobQueueTable = jobQueueTable + self.pendingJobsInitialization = pendingJobsInitialization + self.failedJobsInitialization = failedJobsInitialization + self.processingJobsInitialization = processingJobsInitialization + self.pollTime = pollTime + } + } + + let client: PostgresClient + let configuration: Configuration + let logger: Logger + let isStopped: NIOLockedValueBox + + /// Initialize a HBPostgresJobQueue + public init(client: PostgresClient, configuration: Configuration = .init(), logger: Logger) { + self.client = client + self.configuration = configuration + self.logger = logger + self.isStopped = .init(false) + } + + /// Run on initialization of the job queue + public func onInit() async throws { + do { + _ = try await self.client.withConnection { connection in + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobTable) ( + id uuid PRIMARY KEY, + job json, + status smallint + ) + """, + logger: self.logger + ) + try await connection.query( + """ + CREATE TABLE IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable) ( + job_id uuid PRIMARY KEY, + createdAt timestamp with time zone + ) + """, + logger: self.logger + ) + try await connection.query( + """ + CREATE INDEX IF NOT EXISTS \(unescaped: self.configuration.jobQueueTable)idx + ON \(unescaped: self.configuration.jobQueueTable) (createdAt ASC) + """, + logger: self.logger + ) + try await self.updateJobsOnInit(withStatus: .pending, onInit: self.configuration.pendingJobsInitialization, connection: connection) + try await self.updateJobsOnInit(withStatus: .processing, onInit: self.configuration.processingJobsInitialization, connection: connection) + try await self.updateJobsOnInit(withStatus: .failed, onInit: self.configuration.failedJobsInitialization, connection: connection) + } + } catch let error as PSQLError { + print("\(String(reflecting: error))") + throw error + } + } + + /// Push Job onto queue + /// - Returns: Identifier of queued job + @discardableResult public func push(_ job: HBJob) async throws -> JobID { + try await self.client.withConnection { connection in + let queuedJob = HBQueuedJob(id: .init(), job: job) + try await add(queuedJob, connection: connection) + try await addToQueue(jobId: queuedJob.id, connection: connection) + return queuedJob.id + } + } + + /// This is called to say job has finished processing and it can be deleted + public func finished(jobId: JobID) async throws { + _ = try await self.client.withConnection { connection in + try await self.delete(jobId: jobId, connection: connection) + } + } + + /// This is called to say job has failed to run and should be put aside + public func failed(jobId: JobID, error: Error) async throws { + _ = try await self.client.withConnection { connection in + try await self.setStatus(jobId: jobId, status: .failed, connection: connection) + } + } + + /// stop serving jobs + public func stop() async { + self.isStopped.withLockedValue { $0 = true } + } + + /// shutdown queue once all active jobs have been processed + public func shutdownGracefully() async {} + + func popFirst() async throws -> HBQueuedJob? { + do { + return try await self.client.withConnection { connection -> HBQueuedJob? in + while true { + try Task.checkCancellation() + let stream = try await connection.query( + """ + DELETE + FROM \(unescaped: self.configuration.jobQueueTable) pse + WHERE pse.job_id = + (SELECT pse_inner.job_id + FROM \(unescaped: self.configuration.jobQueueTable) pse_inner + ORDER BY pse_inner.createdAt ASC + FOR UPDATE SKIP LOCKED + LIMIT 1) + RETURNING pse.job_id + """, + logger: self.logger + ) + // return nil if nothing in queue + guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else { + return nil + } + // select job from job table + let stream2 = try await connection.query( + "SELECT job FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)", + logger: self.logger + ) + + do { + try await self.setStatus(jobId: jobId, status: .processing, connection: connection) + // if failed to find a job in the job table try getting another index + guard let job = try await stream2.decode(HBAnyCodableJob.self, context: .default).first(where: { _ in true }) else { + continue + } + return HBQueuedJob(id: jobId, job: job.job) + } catch { + try await self.setStatus(jobId: jobId, status: .failed, connection: connection) + throw JobQueueError.decodeJobFailed + } + } + } + } catch let error as PSQLError { + print("\(String(reflecting: error))") + throw error + } + } + + func add(_ job: HBQueuedJob, connection: PostgresConnection) async throws { + try await connection.query( + """ + INSERT INTO \(unescaped: self.configuration.jobTable) (id, job, status) + VALUES (\(job.id), \(job.anyCodableJob), \(Status.pending)) + """, + logger: self.logger + ) + } + + func delete(jobId: JobID, connection: PostgresConnection) async throws { + try await connection.query( + "DELETE FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)", + logger: self.logger + ) + } + + func addToQueue(jobId: JobID, connection: PostgresConnection) async throws { + try await connection.query( + """ + INSERT INTO \(unescaped: self.configuration.jobQueueTable) (job_id, createdAt) VALUES (\(jobId), \(Date.now)) + """, + logger: self.logger + ) + } + + func setStatus(jobId: JobID, status: Status, connection: PostgresConnection) async throws { + try await connection.query( + "UPDATE \(unescaped: self.configuration.jobTable) SET status = \(status) WHERE id = \(jobId)", + logger: self.logger + ) + } + + func getJobs(withStatus status: Status) async throws -> [JobID] { + return try await self.client.withConnection { connection in + let stream = try await connection.query( + "SELECT id FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)", + logger: self.logger + ) + var jobs: [JobID] = [] + for try await id in stream.decode(JobID.self, context: .default) { + jobs.append(id) + } + return jobs + } + } + + func updateJobsOnInit(withStatus status: Status, onInit: JobInitialization, connection: PostgresConnection) async throws { + switch onInit { + case .remove: + try await connection.query( + "DELETE FROM \(unescaped: self.configuration.jobTable) WHERE status = \(status)", + logger: self.logger + ) + case .rerun: + guard status != .pending else { return } + let jobs = try await getJobs(withStatus: status) + for jobId in jobs { + try await self.addToQueue(jobId: jobId, connection: connection) + } + case .doNothing: + break + } + } +} + +/// extend HBPostgresJobQueue to conform to AsyncSequence +extension HBPostgresJobQueue { + public struct AsyncIterator: AsyncIteratorProtocol { + let queue: HBPostgresJobQueue + + public func next() async throws -> Element? { + while true { + if self.queue.isStopped.withLockedValue({ $0 }) { + return nil + } + if let job = try await queue.popFirst() { + return job + } + // we only sleep if we didn't receive a job + try await Task.sleep(for: self.queue.configuration.pollTime) + } + } + } + + public func makeAsyncIterator() -> AsyncIterator { + return .init(queue: self) + } +} + +extension HBAnyCodableJob: PostgresCodable {} diff --git a/Tests/HummingbirdPostgresTests/JobsTests.swift b/Tests/HummingbirdPostgresTests/JobsTests.swift new file mode 100644 index 0000000..eddc5c5 --- /dev/null +++ b/Tests/HummingbirdPostgresTests/JobsTests.swift @@ -0,0 +1,410 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Hummingbird server framework project +// +// Copyright (c) 2021-2021 the Hummingbird authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Atomics +import Hummingbird +import HummingbirdJobs +@testable @_spi(ConnectionPool) import HummingbirdJobsPostgres +import HummingbirdXCT +@_spi(ConnectionPool) import PostgresNIO +import ServiceLifecycle +import XCTest + +extension XCTestExpectation { + convenience init(description: String, expectedFulfillmentCount: Int) { + self.init(description: description) + self.expectedFulfillmentCount = expectedFulfillmentCount + } +} + +final class JobsTests: XCTestCase { + func wait(for expectations: [XCTestExpectation], timeout: TimeInterval) async { + #if (os(Linux) && swift(<5.9)) || swift(<5.8) + super.wait(for: expectations, timeout: timeout) + #else + await fulfillment(of: expectations, timeout: timeout) + #endif + } + + static let env = HBEnvironment() + + /// Helper function for test a server + /// + /// Creates test client, runs test function abd ensures everything is + /// shutdown correctly + @discardableResult public func testJobQueue( + numWorkers: Int, + configuration: HBPostgresJobQueue.Configuration = .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), + test: (HBPostgresJobQueue) async throws -> T + ) async throws -> T { + let logger = { + var logger = Logger(label: "JobsTests") + logger.logLevel = .debug + return logger + }() + let postgresClient = try await PostgresClient( + configuration: getPostgresConfiguration(), + backgroundLogger: logger + ) + let postgresJobQueue = HBPostgresJobQueue( + client: postgresClient, + configuration: configuration, + logger: logger + ) + let jobQueueHandler = HBJobQueueHandler( + queue: postgresJobQueue, + numWorkers: numWorkers, + logger: logger + ) + + do { + return try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [PostgresClientService(client: postgresClient), jobQueueHandler], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + ) + group.addTask { + try await serviceGroup.run() + } + try await Task.sleep(for: .seconds(1)) + do { + let value = try await test(postgresJobQueue) + await serviceGroup.triggerGracefulShutdown() + return value + } catch let error as PSQLError { + XCTFail("\(String(reflecting: error))") + await serviceGroup.triggerGracefulShutdown() + throw error + } catch { + await serviceGroup.triggerGracefulShutdown() + throw error + } + } + } catch let error as PSQLError { + XCTFail("\(String(reflecting: error))") + throw error + } + } + + func testBasic() async throws { + struct TestJob: HBJob { + static let name = "testBasic" + static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + + let value: Int + func execute(logger: Logger) async throws { + print(self.value) + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + Self.expectation.fulfill() + } + } + TestJob.register() + try await self.testJobQueue(numWorkers: 1) { jobQueue in + try await jobQueue.push(TestJob(value: 1)) + try await jobQueue.push(TestJob(value: 2)) + try await jobQueue.push(TestJob(value: 3)) + try await jobQueue.push(TestJob(value: 4)) + try await jobQueue.push(TestJob(value: 5)) + try await jobQueue.push(TestJob(value: 6)) + try await jobQueue.push(TestJob(value: 7)) + try await jobQueue.push(TestJob(value: 8)) + try await jobQueue.push(TestJob(value: 9)) + try await jobQueue.push(TestJob(value: 10)) + + await self.wait(for: [TestJob.expectation], timeout: 5) + + let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) + XCTAssertEqual(pendingJobs.count, 0) + } + } + + func testMultipleWorkers() async throws { + struct TestJob: HBJob { + static let name = "testMultipleWorkers" + static let runningJobCounter = ManagedAtomic(0) + static let maxRunningJobCounter = ManagedAtomic(0) + static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + + let value: Int + func execute(logger: Logger) async throws { + let runningJobs = Self.runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) + if runningJobs > Self.maxRunningJobCounter.load(ordering: .relaxed) { + Self.maxRunningJobCounter.store(runningJobs, ordering: .relaxed) + } + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<100))) + print(self.value) + Self.expectation.fulfill() + Self.runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) + } + } + TestJob.register() + + try await self.testJobQueue(numWorkers: 4) { jobQueue in + try await jobQueue.push(TestJob(value: 1)) + try await jobQueue.push(TestJob(value: 2)) + try await jobQueue.push(TestJob(value: 3)) + try await jobQueue.push(TestJob(value: 4)) + try await jobQueue.push(TestJob(value: 5)) + try await jobQueue.push(TestJob(value: 6)) + try await jobQueue.push(TestJob(value: 7)) + try await jobQueue.push(TestJob(value: 8)) + try await jobQueue.push(TestJob(value: 9)) + try await jobQueue.push(TestJob(value: 10)) + + await self.wait(for: [TestJob.expectation], timeout: 5) + + XCTAssertGreaterThan(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 1) + XCTAssertLessThanOrEqual(TestJob.maxRunningJobCounter.load(ordering: .relaxed), 4) + + let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) + XCTAssertEqual(pendingJobs.count, 0) + } + } + + func testErrorRetryCount() async throws { + struct FailedError: Error {} + + struct TestJob: HBJob { + static let name = "testErrorRetryCount" + static let maxRetryCount = 3 + static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) + func execute(logger: Logger) async throws { + Self.expectation.fulfill() + throw FailedError() + } + } + TestJob.register() + + try await self.testJobQueue(numWorkers: 4) { jobQueue in + try await jobQueue.push(TestJob()) + + await self.wait(for: [TestJob.expectation], timeout: 5) + try await Task.sleep(for: .milliseconds(200)) + + let failedJobs = try await jobQueue.getJobs(withStatus: .failed) + XCTAssertEqual(failedJobs.count, 1) + let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) + XCTAssertEqual(pendingJobs.count, 0) + } + } + + /// Test job is cancelled on shutdown + func testShutdownJob() async throws { + struct TestJob: HBJob { + static let name = "testShutdownJob" + static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) + func execute(logger: Logger) async throws { + Self.expectation.fulfill() + try await Task.sleep(for: .seconds(10)) + } + } + TestJob.register() + try await self.testJobQueue(numWorkers: 4) { jobQueue in + try await jobQueue.push(TestJob()) + await self.wait(for: [TestJob.expectation], timeout: 5) + } + + try await self.testJobQueue( + numWorkers: 4, + configuration: .init(failedJobsInitialization: .doNothing, processingJobsInitialization: .doNothing) + ) { jobQueue in + let failedJobs = try await jobQueue.getJobs(withStatus: .processing) + XCTAssertEqual(failedJobs.count, 1) + let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) + XCTAssertEqual(pendingJobs.count, 0) + } + } + + /// test job fails to decode but queue continues to process + func testFailToDecode() async throws { + struct TestJob1: HBJob { + static let name = "testFailToDecode" + func execute(logger: Logger) async throws {} + } + struct TestJob2: HBJob { + static let name = "testFailToDecode2" + static var value: String? + static let expectation = XCTestExpectation(description: "TestJob2.execute was called") + let value: String + func execute(logger: Logger) async throws { + Self.value = self.value + Self.expectation.fulfill() + } + } + TestJob2.register() + + try await self.testJobQueue(numWorkers: 4) { jobQueue in + try await jobQueue.push(TestJob1()) + try await jobQueue.push(TestJob2(value: "test")) + // stall to give job chance to start running + await self.wait(for: [TestJob2.expectation], timeout: 5) + + let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) + XCTAssertEqual(pendingJobs.count, 0) + } + + XCTAssertEqual(TestJob2.value, "test") + } + + /// creates job that errors on first attempt, and is left on processing queue and + /// is then rerun on startup of new server + func testRerunAtStartup() async throws { + struct RetryError: Error {} + struct TestJob: HBJob { + static let name = "testRerunAtStartup" + static let maxRetryCount: Int = 0 + static var firstTime = ManagedAtomic(true) + static var finished = ManagedAtomic(false) + static let failedExpectation = XCTestExpectation(description: "TestJob failed", expectedFulfillmentCount: 1) + static let succeededExpectation = XCTestExpectation(description: "TestJob2 succeeded", expectedFulfillmentCount: 1) + func execute(logger: Logger) async throws { + if Self.firstTime.compareExchange(expected: true, desired: false, ordering: .relaxed).original { + Self.failedExpectation.fulfill() + throw RetryError() + } + Self.succeededExpectation.fulfill() + Self.finished.store(true, ordering: .relaxed) + } + } + TestJob.register() + + try await self.testJobQueue(numWorkers: 4) { jobQueue in + try await jobQueue.push(TestJob()) + + await self.wait(for: [TestJob.failedExpectation], timeout: 10) + + // stall to give job chance to start running + try await Task.sleep(for: .milliseconds(50)) + + XCTAssertFalse(TestJob.firstTime.load(ordering: .relaxed)) + XCTAssertFalse(TestJob.finished.load(ordering: .relaxed)) + } + + try await self.testJobQueue(numWorkers: 4, configuration: .init(failedJobsInitialization: .rerun)) { _ in + await self.wait(for: [TestJob.succeededExpectation], timeout: 10) + XCTAssertTrue(TestJob.finished.load(ordering: .relaxed)) + } + } + + func testCustomTableNames() async throws { + struct TestJob: HBJob { + static let name = "testBasic" + static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) + + let value: Int + func execute(logger: Logger) async throws { + print(self.value) + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + Self.expectation.fulfill() + } + } + TestJob.register() + try await self.testJobQueue( + numWorkers: 4, + configuration: .init(jobTable: "_test_job_table", jobQueueTable: "_test_job_queue_table") + ) { jobQueue in + try await jobQueue.push(TestJob(value: 1)) + try await jobQueue.push(TestJob(value: 2)) + try await jobQueue.push(TestJob(value: 3)) + try await jobQueue.push(TestJob(value: 4)) + try await jobQueue.push(TestJob(value: 5)) + try await jobQueue.push(TestJob(value: 6)) + try await jobQueue.push(TestJob(value: 7)) + try await jobQueue.push(TestJob(value: 8)) + try await jobQueue.push(TestJob(value: 9)) + try await jobQueue.push(TestJob(value: 10)) + + await self.wait(for: [TestJob.expectation], timeout: 5) + + let pendingJobs = try await jobQueue.getJobs(withStatus: .pending) + XCTAssertEqual(pendingJobs.count, 0) + } + } + + func testMultipleJobQueueHandlers() async throws { + struct TestJob: HBJob { + static let name = "testMultipleJobQueues" + static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) + + let value: Int + func execute(logger: Logger) async throws { + try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) + Self.expectation.fulfill() + } + } + TestJob.register() + let logger = { + var logger = Logger(label: "HummingbirdJobsTests") + logger.logLevel = .debug + return logger + }() + let postgresClient = try await PostgresClient( + configuration: getPostgresConfiguration(), + backgroundLogger: logger + ) + let postgresJobQueue = HBPostgresJobQueue( + client: postgresClient, + configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), + logger: logger + ) + let jobQueueHandler = HBJobQueueHandler( + queue: postgresJobQueue, + numWorkers: 2, + logger: logger + ) + let postgresJobQueue2 = HBPostgresJobQueue( + client: postgresClient, + configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove), + logger: logger + ) + let jobQueueHandler2 = HBJobQueueHandler( + queue: postgresJobQueue2, + numWorkers: 3, + logger: logger + ) + + do { + try await withThrowingTaskGroup(of: Void.self) { group in + let serviceGroup = ServiceGroup( + configuration: .init( + services: [PostgresClientService(client: postgresClient), jobQueueHandler, jobQueueHandler2], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + ) + group.addTask { + try await serviceGroup.run() + } + try await Task.sleep(for: .seconds(1)) + do { + for i in 0..<200 { + try await postgresJobQueue.push(TestJob(value: i)) + } + await self.wait(for: [TestJob.expectation], timeout: 5) + await serviceGroup.triggerGracefulShutdown() + } catch { + XCTFail("\(String(reflecting: error))") + await serviceGroup.triggerGracefulShutdown() + throw error + } + } + } catch let error as PSQLError { + XCTFail("\(String(reflecting: error))") + throw error + } + } +} diff --git a/Tests/HummingbirdPostgresTests/PersistTests.swift b/Tests/HummingbirdPostgresTests/PersistTests.swift index 70247a6..dc85399 100644 --- a/Tests/HummingbirdPostgresTests/PersistTests.swift +++ b/Tests/HummingbirdPostgresTests/PersistTests.swift @@ -21,17 +21,6 @@ import ServiceLifecycle import XCTest final class PersistTests: XCTestCase { - /// Manage the lifecycle of a PostgresClient - struct PostgresClientService: Service { - let client: PostgresClient - - func run() async { - await cancelOnGracefulShutdown { - await self.client.run() - } - } - } - func createApplication(_ updateRouter: (HBRouter, HBPersistDriver) -> Void = { _, _ in }) async throws -> some HBApplicationProtocol { struct PostgresErrorMiddleware: HBMiddlewareProtocol { func handle(_ request: HBRequest, context: Context, next: (HBRequest, Context) async throws -> HBResponse) async throws -> HBResponse { diff --git a/Tests/HummingbirdPostgresTests/TestUtils.swift b/Tests/HummingbirdPostgresTests/TestUtils.swift index 78f1301..2d3d151 100644 --- a/Tests/HummingbirdPostgresTests/TestUtils.swift +++ b/Tests/HummingbirdPostgresTests/TestUtils.swift @@ -1,5 +1,17 @@ import Hummingbird @_spi(ConnectionPool) import PostgresNIO +import ServiceLifecycle + +/// Manage the lifecycle of a PostgresClient +struct PostgresClientService: Service { + let client: PostgresClient + + func run() async { + await cancelOnGracefulShutdown { + await self.client.run() + } + } +} func getPostgresConfiguration() async throws -> PostgresClient.Configuration { let env = try await HBEnvironment.shared.merging(with: .dotEnv())