Skip to content

Commit

Permalink
Revert migrations in tests, don't flag migrations as complete in a re…
Browse files Browse the repository at this point in the history
…vert
  • Loading branch information
adam-fowler committed Mar 6, 2024
1 parent 3c94c17 commit 345f0b6
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@ import Logging

struct CreateJobQueue: HBPostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS _hb_jobs (
id uuid PRIMARY KEY,
job json,
status smallint,
lastModified TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
)
""",
logger: logger
)
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS _hb_job_queue (
Expand All @@ -48,10 +37,6 @@ struct CreateJobQueue: HBPostgresMigration {
}

func revert(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"DROP TABLE _hb_jobs",
logger: logger
)
try await connection.query(
"DROP TABLE _hb_job_queue",
logger: logger
Expand All @@ -61,8 +46,3 @@ struct CreateJobQueue: HBPostgresMigration {
var name: String { "_Create_JobQueue_Table_" }
var group: HBMigrationGroup { .jobQueue }
}

extension HBMigrationGroup {
/// JobQueue migration group
public static var jobQueue: Self { .init("_hb_jobqueue") }
}
48 changes: 48 additions & 0 deletions Sources/HummingbirdJobsPostgres/Migrations/CreateJobs.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Hummingbird server framework project
//
// Copyright (c) 2024 the Hummingbird authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO

struct CreateJobs: HBPostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"""
CREATE TABLE IF NOT EXISTS _hb_jobs (
id uuid PRIMARY KEY,
job bytea,
status smallint,
lastModified TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
)
""",
logger: logger
)
}

func revert(connection: PostgresConnection, logger: Logger) async throws {
try await connection.query(
"DROP TABLE _hb_jobs",
logger: logger
)
}

var name: String { "_Create_Jobs_Table_" }
var group: HBMigrationGroup { .jobQueue }
}

extension HBMigrationGroup {
/// JobQueue migration group
public static var jobQueue: Self { .init("_hb_jobqueue") }
}
1 change: 1 addition & 0 deletions Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public final class HBPostgresQueue: HBJobQueueDriver {
self.logger = logger
self.isStopped = .init(false)
self.migrations = migrations
await migrations.add(CreateJobs())
await migrations.add(CreateJobQueue())
}

Expand Down
9 changes: 6 additions & 3 deletions Sources/HummingbirdPostgres/Migrations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public actor HBPostgresMigrations {
/// - dryRun: Should migrations actually be applied, or should we just report what would be applied and reverted
@_spi(ConnectionPool)
public func apply(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, dryRun: dryRun)
try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, completeMigrations: true, dryRun: dryRun)
}

/// Revery database migrations
Expand All @@ -75,14 +75,15 @@ public actor HBPostgresMigrations {
/// - dryRun: Should migrations actually be reverted, or should we just report what would be reverted
@_spi(ConnectionPool)
public func revert(client: PostgresClient, groups: [HBMigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, dryRun: dryRun)
try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, completeMigrations: false, dryRun: dryRun)
}

private func migrate(
client: PostgresClient,
migrations: [HBPostgresMigration],
groups: [HBMigrationGroup],
logger: Logger,
completeMigrations: Bool,
dryRun: Bool
) async throws {
switch self.state {
Expand Down Expand Up @@ -151,7 +152,9 @@ public actor HBPostgresMigrations {
self.setFailed(error)
throw error
}
self.setCompleted()
if completeMigrations {
self.setCompleted()
}
}

/// Report if the migration process has completed
Expand Down
56 changes: 32 additions & 24 deletions Tests/HummingbirdPostgresTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ final class JobsTests: XCTestCase {
backgroundLogger: logger
)
let postgresMigrations = HBPostgresMigrations()
return HBJobQueue(
return await HBJobQueue(
HBPostgresQueue(
client: postgresClient,
migrations: postgresMigrations,
Expand All @@ -70,6 +70,7 @@ final class JobsTests: XCTestCase {
/// shutdown correctly
@discardableResult public func testJobQueue<T>(
jobQueue: HBJobQueue<HBPostgresQueue>,
revertMigrations: Bool = false,
test: (HBJobQueue<HBPostgresQueue>) async throws -> T
) async throws -> T {
do {
Expand All @@ -85,8 +86,14 @@ final class JobsTests: XCTestCase {
try await serviceGroup.run()
}
do {
try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)
let value = try await test(postgresJobQueue)
let migrations = jobQueue.queue.migrations
let client = jobQueue.queue.client
let logger = jobQueue.queue.logger
if revertMigrations {
try await migrations.revert(client: client, groups: [.jobQueue], logger: logger, dryRun: false)
}
try await migrations.apply(client: client, groups: [.jobQueue], logger: logger, dryRun: false)
let value = try await test(jobQueue)
await serviceGroup.triggerGracefulShutdown()
return value
} catch let error as PSQLError {
Expand Down Expand Up @@ -114,7 +121,7 @@ final class JobsTests: XCTestCase {
test: (HBJobQueue<HBPostgresQueue>) async throws -> T
) async throws -> T {
let jobQueue = try await self.createJobQueue(numWorkers: numWorkers, configuration: configuration)
return try await self.testJobQueue(jobQueue: jobQueue, test: test)
return try await self.testJobQueue(jobQueue: jobQueue, revertMigrations: true, test: test)
}

func testBasic() async throws {
Expand Down Expand Up @@ -318,20 +325,21 @@ final class JobsTests: XCTestCase {
backgroundLogger: logger
)
let postgresMigrations = HBPostgresMigrations()
let jobQueue = HBJobQueue(
let jobQueue = await HBJobQueue(
.postgres(
client: postgresClient,
migrations: postgresMigrations,
client: postgresClient,
migrations: postgresMigrations,
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
logger: logger
),
numWorkers: 2,
logger: logger
)
let jobQueue2 = HBJobQueue(
let postgresMigrations2 = HBPostgresMigrations()
let jobQueue2 = await HBJobQueue(
HBPostgresQueue(
client: postgresClient,
migrations: postgresMigrations,
migrations: postgresMigrations2,
configuration: .init(failedJobsInitialization: .remove, processingJobsInitialization: .remove),
logger: logger
),
Expand All @@ -348,22 +356,22 @@ final class JobsTests: XCTestCase {
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
group.addTask {
try await serviceGroup.run()
}
try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)
try await postgresMigrations2.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)
do {
for i in 0..<200 {
try await postgresJobQueue.push(id: jobIdentifer, parameters: i)
}
await self.wait(for: [expectation], timeout: 5)
await serviceGroup.triggerGracefulShutdown()
} catch {
XCTFail("\(String(reflecting: error))")
await serviceGroup.triggerGracefulShutdown()
throw error
)
group.addTask {
try await serviceGroup.run()
}
try await postgresMigrations.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)
try await postgresMigrations2.apply(client: postgresClient, groups: [.jobQueue], logger: logger, dryRun: false)
do {
for i in 0..<200 {
try await jobQueue.push(id: jobIdentifer, parameters: i)
}
await self.wait(for: [expectation], timeout: 5)
await serviceGroup.triggerGracefulShutdown()
} catch {
XCTFail("\(String(reflecting: error))")
await serviceGroup.triggerGracefulShutdown()
throw error
}
}
}
Expand Down

0 comments on commit 345f0b6

Please sign in to comment.