Skip to content

Commit

Permalink
Add new testing multiple job queue handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler committed Feb 16, 2024
1 parent 805f3b6 commit 29570b7
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 43 deletions.
9 changes: 0 additions & 9 deletions Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@
"version" : "1.20.1"
}
},
{
"identity" : "hummingbird",
"kind" : "remoteSourceControl",
"location" : "https://github.com/hummingbird-project/hummingbird.git",
"state" : {
"branch" : "2.x.x-jobs-refactor",
"revision" : "fb7e4dd6f5785fc5b21434e1cddf87b050760a48"
}
},
{
"identity" : "postgres-nio",
"kind" : "remoteSourceControl",
Expand Down
65 changes: 35 additions & 30 deletions Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,38 +147,43 @@ public final class HBPostgresJobQueue: HBJobQueue {
func popFirst() async throws -> HBQueuedJob<JobID>? {
do {
return try await self.client.withConnection { connection -> HBQueuedJob? in
try Task.checkCancellation()
let stream = try await connection.query(
"""
DELETE
FROM \(unescaped: self.configuration.jobQueueTable) pse
WHERE pse.job_id =
(SELECT pse_inner.job_id
FROM \(unescaped: self.configuration.jobQueueTable) pse_inner
ORDER BY pse_inner.createdAt ASC
FOR UPDATE SKIP LOCKED
LIMIT 1)
RETURNING pse.job_id
""",
logger: self.logger
)
guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
return nil
}
let stream2 = try await connection.query(
"SELECT job FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)",
logger: self.logger
)

do {
try await self.setStatus(jobId: jobId, status: .processing, connection: connection)
guard let job = try await stream2.decode(HBAnyCodableJob.self, context: .default).first(where: { _ in true }) else {
while true {
try Task.checkCancellation()
let stream = try await connection.query(
"""
DELETE
FROM \(unescaped: self.configuration.jobQueueTable) pse
WHERE pse.job_id =
(SELECT pse_inner.job_id
FROM \(unescaped: self.configuration.jobQueueTable) pse_inner
ORDER BY pse_inner.createdAt ASC
FOR UPDATE SKIP LOCKED
LIMIT 1)
RETURNING pse.job_id
""",
logger: self.logger
)
// return nil if nothing in queue
guard let jobId = try await stream.decode(UUID.self, context: .default).first(where: { _ in true }) else {
return nil
}
return HBQueuedJob(id: jobId, job: job.job)
} catch {
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
throw JobQueueError.decodeJobFailed
// select job from job table
let stream2 = try await connection.query(
"SELECT job FROM \(unescaped: self.configuration.jobTable) WHERE id = \(jobId)",
logger: self.logger
)

do {
try await self.setStatus(jobId: jobId, status: .processing, connection: connection)
// if failed to find a job in the job table try getting another index
guard let job = try await stream2.decode(HBAnyCodableJob.self, context: .default).first(where: { _ in true }) else {
continue
}
return HBQueuedJob(id: jobId, job: job.job)
} catch {
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
throw JobQueueError.decodeJobFailed
}
}
}
} catch let error as PSQLError {
Expand Down
83 changes: 79 additions & 4 deletions Tests/HummingbirdPostgresTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ final class JobsTests: XCTestCase {
}

static let env = HBEnvironment()
static let redisHostname = env.get("REDIS_HOSTNAME") ?? "localhost"

/// Helper function for test a server
///
Expand All @@ -49,8 +48,11 @@ final class JobsTests: XCTestCase {
configuration: HBPostgresJobQueue.Configuration = .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
test: (HBPostgresJobQueue) async throws -> T
) async throws -> T {
var logger = Logger(label: "HummingbirdJobsTests")
logger.logLevel = .debug
let logger = {
var logger = Logger(label: "JobsTests")
logger.logLevel = .debug
return logger
}()
let postgresClient = try await PostgresClient(
configuration: getPostgresConfiguration(),
backgroundLogger: logger
Expand All @@ -72,7 +74,7 @@ final class JobsTests: XCTestCase {
configuration: .init(
services: [PostgresClientService(client: postgresClient), jobQueueHandler],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: Logger(label: "JobQueueService")
logger: logger
)
)
group.addTask {
Expand Down Expand Up @@ -332,4 +334,77 @@ final class JobsTests: XCTestCase {
XCTAssertEqual(pendingJobs.count, 0)
}
}

func testMultipleJobQueueHandlers() async throws {
struct TestJob: HBJob {
static let name = "testMultipleJobQueues"
static let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200)

let value: Int
func execute(logger: Logger) async throws {
try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50)))
Self.expectation.fulfill()
}
}
TestJob.register()
let logger = {
var logger = Logger(label: "HummingbirdJobsTests")
logger.logLevel = .debug
return logger
}()
let postgresClient = try await PostgresClient(
configuration: getPostgresConfiguration(),
backgroundLogger: logger
)
let postgresJobQueue = HBPostgresJobQueue(
client: postgresClient,
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
logger: logger
)
let jobQueueHandler = HBJobQueueHandler(
queue: postgresJobQueue,
numWorkers: 2,
logger: logger
)
let postgresJobQueue2 = HBPostgresJobQueue(
client: postgresClient,
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
logger: logger
)
let jobQueueHandler2 = HBJobQueueHandler(
queue: postgresJobQueue2,
numWorkers: 3,
logger: logger
)

do {
try await withThrowingTaskGroup(of: Void.self) { group in
let serviceGroup = ServiceGroup(
configuration: .init(
services: [PostgresClientService(client: postgresClient), jobQueueHandler, jobQueueHandler2],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
)
group.addTask {
try await serviceGroup.run()
}
try await Task.sleep(for: .seconds(1))
do {
for i in 0..<200 {
try await postgresJobQueue.push(TestJob(value: i))
}
await self.wait(for: [TestJob.expectation], timeout: 5)
await serviceGroup.triggerGracefulShutdown()
} catch {
XCTFail("\(String(reflecting: error))")
await serviceGroup.triggerGracefulShutdown()
throw error
}
}
} catch let error as PSQLError {
XCTFail("\(String(reflecting: error))")
throw error
}
}
}

0 comments on commit 29570b7

Please sign in to comment.