Skip to content

Commit

Permalink
Merge pull request #100 from ecohealthalliance/feature/tidymodels
Browse files Browse the repository at this point in the history
Feature/tidymodels
  • Loading branch information
n8layman authored Oct 21, 2024
2 parents b95e826 + 2f8f2cc commit a2d5b6e
Show file tree
Hide file tree
Showing 143 changed files with 17,034 additions and 3,540 deletions.
Binary file modified .env
Binary file not shown.
Binary file not shown.
213 changes: 213 additions & 0 deletions R/AWS_get_folder.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
#' Download files from an AWS S3 bucket to a local folder
#'
#' This function downloads files from a specified S3 bucket and prefix to a local folder.
#' It only downloads files that are not already present in the local folder.
#' Additionally, it ensures that AWS credentials and region are set in the environment.
#'
#' @author Nathan Layman
#'
#' @param local_folder Character. The path to the local folder where files should be downloaded and the AWS prefix
#' @param ...
#'
#' @return A list of files downloaded from AWS
#'
#' @note
#' The AWS environment variables `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`, and `AWS_BUCKET_ID`
#' must be set correctly in order to access the S3 bucket. If any of these are missing, the function will stop with an error.
#' Files in the S3 bucket will be deleted if they cannot be successfully read as parquet files.
#'
#'
#' @examples
#' \dontrun{
#' # Ensure the AWS environment variables are set in your system or .env file:
#' # AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION, and AWS_BUCKET_ID
#'
#' # Download files from an S3 bucket folder to a local directory
#' downloaded_files <- AWS_get_folder("my/local/folder")
#' }
#'
#' @export
AWS_get_folder <- function(local_folder, ...) {

# Check if AWS credentials and region are set in the environment
if (any(Sys.getenv(c("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_REGION")) == "")) {
msg <- paste(
"AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION environment variables",
"must all be set to access AWS. Please ensure they are configured correctly,",
"probably in the .env file or system environment."
)
stop(msg)
}

# Create an S3 client
s3 <- paws::s3()

# List all objects in the S3 bucket, handling pagination
s3_files <- c() # Initialize an empty list to hold all files
continuation_token <- NULL

repeat {
response <- s3$list_objects_v2(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Prefix = local_folder,
ContinuationToken = continuation_token)

# Append the files from this response to the main list
s3_files <- c(s3_files, map_vec(response$Contents, ~.x$Key))

# Check if there's a continuation token for further pages
if (!length(response$NextContinuationToken)) break # No more pages to fetch, exit the loop
continuation_token <- response$NextContinuationToken
}

# Check if S3 has files to download
if (length(s3_files) == 0) {
cat("No files found in the specified S3 bucket and prefix.\n")
return(NULL)
}

# List local files in your folder
local_files <- list.files(local_folder, recursive = TRUE, full.names = TRUE)
downloaded_files <- c()

# Loop through S3 files and download if they don't exist locally
for (file in s3_files) {

# Check if file already exists locally
if (!file %in% local_files) {

# Download the file from S3
s3_download <- s3$get_object(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Key = file)

# Write output to file
writeBin(s3_download$Body, con = file)

cat("Downloaded:", file, "\n")

# Create an error safe way to test if the parquet file can be read
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)

# Check if transformed file can be loaded.
# If not clean it up and also remove it from AWS
# It'll be picked up next time.
if(is.null(error_safe_read_parquet(file))) {
unlist(file)
s3$delete_object(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Key = file)
} else {
downloaded_files <- c(downloaded_files, file)
}
}
}

downloaded_files
}


