Skip to content

Commit

Permalink
refactor: Use opendal to replace object_store
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Nov 9, 2024
1 parent 67cb2a2 commit cfad6c8
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 134 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ with-db = ["dep:sea-orm", "dep:sea-orm-migration", "loco-gen/with-db"]
channels = ["dep:socketioxide"]
# Storage features
all_storage = ["storage_aws_s3", "storage_azure", "storage_gcp"]
storage_aws_s3 = ["object_store/aws"]
storage_azure = ["object_store/azure"]
storage_gcp = ["object_store/gcp"]
storage_aws_s3 = ["opendal/services-s3"]
storage_azure = ["opendal/services-azblob"]
storage_gcp = ["opendal/services-gcs"]
# Cache feature
cache_inmem = ["dep:moka"]
bg_redis = ["dep:rusty-sidekiq", "dep:bb8"]
Expand Down Expand Up @@ -125,7 +125,7 @@ socketioxide = { version = "0.14.0", features = ["state"], optional = true }


# File Upload
object_store = { version = "0.11.0", default-features = false }
opendal = { version = "0.50.2", default-features = false,features = ["services-memory","services-fs"] }

# cache
moka = { version = "0.12.7", features = ["sync"], optional = true }
Expand Down
4 changes: 2 additions & 2 deletions src/bgworker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ impl Queue {

/// # Errors
///
/// Does not currently return an error, but the postgres or other future queue implementations
/// might, so using Result here as return type.
/// Does not currently return an error, but the postgres or other future
/// queue implementations might, so using Result here as return type.
pub fn shutdown(&self) -> Result<()> {
println!("waiting for running jobs to finish...");
match self {
Expand Down
12 changes: 8 additions & 4 deletions src/bgworker/sqlt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ async fn dequeue(client: &SqlitePool) -> Result<Option<Task>> {

if let Some(task) = row {
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'processing', updated_at = CURRENT_TIMESTAMP WHERE id = $1",
"UPDATE sqlt_loco_queue SET status = 'processing', updated_at = CURRENT_TIMESTAMP \
WHERE id = $1",
)
.bind(&task.id)
.execute(&mut *tx)
Expand Down Expand Up @@ -325,15 +326,17 @@ async fn complete_task(
if let Some(interval_ms) = interval_ms {
let next_run_at = Utc::now() + chrono::Duration::milliseconds(interval_ms);
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'queued', updated_at = CURRENT_TIMESTAMP, run_at = DATETIME($1) WHERE id = $2",
"UPDATE sqlt_loco_queue SET status = 'queued', updated_at = CURRENT_TIMESTAMP, run_at \
= DATETIME($1) WHERE id = $2",
)
.bind(next_run_at)
.bind(task_id)
.execute(pool)
.await?;
} else {
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'completed', updated_at = CURRENT_TIMESTAMP WHERE id = $1",
"UPDATE sqlt_loco_queue SET status = 'completed', updated_at = CURRENT_TIMESTAMP \
WHERE id = $1",
)
.bind(task_id)
.execute(pool)
Expand All @@ -347,7 +350,8 @@ async fn fail_task(pool: &SqlitePool, task_id: &TaskId, error: &crate::Error) ->
error!(err = msg, "failed task");
let error_json = serde_json::json!({ "error": msg });
sqlx::query(
"UPDATE sqlt_loco_queue SET status = 'failed', updated_at = CURRENT_TIMESTAMP, task_data = json_patch(task_data, $1) WHERE id = $2",
"UPDATE sqlt_loco_queue SET status = 'failed', updated_at = CURRENT_TIMESTAMP, task_data \
= json_patch(task_data, $1) WHERE id = $2",
)
.bind(error_json)
.bind(task_id)
Expand Down
3 changes: 1 addition & 2 deletions src/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::path::PathBuf;
use axum::Router;
#[cfg(feature = "with-db")]
use sea_orm_migration::MigratorTrait;
use tokio::task::JoinHandle;
use tokio::{select, signal};
use tokio::{select, signal, task::JoinHandle};
use tracing::{debug, error, info, warn};

#[cfg(feature = "with-db")]
Expand Down
3 changes: 2 additions & 1 deletion src/mailer/email_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ impl EmailSender {
///
/// # Errors
///
/// When email doesn't send successfully or has an error to build the message
/// When email doesn't send successfully or has an error to build the
/// message
pub async fn mail(&self, email: &Email) -> Result<()> {
let content = MultiPart::alternative_plain_html(email.text.clone(), email.html.clone());
let mut builder = Message::builder()
Expand Down
59 changes: 18 additions & 41 deletions src/storage/drivers/aws.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
#[cfg(test)]
use core::time::Duration;
use std::sync::Arc;

use object_store::{
aws::{AmazonS3Builder, AwsCredential},
StaticCredentialProvider,
};
#[cfg(test)]
use object_store::{BackoffConfig, RetryConfig};
use opendal::{services::S3, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::{opendal_adapter::OpendalAdapter, StoreDriver};
use crate::storage::StorageResult;

/// A set of AWS security credentials
#[derive(Debug)]
Expand All @@ -34,14 +25,10 @@ pub struct Credential {
/// # Errors
///
/// When could not initialize the client instance
pub fn new(bucket_name: &str, region: &str) -> Result<Box<dyn StoreDriver>> {
let s3 = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_region(region)
.build()
.map_err(Box::from)?;
pub fn new(bucket_name: &str, region: &str) -> StorageResult<Box<dyn StoreDriver>> {
let s3 = S3::default().bucket(bucket_name).region(region);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3))))
Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish())))
}

/// Create new AWS s3 storage with bucket, region and credentials.
Expand All @@ -64,18 +51,16 @@ pub fn with_credentials(
bucket_name: &str,
region: &str,
credentials: Credential,
) -> Result<Box<dyn StoreDriver>> {
let s3 = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_region(region)
.with_credentials(Arc::new(StaticCredentialProvider::new(AwsCredential {
key_id: credentials.key_id.to_string(),
secret_key: credentials.secret_key.to_string(),
token: credentials.token,
})))
.build()
.map_err(Box::from)?;
Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3))))
) -> StorageResult<Box<dyn StoreDriver>> {
let mut s3 = S3::default()
.bucket(bucket_name)
.region(region)
.access_key_id(&credentials.key_id)
.secret_access_key(&credentials.secret_key);
if let Some(token) = credentials.token {
s3 = s3.session_token(&token);
}
Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish())))
}

