Skip to content

Commit

Permalink
some more progress on zenodo uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
vsbuffalo committed Aug 30, 2023
1 parent cc35b79 commit 367247e
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 60 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ lazy_static = "1.4.0"
httpmock = "0.6.8"
cd = "0.2.1"
indicatif = { version = "0.17.6", features = ["futures"] }
tokio-util = { version = "0.7.8", features = ["codec"] }

50 changes: 35 additions & 15 deletions src/lib/api/figshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::HashMap;
use serde_derive::{Serialize,Deserialize};
use serde_json::Value;
use reqwest::{Method, header::{HeaderMap, HeaderValue}};
use reqwest::{Client, Response};
use reqwest::{Client, Response, Body};
use colored::Colorize;
use futures_util::StreamExt;
use tokio::fs::File;
Expand All @@ -27,6 +27,8 @@ use crate::lib::data::{DataFile, MergedFile};
use crate::lib::remote::{AuthKeys, RemoteFile, DownloadInfo,RequestData};
use crate::lib::project::LocalMetadata;

use super::zenodo::ZenodoDeposition;

pub const FIGSHARE_BASE_URL: &str = "https://api.figshare.com/v2/";

// for testing:
Expand Down Expand Up @@ -304,6 +306,11 @@ impl FigShareAPI {
Some(RequestData::Json(json_data)) => request.json(&json_data),
Some(RequestData::Binary(bin_data)) => request.body(bin_data),
Some(RequestData::File(file)) => request.body(file),
Some(RequestData::Stream(file)) => {
let stream = tokio_util::io::ReaderStream::new(file);
let body = Body::wrap_stream(stream);
request.body(body)
},
Some(RequestData::Empty) => request.json(&serde_json::Value::Object(serde_json::Map::new())),
None => request,
};
Expand Down Expand Up @@ -407,26 +414,39 @@ impl FigShareAPI {
Ok(())
}

pub async fn find_article(&self) -> Result<Option<FigShareArticle>> {
let articles = self.get_articles().await?;
let matches_found: Vec<_> = articles.into_iter().filter(|a| a.title == self.name).collect();
if !matches_found.is_empty() {
if matches_found.len() > 1 {
return Err(anyhow!("Found multiple FigShare Articles with the \
title '{}'", self.name));
} else {
return Ok(Some(matches_found[0].clone()));
}
} else {
return Ok(None);
}
}

