Skip to content

Commit

Permalink
backend/stream: fix a race condition in read lock (#553)
Browse files Browse the repository at this point in the history
* backend/stream: fix a race condition in read lock

* use builtin file lock feature
  • Loading branch information
vnghia authored Dec 8, 2024
1 parent 1115fed commit b4aebd3
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ jobs:
with:
operating_system: freebsd
version: ${{ matrix.version }}
environment_variables: "CARGO_TERM_COLOR RUST_BACKTRACE RUST_LOG DATABASE_URL AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY AWS_REGION AWS_ENDPOINT_URL AWS_USE_PATH_STYLE_ENDPOINT"
environment_variables: "CARGO_TERM_COLOR RUST_BACKTRACE RUST_LOG COLORBT_SHOW_HIDDEN DATABASE_URL AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY AWS_REGION AWS_ENDPOINT_URL AWS_USE_PATH_STYLE_ENDPOINT"
shell: "bash"
cpu_count: 4
image_url: https://github.com/vnghia/nghe-freebsd-builder/releases/download/v0.0.12/freebsd-${{ matrix.version }}-x86-64.qcow2
Expand Down
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion nghe-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ diesel-async = { version = "0.5.2", features = [
diesel_full_text_search = { version = "2.2.0", default-features = false }
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
figment = { version = "0.10.19", features = ["env"] }
fs4 = { version = "0.12.0", features = ["sync"] }
futures-lite = { version = "2.5.0" }
hyper = { version = "0.14.31" }
hyper-tls = { version = "0.5.0" }
Expand Down
9 changes: 9 additions & 0 deletions nghe-backend/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ pub enum Kind {
#[into(OpensubsonicCode| OpensubsonicCode::AGenericError)]
EmptyFileEncountered,

#[error("File is already locked")]
#[into(StatusCode| StatusCode::INTERNAL_SERVER_ERROR)]
#[into(OpensubsonicCode| OpensubsonicCode::AGenericError)]
FileAlreadyLocked,
#[error("File is already exclusively locked")]
#[into(StatusCode| StatusCode::INTERNAL_SERVER_ERROR)]
#[into(OpensubsonicCode| OpensubsonicCode::AGenericError)]
FileAlreadyExclusivelyLocked,

// Media error
#[error("Could not found {0} tag in format {1}")]
#[into(StatusCode| StatusCode::INTERNAL_SERVER_ERROR)]
Expand Down
1 change: 1 addition & 0 deletions nghe-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#![feature(if_let_guard)]
#![feature(integer_sign_cast)]
#![feature(iterator_try_collect)]
#![feature(file_lock)]
#![feature(let_chains)]
#![feature(proc_macro_hygiene)]
#![feature(stmt_expr_attributes)]
Expand Down
5 changes: 2 additions & 3 deletions nghe-backend/src/route/media_retrieval/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn handler(
let span = tracing::Span::current();
let (can_acquire_lock, output) = tokio::task::spawn_blocking(move || {
let _entered = span.enter();
(transcode::Sink::lock_read(&output).is_ok(), output)
(transcode::Lock::lock_read(&output).is_ok(), output)
})
.await?;

Expand Down Expand Up @@ -117,7 +117,6 @@ mod tests {
use itertools::Itertools;
use nghe_api::common::{filesystem, format};
use rstest::rstest;
use transcode::Sink;

use super::*;
use crate::file::audio;
Expand Down Expand Up @@ -255,7 +254,7 @@ mod tests {
.replace(format)
.path(config.cache_dir.as_ref().unwrap(), bitrate.to_string().as_str());
tokio::task::spawn_blocking(move || {
Sink::lock_read_blocking(&cache_path).unwrap();
transcode::Lock::lock_read_blocking(&cache_path).unwrap();
})
.await
.unwrap();
Expand Down
60 changes: 60 additions & 0 deletions nghe-backend/src/transcode/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::fmt::Debug;
use std::io::Seek;

use tracing::instrument;
use typed_path::Utf8NativePath;

use crate::{error, Error};

pub struct Lock;

impl Lock {
fn open_read(path: impl AsRef<Utf8NativePath>) -> Result<std::fs::File, Error> {
if cfg!(windows) {
// On Windows, the file must be open with write permissions to lock it.
std::fs::OpenOptions::new().read(true).write(true).open(path.as_ref())
} else {
std::fs::OpenOptions::new().read(true).open(path.as_ref())
}
.map_err(Error::from)
}

#[instrument(err(Debug, level = "debug"))]
pub fn lock_read(path: impl AsRef<Utf8NativePath> + Debug) -> Result<std::fs::File, Error> {
let mut file = Self::open_read(path)?;
// The read lock might be acquired with an empty file since creating and locking exclusively
// a file are two separate operations. We need to check if the file is empty before trying
// to acquiring the read lock. If the file is empty, don't lock it so the write lock
// can be acquired by the process that has created this file.
if file.seek(std::io::SeekFrom::End(0))? > 0 {
if file.try_lock_shared()? {
Ok(file)
} else {
error::Kind::FileAlreadyExclusivelyLocked.into()
}
} else {
error::Kind::EmptyFileEncountered.into()
}
}

#[instrument(err(Debug, level = "debug"))]
pub fn lock_write(path: impl AsRef<Utf8NativePath> + Debug) -> Result<std::fs::File, Error> {
let file = std::fs::OpenOptions::new().write(true).create_new(true).open(path.as_ref())?;
if file.try_lock()? { Ok(file) } else { error::Kind::FileAlreadyLocked.into() }
}
}

#[cfg(test)]
mod test {
use super::*;

impl Lock {
pub fn lock_read_blocking(
path: impl AsRef<Utf8NativePath>,
) -> Result<std::fs::File, Error> {
let file = Self::open_read(path)?;
file.lock_shared()?;
Ok(file)
}
}
}
2 changes: 2 additions & 0 deletions nghe-backend/src/transcode/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod format;
mod lock;
mod sink;
mod transcoder;

pub use lock::Lock;
pub use sink::Sink;
pub use transcoder::Transcoder;
37 changes: 2 additions & 35 deletions nghe-backend/src/transcode/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ use std::fmt::Debug;
use std::io::Write;

use educe::Educe;
use fs4::fs_std::FileExt;
use loole::{Receiver, Sender};
use nghe_api::common::format;
use rsmpeg::avformat::{AVIOContextContainer, AVIOContextCustom};
use rsmpeg::avutil::AVMem;
use rsmpeg::ffi;
use tracing::instrument;
use typed_path::Utf8NativePath;

use super::Lock;
use crate::{config, Error};

#[derive(Educe)]
Expand Down Expand Up @@ -39,7 +38,7 @@ impl Sink {
let span = tracing::Span::current();
let file = tokio::task::spawn_blocking(move || {
let _entered = span.enter();
output.map(Self::lock_write).transpose().ok().flatten()
output.map(Lock::lock_write).transpose().ok().flatten()
})
.await?;
Ok((Self { tx, buffer_size: config.buffer_size, format, file }, rx))
Expand All @@ -57,30 +56,6 @@ impl Sink {
}
}

#[instrument(err(Debug, level = "debug"))]
pub fn lock_write(path: impl AsRef<Utf8NativePath> + Debug) -> Result<std::fs::File, Error> {
let file = std::fs::OpenOptions::new().write(true).create_new(true).open(path.as_ref())?;
file.try_lock_exclusive()?;
Ok(file)
}

fn open_read(path: impl AsRef<Utf8NativePath>) -> Result<std::fs::File, Error> {
if cfg!(windows) {
// On Windows, the file must be open with write permissions to lock it.
std::fs::OpenOptions::new().read(true).write(true).open(path.as_ref())
} else {
std::fs::OpenOptions::new().read(true).open(path.as_ref())
}
.map_err(Error::from)
}

#[instrument(err(Debug, level = "debug"))]
pub fn lock_read(path: impl AsRef<Utf8NativePath> + Debug) -> Result<std::fs::File, Error> {
let file = Self::open_read(path)?;
FileExt::try_lock_shared(&file)?;
Ok(file)
}

fn write(&mut self, data: &[u8]) -> i32 {
let write_len = data.len().try_into().unwrap_or(ffi::AVERROR_BUG2);

Expand Down Expand Up @@ -126,13 +101,5 @@ mod test {
_ => status,
}
}

pub fn lock_read_blocking(
path: impl AsRef<Utf8NativePath>,
) -> Result<std::fs::File, Error> {
let file = Self::open_read(path)?;
FileExt::lock_shared(&file)?;
Ok(file)
}
}
}

0 comments on commit b4aebd3

Please sign in to comment.