Skip to content

Commit

Permalink
new get/bulk methods
Browse files Browse the repository at this point in the history
  • Loading branch information
vsbuffalo committed Sep 1, 2023
1 parent 362ff03 commit 29467a9
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 104 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ httpmock = "0.6.8"
cd = "0.2.1"
indicatif = { version = "0.17.6", features = ["futures"] }
tokio-util = { version = "0.7.8", features = ["codec"] }
csv = "1.2.2"

105 changes: 58 additions & 47 deletions src/lib/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fs::{metadata};
use serde_derive::{Serialize,Deserialize};
use serde;
use crate::lib::data::serde::{Serializer,Deserializer};
use crate::lib::download::Downloads;
#[allow(unused_imports)]
use log::{info, trace, debug};
use chrono::prelude::*;
Expand Down Expand Up @@ -194,10 +195,13 @@ impl MergedFile {
self.local.as_ref().map(|data_file| data_file.tracked)
}

pub fn local_md5(&self, path_context: &Path) -> Option<String> {
self.local.as_ref()
.and_then(|local| local.get_md5(path_context).ok())
.flatten()
pub async fn local_md5(&self, path_context: &Path) -> Option<String> {
if let Some(local) = &self.local {
if let Ok(md5_result) = local.get_md5(path_context).await {
return md5_result;
}
}
None
}

pub fn remote_md5(&self) -> Option<String> {
Expand All @@ -212,8 +216,8 @@ impl MergedFile {
self.local.as_ref().map(|local| local.md5.clone())
}

pub fn local_remote_md5_mismatch(&self, path_context: &Path) -> Option<bool> {
let local_md5 = self.local_md5(path_context);
pub async fn local_remote_md5_mismatch(&self, path_context: &Path) -> Option<bool> {
let local_md5 = self.local_md5(path_context).await;
let remote_md5 = self.remote_md5();
match (remote_md5, local_md5) {
(Some(remote), Some(local)) => Some(remote != local),
Expand All @@ -227,18 +231,21 @@ impl MergedFile {
.get_mod_time(path_context).ok())
}

pub fn status(&self, path_context: &Path) -> Result<RemoteStatusCode> {
pub async fn status(&self, path_context: &Path) -> Result<RemoteStatusCode> {
//let tracked = self.local.as_ref().map_or(None,|df| Some(df.tracked));

// local status, None if no local file found
let local_status = self.local
.as_ref()
.and_then(|local| local.status(path_context).ok());
let local_status = if let Some(local) = &self.local {
local.status(path_context).await.ok()
} else {
None
};

// TODO fix path_context
//info!("{:?} local status: {:?} ({:?})", self.name(), local_status, &path_context);

let md5_mismatch = self.local_remote_md5_mismatch(path_context);
let md5_mismatch = self.local_remote_md5_mismatch(path_context).await;

if !self.has_remote().unwrap_or(false) {
return Ok(RemoteStatusCode::NotExists)
}
Expand Down Expand Up @@ -285,13 +292,15 @@ impl MergedFile {
// Create a StatusEntry, for printing the status to the user.
pub async fn status_entry(&self, path_context: &Path, include_remotes: bool) -> Result<StatusEntry> {
let tracked = self.local.as_ref().map(|df| df.tracked);
let local_status = self.local
.as_ref()
.and_then(|local| local.status(path_context).ok());
let local_status = if let Some(local) = self.local.as_ref() {
local.status(path_context).await.ok()
} else {
None
};

let remote_status = if include_remotes { Some(self.status(path_context)?) } else { None };
let remote_status = if include_remotes { Some(self.status(path_context).await?) } else { None };
//let remote_status = if self.remote_service.is_some() { Some(self.status(path_context)?) } else { None };

let remote_service = if include_remotes { self.remote_service.clone() } else { None };

if self.local.is_none() && self.remote.is_none() {
Expand All @@ -304,7 +313,7 @@ impl MergedFile {
remote_status,
tracked,
remote_service,
local_md5: self.local_md5(path_context),
local_md5: self.local_md5(path_context).await,
remote_md5: self.remote_md5(),
manifest_md5: self.manifest_md5(),
local_mod_time: self.local_mod_time(path_context)
Expand All @@ -314,12 +323,12 @@ impl MergedFile {


impl DataFile {
pub fn new(path: String, url: Option<&str>, path_context: &Path) -> Result<DataFile> {
pub async fn new(path: String, url: Option<&str>, path_context: &Path) -> Result<DataFile> {
let full_path = path_context.join(&path);
if !full_path.exists() {
return Err(anyhow!("File '{}' does not exist.", path))
}
let md5 = match compute_md5(&full_path)? {
let md5 = match compute_md5(&full_path).await? {
Some(md5) => md5,
None => return Err(anyhow!("Could not compute MD5 as file does not exist")),
};
Expand Down Expand Up @@ -357,8 +366,8 @@ impl DataFile {
.to_string())
}

pub fn get_md5(&self, path_context: &Path) -> Result<Option<String>> {
compute_md5(&self.full_path(path_context)?)
pub async fn get_md5(&self, path_context: &Path) -> Result<Option<String>> {
compute_md5(&self.full_path(path_context)?).await
}

pub fn get_mod_time(&self, path_context: &Path) -> Result<DateTime<Utc>> {
Expand All @@ -381,16 +390,16 @@ impl DataFile {


// Returns true if the file does not exist.
pub fn is_changed(&self, path_context: &Path) -> Result<bool> {
match self.get_md5(path_context)? {
pub async fn is_changed(&self, path_context: &Path) -> Result<bool> {
match self.get_md5(path_context).await? {
Some(new_md5) => Ok(new_md5 != self.md5),
None => Ok(true),
}
}

pub fn status(&self, path_context: &Path) -> Result<LocalStatusCode> {
pub async fn status(&self, path_context: &Path) -> Result<LocalStatusCode> {
let is_alive = self.is_alive(path_context);
let is_changed = self.is_changed(path_context)?;
let is_changed = self.is_changed(path_context).await?;
let local_status = match (is_changed, is_alive) {
(false, true) => LocalStatusCode::Current,
(true, true) => LocalStatusCode::Modified,
Expand All @@ -403,8 +412,8 @@ impl DataFile {
Ok(local_status)
}

pub fn update(&mut self, path_context: &Path) -> Result<()> {
self.update_md5(path_context)?;
pub async fn update(&mut self, path_context: &Path) -> Result<()> {
self.update_md5(path_context).await?;
self.update_size(path_context)?;
Ok(())
}
Expand All @@ -415,8 +424,8 @@ impl DataFile {
Ok(())
}

pub fn update_md5(&mut self, path_context: &Path) -> Result<()> {
let new_md5 = match self.get_md5(path_context)? {
pub async fn update_md5(&mut self, path_context: &Path) -> Result<()> {
let new_md5 = match self.get_md5(path_context).await? {
Some(md5) => md5,
None => return Err(anyhow!("Cannot update MD5: file does not exist")),
};
Expand Down Expand Up @@ -542,11 +551,11 @@ impl DataCollection {
}
}

pub fn update(&mut self, filename: Option<&String>, path_context: &Path) -> Result<()> {
pub async fn update(&mut self, filename: Option<&String>, path_context: &Path) -> Result<()> {
match filename {
Some(file) => {
if let Some(data_file) = self.files.get_mut(file) {
data_file.update(path_context)?;
data_file.update(path_context).await?;
debug!("rehashed file {:?}", data_file.path);
}
}
Expand All @@ -555,7 +564,7 @@ impl DataCollection {
let all_files: Vec<_> = self.files.keys().cloned().collect();
for file in all_files {
if let Some(data_file) = self.files.get_mut(&file) {
data_file.update(path_context)?;
data_file.update(path_context).await?;
debug!("rehashed file {:?}", data_file.path);
}

Expand Down Expand Up @@ -708,6 +717,7 @@ impl DataCollection {
pb.bar.finish_with_message("Fetching completed.");
Ok(all_remote_files)
}

// Merge all local and remote files.
//
// Use a fetch to get all remote files (as RemoteFile), and merge these
Expand Down Expand Up @@ -787,10 +797,9 @@ impl DataCollection {

let mut statuses = BTreeMap::new();

let pb = Progress::new(statuses.len() as u64)?;
let pb = Progress::new(statuses_futures.len() as u64)?;

// process the futures as they become ready
pb.bar.set_message("Calculating MD5s...");
while let Some(result) = statuses_futures.next().await {
if let Ok((key, value)) = result {
pb.bar.set_message(format!("Calculating MD5s... {} done.", &value.name));
Expand Down Expand Up @@ -836,7 +845,7 @@ impl DataCollection {
// now we need to figure out whether to push the file,
// which depends on the RemoteStatusCode and whether
// we should overwrite (TODO)
let do_upload = match merged_file.status(path_context)? {
let do_upload = match merged_file.status(path_context).await? {
RemoteStatusCode::NoLocal => {
// A file exists on the remote, but not locally: there
// is nothing to push in this case (or count!)
Expand Down Expand Up @@ -931,7 +940,7 @@ impl DataCollection {
pub async fn pull(&mut self, path_context: &Path, overwrite: bool) -> Result<()> {
let all_files = self.merge(true).await?;

let mut downloads = Vec::new();
let mut downloads = Downloads::new();

let mut current_skipped = Vec::new();
let mut messy_skipped = Vec::new();
Expand All @@ -944,7 +953,7 @@ impl DataCollection {

let path = merged_file.name()?;

let do_download = match merged_file.status(path_context)? {
let do_download = match merged_file.status(path_context).await? {
RemoteStatusCode::NoLocal => {
return Err(anyhow!("Internal error: execution should not have reached this point, please report.\n\
'sdf pull' filtered by MergedFile.can_download() but found a RemoteStatusCode::NoLocal status."));
Expand Down Expand Up @@ -986,12 +995,14 @@ impl DataCollection {
if do_download {
if let Some(remote) = self.remotes.get(dir) {
let download = remote.get_download_info(merged_file, path_context, overwrite)?;
downloads.push(download);
downloads.list.push(download);
}
}
}
}

// now retrieve all the files in the queue.
downloads.retrieve(Some(" - {}"), Some("No files downloaded.")).await?;

let num_skipped = overwrite_skipped.len() + current_skipped.len() +
messy_skipped.len();
Expand Down Expand Up @@ -1046,7 +1057,7 @@ mod tests {
let nonexistent_path = "some/nonexistent/path".to_string();
let path_context = Path::new("");

let result = DataFile::new(nonexistent_path, None, &path_context);
let result = DataFile::new(nonexistent_path, None, &path_context).await;
match result {
Ok(_) => assert!(false, "Expected an error, but got Ok"),
Err(err) => {
Expand All @@ -1066,11 +1077,11 @@ mod tests {

// Make a DataFile
let path = file.path().to_string_lossy().to_string();
let data_file = DataFile::new(path, None, &path_context).unwrap();
let data_file = DataFile::new(path, None, &path_context).await.unwrap();

// Compare MD5s
let expected_md5 = "d3feb335769173b2db573413b0f6abf4".to_string();
let observed_md5 = data_file.get_md5(&path_context).unwrap().unwrap();
let observed_md5 = data_file.get_md5(&path_context).await.unwrap().unwrap();
assert!(observed_md5 == expected_md5, "MD5 mismatch!");
}

Expand All @@ -1085,7 +1096,7 @@ mod tests {

// Make a DataFile
let path = file.path().to_string_lossy().to_string();
let data_file = DataFile::new(path, None, &path_context).unwrap();
let data_file = DataFile::new(path, None, &path_context).await.unwrap();

// Let's also check size
assert!(data_file.size == 11, "Size mismatch {:?} != {:?}!",
Expand All @@ -1104,22 +1115,22 @@ mod tests {

// Make a DataFile
let path = file.path().to_string_lossy().to_string();
let mut data_file = DataFile::new(path, None, &path_context).unwrap();
let mut data_file = DataFile::new(path, None, &path_context).await.unwrap();

// Now, we change the data.
writeln!(file, "Modified mock data.").unwrap();

// Make sure the file MD5 is right
let expected_md5 = "c6526ab1de615b49e53398ae5588bd00".to_string();
let observed_md5 = data_file.get_md5(&path_context).unwrap().unwrap();
let observed_md5 = data_file.get_md5(&path_context).await.unwrap().unwrap();
assert!(observed_md5 == expected_md5);

// Make sure the old MD5 is in the DataFile
let old_md5 = "d3feb335769173b2db573413b0f6abf4".to_string();
assert!(data_file.md5 == old_md5, "DataFile.md5 mismatch!");

// Now update
data_file.update_md5(path_context).unwrap();
data_file.update_md5(path_context).await.unwrap();
assert!(data_file.md5 == expected_md5, "DataFile.update_md5() failed!");
}

Expand All @@ -1133,7 +1144,7 @@ mod tests {

// Make a DataFile
let path = file.path().to_string_lossy().to_string();
let mut data_file = DataFile::new(path, None, &path_context).unwrap();
let mut data_file = DataFile::new(path, None, &path_context).await.unwrap();

// Now, we change the data.
writeln!(file, "Modified mock data.").unwrap();
Expand Down
13 changes: 9 additions & 4 deletions src/lib/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::lib::progress::{DEFAULT_PROGRESS_STYLE, DEFAULT_PROGRESS_INC};
use crate::lib::utils::pluralize;

pub struct Downloads {
list: Vec<Download>,
pub list: Vec<Download>,
}


Expand All @@ -36,7 +36,8 @@ impl Downloads {
Downloads { list }
}

pub fn add<T: Downloadable>(&mut self, item: T, filename: Option<&str>) -> Result<&Download> {
pub fn add<T: Downloadable>(&mut self, item: T, filename: Option<&str>,
overwrite: bool) -> Result<Option<&Download>> {
let url = item.to_url()?;

let resolved_filename = match filename {
Expand All @@ -50,10 +51,14 @@ impl Downloads {
}
};

let file_path = PathBuf::from(&resolved_filename);
if file_path.exists() && !overwrite {
return Ok(None);
}

let download = Download { url, filename: resolved_filename };
self.list.push(download);
Ok(self.list.last().ok_or(anyhow::anyhow!("Failed to add download"))?)
Ok(Some(self.list.last().ok_or(anyhow::anyhow!("Failed to add download"))?))
}

pub fn default_style(&self) -> Result<StyleOptions> {
Expand All @@ -67,7 +72,7 @@ impl Downloads {
}


pub async fn download_all(&self, success_status: Option<&str>,
pub async fn retrieve(&self, success_status: Option<&str>,
no_downloads_message: Option<&str>) -> Result<()> {
let downloads = &self.list;
let total_files = downloads.len();
Expand Down
Loading

0 comments on commit 29467a9

Please sign in to comment.