// FigShare Remote initialization
//
// This creates a FigShare article for the tracked directory.
#[allow(unused)]
pub async fn remote_init(&mut self, local_metadata: LocalMetadata) -> Result<()> {
pub async fn remote_init(&mut self, local_metadata: LocalMetadata, link_only: bool) -> Result<()> {
// (1) Let's make sure there is no Article that exists
// with this same name
let articles = self.get_articles().await?;

let matches_found: Vec<_> = articles.iter().filter(|&a| a.title == self.name).collect();

if !matches_found.is_empty() {
return Err(anyhow!("An existing FigShare Article with the title \
'{}' was found. Either delete it on figshare.com \
or chose a different name.", self.name));
}

// (2) Now that no Article name clash has occurred, let's
// create the new article and get the ID
let article = self.create_article(&self.name).await?;
let found_match = self.find_article().await?;
let article = if let Some(existing_info) = found_match {
if !link_only {
return Err(anyhow!("An existing FigShare Article with the title \
'{}' was found. Use --link-only to link.", self.name));
}
existing_info
} else {
// Step 2: Create a new deposition if none exists
self.create_article(&self.name).await?
};

// (3) Set the Article ID, which is the only state needed
// for later queries
Expand Down
130 changes: 100 additions & 30 deletions src/lib/api/zenodo.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{anyhow,Result,Context};
use std::path::Path;
use std::{path::Path, fmt::Binary};
use reqwest::{Method, header::{HeaderMap, HeaderValue, CONTENT_TYPE}};
use reqwest::{Client, Response};
use reqwest::{Client, Response, Body};
use std::collections::HashMap;
use serde_derive::{Serialize,Deserialize};
#[allow(unused_imports)]
Expand All @@ -22,7 +22,7 @@ const BASE_URL: &str = "https://zenodo.org/api";
// for testing:
const TEST_TOKEN: &str = "test-token";

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct ZenodoDeposition {
conceptrecid: String,
created: String,
Expand Down Expand Up @@ -92,20 +92,20 @@ pub struct ZenodoLinks {
self_link: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct Creator {
name: String,
affiliation: Option<String>
}

// We need this wrapper to provide the metadata
// for the Zenodo Deposition.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct ZenodoDepositionData {
metadata: ZenodoMetadata,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct ZenodoMetadata {
#[serde(skip_serializing_if = "Option::is_none")]
prereserve_doi: Option<PrereserveDoi>,
Expand All @@ -126,7 +126,10 @@ impl TryInto<ZenodoDepositionData> for LocalMetadata {
Ok(ZenodoDepositionData {
metadata: ZenodoMetadata {
prereserve_doi: None,
title: self.title.ok_or(anyhow!("Zenodo requires a title be set."))?,
title: self.title.ok_or(anyhow!("Zenodo requires a title be set.\n\
Either: \n\
- set this manually in data_manifest.yml\n\
- specify with 'sdf link --name <NAME>'\n"))?,
upload_type: Some("dataset".to_string()),
description: Some(description),
creators: Some(vec![Creator {
Expand All @@ -138,7 +141,7 @@ impl TryInto<ZenodoDepositionData> for LocalMetadata {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct PrereserveDoi {
doi: String,
recid: usize,
Expand Down Expand Up @@ -217,25 +220,24 @@ impl ZenodoAPI {

let client = Client::new();
let mut request = client.request(method, &url);
trace!("request: {:?}", request);
if let Some(h) = headers {
trace!("Request Headers: {:?}", h);
request = request.headers(h);
}

if let Some(data) = &data { // Use the cloned data for logging
let data_clone = data.clone(); // Clone the data
trace!("Request Data: {:?}", data_clone);
}

let request = match data {
Some(RequestData::Json(json_data)) => request.json(&json_data),
Some(RequestData::Binary(bin_data)) => request.body(bin_data),
Some(RequestData::File(file)) => request.body(file),
Some(RequestData::Stream(file)) => {
let stream = tokio_util::io::ReaderStream::new(file);
let body = Body::wrap_stream(stream);
request.body(body)
},
Some(RequestData::Empty) => request.json(&serde_json::Value::Object(serde_json::Map::new())),
None => request,
};

trace!("request (before send): {:?}", request);
let response = request.send().await?;

let response_status = response.status();
Expand All @@ -246,25 +248,78 @@ impl ZenodoAPI {
}
}

pub async fn get_depositions(&self) -> Result<Vec<ZenodoDeposition>> {
let response = self.issue_request::<HashMap<String,String>>(Method::GET, "/deposit/depositions", None, None).await?;
let info: Vec<ZenodoDeposition> = response.json().await?;
Ok(info)
}

// Initialize the data collection on the Remote
pub async fn get_deposition_exists(&self) -> Result<bool> {
let depositions = self.get_depositions().await?;
let matches_found: Vec<_> = depositions.iter().filter(|&a| a.title == self.name).collect();
Ok(!matches_found.is_empty())
}

pub async fn find_deposition(&self) -> Result<Option<ZenodoDeposition>> {
let depositions = self.get_depositions().await?;
let matches_found: Vec<_> = depositions.into_iter().filter(|a| a.title == self.name).collect();
if !matches_found.is_empty() {
if matches_found.len() > 1 {
return Err(anyhow!("Found multiple Zenodo Depositions with the \
title '{}'", self.name));
} else {
return Ok(Some(matches_found[0].clone()));
}
} else {
return Ok(None);
}
}

// Create a new Zenodo Deposition
//
// For Zenodo, this creates a new "deposition"
#[allow(unused)]
pub async fn remote_init(&mut self, local_metadata: LocalMetadata) -> Result<()> {
// TODO URGENT: check for existing entries!
// Note that this uses LocalMetadata to propagate some of the Zenodo metadata fields
// However, the title field is overwritten by ZenodoAPI.name.
pub async fn create_deposition(&self, local_metadata: LocalMetadata) -> Result<ZenodoDeposition> {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let metadata: ZenodoDepositionData = local_metadata.try_into()?;
let data = Some(RequestData::Json(metadata));

// overwrite the name with the Remote::ZenodoAPI.name.
let mut metadata_copy = local_metadata.clone();
metadata_copy.title = Some(self.name.clone());

let deposition_data: ZenodoDepositionData = metadata_copy.try_into()?;

let data = Some(RequestData::Json(deposition_data));
let response = self.issue_request(Method::POST, "/deposit/depositions", Some(headers), data).await?;
let info: ZenodoDeposition = response.json().await?;
trace!("ZenodoDeposition: {:?}", info);
let deposition: ZenodoDeposition = response.json().await?;
Ok(deposition)
}

// Initialize the data collection on the Remote
//
// For Zenodo, this creates a new "deposition"
pub async fn remote_init(&mut self, local_metadata: LocalMetadata, link_only: bool) -> Result<()> {
// Step 1: Check if a deposition already exists
let found_match = self.find_deposition().await?;

let info = if let Some(existing_info) = found_match {
if !link_only {
return Err(anyhow!("An existing Zenodo Deposition with the title \
'{}' was found. Use --link-only to link.", self.name));
}
existing_info
} else {
// Step 2: Create a new deposition if none exists
self.create_deposition(local_metadata).await?
};

self.deposition_id = Some(info.id as u64);
self.bucket_url = info.links.bucket;

Ok(())
}


// Check if file exists, returning None if not,
// and the ZenodoFile if so
// TODO: could be part of higher Remote API, e.g. through generics?
Expand All @@ -290,7 +345,6 @@ impl ZenodoAPI {
// Upload the file, deleting any existing files if overwrite is true.
//
// Returns true/false if upload was completed or not. Will Error in other cases.
#[allow(unused_variables)]
pub async fn upload(&self, data_file: &DataFile, path_context: &Path, overwrite: bool) -> Result<bool> {
let bucket_url = self.bucket_url.as_ref().ok_or(anyhow!("Internal Error: Zenodo bucket_url not set. Please report."))?;
let full_path = path_context.join(&data_file.path);
Expand All @@ -317,13 +371,18 @@ impl ZenodoAPI {
}
}

// upload the file
// Upload the file.
// First build the headers -- octet stream
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));

let file = tokio::fs::File::open(full_path).await?;
let response = self.issue_request::<HashMap<String, String>>(Method::PUT, &bucket_endpoint, None, Some(RequestData::File(file))).await?;
let response = self.issue_request::<HashMap<String, String>>(Method::PUT, &bucket_endpoint,
Some(headers), Some(RequestData::Stream(file))).await?;
let info: ZenodoFileUpload = response.json().await?;

let msg = "After upload, the local and remote MD5s differed. SciDataFlow\n\
automatically deletes the remote file in this case";
automatically deletes the remote file in this case";
// let's compare the MD5s
let remote_md5 = info.checksum;
if remote_md5 != data_file.md5 {
Expand Down Expand Up @@ -441,6 +500,14 @@ mod tests {
description: Some("Let's build infrastructure so science can build off itself.".to_string()),
};

// Create a mock deposition endpoint with a simulated success response
let deposition_get_mock = server.mock(|when, then| {
when.method(GET)
.path("/deposit/depositions");
then.status(200)
.json_body(json!([]));
});

// Create a mock deposition endpoint with a simulated success response
let deposition_mock = server.mock(|when, then| {
when.method(POST)
Expand Down Expand Up @@ -497,10 +564,11 @@ mod tests {
let mut api = ZenodoAPI::new("test", Some(server.url("/"))).unwrap();

// Main call to test
let _result = api.remote_init(local_metadata).await;
let _result = api.remote_init(local_metadata,false).await;
//info!("result: {:?}", result);

// ensure the specified mock was called exactly one time (or fail).
// ensure the specified mocks were called exactly one time (or fail).
deposition_get_mock.assert();
deposition_mock.assert();

// Assert that the deposition_id and bucket_url have been set correctly
Expand Down Expand Up @@ -613,8 +681,10 @@ mod tests {
};

// Mock for the upload method
// NOTE: this mock does not test for binary files or the octet-stream header.
let upload_file_mock = server.mock(|when, then| {
when.method(PUT)
.header("Content-Type", "application/octet-stream")
.path_matches(Regex::new(&format!(r"{}/([^/]+)", bucket_endpoint)).unwrap());
then.status(201)
.json_body(json!({
Expand Down
Loading

0 comments on commit 367247e

Please sign in to comment.