Skip to content

Commit

Permalink
fixed subtle bug with base_url in zenodo setup
Browse files Browse the repository at this point in the history
  • Loading branch information
vsbuffalo committed Aug 22, 2023
1 parent 01751fa commit ac0bedb
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 43 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ rand = "0.8.5"
flate2 = "1.0.27"
lazy_static = "1.4.0"
httpmock = "0.6.8"
cd = "0.2.1"
indicatif = { version = "0.17.6", features = ["futures"] }

5 changes: 4 additions & 1 deletion src/lib/api/figshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ pub struct FigShareArticle {

impl FigShareAPI {
pub fn new(name: &str, base_url: Option<String>) -> Result<Self> {
// Note: this constructor is not called often, except through
// Project::link(), since serde is usually deserializing the
// new FigShareAPI Remote variant from the manifest.
let auth_keys = if base_url.is_none() {
// using the default base_url means we're
// not using mock HTTP servers
Expand All @@ -255,7 +258,7 @@ impl FigShareAPI {
// If base_url is set, we're using mock HTTP servers,
// so we use the test-token
let mut auth_keys = AuthKeys::default();
auth_keys.add("figshare", TEST_TOKEN);
auth_keys.temporary_add("figshare", TEST_TOKEN);
auth_keys
};
let token = auth_keys.get("figshare".to_string())?;
Expand Down
36 changes: 31 additions & 5 deletions src/lib/api/zenodo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use crate::lib::remote::{AuthKeys,RemoteFile,RequestData};

const BASE_URL: &str = "https://zenodo.org/api";

// for testing:
const TEST_TOKEN: &str = "test-token";

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ZenodoDeposition {
conceptrecid: String,
Expand Down Expand Up @@ -136,9 +139,16 @@ struct PrereserveDoi {
recid: usize,
}


// for serde deserialize default
fn zenodo_api_url() -> String {
BASE_URL.to_string()
}


#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ZenodoAPI {
#[serde(skip_serializing, skip_deserializing)]
#[serde(skip_serializing, skip_deserializing,default="zenodo_api_url")]
base_url: String,
name: String,
#[serde(skip_serializing, skip_deserializing)]
Expand All @@ -152,9 +162,23 @@ pub struct ZenodoAPI {

impl ZenodoAPI {
pub fn new(name: &str, base_url: Option<String>) -> Result<Self> {
let auth_keys = AuthKeys::new();
let token = auth_keys.get("figshare".to_string())?;
// Note: this constructor is not called often, except through
// Project::link(), since serde is usually deserializing the
// new ZenodoAPI Remote variant from the manifest.
let auth_keys = if base_url.is_none() {
// using the default base_url means we're
// not using mock HTTP servers
AuthKeys::new()
} else {
// If base_url is set, we're using mock HTTP servers,
// so we use the test-token
let mut auth_keys = AuthKeys::default();
auth_keys.temporary_add("zenodo", TEST_TOKEN);
auth_keys
};
let token = auth_keys.get("zenodo".to_string())?;
let base_url = base_url.unwrap_or(BASE_URL.to_string());
println!("Base URL in constructor: {:?}", base_url);
Ok(ZenodoAPI {
base_url,
name: name.to_string(),
Expand All @@ -175,6 +199,8 @@ impl ZenodoAPI {
async fn issue_request<T: serde::Serialize + std::fmt::Debug>(&self, method: Method, endpoint: &str,
headers: Option<HeaderMap>,
data: Option<RequestData<T>>) -> Result<Response> {

println!("BASE_URL: {:?}", self.base_url);
let url = format!("{}/{}?access_token={}", self.base_url.trim_end_matches('/'), endpoint.trim_start_matches('/'), self.token);
trace!("request URL: {:?}", &url);

Expand Down Expand Up @@ -239,6 +265,7 @@ impl ZenodoAPI {
}

pub async fn get_files(&self) -> Result<Vec<ZenodoFile>> {
println!("self: {:?}", self);
let id = self.deposition_id.ok_or(anyhow!("Internal Error: Zenodo deposition_id not set."))?;
let url = format!("{}/{}/files", "/deposit/depositions", id);
let response = self.issue_request::<HashMap<String, String>>(Method::GET, &url, None, None).await?;
Expand Down Expand Up @@ -333,10 +360,9 @@ mod tests {
// Create an instance of ZenodoAPI
let mut api = ZenodoAPI::new("test", Some(server.url("/"))).unwrap();
info!("Test ZenodoAPI: {:?}", api);
api.set_token("fake_token".to_string());

// Main call to test
let result = api.remote_init(local_metadata).await;
let _result = api.remote_init(local_metadata).await;
//info!("result: {:?}", result);

// ensure the specified mock was called exactly one time (or fail).
Expand Down
117 changes: 80 additions & 37 deletions src/lib/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::fs;
use trauma::downloader::{DownloaderBuilder,StyleOptions,ProgressBarOpts};
use std::time::Duration;
use std::thread;
use indicatif::{ProgressBar, ProgressStyle};
use colored::*;

use crate::{print_warn,print_info};
Expand Down Expand Up @@ -605,22 +608,50 @@ impl DataCollection {
// Fetch all remote files.
//
// (remote service, path) -> { filename -> RemoteFile, ... }
pub async fn fetch(&mut self) -> Result<HashMap<(String,String), HashMap<String,RemoteFile>>> {
pub async fn fetch(&mut self) -> Result<HashMap<(String, String), HashMap<String, RemoteFile>>> {
self.authenticate_remotes()?;

let mut all_remote_files = HashMap::new();
for (path, remote) in &self.remotes {
let remote_files = remote.get_files_hashmap().await?;
all_remote_files.insert((remote.name().to_string(), path.clone()), remote_files);
let pb = ProgressBar::new(self.remotes.len() as u64);
pb.set_style(ProgressStyle::default_bar()
.progress_chars("=> ")
.template("{spinner:.green} [{bar:40.green/white}] {pos:>}/{len} ({percent}%) eta {eta_precise:.green} {msg}")?
);
pb.set_message(format!("Fetching remote files..."));

// Convert remotes into Futures, so that they can be awaited in parallel
let fetch_futures: Vec<_> = self.remotes.iter().map(|(path, remote)| {
let remote_name = remote.name().to_string();
let path_clone = path.clone();
async move {
let remote_files = remote.get_files_hashmap().await?;
Ok(((remote_name, path_clone), remote_files))
}
}).collect();

let results = join_all(fetch_futures).await;

for result in results {
match result {
Ok((key, value)) => {
pb.set_message(format!("Fetching remote files... {} done.", key.0));
all_remote_files.insert(key, value);
pb.inc(1);
},
Err(e) => return Err(e), // Handle errors as needed
}
}
//info!("fetch() remote files: {:?}", all_remote_files);

pb.finish_with_message("Fetching completed!");
Ok(all_remote_files)
}

// Merge all local and remote files.
//
// Use a fetch to get all remote files (as RemoteFile), and merge these
// in with the local data files (DataFile) into a MergedFile struct.
// Missing remote/local files are None.
//
// Returns: Result with HashMap of directory -> { File -> MergedFile, ... }
pub async fn merge(&mut self, include_remotes: bool) -> Result<HashMap<String, HashMap<String, MergedFile>>> {
// directory -> {(filename -> MergedFile), ...}
let mut result: HashMap<String, HashMap<String, MergedFile>> = HashMap::new();
Expand Down Expand Up @@ -673,44 +704,53 @@ impl DataCollection {

// Get the status of the DataCollection, optionally with remotes.
//
// Organized by directory
// Returns Result of BTreeMap of directory -> [ StatusEntry, ...]
pub async fn status(&mut self, path_context: &Path, include_remotes: bool) -> Result<BTreeMap<String, Vec<StatusEntry>>> {
// get all merged files, used to compute the status
let merged_files = self.merge(include_remotes).await?;
//info!("merged_file: {:?}", merged_files);

let mut statuses_futures = FuturesUnordered::new();

for (directory, inner_map) in merged_files.into_iter() {
// this clone is to prevent a borrow issue due to async move below
let files: Vec<_> = inner_map.values().cloned().collect();
for mf in files {
let directory_clone = directory.clone();
statuses_futures.push(async move {
let status_entry = mf.status_entry(path_context, include_remotes).await.map_err(anyhow::Error::from)?;
Ok::<(String, StatusEntry), anyhow::Error>((directory_clone, status_entry))
});
}
}

let mut statuses = BTreeMap::new();

// Get the StatusEntry async via join_all() for each
// MergedFile. The inner hash map has keys that are the
// file names (since this was use for join); these are not
// needed so they're ditched, leaving a
// BTreeMap<Vec<StatusEntry>>
let statuses_futures: FuturesUnordered<_> = merged_files
.into_iter()
.map(|(outer_key, inner_map)| {
// create a future for each merged_file, and collect the results
async move {
let status_entries: Result<Vec<_>, _> = join_all(
inner_map.values()
// get the StatusEntry for each MergedFile
.map(|mf| async { mf.status_entry(path_context, include_remotes).await })
.collect::<Vec<_>>()
).await.into_iter().collect();
status_entries.map(|entries| (outer_key, entries))
}
})
.collect();
let pb = ProgressBar::new(statuses_futures.len() as u64);
pb.set_style(ProgressStyle::default_bar()
.progress_chars("=> ")
.template("{spinner:.green} [{bar:40.green/white}] {pos:>}/{len} ({percent}%) eta {eta_precise:.green} {msg}")?
);

let statuses_results: Vec<_> = statuses_futures.collect().await;

for result in statuses_results {
let pb_clone = pb.clone();
thread::spawn(move || {
loop {
pb_clone.tick();
thread::sleep(Duration::from_millis(20));
}
});
// process the futures as they become ready
pb.set_message("Calculating MD5s...");
while let Some(result) = statuses_futures.next().await {
if let Ok((key, value)) = result {
statuses.insert(key, value);
pb.set_message(format!("Calculating MD5s... {} done.", &value.name));
statuses.entry(key).or_insert_with(Vec::new).push(value);
pb.inc(1);
} else {
// Handle the error as needed
result?;
}
}

pb.finish_with_message("Complete.");
Ok(statuses)
}

Expand Down Expand Up @@ -842,7 +882,7 @@ impl DataCollection {
let mut current_skipped = Vec::new();
let mut messy_skipped = Vec::new();
let mut overwrite_skipped = Vec::new();

for (dir, merged_files) in all_files.iter() {
for merged_file in merged_files.values().filter(|f| f.can_download()) {

Expand Down Expand Up @@ -896,10 +936,13 @@ impl DataCollection {
}
}

let style = ProgressBarOpts::new(
Some("{spinner:.green} [{bar:40.green/white}] {pos:>}/{len} ({percent}%) eta {eta_precise:.green} {msg}".to_string()),
Some("=> ".to_string()),
true, true);

let mut style_opts = StyleOptions::default();
style_opts.set_child(ProgressBarOpts::with_pip_style());

let style_clone = style.clone();
let style_opts = StyleOptions::new(style, style_clone);

let total_files = downloads.len();
if !downloads.is_empty() {
Expand Down
7 changes: 7 additions & 0 deletions src/lib/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ impl AuthKeys {
self.save();
}

pub fn temporary_add(&mut self, service: &str, key: &str) {
// no save, i.e. for testing -- we do *not* want to overwrite the
// dev's own keys.
let service = service.to_lowercase();
self.keys.insert(service, key.to_owned());
}

pub fn get(&self, service: String) -> Result<String> {
match self.keys.get(&service) {
None => Err(anyhow!("no key found for service '{}'", service)),
Expand Down

0 comments on commit ac0bedb

Please sign in to comment.