#' Upload and Sync Files with AWS S3
#'
#' This function synchronizes a local folder with an AWS S3 bucket. It checks for AWS credentials,
#' lists existing files in the S3 bucket, and compares them with the local files to upload new files
#' or remove files that are no longer needed.
#'
#' @author Nathan Layman
#'
#' @param transformed_file_list A character vector of file paths that should be present on AWS S3.
#' @param local_folder A character string specifying the path to the local folder to be synced with AWS S3.
#'
#' @examples
#' \dontrun{
#' AWS_put_files(transformed_file_list = c("file1.parquet", "file2.parquet"),
#' local_folder = "path/to/local/folder")
#' }
#'
#' @return A list of actions taken
#'
#' @export
AWS_put_files <- function(transformed_file_list,
local_folder,
...) {

transformed_file_list <- basename(transformed_file_list |> unlist())

# Check if AWS credentials and region are set in the environment
if (any(Sys.getenv(c("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_REGION")) == "")) {
msg <- paste(
"AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION environment variables",
"must all be set to access AWS. Please ensure they are configured correctly,",
"probably in the .env file or system environment."
)
stop(msg)
}

# Create an S3 client
s3 <- paws::s3()

# List all objects in the S3 bucket, handling pagination
s3_files <- c() # Initialize an empty list to hold all files
continuation_token <- NULL

repeat {
response <- s3$list_objects_v2(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Prefix = local_folder,
ContinuationToken = continuation_token)

# Append the files from this response to the main list
s3_files <- c(s3_files, map_vec(response$Contents, ~.x$Key))

# Check if there's a continuation token for further pages
if (!length(response$NextContinuationToken)) break # No more pages to fetch, exit the loop
continuation_token <- response$NextContinuationToken
}

# Get files in local folder
local_folder_files <- list.files(path = local_folder, recursive = TRUE)

# Collect outcomes
outcome <- c()

# Walk through local_folder_files
for(file in local_folder_files) {

# Is the file in the transformed_file_list?
if(file %in% transformed_file_list) {

# Is the file already on AWS?
if(file %in% s3_files) {

outcome <- c(outcome, glue::glue("{file} already present on AWS"))

} else {

outcome <- c(outcome, glue::glue("Uploading {file} to AWS"))

# Put the file on S3
s3_upload <- s3$put_object(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Key = file.path(local_folder, file))
}
} else {

# Remove the file from AWS if it's present in the folder and on AWS
# but not in the list of successfully transformed files. This file is
# not relevant to the pipeline
if(file %in% s3_files) {

outcome <- c(outcome, glue::glue("Cleaning up dangling file {file} from AWS"))

# Remove the file from AWS
s3_download <- s3$delete_object(
Bucket = Sys.getenv("AWS_BUCKET_ID"),
Prefix = local_folder,
Key = file)
}
}
}

outcome

}
4 changes: 2 additions & 2 deletions R/aggregate_augmented_data_by_adm.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#' @author Emma Mendelsohn
#' @export
aggregate_augmented_data_by_adm <- function(augmented_data,
rsa_polygon,
model_dates_selected) {
rsa_polygon,
model_dates_selected) {

r <- arrow::read_parquet(glue::glue("{augmented_data}/date={model_dates_selected}/part-0.parquet")) |>
rast()
Expand Down
44 changes: 33 additions & 11 deletions R/augment_data.R
Original file line number Diff line number Diff line change
@@ -1,16 +1,38 @@
#' @title
#' @param weather_anomalies
#' @param forecasts_anomalies
#' @param ndvi_anomalies
#' @param augmented_data_directory
#' @return
#' Augment Weather Data
#'
#' This function collects data from three different sources, checks for missing values,
#' combines them into a single dataset, and saves the augmented data as a partitioned dataset
#' in parquet format to a specified directory.
#'
#' @author Emma Mendelsohn
#'
#' @param weather_anomalies File path to the weather anomalies dataset.
#' @param forecasts_anomalies File path to the forecasts anomalies dataset.
#' @param ndvi_anomalies File path to the NDVI anomalies dataset.
#' @param augmented_data_directory Directory where the augmented data will be saved in parquet format.
#'
#' @return A string containing the file path to the directory where the augmented data is saved.
#'
#' @note This function performs a left join of the three datasets on the date, x, and y variables.
#' Any NA values in the 'date', 'x', and 'y' columns of the dataset will be dropped. The function
#' saves the resulting dataset in the specified directory using hive partitioning by date.
#'
#' @examples
#' augment_data(weather_anomalies = 'path/to/weather_data',
#' forecasts_anomalies = 'path/to/forecast_data',
#' ndvi_anomalies = 'path/to/ndvi_data',
#' augmented_data_directory = 'path/to/save/augmented_data')
#'
#' @export
augment_data <- function(weather_anomalies, forecasts_anomalies,
ndvi_anomalies, augmented_data_directory) {

augment_data <- function(weather_anomalies,
forecasts_anomalies,
ndvi_anomalies,
augmented_data_directory,
overwrite = FALSE,
...) {

message("Load datasets into memory")
# Figure out how to do all this OUT of memory.
message("Loading datasets into memory")
weather <- arrow::open_dataset(weather_anomalies) |> dplyr::collect()
forecasts <- arrow::open_dataset(forecasts_anomalies) |> dplyr::collect()
ndvi <- arrow::open_dataset(ndvi_anomalies) |> dplyr::collect()
Expand Down Expand Up @@ -43,4 +65,4 @@ augment_data <- function(weather_anomalies, forecasts_anomalies,

return(augmented_data_directory)

}
}
54 changes: 35 additions & 19 deletions R/calculate_forecasts_anomalies.R
Original file line number Diff line number Diff line change
@@ -1,35 +1,51 @@
#' .. content for \description{} (no empty lines) ..
#' Calculate and Save Anomalies from Forecast Data
#'
#' .. content for \details{} ..
#' This function takes transformed ECMWF forecast and historical weather mean data,
#' calculates anomalies, and saves them in a specified directory. If the file already exists and `overwrite` is FALSE,
#' the existing file's path is returned. Otherwise, the existing file is overwritten.
#'
#' @title
#' @param ecmwf_forecasts_transformed
#' @param ecmwf_forecasts_transformed_directory
#' @param weather_historical_means
#' @param forecast_anomalies_directory
#' @param model_dates
#' @param model_dates_selected
#' @param overwrite
#' @return
#' @author Emma Mendelsohn
#'
#' @param ecmwf_forecasts_transformed_directory Directory containing the transformed forecasts.
#' @param weather_historical_means Filepath to the historical weather means data.
#' @param forecasts_anomalies_directory Directory in which to save the anomalies data.
#' @param model_dates_selected Dates for models that have been selected.
#' @param lead_intervals Lead times for forecasts, which will determine the interval over which anomalies are averaged.
#' @param overwrite Boolean flag indicating whether existing file should be overwritten. Default is FALSE.
#' @param ... Additional unused arguments for future extensibility and function compatibility.
#'
#' @return A string containing the filepath to the anomalies data.
#'
#' @note The returned path either points to an existing file (when overwrite is FALSE and the file already exists)
#' or to a newly created file with calculated anomalies (when overwrite is TRUE or the file didn't exist).
#'
#' @examples
#' calculate_forecasts_anomalies(ecmwf_forecasts_transformed_directory = './forecasts',
#' weather_historical_means='./historical_means.parquet',
#' forecast_anomalies_directory = './anomalies',
#' model_dates_selected = as.Date('2000-01-01'),
#' lead_intervals = c(1, 10),
#' overwrite = TRUE)
#'
#' @export
calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,
ecmwf_forecasts_transformed_directory,
calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed_directory,
weather_historical_means,
forecasts_anomalies_directory,
model_dates_selected,
lead_intervals,
overwrite = FALSE) {
overwrite = FALSE,
...) {

# Set filename
date_selected <- model_dates_selected
save_filename <- glue::glue("forecast_anomaly_{date_selected}.gz.parquet")
message(paste0("Calculating forecast anomalies for ", date_selected))

# Check if file already exists
existing_files <- list.files(forecasts_anomalies_directory)
if(save_filename %in% existing_files & !overwrite) {
message("file already exists, skipping download")
# Check if file already exists and can be read
error_safe_read_parquet <- possibly(arrow::read_parquet, NULL)

if(!is.null(error_safe_read_parquet(file.path(forecasts_validate_directory, save_filename))) & !overwrite) {
message("file already exists and can be loaded, skipping download")
return(file.path(forecasts_anomalies_directory, save_filename))
}

Expand Down Expand Up @@ -131,4 +147,4 @@ calculate_forecasts_anomalies <- function(ecmwf_forecasts_transformed,

return(file.path(forecasts_anomalies_directory, save_filename))

}
}
Loading

0 comments on commit a2d5b6e

Please sign in to comment.