Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use opendal to replace object_store #978

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ testing = ["dep:axum-test", "dep:scraper"]
with-db = ["dep:sea-orm", "dep:sea-orm-migration", "loco-gen/with-db"]
# Storage features
all_storage = ["storage_aws_s3", "storage_azure", "storage_gcp"]
storage_aws_s3 = ["object_store/aws"]
storage_azure = ["object_store/azure"]
storage_gcp = ["object_store/gcp"]
storage_aws_s3 = ["opendal/services-s3"]
storage_azure = ["opendal/services-azblob"]
storage_gcp = ["opendal/services-gcs"]
# Cache feature
cache_inmem = ["dep:moka"]
bg_redis = ["dep:rusty-sidekiq", "dep:bb8"]
Expand Down Expand Up @@ -130,7 +130,7 @@ cfg-if = "1"
uuid = { version = "1.10.0", features = ["v4", "fast-rng"] }

# File Upload
object_store = { version = "0.11.0", default-features = false }
opendal = { version = "0.50.2", default-features = false,features = ["services-memory","services-fs"] }

# cache
moka = { version = "0.12.7", features = ["sync"], optional = true }
Expand Down
63 changes: 22 additions & 41 deletions src/storage/drivers/aws.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
#[cfg(test)]
use core::time::Duration;
use std::sync::Arc;

use object_store::{
aws::{AmazonS3Builder, AwsCredential},
StaticCredentialProvider,
};
#[cfg(test)]
use object_store::{BackoffConfig, RetryConfig};
use opendal::{services::S3, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::{opendal_adapter::OpendalAdapter, StoreDriver};
use crate::storage::StorageResult;

/// A set of AWS security credentials
#[derive(Debug)]
Expand All @@ -34,14 +25,10 @@ pub struct Credential {
/// # Errors
///
/// When could not initialize the client instance
pub fn new(bucket_name: &str, region: &str) -> Result<Box<dyn StoreDriver>> {
let s3 = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_region(region)
.build()
.map_err(Box::from)?;
pub fn new(bucket_name: &str, region: &str) -> StorageResult<Box<dyn StoreDriver>> {
let s3 = S3::default().bucket(bucket_name).region(region);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3))))
Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish())))
}

/// Create new AWS s3 storage with bucket, region and credentials.
Expand All @@ -64,18 +51,16 @@ pub fn with_credentials(
bucket_name: &str,
region: &str,
credentials: Credential,
) -> Result<Box<dyn StoreDriver>> {
let s3 = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_region(region)
.with_credentials(Arc::new(StaticCredentialProvider::new(AwsCredential {
key_id: credentials.key_id.to_string(),
secret_key: credentials.secret_key.to_string(),
token: credentials.token,
})))
.build()
.map_err(Box::from)?;
Ok(Box::new(ObjectStoreAdapter::new(Box::new(s3))))
) -> StorageResult<Box<dyn StoreDriver>> {
let mut s3 = S3::default()
.bucket(bucket_name)
.region(region)
.access_key_id(&credentials.key_id)
.secret_access_key(&credentials.secret_key);
if let Some(token) = credentials.token {
s3 = s3.session_token(&token);
}
Ok(Box::new(OpendalAdapter::new(Operator::new(s3)?.finish())))
}

/// Build store with failure
Expand All @@ -86,15 +71,11 @@ pub fn with_credentials(
#[cfg(test)]
#[must_use]
pub fn with_failure() -> Box<dyn StoreDriver> {
let s3 = AmazonS3Builder::new()
.with_bucket_name("loco-test")
.with_retry(RetryConfig {
backoff: BackoffConfig::default(),
max_retries: 0,
retry_timeout: Duration::from_secs(0),
})
.build()
.unwrap();
let s3 = S3::default()
.bucket("loco-test")
.region("ap-south-1")
.allow_anonymous()
.disable_ec2_metadata();

Box::new(ObjectStoreAdapter::new(Box::new(s3)))
Box::new(OpendalAdapter::new(Operator::new(s3).unwrap().finish()))
}
22 changes: 11 additions & 11 deletions src/storage/drivers/azure.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use object_store::azure::MicrosoftAzureBuilder;
use opendal::{services::Azblob, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult};

/// Create new Azure storage.
///
Expand All @@ -18,13 +18,13 @@ pub fn new(
container_name: &str,
account_name: &str,
access_key: &str,
) -> Result<Box<dyn StoreDriver>> {
let azure = MicrosoftAzureBuilder::new()
.with_container_name(container_name)
.with_account(account_name)
.with_access_key(access_key)
.build()
.map_err(Box::from)?;
) -> StorageResult<Box<dyn StoreDriver>> {
let azure = Azblob::default()
.container(container_name)
.account_name(account_name)
.account_key(access_key);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(azure))))
Ok(Box::new(OpendalAdapter::new(
Operator::new(azure)?.finish(),
)))
}
25 changes: 9 additions & 16 deletions src/storage/drivers/gcp.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
use object_store::gcp::GoogleCloudStorageBuilder;
use opendal::{services::Gcs, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult};

