Skip to content

Commit

Permalink
Remove @_spi(ConnectionPool), use PostgresClient.query (#12)
Browse files Browse the repository at this point in the history
* Remove @_spi, use PostgresClient.query

* Use PostgresClient.query where applicable

* Name variables in PostgresPersistDriver.get
  • Loading branch information
adam-fowler authored Mar 14, 2024
1 parent 59eb523 commit 5731af1
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 86 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ let package = Package(
],
dependencies: [
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-beta.1"),
.package(url: "https://github.com/vapor/postgres-nio", from: "1.20.0"),
.package(url: "https://github.com/vapor/postgres-nio", from: "1.21.0"),
],
targets: [
.target(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

struct CreateJobQueue: PostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

struct CreateJobs: PostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
Expand Down
43 changes: 21 additions & 22 deletions Sources/HummingbirdJobsPostgres/PostgresJobsQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@

import Foundation
import HummingbirdJobs
@_spi(ConnectionPool) import HummingbirdPostgres
import HummingbirdPostgres
import Logging
import NIOConcurrencyHelpers
import NIOCore
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

@_spi(ConnectionPool)
public final class PostgresQueue: JobQueueDriver {
public typealias JobID = UUID

Expand Down Expand Up @@ -121,16 +120,12 @@ public final class PostgresQueue: JobQueueDriver {

/// This is called to say job has finished processing and it can be deleted
public func finished(jobId: JobID) async throws {
_ = try await self.client.withConnection { connection in
try await self.delete(jobId: jobId, connection: connection)
}
try await self.delete(jobId: jobId)
}

/// This is called to say job has failed to run and should be put aside
public func failed(jobId: JobID, error: Error) async throws {
_ = try await self.client.withConnection { connection in
try await self.setStatus(jobId: jobId, status: .failed, connection: connection)
}
try await self.setStatus(jobId: jobId, status: .failed)
}

/// stop serving jobs
Expand Down Expand Up @@ -199,8 +194,8 @@ public final class PostgresQueue: JobQueueDriver {
)
}

func delete(jobId: JobID, connection: PostgresConnection) async throws {
try await connection.query(
func delete(jobId: JobID) async throws {
try await self.client.query(
"DELETE FROM _hb_pg_jobs WHERE id = \(jobId)",
logger: self.logger
)
Expand All @@ -222,18 +217,23 @@ public final class PostgresQueue: JobQueueDriver {
)
}

func setStatus(jobId: JobID, status: Status) async throws {
try await self.client.query(
"UPDATE _hb_pg_jobs SET status = \(status), lastModified = \(Date.now) WHERE id = \(jobId)",
logger: self.logger
)
}

func getJobs(withStatus status: Status) async throws -> [JobID] {
return try await self.client.withConnection { connection in
let stream = try await connection.query(
"SELECT id FROM _hb_pg_jobs WHERE status = \(status)",
logger: self.logger
)
var jobs: [JobID] = []
for try await id in stream.decode(JobID.self, context: .default) {
jobs.append(id)
}
return jobs
let stream = try await self.client.query(
"SELECT id FROM _hb_pg_jobs WHERE status = \(status)",
logger: self.logger
)
var jobs: [JobID] = []
for try await id in stream.decode(JobID.self, context: .default) {
jobs.append(id)
}
return jobs
}

func updateJobsOnInit(withStatus status: Status, onInit: JobInitialization, connection: PostgresConnection) async throws {
Expand Down Expand Up @@ -280,7 +280,6 @@ extension PostgresQueue {
}
}

@_spi(ConnectionPool)
extension JobQueueDriver where Self == PostgresQueue {
/// Return Postgres driver for Job Queue
/// - Parameters:
Expand Down
2 changes: 1 addition & 1 deletion Sources/HummingbirdPostgres/CreatePersistTable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

struct CreatePersistTable: PostgresMigration {
func apply(connection: PostgresConnection, logger: Logger) async throws {
Expand Down
2 changes: 1 addition & 1 deletion Sources/HummingbirdPostgres/Migration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

/// Protocol for a database migration
///
Expand Down
6 changes: 3 additions & 3 deletions Sources/HummingbirdPostgres/Migrations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

/// Database migration support
public actor PostgresMigrations {
Expand Down Expand Up @@ -63,7 +63,7 @@ public actor PostgresMigrations {
/// - client: Postgres client
/// - logger: Logger to use
/// - dryRun: Should migrations actually be applied, or should we just report what would be applied and reverted
@_spi(ConnectionPool)

public func apply(client: PostgresClient, groups: [MigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: self.migrations, groups: groups, logger: logger, completeMigrations: true, dryRun: dryRun)
}
Expand All @@ -73,7 +73,7 @@ public actor PostgresMigrations {
/// - client: Postgres client
/// - logger: Logger to use
/// - dryRun: Should migrations actually be reverted, or should we just report what would be reverted
@_spi(ConnectionPool)

public func revert(client: PostgresClient, groups: [MigrationGroup] = [], logger: Logger, dryRun: Bool) async throws {
try await self.migrate(client: client, migrations: [], groups: groups, logger: logger, completeMigrations: false, dryRun: dryRun)
}
Expand Down
86 changes: 38 additions & 48 deletions Sources/HummingbirdPostgres/PostgresPersistDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import AsyncAlgorithms
import Foundation
import Hummingbird
import NIOCore
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO

extension PSQLError {
public var serverError: PostgresError.Code? {
Expand Down Expand Up @@ -56,7 +56,7 @@ public final class PostgresPersistDriver: PersistDriver {
/// - Parameters:
/// - client: Postgres client
/// - tidyUpFrequequency: How frequently cleanup expired database entries should occur
@_spi(ConnectionPool)

public init(client: PostgresClient, migrations: PostgresMigrations, tidyUpFrequency: Duration = .seconds(600), logger: Logger) async {
self.client = client
self.logger = logger
Expand All @@ -68,72 +68,62 @@ public final class PostgresPersistDriver: PersistDriver {
/// Create new key. This doesn't check for the existence of this key already so may fail if the key already exists
public func create(key: String, value: some Codable, expires: Duration?) async throws {
let expires = expires.map { Date.now + Double($0.components.seconds) } ?? Date.distantFuture
try await self.client.withConnection { connection in
do {
try await connection.query(
"INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))",
logger: self.logger
)
} catch let error as PSQLError {
if error.serverError == .uniqueViolation {
throw PersistError.duplicate
} else {
throw error
}
do {
try await self.client.query(
"INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))",
logger: self.logger
)
} catch let error as PSQLError {
if error.serverError == .uniqueViolation {
throw PersistError.duplicate
} else {
throw error
}
}
}

/// Set value for key.
public func set(key: String, value: some Codable, expires: Duration?) async throws {
let expires = expires.map { Date.now + Double($0.components.seconds) } ?? Date.distantFuture
_ = try await self.client.withConnection { connection in
try await connection.query(
"""
INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))
ON CONFLICT (id)
DO UPDATE SET data = \(WrapperObject(value)), expires = \(expires)
""",
logger: self.logger
)
}
try await self.client.query(
"""
INSERT INTO _hb_pg_persist (id, data, expires) VALUES (\(key), \(WrapperObject(value)), \(expires))
ON CONFLICT (id)
DO UPDATE SET data = \(WrapperObject(value)), expires = \(expires)
""",
logger: self.logger
)
}

/// Get value for key
public func get<Object: Codable>(key: String, as object: Object.Type) async throws -> Object? {
try await self.client.withConnection { connection in
let stream = try await connection.query(
"SELECT data, expires FROM _hb_pg_persist WHERE id = \(key)",
logger: self.logger
)
guard let result = try await stream.decode((WrapperObject<Object>, Date).self)
.first(where: { _ in true })
else {
return nil
}
guard result.1 > .now else { return nil }
return result.0.value
let stream = try await self.client.query(
"SELECT data, expires FROM _hb_pg_persist WHERE id = \(key)",
logger: self.logger
)
guard let (object, expires) = try await stream.decode((WrapperObject<Object>, Date).self)
.first(where: { _ in true })
else {
return nil
}
guard expires > .now else { return nil }
return object.value
}

/// Remove key
public func remove(key: String) async throws {
_ = try await self.client.withConnection { connection in
try await connection.query(
"DELETE FROM _hb_pg_persist WHERE id = \(key)",
logger: self.logger
)
}
try await self.client.query(
"DELETE FROM _hb_pg_persist WHERE id = \(key)",
logger: self.logger
)
}

/// tidy up database by cleaning out expired keys
func tidy() async throws {
_ = try await self.client.withConnection { connection in
try await connection.query(
"DELETE FROM _hb_pg_persist WHERE expires < \(Date.now)",
logger: self.logger
)
}
try await self.client.query(
"DELETE FROM _hb_pg_persist WHERE expires < \(Date.now)",
logger: self.logger
)
}
}

Expand Down
6 changes: 3 additions & 3 deletions Tests/HummingbirdPostgresTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import Atomics
import Hummingbird
import HummingbirdJobs
@testable @_spi(ConnectionPool) import HummingbirdPostgres
@testable @_spi(ConnectionPool) import HummingbirdJobsPostgres
@testable import HummingbirdJobsPostgres
@testable import HummingbirdPostgres
import HummingbirdTesting
import NIOConcurrencyHelpers
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO
import ServiceLifecycle
import XCTest

Expand Down
4 changes: 2 additions & 2 deletions Tests/HummingbirdPostgresTests/MigrationTests.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Atomics
@testable @_spi(ConnectionPool) import HummingbirdPostgres
@testable import HummingbirdPostgres
import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO
import XCTest

final class MigrationTests: XCTestCase {
Expand Down
4 changes: 2 additions & 2 deletions Tests/HummingbirdPostgresTests/PersistTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
//===----------------------------------------------------------------------===//

import Hummingbird
@_spi(ConnectionPool) import HummingbirdPostgres
import HummingbirdPostgres
import HummingbirdTesting
import Logging
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO
import ServiceLifecycle
import XCTest

Expand Down
2 changes: 1 addition & 1 deletion Tests/HummingbirdPostgresTests/TestUtils.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Hummingbird
@_spi(ConnectionPool) import PostgresNIO
import PostgresNIO
import ServiceLifecycle

/// Manage the lifecycle of a PostgresClient
Expand Down

0 comments on commit 5731af1

Please sign in to comment.