diff --git a/Cargo.toml b/Cargo.toml index f379b6a..94d2aeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,4 +46,5 @@ lazy_static = "1.4.0" httpmock = "0.6.8" cd = "0.2.1" indicatif = { version = "0.17.6", features = ["futures"] } +tokio-util = { version = "0.7.8", features = ["codec"] } diff --git a/src/lib/api/figshare.rs b/src/lib/api/figshare.rs index a27fabc..6b58c6e 100644 --- a/src/lib/api/figshare.rs +++ b/src/lib/api/figshare.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use serde_derive::{Serialize,Deserialize}; use serde_json::Value; use reqwest::{Method, header::{HeaderMap, HeaderValue}}; -use reqwest::{Client, Response}; +use reqwest::{Client, Response, Body}; use colored::Colorize; use futures_util::StreamExt; use tokio::fs::File; @@ -27,6 +27,8 @@ use crate::lib::data::{DataFile, MergedFile}; use crate::lib::remote::{AuthKeys, RemoteFile, DownloadInfo,RequestData}; use crate::lib::project::LocalMetadata; +use super::zenodo::ZenodoDeposition; + pub const FIGSHARE_BASE_URL: &str = "https://api.figshare.com/v2/"; // for testing: @@ -304,6 +306,11 @@ impl FigShareAPI { Some(RequestData::Json(json_data)) => request.json(&json_data), Some(RequestData::Binary(bin_data)) => request.body(bin_data), Some(RequestData::File(file)) => request.body(file), + Some(RequestData::Stream(file)) => { + let stream = tokio_util::io::ReaderStream::new(file); + let body = Body::wrap_stream(stream); + request.body(body) + }, Some(RequestData::Empty) => request.json(&serde_json::Value::Object(serde_json::Map::new())), None => request, }; @@ -407,26 +414,39 @@ impl FigShareAPI { Ok(()) } + pub async fn find_article(&self) -> Result> { + let articles = self.get_articles().await?; + let matches_found: Vec<_> = articles.into_iter().filter(|a| a.title == self.name).collect(); + if !matches_found.is_empty() { + if matches_found.len() > 1 { + return Err(anyhow!("Found multiple FigShare Articles with the \ + title '{}'", self.name)); + } else { + return Ok(Some(matches_found[0].clone())); + } + } else { + return Ok(None); + } + } + // FigShare Remote initialization // // This creates a FigShare article for the tracked directory. #[allow(unused)] - pub async fn remote_init(&mut self, local_metadata: LocalMetadata) -> Result<()> { + pub async fn remote_init(&mut self, local_metadata: LocalMetadata, link_only: bool) -> Result<()> { // (1) Let's make sure there is no Article that exists // with this same name - let articles = self.get_articles().await?; - - let matches_found: Vec<_> = articles.iter().filter(|&a| a.title == self.name).collect(); - - if !matches_found.is_empty() { - return Err(anyhow!("An existing FigShare Article with the title \ - '{}' was found. Either delete it on figshare.com \ - or chose a different name.", self.name)); - } - - // (2) Now that no Article name clash has occurred, let's - // create the new article and get the ID - let article = self.create_article(&self.name).await?; + let found_match = self.find_article().await?; + let article = if let Some(existing_info) = found_match { + if !link_only { + return Err(anyhow!("An existing FigShare Article with the title \ + '{}' was found. Use --link-only to link.", self.name)); + } + existing_info + } else { + // Step 2: Create a new deposition if none exists + self.create_article(&self.name).await? + }; // (3) Set the Article ID, which is the only state needed // for later queries diff --git a/src/lib/api/zenodo.rs b/src/lib/api/zenodo.rs index 3408e36..cb766c7 100644 --- a/src/lib/api/zenodo.rs +++ b/src/lib/api/zenodo.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow,Result,Context}; -use std::path::Path; +use std::{path::Path, fmt::Binary}; use reqwest::{Method, header::{HeaderMap, HeaderValue, CONTENT_TYPE}}; -use reqwest::{Client, Response}; +use reqwest::{Client, Response, Body}; use std::collections::HashMap; use serde_derive::{Serialize,Deserialize}; #[allow(unused_imports)] @@ -22,7 +22,7 @@ const BASE_URL: &str = "https://zenodo.org/api"; // for testing: const TEST_TOKEN: &str = "test-token"; -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct ZenodoDeposition { conceptrecid: String, created: String, @@ -92,7 +92,7 @@ pub struct ZenodoLinks { self_link: Option, } -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] struct Creator { name: String, affiliation: Option @@ -100,12 +100,12 @@ struct Creator { // We need this wrapper to provide the metadata // for the Zenodo Deposition. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] struct ZenodoDepositionData { metadata: ZenodoMetadata, } -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] struct ZenodoMetadata { #[serde(skip_serializing_if = "Option::is_none")] prereserve_doi: Option, @@ -126,7 +126,10 @@ impl TryInto for LocalMetadata { Ok(ZenodoDepositionData { metadata: ZenodoMetadata { prereserve_doi: None, - title: self.title.ok_or(anyhow!("Zenodo requires a title be set."))?, + title: self.title.ok_or(anyhow!("Zenodo requires a title be set.\n\ + Either: \n\ + - set this manually in data_manifest.yml\n\ + - specify with 'sdf link --name '\n"))?, upload_type: Some("dataset".to_string()), description: Some(description), creators: Some(vec![Creator { @@ -138,7 +141,7 @@ impl TryInto for LocalMetadata { } } -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] struct PrereserveDoi { doi: String, recid: usize, @@ -217,25 +220,24 @@ impl ZenodoAPI { let client = Client::new(); let mut request = client.request(method, &url); - trace!("request: {:?}", request); if let Some(h) = headers { - trace!("Request Headers: {:?}", h); request = request.headers(h); } - if let Some(data) = &data { // Use the cloned data for logging - let data_clone = data.clone(); // Clone the data - trace!("Request Data: {:?}", data_clone); - } - let request = match data { Some(RequestData::Json(json_data)) => request.json(&json_data), Some(RequestData::Binary(bin_data)) => request.body(bin_data), Some(RequestData::File(file)) => request.body(file), + Some(RequestData::Stream(file)) => { + let stream = tokio_util::io::ReaderStream::new(file); + let body = Body::wrap_stream(stream); + request.body(body) + }, Some(RequestData::Empty) => request.json(&serde_json::Value::Object(serde_json::Map::new())), None => request, }; + trace!("request (before send): {:?}", request); let response = request.send().await?; let response_status = response.status(); @@ -246,25 +248,78 @@ impl ZenodoAPI { } } + pub async fn get_depositions(&self) -> Result> { + let response = self.issue_request::>(Method::GET, "/deposit/depositions", None, None).await?; + let info: Vec = response.json().await?; + Ok(info) + } - // Initialize the data collection on the Remote + pub async fn get_deposition_exists(&self) -> Result { + let depositions = self.get_depositions().await?; + let matches_found: Vec<_> = depositions.iter().filter(|&a| a.title == self.name).collect(); + Ok(!matches_found.is_empty()) + } + + pub async fn find_deposition(&self) -> Result> { + let depositions = self.get_depositions().await?; + let matches_found: Vec<_> = depositions.into_iter().filter(|a| a.title == self.name).collect(); + if !matches_found.is_empty() { + if matches_found.len() > 1 { + return Err(anyhow!("Found multiple Zenodo Depositions with the \ + title '{}'", self.name)); + } else { + return Ok(Some(matches_found[0].clone())); + } + } else { + return Ok(None); + } + } + + // Create a new Zenodo Deposition // - // For Zenodo, this creates a new "deposition" - #[allow(unused)] - pub async fn remote_init(&mut self, local_metadata: LocalMetadata) -> Result<()> { - // TODO URGENT: check for existing entries! + // Note that this uses LocalMetadata to propagate some of the Zenodo metadata fields + // However, the title field is overwritten by ZenodoAPI.name. + pub async fn create_deposition(&self, local_metadata: LocalMetadata) -> Result { let mut headers = HeaderMap::new(); headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - let metadata: ZenodoDepositionData = local_metadata.try_into()?; - let data = Some(RequestData::Json(metadata)); + + // overwrite the name with the Remote::ZenodoAPI.name. + let mut metadata_copy = local_metadata.clone(); + metadata_copy.title = Some(self.name.clone()); + + let deposition_data: ZenodoDepositionData = metadata_copy.try_into()?; + + let data = Some(RequestData::Json(deposition_data)); let response = self.issue_request(Method::POST, "/deposit/depositions", Some(headers), data).await?; - let info: ZenodoDeposition = response.json().await?; - trace!("ZenodoDeposition: {:?}", info); + let deposition: ZenodoDeposition = response.json().await?; + Ok(deposition) + } + + // Initialize the data collection on the Remote + // + // For Zenodo, this creates a new "deposition" + pub async fn remote_init(&mut self, local_metadata: LocalMetadata, link_only: bool) -> Result<()> { + // Step 1: Check if a deposition already exists + let found_match = self.find_deposition().await?; + + let info = if let Some(existing_info) = found_match { + if !link_only { + return Err(anyhow!("An existing Zenodo Deposition with the title \ + '{}' was found. Use --link-only to link.", self.name)); + } + existing_info + } else { + // Step 2: Create a new deposition if none exists + self.create_deposition(local_metadata).await? + }; + self.deposition_id = Some(info.id as u64); self.bucket_url = info.links.bucket; + Ok(()) } + // Check if file exists, returning None if not, // and the ZenodoFile if so // TODO: could be part of higher Remote API, e.g. through generics? @@ -290,7 +345,6 @@ impl ZenodoAPI { // Upload the file, deleting any existing files if overwrite is true. // // Returns true/false if upload was completed or not. Will Error in other cases. - #[allow(unused_variables)] pub async fn upload(&self, data_file: &DataFile, path_context: &Path, overwrite: bool) -> Result { let bucket_url = self.bucket_url.as_ref().ok_or(anyhow!("Internal Error: Zenodo bucket_url not set. Please report."))?; let full_path = path_context.join(&data_file.path); @@ -317,13 +371,18 @@ impl ZenodoAPI { } } - // upload the file + // Upload the file. + // First build the headers -- octet stream + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/octet-stream")); + let file = tokio::fs::File::open(full_path).await?; - let response = self.issue_request::>(Method::PUT, &bucket_endpoint, None, Some(RequestData::File(file))).await?; + let response = self.issue_request::>(Method::PUT, &bucket_endpoint, + Some(headers), Some(RequestData::Stream(file))).await?; let info: ZenodoFileUpload = response.json().await?; let msg = "After upload, the local and remote MD5s differed. SciDataFlow\n\ - automatically deletes the remote file in this case"; + automatically deletes the remote file in this case"; // let's compare the MD5s let remote_md5 = info.checksum; if remote_md5 != data_file.md5 { @@ -441,6 +500,14 @@ mod tests { description: Some("Let's build infrastructure so science can build off itself.".to_string()), }; + // Create a mock deposition endpoint with a simulated success response + let deposition_get_mock = server.mock(|when, then| { + when.method(GET) + .path("/deposit/depositions"); + then.status(200) + .json_body(json!([])); + }); + // Create a mock deposition endpoint with a simulated success response let deposition_mock = server.mock(|when, then| { when.method(POST) @@ -497,10 +564,11 @@ mod tests { let mut api = ZenodoAPI::new("test", Some(server.url("/"))).unwrap(); // Main call to test - let _result = api.remote_init(local_metadata).await; + let _result = api.remote_init(local_metadata,false).await; //info!("result: {:?}", result); - // ensure the specified mock was called exactly one time (or fail). + // ensure the specified mocks were called exactly one time (or fail). + deposition_get_mock.assert(); deposition_mock.assert(); // Assert that the deposition_id and bucket_url have been set correctly @@ -613,8 +681,10 @@ mod tests { }; // Mock for the upload method + // NOTE: this mock does not test for binary files or the octet-stream header. let upload_file_mock = server.mock(|when, then| { when.method(PUT) + .header("Content-Type", "application/octet-stream") .path_matches(Regex::new(&format!(r"{}/([^/]+)", bucket_endpoint)).unwrap()); then.status(201) .json_body(json!({ diff --git a/src/lib/project.rs b/src/lib/project.rs index 7bf486a..789fba2 100644 --- a/src/lib/project.rs +++ b/src/lib/project.rs @@ -69,6 +69,7 @@ pub struct Config { // that Remote.remote_init() can access, so we can pass // a single object to Remote.remote_init(). E.g. includes // User and DataCollectionMetadata. +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct LocalMetadata { pub author_name: Option, pub email: Option, @@ -138,7 +139,15 @@ impl Project { } + + // This tries to figure out a good default name to use, e.g. for + // remote titles or names. + // + // The precedence is local metadata in manifest > project directory pub fn name(&self) -> String { + if let Some(t) = &self.data.metadata.title { + return t.to_string(); + } Project::get_parent_dir(&self.manifest) } @@ -315,9 +324,10 @@ impl Project { // (2) create a new remote, with a name // Associate a project (either by creating it, or finding it on FigShare) - let name = match name { - None => self.name(), - Some(n) => n.to_string() + let name = if let Some(n) = name { + n.to_string() + } else { + self.name() }; let service = service.to_lowercase(); @@ -337,15 +347,13 @@ impl Project { // is already done. self.data.validate_remote_directory(&dir)?; - if !link_only { - // (5) initialize the remote (e.g. for FigShare, this - // checks that the article doesn't exist (error if it - // does), creates it, and sets the FigShare.article_id - // once it is assigned by the remote). - // Note: we pass the Project to remote_init - let local_metadata = LocalMetadata::from_project(self); - remote.remote_init(local_metadata).await?; - } + // (5) initialize the remote (e.g. for FigShare, this + // checks that the article doesn't exist (error if it + // does), creates it, and sets the FigShare.article_id + // once it is assigned by the remote). + // Note: we pass the Project to remote_init + let local_metadata = LocalMetadata::from_project(self); + remote.remote_init(local_metadata, *link_only).await?; // (6) register the remote in the manifest self.data.register_remote(&dir, remote)?; diff --git a/src/lib/remote.rs b/src/lib/remote.rs index cee0bce..92229e9 100644 --- a/src/lib/remote.rs +++ b/src/lib/remote.rs @@ -171,10 +171,10 @@ impl Remote { } } // initialize the remote (i.e. tell it we have a new empty data set) - pub async fn remote_init(&mut self, local_metadata: LocalMetadata) -> Result<()> { + pub async fn remote_init(&mut self, local_metadata: LocalMetadata, link_only: bool) -> Result<()> { match self { - Remote::FigShareAPI(fgsh_api) => fgsh_api.remote_init(local_metadata).await, - Remote::ZenodoAPI(znd_api) => znd_api.remote_init(local_metadata).await, + Remote::FigShareAPI(fgsh_api) => fgsh_api.remote_init(local_metadata, link_only).await, + Remote::ZenodoAPI(znd_api) => znd_api.remote_init(local_metadata, link_only).await, Remote::DataDryadAPI(_) => service_not_implemented!("DataDryad"), } } @@ -242,11 +242,15 @@ pub fn authenticate_remote(remote: &mut Remote) -> Result<()> { // Common enum for issue_request() methods of APIs +// +// Notes: Binary() should be used only for small amounts of data, +// that can be read into memory, e.g. FigShare's upload_parts(). #[derive(Debug)] pub enum RequestData { Json(T), Binary(Vec), File(tokio::fs::File), + Stream(tokio::fs::File), Empty }