Skip to content

Commit

Permalink
stop trying to recv or send pg framed messages if the connection is c…
Browse files Browse the repository at this point in the history
…anceled
  • Loading branch information
jeromegn committed Apr 1, 2024
1 parent 58763fb commit c6de652
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions crates/corro-pg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,11 @@ pub async fn start(
let cancel = cancel.clone();
async move {
// cancel stuff if this loop breaks
let _drop_guard = cancel.drop_guard();
let _drop_guard = cancel.clone().drop_guard();

while let Some(decode_res) = stream.next().await {
while let Outcome::Completed(Some(decode_res)) =
stream.next().preemptible(cancel.cancelled()).await
{
let msg = match decode_res {
Ok(msg) => msg,
Err(PgWireError::IoError(io_error)) => {
Expand All @@ -585,9 +587,9 @@ pub async fn start(
break;
}
};

front_tx.send(msg).await?;
}

debug!("frontend stream is done");

Ok::<_, BoxError>(())
Expand All @@ -597,8 +599,11 @@ pub async fn start(
tokio::spawn({
let cancel = cancel.clone();
async move {
let _drop_guard = cancel.drop_guard();
while let Some(back) = back_rx.recv().await {
let _drop_guard = cancel.clone().drop_guard();

while let Outcome::Completed(Some(back)) =
back_rx.recv().preemptible(cancel.cancelled()).await
{
match back {
BackendResponse::Message { message, flush } => {
if let PgWireBackendMessage::ErrorResponse(e) = &message {
Expand Down

0 comments on commit c6de652

Please sign in to comment.