diff --git a/Cargo.toml b/Cargo.toml index eccf958..88e0ff6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,4 +44,6 @@ rand = "0.8.5" flate2 = "1.0.27" lazy_static = "1.4.0" httpmock = "0.6.8" +cd = "0.2.1" +indicatif = { version = "0.17.6", features = ["futures"] } diff --git a/src/lib/api/figshare.rs b/src/lib/api/figshare.rs index cd6af3e..ee60dbe 100644 --- a/src/lib/api/figshare.rs +++ b/src/lib/api/figshare.rs @@ -247,6 +247,9 @@ pub struct FigShareArticle { impl FigShareAPI { pub fn new(name: &str, base_url: Option) -> Result { + // Note: this constructor is not called often, except through + // Project::link(), since serde is usually deserializing the + // new FigShareAPI Remote variant from the manifest. let auth_keys = if base_url.is_none() { // using the default base_url means we're // not using mock HTTP servers @@ -255,7 +258,7 @@ impl FigShareAPI { // If base_url is set, we're using mock HTTP servers, // so we use the test-token let mut auth_keys = AuthKeys::default(); - auth_keys.add("figshare", TEST_TOKEN); + auth_keys.temporary_add("figshare", TEST_TOKEN); auth_keys }; let token = auth_keys.get("figshare".to_string())?; diff --git a/src/lib/api/zenodo.rs b/src/lib/api/zenodo.rs index e938493..f711bfa 100644 --- a/src/lib/api/zenodo.rs +++ b/src/lib/api/zenodo.rs @@ -14,6 +14,9 @@ use crate::lib::remote::{AuthKeys,RemoteFile,RequestData}; const BASE_URL: &str = "https://zenodo.org/api"; +// for testing: +const TEST_TOKEN: &str = "test-token"; + #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct ZenodoDeposition { conceptrecid: String, @@ -136,9 +139,16 @@ struct PrereserveDoi { recid: usize, } + +// for serde deserialize default +fn zenodo_api_url() -> String { + BASE_URL.to_string() +} + + #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct ZenodoAPI { - #[serde(skip_serializing, skip_deserializing)] + #[serde(skip_serializing, skip_deserializing,default="zenodo_api_url")] base_url: String, name: String, #[serde(skip_serializing, skip_deserializing)] @@ -152,9 +162,23 @@ pub struct ZenodoAPI { impl ZenodoAPI { pub fn new(name: &str, base_url: Option) -> Result { - let auth_keys = AuthKeys::new(); - let token = auth_keys.get("figshare".to_string())?; + // Note: this constructor is not called often, except through + // Project::link(), since serde is usually deserializing the + // new ZenodoAPI Remote variant from the manifest. + let auth_keys = if base_url.is_none() { + // using the default base_url means we're + // not using mock HTTP servers + AuthKeys::new() + } else { + // If base_url is set, we're using mock HTTP servers, + // so we use the test-token + let mut auth_keys = AuthKeys::default(); + auth_keys.temporary_add("zenodo", TEST_TOKEN); + auth_keys + }; + let token = auth_keys.get("zenodo".to_string())?; let base_url = base_url.unwrap_or(BASE_URL.to_string()); + println!("Base URL in constructor: {:?}", base_url); Ok(ZenodoAPI { base_url, name: name.to_string(), @@ -175,6 +199,8 @@ impl ZenodoAPI { async fn issue_request(&self, method: Method, endpoint: &str, headers: Option, data: Option>) -> Result { + + println!("BASE_URL: {:?}", self.base_url); let url = format!("{}/{}?access_token={}", self.base_url.trim_end_matches('/'), endpoint.trim_start_matches('/'), self.token); trace!("request URL: {:?}", &url); @@ -239,6 +265,7 @@ impl ZenodoAPI { } pub async fn get_files(&self) -> Result> { + println!("self: {:?}", self); let id = self.deposition_id.ok_or(anyhow!("Internal Error: Zenodo deposition_id not set."))?; let url = format!("{}/{}/files", "/deposit/depositions", id); let response = self.issue_request::>(Method::GET, &url, None, None).await?; @@ -333,10 +360,9 @@ mod tests { // Create an instance of ZenodoAPI let mut api = ZenodoAPI::new("test", Some(server.url("/"))).unwrap(); info!("Test ZenodoAPI: {:?}", api); - api.set_token("fake_token".to_string()); // Main call to test - let result = api.remote_init(local_metadata).await; + let _result = api.remote_init(local_metadata).await; //info!("result: {:?}", result); // ensure the specified mock was called exactly one time (or fail). diff --git a/src/lib/data.rs b/src/lib/data.rs index 2ad4575..9a3dba4 100644 --- a/src/lib/data.rs +++ b/src/lib/data.rs @@ -13,6 +13,9 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use std::fs; use trauma::downloader::{DownloaderBuilder,StyleOptions,ProgressBarOpts}; +use std::time::Duration; +use std::thread; +use indicatif::{ProgressBar, ProgressStyle}; use colored::*; use crate::{print_warn,print_info}; @@ -605,22 +608,50 @@ impl DataCollection { // Fetch all remote files. // // (remote service, path) -> { filename -> RemoteFile, ... } - pub async fn fetch(&mut self) -> Result>> { + pub async fn fetch(&mut self) -> Result>> { self.authenticate_remotes()?; + let mut all_remote_files = HashMap::new(); - for (path, remote) in &self.remotes { - let remote_files = remote.get_files_hashmap().await?; - all_remote_files.insert((remote.name().to_string(), path.clone()), remote_files); + let pb = ProgressBar::new(self.remotes.len() as u64); + pb.set_style(ProgressStyle::default_bar() + .progress_chars("=> ") + .template("{spinner:.green} [{bar:40.green/white}] {pos:>}/{len} ({percent}%) eta {eta_precise:.green} {msg}")? + ); + pb.set_message(format!("Fetching remote files...")); + + // Convert remotes into Futures, so that they can be awaited in parallel + let fetch_futures: Vec<_> = self.remotes.iter().map(|(path, remote)| { + let remote_name = remote.name().to_string(); + let path_clone = path.clone(); + async move { + let remote_files = remote.get_files_hashmap().await?; + Ok(((remote_name, path_clone), remote_files)) + } + }).collect(); + + let results = join_all(fetch_futures).await; + + for result in results { + match result { + Ok((key, value)) => { + pb.set_message(format!("Fetching remote files... {} done.", key.0)); + all_remote_files.insert(key, value); + pb.inc(1); + }, + Err(e) => return Err(e), // Handle errors as needed + } } - //info!("fetch() remote files: {:?}", all_remote_files); + + pb.finish_with_message("Fetching completed!"); Ok(all_remote_files) } - // Merge all local and remote files. // // Use a fetch to get all remote files (as RemoteFile), and merge these // in with the local data files (DataFile) into a MergedFile struct. // Missing remote/local files are None. + // + // Returns: Result with HashMap of directory -> { File -> MergedFile, ... } pub async fn merge(&mut self, include_remotes: bool) -> Result>> { // directory -> {(filename -> MergedFile), ...} let mut result: HashMap> = HashMap::new(); @@ -673,44 +704,53 @@ impl DataCollection { // Get the status of the DataCollection, optionally with remotes. // - // Organized by directory + // Returns Result of BTreeMap of directory -> [ StatusEntry, ...] pub async fn status(&mut self, path_context: &Path, include_remotes: bool) -> Result>> { - // get all merged files, used to compute the status let merged_files = self.merge(include_remotes).await?; - //info!("merged_file: {:?}", merged_files); + + let mut statuses_futures = FuturesUnordered::new(); + + for (directory, inner_map) in merged_files.into_iter() { + // this clone is to prevent a borrow issue due to async move below + let files: Vec<_> = inner_map.values().cloned().collect(); + for mf in files { + let directory_clone = directory.clone(); + statuses_futures.push(async move { + let status_entry = mf.status_entry(path_context, include_remotes).await.map_err(anyhow::Error::from)?; + Ok::<(String, StatusEntry), anyhow::Error>((directory_clone, status_entry)) + }); + } + } let mut statuses = BTreeMap::new(); - // Get the StatusEntry async via join_all() for each - // MergedFile. The inner hash map has keys that are the - // file names (since this was use for join); these are not - // needed so they're ditched, leaving a - // BTreeMap> - let statuses_futures: FuturesUnordered<_> = merged_files - .into_iter() - .map(|(outer_key, inner_map)| { - // create a future for each merged_file, and collect the results - async move { - let status_entries: Result, _> = join_all( - inner_map.values() - // get the StatusEntry for each MergedFile - .map(|mf| async { mf.status_entry(path_context, include_remotes).await }) - .collect::>() - ).await.into_iter().collect(); - status_entries.map(|entries| (outer_key, entries)) - } - }) - .collect(); + let pb = ProgressBar::new(statuses_futures.len() as u64); + pb.set_style(ProgressStyle::default_bar() + .progress_chars("=> ") + .template("{spinner:.green} [{bar:40.green/white}] {pos:>}/{len} ({percent}%) eta {eta_precise:.green} {msg}")? + ); - let statuses_results: Vec<_> = statuses_futures.collect().await; - for result in statuses_results { + let pb_clone = pb.clone(); + thread::spawn(move || { + loop { + pb_clone.tick(); + thread::sleep(Duration::from_millis(20)); + } + }); + // process the futures as they become ready + pb.set_message("Calculating MD5s..."); + while let Some(result) = statuses_futures.next().await { if let Ok((key, value)) = result { - statuses.insert(key, value); + pb.set_message(format!("Calculating MD5s... {} done.", &value.name)); + statuses.entry(key).or_insert_with(Vec::new).push(value); + pb.inc(1); } else { - // Handle the error as needed + result?; } } + + pb.finish_with_message("Complete."); Ok(statuses) } @@ -842,7 +882,7 @@ impl DataCollection { let mut current_skipped = Vec::new(); let mut messy_skipped = Vec::new(); let mut overwrite_skipped = Vec::new(); - + for (dir, merged_files) in all_files.iter() { for merged_file in merged_files.values().filter(|f| f.can_download()) { @@ -896,10 +936,13 @@ impl DataCollection { } } + let style = ProgressBarOpts::new( + Some("{spinner:.green} [{bar:40.green/white}] {pos:>}/{len} ({percent}%) eta {eta_precise:.green} {msg}".to_string()), + Some("=> ".to_string()), + true, true); - let mut style_opts = StyleOptions::default(); - style_opts.set_child(ProgressBarOpts::with_pip_style()); - + let style_clone = style.clone(); + let style_opts = StyleOptions::new(style, style_clone); let total_files = downloads.len(); if !downloads.is_empty() { diff --git a/src/lib/remote.rs b/src/lib/remote.rs index f42f2e5..ba0113d 100644 --- a/src/lib/remote.rs +++ b/src/lib/remote.rs @@ -121,6 +121,13 @@ impl AuthKeys { self.save(); } + pub fn temporary_add(&mut self, service: &str, key: &str) { + // no save, i.e. for testing -- we do *not* want to overwrite the + // dev's own keys. + let service = service.to_lowercase(); + self.keys.insert(service, key.to_owned()); + } + pub fn get(&self, service: String) -> Result { match self.keys.get(&service) { None => Err(anyhow!("no key found for service '{}'", service)),