/// Build store with failure
Expand All @@ -86,15 +71,7 @@ pub fn with_credentials(
#[cfg(test)]
#[must_use]
pub fn with_failure() -> Box<dyn StoreDriver> {
let s3 = AmazonS3Builder::new()
.with_bucket_name("loco-test")
.with_retry(RetryConfig {
backoff: BackoffConfig::default(),
max_retries: 0,
retry_timeout: Duration::from_secs(0),
})
.build()
.unwrap();
let s3 = S3::default().bucket("loco-test");

Box::new(ObjectStoreAdapter::new(Box::new(s3)))
Box::new(OpendalAdapter::new(Operator::new(s3).unwrap().finish()))
}
22 changes: 11 additions & 11 deletions src/storage/drivers/azure.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use object_store::azure::MicrosoftAzureBuilder;
use opendal::{services::Azblob, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult};

/// Create new Azure storage.
///
Expand All @@ -18,13 +18,13 @@ pub fn new(
container_name: &str,
account_name: &str,
access_key: &str,
) -> Result<Box<dyn StoreDriver>> {
let azure = MicrosoftAzureBuilder::new()
.with_container_name(container_name)
.with_account(account_name)
.with_access_key(access_key)
.build()
.map_err(Box::from)?;
) -> StorageResult<Box<dyn StoreDriver>> {
let azure = Azblob::default()
.container(container_name)
.account_name(account_name)
.account_key(access_key);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(azure))))
Ok(Box::new(OpendalAdapter::new(
Operator::new(azure)?.finish(),
)))
}
25 changes: 9 additions & 16 deletions src/storage/drivers/gcp.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
use object_store::gcp::GoogleCloudStorageBuilder;
use opendal::{services::Gcs, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult};

