Skip to content

Commit

Permalink
ensure transaction for lock acquisition
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcountryman committed Nov 11, 2024
1 parent 44a0e05 commit 6b5708a
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ use builder_states::{Initial, NameSet, PoolSet};
use jiff::{Span, ToSpan};
use sqlx::{
postgres::{PgAdvisoryLock, PgAdvisoryLockGuard},
PgConnection, PgExecutor, PgPool,
Acquire, PgConnection, PgExecutor, PgPool, Postgres, Transaction,
};
use tracing::instrument;

Expand Down Expand Up @@ -1298,8 +1298,11 @@ impl InProgressTask {
Ok(())
}

pub(crate) async fn try_acquire_lock(&self, conn: &mut PgConnection) -> Result<bool> {
try_acquire_advisory_xact_lock(conn, &self.lock_key()).await
pub(crate) async fn try_acquire_lock(
&self,
tx: &mut Transaction<'_, Postgres>,
) -> Result<bool> {
try_acquire_advisory_xact_lock(tx, &self.lock_key()).await
}

fn lock_key(&self) -> Cow<str> {
Expand All @@ -1310,14 +1313,14 @@ impl InProgressTask {
}
}

#[instrument(skip(executor), err)]
async fn try_acquire_advisory_xact_lock<'a, E>(executor: E, key: &str) -> Result<bool>
where
E: PgExecutor<'a>,
{
#[instrument(skip(tx), err)]
async fn try_acquire_advisory_xact_lock(
tx: &mut Transaction<'_, Postgres>,
key: &str,
) -> Result<bool> {
Ok(
sqlx::query_scalar!("select pg_try_advisory_xact_lock(hashtext($1))", key)
.fetch_one(executor)
.fetch_one(tx.acquire().await?)
.await?
.unwrap_or(false),
)
Expand Down

0 comments on commit 6b5708a

Please sign in to comment.