/// Create new GCP storage.
///
/// # Examples
///```
/// use loco_rs::storage::drivers::gcp;
/// let gcp_driver = gcp::new("key", "account_key", "service_account");
/// let gcp_driver = gcp::new("key", "credential_path");
/// ```
///
/// # Errors
///
/// When could not initialize the client instance
pub fn new(
bucket_name: &str,
service_account_key: &str,
service_account: &str,
) -> Result<Box<dyn StoreDriver>> {
let gcs = GoogleCloudStorageBuilder::new()
.with_bucket_name(bucket_name)
.with_service_account_key(service_account_key)
.with_service_account_path(service_account)
.build()
.map_err(Box::from)?;
pub fn new(bucket_name: &str, credential_path: &str) -> StorageResult<Box<dyn StoreDriver>> {
let gcs = Gcs::default()
.bucket(bucket_name)
.credential_path(credential_path);

Ok(Box::new(ObjectStoreAdapter::new(Box::new(gcs))))
Ok(Box::new(OpendalAdapter::new(Operator::new(gcs)?.finish())))
}
24 changes: 16 additions & 8 deletions src/storage/drivers/local.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use object_store::local::LocalFileSystem;
use opendal::{services::Fs, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use crate::Result;
use super::StoreDriver;
use crate::storage::{drivers::opendal_adapter::OpendalAdapter, StorageResult};

/// Create new filesystem storage with no prefix
///
Expand All @@ -10,9 +10,18 @@ use crate::Result;
/// use loco_rs::storage::drivers::local;
/// let file_system_driver = local::new();
/// ```
///
/// # Panics
///
/// Panics if the filesystem service built failed.
#[must_use]
pub fn new() -> Box<dyn StoreDriver> {
Box::new(ObjectStoreAdapter::new(Box::new(LocalFileSystem::new())))
let fs = Fs::default().root("/");
Box::new(OpendalAdapter::new(
Operator::new(fs)
.expect("fs service should build with success")
.finish(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we handle this without panic?
Would it make more sense to return a Result instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I'm willing to return an error if changing the signature is allowed. This PR tries it's best to not change the exposed public API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better than I don't expect.
What do you think? it should't affect the users, right? Only framework changes

))
}

/// Create new filesystem storage with `prefix` applied to all paths
Expand All @@ -26,8 +35,7 @@ pub fn new() -> Box<dyn StoreDriver> {
/// # Errors
///
/// Returns an error if the path does not exist
pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Box<dyn StoreDriver>> {
Ok(Box::new(ObjectStoreAdapter::new(Box::new(
LocalFileSystem::new_with_prefix(prefix).map_err(Box::from)?,
))))
pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> StorageResult<Box<dyn StoreDriver>> {
let fs = Fs::default().root(&prefix.as_ref().display().to_string());
Ok(Box::new(OpendalAdapter::new(Operator::new(fs)?.finish())))
}
15 changes: 12 additions & 3 deletions src/storage/drivers/mem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use object_store::memory::InMemory;
use opendal::{services::Memory, Operator};

use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
use super::StoreDriver;
use crate::storage::drivers::opendal_adapter::OpendalAdapter;

/// Create new in-memory storage.
///
Expand All @@ -9,7 +10,15 @@ use super::{object_store_adapter::ObjectStoreAdapter, StoreDriver};
/// use loco_rs::storage::drivers::mem;
/// let mem_storage = mem::new();
/// ```
///
/// # Panics
///
/// Panics if the memory service built failed.
#[must_use]
pub fn new() -> Box<dyn StoreDriver> {
Box::new(ObjectStoreAdapter::new(Box::new(InMemory::new())))
Box::new(OpendalAdapter::new(
Operator::new(Memory::default())
.expect("memory service must build with success")
.finish(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question

))
}
29 changes: 25 additions & 4 deletions src/storage/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::path::Path;

use async_trait::async_trait;
use bytes::Bytes;
use opendal::Reader;

#[cfg(feature = "storage_aws_s3")]
pub mod aws;
#[cfg(feature = "storage_azure")]
Expand All @@ -11,7 +13,7 @@ pub mod gcp;
pub mod local;
pub mod mem;
pub mod null;
pub mod object_store_adapter;
pub mod opendal_adapter;

use super::StorageResult;

Expand All @@ -21,9 +23,28 @@ pub struct UploadResponse {
pub version: Option<String>,
}

// TODO: need to properly abstract the object_store type in order to not
// strongly depend on it
pub type GetResponse = object_store::GetResult;
/// TODO: Add more methods to `GetResponse` to read the content in different
/// ways
///
/// For example, we can read a specific range of bytes from the stream.
pub struct GetResponse {
stream: Reader,
}

impl GetResponse {
pub(crate) fn new(stream: Reader) -> Self {
Self { stream }
}

/// Read all content from the stream and return as `Bytes`.
///
/// # Errors
///
/// Returns a `StorageError` with the reason for the failure.
pub async fn bytes(&self) -> StorageResult<Bytes> {
Ok(self.stream.read(..).await?.to_bytes())
}
}

#[async_trait]
pub trait StoreDriver: Sync + Send {
Expand Down
Loading
Loading