/// Create new GCP storage.
///
/// # Examples
///```
/// use loco_rs::storage::drivers::gcp;
/// let gcp_driver = gcp::new("key", "account_key", "service_account");
/// let gcp_driver = gcp::new("key", "credential_path");
/// ```
///
/// # Errors
///
/// When could not initialize the client instance
pub fn new(
bucket_name: &str,
service_account_key: &str,
service_account: &str,
) -> Result<Box<dyn StoreDriver>> {
let gcs = GoogleCloudStorageBuilder::new()
.with_bucket_name(bucket_name)
.with_service_account_key(service_account_key)
.with_service_account_path(service_account)
.build()
.map_err(Box::from)?;
pub fn new(bucket_name: &str, credential_path: &str) -> StorageResult<Box<dyn StoreDriver>> {
let gcs = Gcs::default()
.bucket(bucket_name)
.credential_path(credential_path);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(gcs))))
Ok(Box::new(OpendalAdapter::new(Operator::new(gcs)?.finish())))
}
20 changes: 12 additions & 8 deletions src/storage/drivers/local.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use object_store::local::LocalFileSystem;
use opendal::{services::Fs, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult};

/// Create new filesystem storage with no prefix
///
Expand All @@ -12,7 +12,12 @@ use crate::Result;
/// ```
#[must_use]
pub fn new() -> Box<dyn StoreDriver> {
Box::new(ObjectStoreAdapter::new(Box::new(LocalFileSystem::new())))
let fs = Fs::default();
Box::new(OpendalAdapter::new(
Operator::new(fs)
.expect("fs service should build with success")
.finish(),
))
}

/// Create new filesystem storage with `prefix` applied to all paths
Expand All @@ -26,8 +31,7 @@ pub fn new() -> Box<dyn StoreDriver> {
/// # Errors
///
/// Returns an error if the path does not exist
pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Box<dyn StoreDriver>> {
Ok(Box::new(ObjectStoreAdapter::new(Box::new(
LocalFileSystem::new_with_prefix(prefix).map_err(Box::from)?,
))))
pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> StorageResult<Box<dyn StoreDriver>> {
let fs = Fs::default().root(&prefix.as_ref().display().to_string());
Ok(Box::new(OpendalAdapter::new(Operator::new(fs)?.finish())))
}
11 changes: 8 additions & 3 deletions src/storage/drivers/mem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use object_store::memory::InMemory;
use opendal::{services::Memory, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use super::StoreDriver;
use crate::storage::drivers::opendal_adapter::OpendalAdapter;

/// Create new in-memory storage.
///
Expand All @@ -11,5 +12,9 @@ use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
/// ```
#[must_use]
pub fn new() -> Box<dyn StoreDriver> {
Box::new(ObjectStoreAdapter::new(Box::new(InMemory::new())))
Box::new(OpendalAdapter::new(
Operator::new(Memory::default())
.expect("memory service must build with success")
.finish(),
))
}
25 changes: 21 additions & 4 deletions src/storage/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::path::Path;

use async_trait::async_trait;
use bytes::Bytes;
use opendal::Reader;

#[cfg(feature = "storage_aws_s3")]
pub mod aws;
#[cfg(feature = "storage_azure")]
Expand All @@ -11,7 +13,7 @@ pub mod gcp;
pub mod local;
pub mod mem;
pub mod null;
pub mod object_store_adapter;
pub mod opendal_adapter;

use super::StorageResult;

Expand All @@ -21,9 +23,24 @@ pub struct UploadResponse {
pub version: Option<String>,
}

// TODO: need to properly abstract the object_store type in order to not
// strongly depend on it
pub type GetResponse = object_store::GetResult;
/// TODO: Add more methods to `GetResponse` to read the content in different
/// ways
///
/// For example, we can read a specific range of bytes from the stream.
pub struct GetResponse {
stream: Reader,
}

impl GetResponse {
pub(crate) fn new(stream: Reader) -> Self {
Self { stream }
}

/// Read all content from the stream and return as `Bytes`.
pub async fn bytes(&self) -> StorageResult<Bytes> {
Ok(self.stream.read(..).await?.to_bytes())
}
}

#[async_trait]
pub trait StoreDriver: Sync + Send {
Expand Down
Loading

0 comments on commit cfad6c8

Please sign in to comment.