Skip to content

Commit

Permalink
feat(scan): allow to set parallel scan or not
Browse files Browse the repository at this point in the history
  • Loading branch information
vnghia committed Apr 9, 2024
1 parent 1b85671 commit 2294c47
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 38 deletions.
2 changes: 2 additions & 0 deletions src/config/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct ArtistConfig {
#[derive(Debug, Clone, Serialize, Deserialize, Derivative)]
#[derivative(Default)]
pub struct ScanConfig {
#[derivative(Default(value = "false"))]
pub parallel: bool,
#[derivative(Default(value = "10"))]
pub channel_size: usize,
#[derivative(Default(value = "10"))]
Expand Down
3 changes: 2 additions & 1 deletion src/open_subsonic/scan/run_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,10 @@ pub async fn run_scan(

let span = tracing::Span::current();
let (tx, rx) = flume::bounded(scan_config.channel_size);
let scan_parallel = scan_config.parallel;
scan_media_files_tasks.push(tokio::task::spawn_blocking(move || {
let _enter = span.enter();
scan_media_files(music_folder_path, tx)
scan_media_files(music_folder_path, tx, scan_parallel)
}));

while let Ok(scanned_media_file) = rx.recv_async().await {
Expand Down
116 changes: 79 additions & 37 deletions src/utils/fs/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path::{Path, PathBuf};

use flume::Sender;
use ignore::types::TypesBuilder;
use ignore::WalkBuilder;
use ignore::{DirEntry, Error, WalkBuilder};
use tracing::instrument;

use super::super::song::file_type::SONG_FILE_TYPES;
Expand All @@ -27,10 +27,37 @@ impl ScannedMediaFile {
}
}

fn process_dir_entry<P: AsRef<Path>>(
root: P,
tx: &Sender<ScannedMediaFile>,
entry: Result<DirEntry, Error>,
) -> ignore::WalkState {
match try {
let entry = entry?;
let metadata = entry.metadata()?;
let path = entry.path();
if metadata.is_file()
&& let Err(e) = tx.send(ScannedMediaFile::new(root, path.to_path_buf(), metadata.len()))
{
tracing::error!(sending_walkdir_result = ?e);
ignore::WalkState::Quit
} else {
ignore::WalkState::Continue
}
} {
Ok(r) => r,
Err::<_, anyhow::Error>(e) => {
tracing::error!(walking_media_directory = ?e);
ignore::WalkState::Continue
}
}
}

#[instrument(skip(tx))]
pub fn scan_media_files<P: AsRef<Path> + Clone + Send + std::fmt::Debug>(
root: P,
tx: Sender<ScannedMediaFile>,
scan_parallel: bool,
) {
tracing::debug!("start scanning");

Expand All @@ -48,36 +75,21 @@ pub fn scan_media_files<P: AsRef<Path> + Clone + Send + std::fmt::Debug>(
}
};

WalkBuilder::new(&root).types(types).build_parallel().run(|| {
let span = tracing::Span::current();
let tx = tx.clone();
let root = root.clone();

Box::new(move |entry| {
let _enter = span.enter();

match try {
let entry = entry?;
let metadata = entry.metadata()?;
let path = entry.path();
if metadata.is_file()
&& let Err(e) =
tx.send(ScannedMediaFile::new(&root, path.to_path_buf(), metadata.len()))
{
tracing::error!(sending_walkdir_result = ?e);
ignore::WalkState::Quit
} else {
ignore::WalkState::Continue
}
} {
Ok(r) => r,
Err::<_, anyhow::Error>(e) => {
tracing::error!(walking_media_directory = ?e);
ignore::WalkState::Continue
}
}
})
});
if scan_parallel {
WalkBuilder::new(&root).types(types).build_parallel().run(|| {
let span = tracing::Span::current();
let tx = tx.clone();
let root = root.clone();
Box::new(move |entry| {
let _enter = span.enter();
process_dir_entry(&root, &tx, entry)
})
});
} else {
for entry in WalkBuilder::new(&root).types(types).build() {
process_dir_entry(&root, &tx, entry);
}
}

tracing::debug!("finish scanning");
}
Expand All @@ -90,11 +102,12 @@ mod tests {
use crate::utils::song::file_type::to_extensions;
use crate::utils::test::fs::TemporaryFs;

async fn wrap_scan_media_file(fs: &TemporaryFs) -> Vec<ScannedMediaFile> {
async fn wrap_scan_media_file(fs: &TemporaryFs, scan_parallel: bool) -> Vec<ScannedMediaFile> {
let (tx, rx) = flume::bounded(100);
let root_path = fs.root_path().to_path_buf();

let scan_thread = tokio::task::spawn_blocking(move || scan_media_files(&root_path, tx));
let scan_thread =
tokio::task::spawn_blocking(move || scan_media_files(&root_path, tx, scan_parallel));
let mut result = vec![];
while let Ok(r) = rx.recv_async().await {
result.push(r);
Expand All @@ -113,7 +126,7 @@ mod tests {
.map(|path| fs.create_file(path))
.collect_vec();

let scanned_results = wrap_scan_media_file(&fs).await;
let scanned_results = wrap_scan_media_file(&fs, false).await;
let scanned_lens =
scanned_results.iter().cloned().map(|result| result.song_file_size).collect_vec();
let scanned_paths =
Expand Down Expand Up @@ -142,7 +155,7 @@ mod tests {
.map(|path| fs.create_file(path).strip_prefix(fs.root_path()).unwrap().to_path_buf())
.collect_vec();

let scanned_paths = wrap_scan_media_file(&fs)
let scanned_paths = wrap_scan_media_file(&fs, false)
.await
.iter()
.cloned()
Expand Down Expand Up @@ -178,7 +191,7 @@ mod tests {
})
.collect_vec();

let scanned_paths = wrap_scan_media_file(&fs)
let scanned_paths = wrap_scan_media_file(&fs, false)
.await
.into_iter()
.map(|result| result.song_absolute_path)
Expand Down Expand Up @@ -206,7 +219,7 @@ mod tests {
})
.collect_vec();

let scanned_paths = wrap_scan_media_file(&fs)
let scanned_paths = wrap_scan_media_file(&fs, false)
.await
.into_iter()
.map(|result| result.song_absolute_path)
Expand All @@ -217,4 +230,33 @@ mod tests {
scanned_paths.into_iter().sorted().collect_vec()
);
}

#[tokio::test]
async fn test_scan_media_files_parallel() {
let fs = TemporaryFs::default();

let media_paths = TemporaryFs::create_random_relative_paths(50, 3, &to_extensions())
.into_iter()
.map(|path| fs.create_file(path))
.collect_vec();

let scanned_results = wrap_scan_media_file(&fs, true).await;
let scanned_lens =
scanned_results.iter().cloned().map(|result| result.song_file_size).collect_vec();
let scanned_paths =
scanned_results.iter().cloned().map(|result| result.song_absolute_path).collect_vec();

assert_eq!(
media_paths
.iter()
.map(|path| std::fs::metadata(path).unwrap().len())
.sorted()
.collect_vec(),
scanned_lens.into_iter().sorted().collect_vec()
);
assert_eq!(
media_paths.into_iter().sorted().collect_vec(),
scanned_paths.into_iter().sorted().collect_vec()
);
}
}

0 comments on commit 2294c47

Please sign in to comment.