From ecf3f04ef42a2a7f646fce74698bd73e02fa3613 Mon Sep 17 00:00:00 2001 From: dk Date: Tue, 11 Jun 2024 10:33:34 +0700 Subject: [PATCH 01/30] [syft/action_obj] refactor `convert_to_pointers` - getting size of twin object before client uploading dataset --- .../syft/src/syft/client/domain_client.py | 7 +++ .../src/syft/service/action/action_object.py | 57 +++++++------------ 2 files changed, 26 insertions(+), 38 deletions(-) diff --git a/packages/syft/src/syft/client/domain_client.py b/packages/syft/src/syft/client/domain_client.py index 1c768710040..84040b588bb 100644 --- a/packages/syft/src/syft/client/domain_client.py +++ b/packages/syft/src/syft/client/domain_client.py @@ -5,6 +5,7 @@ from pathlib import Path import re from string import Template +import sys from typing import TYPE_CHECKING from typing import cast @@ -17,6 +18,7 @@ # relative from ..abstract_node import NodeSideType from ..serde.serializable import serializable +from ..serde.serialize import _serialize as serialize from ..service.action.action_object import ActionObject from ..service.code_history.code_history import CodeHistoriesDict from ..service.code_history.code_history import UsersCodeHistoriesDict @@ -134,6 +136,11 @@ def upload_dataset(self, dataset: CreateDataset) -> SyftSuccess | SyftError: syft_node_location=self.id, syft_client_verify_key=self.verify_key, ) + serialized: bytes = serialize(twin, to_bytes=True) + size_mb: float = sys.getsizeof(serialized) / 1024 / 1024 + if size_mb < 16: + print(f"object's size = {size_mb} (MB), less than 16 MB") + # TODO: if less than 16 MB, save without using blob storage twin._save_to_blob_storage() except Exception as e: tqdm.write(f"Failed to create twin for {asset.name}. {e}") diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index dffa3d3d9de..05324ffdb1f 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -498,44 +498,25 @@ def convert_to_pointers( # relative from ..dataset.dataset import Asset - arg_list = [] - kwarg_dict = {} - if args is not None: - for arg in args: - if ( - not isinstance(arg, ActionObject | Asset | UID) - and api.signing_key is not None # type: ignore[unreachable] - ): - arg = ActionObject.from_obj( # type: ignore[unreachable] - syft_action_data=arg, - syft_client_verify_key=api.signing_key.verify_key, - syft_node_location=api.node_uid, - ) - arg.syft_node_uid = node_uid - r = arg._save_to_blob_storage() - if isinstance(r, SyftError): - print(r.message) - arg = api.services.action.set(arg) - arg_list.append(arg) - - if kwargs is not None: - for k, arg in kwargs.items(): - if ( - not isinstance(arg, ActionObject | Asset | UID) - and api.signing_key is not None # type: ignore[unreachable] - ): - arg = ActionObject.from_obj( # type: ignore[unreachable] - syft_action_data=arg, - syft_client_verify_key=api.signing_key.verify_key, - syft_node_location=api.node_uid, - ) - arg.syft_node_uid = node_uid - r = arg._save_to_blob_storage() - if isinstance(r, SyftError): - print(r.message) - arg = api.services.action.set(arg) - - kwarg_dict[k] = arg + def process_arg(arg: ActionObject | Asset | UID | Any) -> Any: + if ( + not isinstance(arg, ActionObject | Asset | UID) + and api.signing_key is not None # type: ignore[unreachable] + ): + arg = ActionObject.from_obj( # type: ignore[unreachable] + syft_action_data=arg, + syft_client_verify_key=api.signing_key.verify_key, + syft_node_location=api.node_uid, + ) + arg.syft_node_uid = node_uid + r = arg._save_to_blob_storage() + if isinstance(r, SyftError): + print(r.message) + arg = api.services.action.set(arg) + return arg + + arg_list = [process_arg(arg) for arg in args] if args else [] + kwarg_dict = {k: process_arg(v) for k, v in kwargs.items()} if kwargs else {} return arg_list, kwarg_dict From ed094f4e8e8488dd60dee60d1198a8848e4f7a7a Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 11 Jun 2024 15:38:41 +0700 Subject: [PATCH 02/30] [syft/action_obj] skip saving to blob storage if `syft_action_data` is less than `min_size_mb` (16 Mb by default) --- .../syft/src/syft/client/domain_client.py | 7 ------ .../src/syft/service/action/action_object.py | 24 +++++++++++++++---- packages/syft/src/syft/util/util.py | 5 ++++ 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/packages/syft/src/syft/client/domain_client.py b/packages/syft/src/syft/client/domain_client.py index 84040b588bb..1c768710040 100644 --- a/packages/syft/src/syft/client/domain_client.py +++ b/packages/syft/src/syft/client/domain_client.py @@ -5,7 +5,6 @@ from pathlib import Path import re from string import Template -import sys from typing import TYPE_CHECKING from typing import cast @@ -18,7 +17,6 @@ # relative from ..abstract_node import NodeSideType from ..serde.serializable import serializable -from ..serde.serialize import _serialize as serialize from ..service.action.action_object import ActionObject from ..service.code_history.code_history import CodeHistoriesDict from ..service.code_history.code_history import UsersCodeHistoriesDict @@ -136,11 +134,6 @@ def upload_dataset(self, dataset: CreateDataset) -> SyftSuccess | SyftError: syft_node_location=self.id, syft_client_verify_key=self.verify_key, ) - serialized: bytes = serialize(twin, to_bytes=True) - size_mb: float = sys.getsizeof(serialized) / 1024 / 1024 - if size_mb < 16: - print(f"object's size = {size_mb} (MB), less than 16 MB") - # TODO: if less than 16 MB, save without using blob storage twin._save_to_blob_storage() except Exception as e: tqdm.write(f"Failed to create twin for {asset.name}. {e}") diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 05324ffdb1f..34673bbd224 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -47,6 +47,7 @@ from ...types.uid import LineageID from ...types.uid import UID from ...util.logger import debug +from ...util.util import get_mb_serialized_size from ...util.util import prompt_warning_message from ..context import AuthedServiceContext from ..response import SyftException @@ -828,17 +829,30 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | None: return None - def _save_to_blob_storage(self) -> SyftError | None: + def _save_to_blob_storage(self, min_size_mb: int = 16) -> SyftError | None: + """ " + If less than min_size_mb, skip saving to blob storage + TODO: min_size_mb shoulb be passed as a env var + """ data = self.syft_action_data if isinstance(data, SyftError): return data if isinstance(data, ActionDataEmpty): - return SyftError(message=f"cannot store empty object {self.id}") - result = self._save_to_blob_storage_(data) - if isinstance(result, SyftError): - return result + return SyftError( + message=f"cannot store empty object {self.id} to the blob storage" + ) if not TraceResultRegistry.current_thread_is_tracing(): self.syft_action_data_cache = self.as_empty_data() + action_data_size_mb: float = get_mb_serialized_size(data) + if action_data_size_mb > min_size_mb: + result = self._save_to_blob_storage_(data) + if isinstance(result, SyftError): + return result + else: + debug( + f"self.syft_action_data's size = {action_data_size_mb:4f} (MB), less than {min_size_mb} (MB). " + f"Skip saving to blob storage." + ) return None @property diff --git a/packages/syft/src/syft/util/util.py b/packages/syft/src/syft/util/util.py index b0affa2b1a0..99aba539cd9 100644 --- a/packages/syft/src/syft/util/util.py +++ b/packages/syft/src/syft/util/util.py @@ -38,6 +38,7 @@ import requests # relative +from ..serde.serialize import _serialize as serialize from .logger import critical from .logger import debug from .logger import error @@ -92,6 +93,10 @@ def get_mb_size(data: Any) -> float: return sys.getsizeof(data) / (1024 * 1024) +def get_mb_serialized_size(data: Any) -> float: + return sys.getsizeof(serialize(data)) / (1024 * 1024) + + def extract_name(klass: type) -> str: name_regex = r".+class.+?([\w\._]+).+" regex2 = r"([\w\.]+)" From a6334ab8fc3ee12c9935e4d020df3e166c362505 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 11 Jun 2024 17:27:51 +0700 Subject: [PATCH 03/30] [syft/action_obj] set `syft_action_data_cache` for `ActionObject` if data is less than the min size to save to blob storage --- packages/syft/src/syft/service/action/action_object.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 34673bbd224..e77e3498cc6 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -841,8 +841,6 @@ def _save_to_blob_storage(self, min_size_mb: int = 16) -> SyftError | None: return SyftError( message=f"cannot store empty object {self.id} to the blob storage" ) - if not TraceResultRegistry.current_thread_is_tracing(): - self.syft_action_data_cache = self.as_empty_data() action_data_size_mb: float = get_mb_serialized_size(data) if action_data_size_mb > min_size_mb: result = self._save_to_blob_storage_(data) @@ -853,6 +851,9 @@ def _save_to_blob_storage(self, min_size_mb: int = 16) -> SyftError | None: f"self.syft_action_data's size = {action_data_size_mb:4f} (MB), less than {min_size_mb} (MB). " f"Skip saving to blob storage." ) + self.syft_action_data_cache = data + # if not TraceResultRegistry.current_thread_is_tracing(): + # self.syft_action_data_cache = self.as_empty_data() return None @property From 7335c567cfd6a63da15fa3eaec740d2a3153d788 Mon Sep 17 00:00:00 2001 From: dk Date: Wed, 12 Jun 2024 09:13:07 +0700 Subject: [PATCH 04/30] [syft/action_obj] remove redundant setting syft_action_data_cache to ActionDataEmpty when current thread is not tracing --- packages/syft/src/syft/service/action/action_object.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index e77e3498cc6..a9e68d68da6 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -785,8 +785,8 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | None: size = sys.getsizeof(serialized) storage_entry = CreateBlobStorageEntry.from_obj(data, file_size=size) - if not TraceResultRegistry.current_thread_is_tracing(): - self.syft_action_data_cache = self.as_empty_data() + # if not TraceResultRegistry.current_thread_is_tracing(): + # self.syft_action_data_cache = self.as_empty_data() if self.syft_blob_storage_entry_id is not None: # TODO: check if it already exists storage_entry.id = self.syft_blob_storage_entry_id @@ -846,14 +846,14 @@ def _save_to_blob_storage(self, min_size_mb: int = 16) -> SyftError | None: result = self._save_to_blob_storage_(data) if isinstance(result, SyftError): return result + if not TraceResultRegistry.current_thread_is_tracing(): + self.syft_action_data_cache = self.as_empty_data() else: debug( f"self.syft_action_data's size = {action_data_size_mb:4f} (MB), less than {min_size_mb} (MB). " f"Skip saving to blob storage." ) self.syft_action_data_cache = data - # if not TraceResultRegistry.current_thread_is_tracing(): - # self.syft_action_data_cache = self.as_empty_data() return None @property From 1505f22bccc5c8098361672d861b715b5224e078 Mon Sep 17 00:00:00 2001 From: dk Date: Wed, 12 Jun 2024 09:51:52 +0700 Subject: [PATCH 05/30] [syft/blob_storage] print out on-disk blob store path in dev mode --- packages/syft/src/syft/node/node.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 1e2c00c6f24..65a2cad07f3 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -497,6 +497,15 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: remote_profile.profile_name ] = remote_profile + if ( + isinstance(self.blob_store_config, OnDiskBlobStorageConfig) + and self.dev_mode + ): + print( + f"Using on-disk blob storage with path: " + f"{self.blob_store_config.client_config.base_directory}" + ) + def run_peer_health_checks(self, context: AuthedServiceContext) -> None: self.peer_health_manager = PeerHealthCheckTask() self.peer_health_manager.run(context=context) From 94793e5942b7339658e0b7cffd8c6e49691ebf9b Mon Sep 17 00:00:00 2001 From: dk Date: Wed, 12 Jun 2024 11:00:51 +0700 Subject: [PATCH 06/30] [syft/blob_storage] pass obj's min size to save to the blob storage in an env varialbe - print out this min size in dev mode when launching nodes --- packages/grid/backend/grid/core/config.py | 1 + packages/grid/backend/grid/core/node.py | 1 + packages/grid/default.env | 1 + packages/syft/src/syft/node/node.py | 30 ++++++++++++------- packages/syft/src/syft/orchestra.py | 4 +++ .../src/syft/service/action/action_object.py | 4 +-- .../src/syft/store/blob_storage/__init__.py | 1 + 7 files changed, 29 insertions(+), 13 deletions(-) diff --git a/packages/grid/backend/grid/core/config.py b/packages/grid/backend/grid/core/config.py index 8c55b8cd3f7..86f37533977 100644 --- a/packages/grid/backend/grid/core/config.py +++ b/packages/grid/backend/grid/core/config.py @@ -155,6 +155,7 @@ def get_emails_enabled(self) -> Self: ASSOCIATION_REQUEST_AUTO_APPROVAL: bool = str_to_bool( os.getenv("ASSOCIATION_REQUEST_AUTO_APPROVAL", "False") ) + MIN_SIZE_BLOB_STORAGE_MB: int = int(os.getenv("MIN_SIZE_BLOB_STORAGE_MB", 16)) model_config = SettingsConfigDict(case_sensitive=True) diff --git a/packages/grid/backend/grid/core/node.py b/packages/grid/backend/grid/core/node.py index cde36f8c5fe..45e33bfa669 100644 --- a/packages/grid/backend/grid/core/node.py +++ b/packages/grid/backend/grid/core/node.py @@ -105,5 +105,6 @@ def seaweedfs_config() -> SeaweedFSConfig: smtp_port=settings.SMTP_PORT, smtp_host=settings.SMTP_HOST, association_request_auto_approval=settings.ASSOCIATION_REQUEST_AUTO_APPROVAL, + min_size_blob_storage_mb=settings.MIN_SIZE_BLOB_STORAGE_MB, background_tasks=True, ) diff --git a/packages/grid/default.env b/packages/grid/default.env index 6ae9748bfef..755f53317d1 100644 --- a/packages/grid/default.env +++ b/packages/grid/default.env @@ -54,6 +54,7 @@ CREATE_PRODUCER=False N_CONSUMERS=1 INMEMORY_WORKERS=True ASSOCIATION_REQUEST_AUTO_APPROVAL=False +MIN_SIZE_BLOB_STORAGE_MB=16 # New Service Flag USE_NEW_SERVICE=False diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 65a2cad07f3..e11886a31de 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -345,6 +345,7 @@ def __init__( smtp_port: int | None = None, smtp_host: str | None = None, association_request_auto_approval: bool = False, + min_size_blob_storage_mb: int = 16, background_tasks: bool = False, ): # 🟡 TODO 22: change our ENV variable format and default init args to make this @@ -432,6 +433,7 @@ def __init__( self.init_queue_manager(queue_config=self.queue_config) + self.min_size_blob_storage_mb = min_size_blob_storage_mb self.init_blob_storage(config=blob_storage_config) context = AuthedServiceContext( @@ -464,7 +466,7 @@ def get_default_store(self, use_sqlite: bool, store_type: str) -> StoreConfig: path = self.get_temp_dir("db") file_name: str = f"{self.id}.sqlite" if self.dev_mode: - print(f"{store_type}'s SQLite DB path: {path/file_name}") + print(f"{store_type}'s SQLite DB path: {path/file_name}.") return SQLiteStoreConfig( client_config=SQLiteStoreClientConfig( filename=file_name, @@ -478,7 +480,9 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: client_config = OnDiskBlobStorageClientConfig( base_directory=self.get_temp_dir("blob") ) - config_ = OnDiskBlobStorageConfig(client_config=client_config) + config_ = OnDiskBlobStorageConfig( + client_config=client_config, min_size_mb=self.min_size_blob_storage_mb + ) else: config_ = config self.blob_store_config = config_ @@ -497,13 +501,16 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: remote_profile.profile_name ] = remote_profile - if ( - isinstance(self.blob_store_config, OnDiskBlobStorageConfig) - and self.dev_mode - ): + if self.dev_mode: + if isinstance(self.blob_store_config, OnDiskBlobStorageConfig): + print( + f"Using on-disk blob storage with path: " + f"{self.blob_store_config.client_config.base_directory}", + end=". ", + ) print( - f"Using on-disk blob storage with path: " - f"{self.blob_store_config.client_config.base_directory}" + f"Minimum object size to be saved to the blob storage: " + f"{self.blob_store_config.min_size_mb} (MB)." ) def run_peer_health_checks(self, context: AuthedServiceContext) -> None: @@ -1754,7 +1761,7 @@ def create_default_worker_pool(node: Node) -> SyftError | None: ) return default_worker_pool - print(f"Creating default worker image with tag='{default_worker_tag}'") + print(f"Creating default worker image with tag='{default_worker_tag}'", end=". ") # Get/Create a default worker SyftWorkerImage default_image = create_default_image( credentials=credentials, @@ -1767,7 +1774,7 @@ def create_default_worker_pool(node: Node) -> SyftError | None: return default_image if not default_image.is_built: - print(f"Building default worker image with tag={default_worker_tag}") + print(f"Building default worker image with tag={default_worker_tag}", end=". ") image_build_method = node.get_service_method(SyftWorkerImageService.build) # Build the Image for given tag result = image_build_method( @@ -1787,7 +1794,8 @@ def create_default_worker_pool(node: Node) -> SyftError | None: f"name={default_pool_name} " f"workers={worker_count} " f"image_uid={default_image.id} " - f"in_memory={node.in_memory_workers}" + f"in_memory={node.in_memory_workers}", + end=". ", ) if default_worker_pool is None: worker_to_add_ = worker_count diff --git a/packages/syft/src/syft/orchestra.py b/packages/syft/src/syft/orchestra.py index 1a08f594aa2..6f34a872099 100644 --- a/packages/syft/src/syft/orchestra.py +++ b/packages/syft/src/syft/orchestra.py @@ -165,6 +165,7 @@ def deploy_to_python( create_producer: bool = False, queue_port: int | None = None, association_request_auto_approval: bool = False, + min_size_blob_storage_mb: int = 16, background_tasks: bool = False, ) -> NodeHandle: worker_classes = { @@ -192,6 +193,7 @@ def deploy_to_python( "n_consumers": n_consumers, "create_producer": create_producer, "association_request_auto_approval": association_request_auto_approval, + "min_size_blob_storage_mb": min_size_blob_storage_mb, "background_tasks": background_tasks, } @@ -281,6 +283,7 @@ def launch( create_producer: bool = False, queue_port: int | None = None, association_request_auto_approval: bool = False, + min_size_blob_storage_mb: int = 16, background_tasks: bool = False, ) -> NodeHandle: if dev_mode is True: @@ -317,6 +320,7 @@ def launch( create_producer=create_producer, queue_port=queue_port, association_request_auto_approval=association_request_auto_approval, + min_size_blob_storage_mb=min_size_blob_storage_mb, background_tasks=background_tasks, ) elif deployment_type_enum == DeploymentType.REMOTE: diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index a9e68d68da6..981809df47e 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -850,8 +850,8 @@ def _save_to_blob_storage(self, min_size_mb: int = 16) -> SyftError | None: self.syft_action_data_cache = self.as_empty_data() else: debug( - f"self.syft_action_data's size = {action_data_size_mb:4f} (MB), less than {min_size_mb} (MB). " - f"Skip saving to blob storage." + f"self.syft_action_data's size = {action_data_size_mb:4f} (MB), " + f"less than {min_size_mb} (MB). Skip saving to blob storage." ) self.syft_action_data_cache = data return None diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index 663660e777c..0f52ebed642 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -273,6 +273,7 @@ def connect(self) -> BlobStorageConnection: class BlobStorageConfig(SyftBaseModel): client_type: type[BlobStorageClient] client_config: BlobStorageClientConfig + min_size_mb: int @migrate(BlobRetrievalByURLV4, BlobRetrievalByURL) From 44fee3adc36638fddd38b0354343371f16f70bfb Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 12 Jun 2024 16:27:16 +0700 Subject: [PATCH 07/30] [syft/blob_storage] stop passing min size to upload obj to blob storage to node and `BlobStorageConfig` - only pass the min size as an env varialbe so it can be read by both the client / server and can easily be configured by the client Co-authored-by: Shubham Gupta --- packages/grid/backend/grid/core/config.py | 1 - packages/grid/backend/grid/core/node.py | 1 - .../backend/backend-statefulset.yaml | 2 ++ packages/grid/helm/syft/values.yaml | 2 ++ packages/syft/src/syft/node/node.py | 11 ++++---- packages/syft/src/syft/orchestra.py | 4 --- .../src/syft/service/action/action_object.py | 27 ++++++++----------- .../src/syft/store/blob_storage/__init__.py | 1 - packages/syft/src/syft/util/util.py | 11 ++++++++ 9 files changed, 31 insertions(+), 29 deletions(-) diff --git a/packages/grid/backend/grid/core/config.py b/packages/grid/backend/grid/core/config.py index 86f37533977..8c55b8cd3f7 100644 --- a/packages/grid/backend/grid/core/config.py +++ b/packages/grid/backend/grid/core/config.py @@ -155,7 +155,6 @@ def get_emails_enabled(self) -> Self: ASSOCIATION_REQUEST_AUTO_APPROVAL: bool = str_to_bool( os.getenv("ASSOCIATION_REQUEST_AUTO_APPROVAL", "False") ) - MIN_SIZE_BLOB_STORAGE_MB: int = int(os.getenv("MIN_SIZE_BLOB_STORAGE_MB", 16)) model_config = SettingsConfigDict(case_sensitive=True) diff --git a/packages/grid/backend/grid/core/node.py b/packages/grid/backend/grid/core/node.py index 45e33bfa669..cde36f8c5fe 100644 --- a/packages/grid/backend/grid/core/node.py +++ b/packages/grid/backend/grid/core/node.py @@ -105,6 +105,5 @@ def seaweedfs_config() -> SeaweedFSConfig: smtp_port=settings.SMTP_PORT, smtp_host=settings.SMTP_HOST, association_request_auto_approval=settings.ASSOCIATION_REQUEST_AUTO_APPROVAL, - min_size_blob_storage_mb=settings.MIN_SIZE_BLOB_STORAGE_MB, background_tasks=True, ) diff --git a/packages/grid/helm/syft/templates/backend/backend-statefulset.yaml b/packages/grid/helm/syft/templates/backend/backend-statefulset.yaml index 106d2fee893..13c5ef82523 100644 --- a/packages/grid/helm/syft/templates/backend/backend-statefulset.yaml +++ b/packages/grid/helm/syft/templates/backend/backend-statefulset.yaml @@ -122,6 +122,8 @@ spec: name: {{ .Values.seaweedfs.secretKeyName | required "seaweedfs.secretKeyName is required" }} key: s3RootPassword {{- end }} + - name: MIN_SIZE_BLOB_STORAGE_MB + value: {{ .Values.seaweedfs.minSizeBlobStorageMB | quote }} # Tracing - name: TRACE value: "false" diff --git a/packages/grid/helm/syft/values.yaml b/packages/grid/helm/syft/values.yaml index 2644eac26e4..c2d13fd3cbe 100644 --- a/packages/grid/helm/syft/values.yaml +++ b/packages/grid/helm/syft/values.yaml @@ -66,6 +66,8 @@ seaweedfs: s3: rootUser: admin + minSizeBlobStorageMB: 16 + # Mount API mountApi: # automount: diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index e11886a31de..fb70f71d23b 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -345,7 +345,6 @@ def __init__( smtp_port: int | None = None, smtp_host: str | None = None, association_request_auto_approval: bool = False, - min_size_blob_storage_mb: int = 16, background_tasks: bool = False, ): # 🟡 TODO 22: change our ENV variable format and default init args to make this @@ -433,7 +432,6 @@ def __init__( self.init_queue_manager(queue_config=self.queue_config) - self.min_size_blob_storage_mb = min_size_blob_storage_mb self.init_blob_storage(config=blob_storage_config) context = AuthedServiceContext( @@ -480,9 +478,7 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: client_config = OnDiskBlobStorageClientConfig( base_directory=self.get_temp_dir("blob") ) - config_ = OnDiskBlobStorageConfig( - client_config=client_config, min_size_mb=self.min_size_blob_storage_mb - ) + config_ = OnDiskBlobStorageConfig(client_config=client_config) else: config_ = config self.blob_store_config = config_ @@ -502,6 +498,9 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: ] = remote_profile if self.dev_mode: + # relative + from ..util.util import min_size_for_blob_storage_upload + if isinstance(self.blob_store_config, OnDiskBlobStorageConfig): print( f"Using on-disk blob storage with path: " @@ -510,7 +509,7 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: ) print( f"Minimum object size to be saved to the blob storage: " - f"{self.blob_store_config.min_size_mb} (MB)." + f"{min_size_for_blob_storage_upload()} (MB)." ) def run_peer_health_checks(self, context: AuthedServiceContext) -> None: diff --git a/packages/syft/src/syft/orchestra.py b/packages/syft/src/syft/orchestra.py index 6f34a872099..1a08f594aa2 100644 --- a/packages/syft/src/syft/orchestra.py +++ b/packages/syft/src/syft/orchestra.py @@ -165,7 +165,6 @@ def deploy_to_python( create_producer: bool = False, queue_port: int | None = None, association_request_auto_approval: bool = False, - min_size_blob_storage_mb: int = 16, background_tasks: bool = False, ) -> NodeHandle: worker_classes = { @@ -193,7 +192,6 @@ def deploy_to_python( "n_consumers": n_consumers, "create_producer": create_producer, "association_request_auto_approval": association_request_auto_approval, - "min_size_blob_storage_mb": min_size_blob_storage_mb, "background_tasks": background_tasks, } @@ -283,7 +281,6 @@ def launch( create_producer: bool = False, queue_port: int | None = None, association_request_auto_approval: bool = False, - min_size_blob_storage_mb: int = 16, background_tasks: bool = False, ) -> NodeHandle: if dev_mode is True: @@ -320,7 +317,6 @@ def launch( create_producer=create_producer, queue_port=queue_port, association_request_auto_approval=association_request_auto_approval, - min_size_blob_storage_mb=min_size_blob_storage_mb, background_tasks=background_tasks, ) elif deployment_type_enum == DeploymentType.REMOTE: diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 981809df47e..41fe70f4b95 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -47,7 +47,7 @@ from ...types.uid import LineageID from ...types.uid import UID from ...util.logger import debug -from ...util.util import get_mb_serialized_size +from ...util.util import can_upload_to_blob_storage from ...util.util import prompt_warning_message from ..context import AuthedServiceContext from ..response import SyftException @@ -785,8 +785,8 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | None: size = sys.getsizeof(serialized) storage_entry = CreateBlobStorageEntry.from_obj(data, file_size=size) - # if not TraceResultRegistry.current_thread_is_tracing(): - # self.syft_action_data_cache = self.as_empty_data() + if not TraceResultRegistry.current_thread_is_tracing(): + self.syft_action_data_cache = self.as_empty_data() if self.syft_blob_storage_entry_id is not None: # TODO: check if it already exists storage_entry.id = self.syft_blob_storage_entry_id @@ -829,7 +829,7 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | None: return None - def _save_to_blob_storage(self, min_size_mb: int = 16) -> SyftError | None: + def _save_to_blob_storage(self) -> SyftError | None: """ " If less than min_size_mb, skip saving to blob storage TODO: min_size_mb shoulb be passed as a env var @@ -841,19 +841,14 @@ def _save_to_blob_storage(self, min_size_mb: int = 16) -> SyftError | None: return SyftError( message=f"cannot store empty object {self.id} to the blob storage" ) - action_data_size_mb: float = get_mb_serialized_size(data) - if action_data_size_mb > min_size_mb: - result = self._save_to_blob_storage_(data) - if isinstance(result, SyftError): - return result - if not TraceResultRegistry.current_thread_is_tracing(): - self.syft_action_data_cache = self.as_empty_data() - else: - debug( - f"self.syft_action_data's size = {action_data_size_mb:4f} (MB), " - f"less than {min_size_mb} (MB). Skip saving to blob storage." - ) + if not can_upload_to_blob_storage(data): self.syft_action_data_cache = data + return None + result = self._save_to_blob_storage_(data) + if isinstance(result, SyftError): + return result + if not TraceResultRegistry.current_thread_is_tracing(): + self.syft_action_data_cache = self.as_empty_data() return None @property diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index 0f52ebed642..663660e777c 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -273,7 +273,6 @@ def connect(self) -> BlobStorageConnection: class BlobStorageConfig(SyftBaseModel): client_type: type[BlobStorageClient] client_config: BlobStorageClientConfig - min_size_mb: int @migrate(BlobRetrievalByURLV4, BlobRetrievalByURL) diff --git a/packages/syft/src/syft/util/util.py b/packages/syft/src/syft/util/util.py index 99aba539cd9..0785e01f8c3 100644 --- a/packages/syft/src/syft/util/util.py +++ b/packages/syft/src/syft/util/util.py @@ -97,6 +97,17 @@ def get_mb_serialized_size(data: Any) -> float: return sys.getsizeof(serialize(data)) / (1024 * 1024) +def min_size_for_blob_storage_upload() -> int: + """ + Return the minimum size in MB for a blob storage upload. Default to 16 MB for now + """ + return int(os.getenv("MIN_SIZE_BLOB_STORAGE_MB", 16)) + + +def can_upload_to_blob_storage(data: Any) -> bool: + return get_mb_size(data) >= min_size_for_blob_storage_upload() + + def extract_name(klass: type) -> str: name_regex = r".+class.+?([\w\._]+).+" regex2 = r"([\w\.]+)" From 495357b62b3972ff8cb98a607a27be80894fec3a Mon Sep 17 00:00:00 2001 From: dk Date: Thu, 13 Jun 2024 11:13:30 +0700 Subject: [PATCH 08/30] [syft/test] test action object's saving to blob storage behavior when using `.send` --- .../src/syft/service/action/action_object.py | 4 --- .../syft/blob_storage/blob_storage_test.py | 36 +++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 41fe70f4b95..52f9ab0f097 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -830,10 +830,6 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | None: return None def _save_to_blob_storage(self) -> SyftError | None: - """ " - If less than min_size_mb, skip saving to blob storage - TODO: min_size_mb shoulb be passed as a env var - """ data = self.syft_action_data if isinstance(data, SyftError): return data diff --git a/packages/syft/tests/syft/blob_storage/blob_storage_test.py b/packages/syft/tests/syft/blob_storage/blob_storage_test.py index 11942815529..3efd8fb5e18 100644 --- a/packages/syft/tests/syft/blob_storage/blob_storage_test.py +++ b/packages/syft/tests/syft/blob_storage/blob_storage_test.py @@ -1,18 +1,23 @@ # stdlib import io +import os import random # third party +import numpy as np import pytest # syft absolute import syft as sy +from syft import ActionObject +from syft.client.domain_client import DomainClient from syft.service.context import AuthedServiceContext from syft.service.response import SyftSuccess from syft.service.user.user import UserCreate from syft.store.blob_storage import BlobDeposit from syft.store.blob_storage import SyftObjectRetrieval from syft.types.blob_storage import CreateBlobStorageEntry +from syft.util.util import min_size_for_blob_storage_upload raw_data = {"test": "test"} data = sy.serialize(raw_data, to_bytes=True) @@ -99,3 +104,34 @@ def test_blob_storage_delete(authed_context, blob_storage): with pytest.raises(FileNotFoundError): blob_storage.read(authed_context, blob_deposit.blob_storage_entry_id) + + +def test_action_obj_send_save_to_blob_storage(worker): + # set this so we will always save action objects to blob storage + os.environ["MIN_SIZE_BLOB_STORAGE_MB"] = "0" + + orig_obj: np.ndarray = np.array([1, 2, 3]) + action_obj = ActionObject.from_obj(orig_obj) + assert action_obj.dtype == orig_obj.dtype + + root_client: DomainClient = worker.root_client + action_obj.send(root_client) + assert isinstance(action_obj.syft_blob_storage_entry_id, sy.UID) + root_authed_ctx = AuthedServiceContext( + node=worker, credentials=root_client.verify_key + ) + + blob_storage = worker.get_service("BlobStorageService") + syft_retrieved_data = blob_storage.read( + root_authed_ctx, action_obj.syft_blob_storage_entry_id + ) + assert isinstance(syft_retrieved_data, SyftObjectRetrieval) + assert all(syft_retrieved_data.read() == orig_obj) + + # stop saving small action objects to blob storage + del os.environ["MIN_SIZE_BLOB_STORAGE_MB"] + assert min_size_for_blob_storage_upload() == 16 + orig_obj_2: np.ndarray = np.array([1, 2, 4]) + action_obj_2 = ActionObject.from_obj(orig_obj_2) + action_obj_2.send(root_client) + assert action_obj_2.syft_blob_storage_entry_id is None From 217c097820590c5f63cd222ae0622336d1155807 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 13 Jun 2024 17:50:53 +0700 Subject: [PATCH 09/30] [syft/blob_storage] pass `MIN_SIZE_BLOB_STORAGE_MB` env var into `Settings` and `BlobStorageConfig` which then is passed into a `Node` - `Node` then pass this information around by `NodeMetadata` / `NodeMetadataJSON` - Update utility methods to see if a client can upload an object to the blob storage - Modify test in `blob_storage_test` to reflect these changes Co-authored-by: Shubham Gupta --- packages/grid/backend/grid/core/config.py | 1 + packages/grid/backend/grid/core/node.py | 5 ++- packages/syft/src/syft/node/node.py | 11 ++--- .../src/syft/service/action/action_object.py | 5 ++- .../src/syft/service/blob_storage/util.py | 21 ++++++++++ .../syft/service/metadata/node_metadata.py | 2 + .../src/syft/store/blob_storage/__init__.py | 1 + packages/syft/src/syft/util/util.py | 13 +----- .../syft/blob_storage/blob_storage_test.py | 40 +++++++++---------- 9 files changed, 58 insertions(+), 41 deletions(-) create mode 100644 packages/syft/src/syft/service/blob_storage/util.py diff --git a/packages/grid/backend/grid/core/config.py b/packages/grid/backend/grid/core/config.py index 8c55b8cd3f7..86f37533977 100644 --- a/packages/grid/backend/grid/core/config.py +++ b/packages/grid/backend/grid/core/config.py @@ -155,6 +155,7 @@ def get_emails_enabled(self) -> Self: ASSOCIATION_REQUEST_AUTO_APPROVAL: bool = str_to_bool( os.getenv("ASSOCIATION_REQUEST_AUTO_APPROVAL", "False") ) + MIN_SIZE_BLOB_STORAGE_MB: int = int(os.getenv("MIN_SIZE_BLOB_STORAGE_MB", 16)) model_config = SettingsConfigDict(case_sensitive=True) diff --git a/packages/grid/backend/grid/core/node.py b/packages/grid/backend/grid/core/node.py index cde36f8c5fe..926cbbc5556 100644 --- a/packages/grid/backend/grid/core/node.py +++ b/packages/grid/backend/grid/core/node.py @@ -66,7 +66,10 @@ def seaweedfs_config() -> SeaweedFSConfig: mount_port=settings.SEAWEED_MOUNT_PORT, ) - return SeaweedFSConfig(client_config=seaweed_client_config) + return SeaweedFSConfig( + client_config=seaweed_client_config, + min_blob_size=settings.MIN_SIZE_BLOB_STORAGE_MB, + ) node_type = NodeType(get_node_type()) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index fb70f71d23b..3d0c8a376f7 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -478,7 +478,10 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: client_config = OnDiskBlobStorageClientConfig( base_directory=self.get_temp_dir("blob") ) - config_ = OnDiskBlobStorageConfig(client_config=client_config) + config_ = OnDiskBlobStorageConfig( + client_config=client_config, + min_blob_size=os.getenv("MIN_SIZE_BLOB_STORAGE_MB", 16), + ) else: config_ = config self.blob_store_config = config_ @@ -498,9 +501,6 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: ] = remote_profile if self.dev_mode: - # relative - from ..util.util import min_size_for_blob_storage_upload - if isinstance(self.blob_store_config, OnDiskBlobStorageConfig): print( f"Using on-disk blob storage with path: " @@ -509,7 +509,7 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: ) print( f"Minimum object size to be saved to the blob storage: " - f"{min_size_for_blob_storage_upload()} (MB)." + f"{self.blob_store_config.min_blob_size} (MB)." ) def run_peer_health_checks(self, context: AuthedServiceContext) -> None: @@ -1083,6 +1083,7 @@ def metadata(self) -> NodeMetadata: node_side_type=node_side_type, show_warnings=show_warnings, eager_execution_enabled=eager_execution_enabled, + min_size_blob_storage_mb=self.blob_store_config.min_blob_size, ) @property diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 52f9ab0f097..8419ff8c3e8 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -35,6 +35,7 @@ from ...node.credentials import SyftVerifyKey from ...serde.serializable import serializable from ...serde.serialize import _serialize as serialize +from ...service.blob_storage.util import can_upload_to_blob_storage from ...service.response import SyftError from ...store.linked_obj import LinkedObject from ...types.base import SyftBaseModel @@ -47,7 +48,6 @@ from ...types.uid import LineageID from ...types.uid import UID from ...util.logger import debug -from ...util.util import can_upload_to_blob_storage from ...util.util import prompt_warning_message from ..context import AuthedServiceContext from ..response import SyftException @@ -837,7 +837,8 @@ def _save_to_blob_storage(self) -> SyftError | None: return SyftError( message=f"cannot store empty object {self.id} to the blob storage" ) - if not can_upload_to_blob_storage(data): + api = APIRegistry.api_for(self.syft_node_location, self.syft_client_verify_key) + if not can_upload_to_blob_storage(data, api.metadata): self.syft_action_data_cache = data return None result = self._save_to_blob_storage_(data) diff --git a/packages/syft/src/syft/service/blob_storage/util.py b/packages/syft/src/syft/service/blob_storage/util.py new file mode 100644 index 00000000000..11d62f5eb97 --- /dev/null +++ b/packages/syft/src/syft/service/blob_storage/util.py @@ -0,0 +1,21 @@ +# stdlib +from typing import Any + +# relative +from ...util.util import get_mb_serialized_size +from ..metadata.node_metadata import NodeMetadata +from ..metadata.node_metadata import NodeMetadataJSON + + +def min_size_for_blob_storage_upload(metadata: NodeMetadata | NodeMetadataJSON) -> int: + if not isinstance(metadata, (NodeMetadata | NodeMetadataJSON)): + raise ValueError( + f"argument `metadata` is type {type(metadata)}, but it should be of type NodeMetadata or NodeMetadataJSON" + ) + return metadata.min_size_blob_storage_mb + + +def can_upload_to_blob_storage( + data: Any, metadata: NodeMetadata | NodeMetadataJSON +) -> bool: + return get_mb_serialized_size(data) >= min_size_for_blob_storage_upload(metadata) diff --git a/packages/syft/src/syft/service/metadata/node_metadata.py b/packages/syft/src/syft/service/metadata/node_metadata.py index de60b90a412..6aaae3fe69c 100644 --- a/packages/syft/src/syft/service/metadata/node_metadata.py +++ b/packages/syft/src/syft/service/metadata/node_metadata.py @@ -60,6 +60,7 @@ class NodeMetadata(SyftObject): node_side_type: str show_warnings: bool eager_execution_enabled: bool + min_size_blob_storage_mb: int def check_version(self, client_version: str) -> bool: return check_version( @@ -112,6 +113,7 @@ class NodeMetadataJSON(BaseModel, StorableObjectType): node_side_type: str show_warnings: bool supported_protocols: list = [] + min_size_blob_storage_mb: int @model_validator(mode="before") @classmethod diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index 663660e777c..8056be60207 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -273,6 +273,7 @@ def connect(self) -> BlobStorageConnection: class BlobStorageConfig(SyftBaseModel): client_type: type[BlobStorageClient] client_config: BlobStorageClientConfig + min_blob_size: int # in MB @migrate(BlobRetrievalByURLV4, BlobRetrievalByURL) diff --git a/packages/syft/src/syft/util/util.py b/packages/syft/src/syft/util/util.py index 0785e01f8c3..7d13e348f67 100644 --- a/packages/syft/src/syft/util/util.py +++ b/packages/syft/src/syft/util/util.py @@ -94,18 +94,7 @@ def get_mb_size(data: Any) -> float: def get_mb_serialized_size(data: Any) -> float: - return sys.getsizeof(serialize(data)) / (1024 * 1024) - - -def min_size_for_blob_storage_upload() -> int: - """ - Return the minimum size in MB for a blob storage upload. Default to 16 MB for now - """ - return int(os.getenv("MIN_SIZE_BLOB_STORAGE_MB", 16)) - - -def can_upload_to_blob_storage(data: Any) -> bool: - return get_mb_size(data) >= min_size_for_blob_storage_upload() + return sys.getsizeof(serialize(data, to_bytes=True)) / (1024 * 1024) def extract_name(klass: type) -> str: diff --git a/packages/syft/tests/syft/blob_storage/blob_storage_test.py b/packages/syft/tests/syft/blob_storage/blob_storage_test.py index 3efd8fb5e18..944b25628f2 100644 --- a/packages/syft/tests/syft/blob_storage/blob_storage_test.py +++ b/packages/syft/tests/syft/blob_storage/blob_storage_test.py @@ -1,6 +1,5 @@ # stdlib import io -import os import random # third party @@ -11,13 +10,14 @@ import syft as sy from syft import ActionObject from syft.client.domain_client import DomainClient +from syft.service.blob_storage.util import can_upload_to_blob_storage +from syft.service.blob_storage.util import min_size_for_blob_storage_upload from syft.service.context import AuthedServiceContext from syft.service.response import SyftSuccess from syft.service.user.user import UserCreate from syft.store.blob_storage import BlobDeposit from syft.store.blob_storage import SyftObjectRetrieval from syft.types.blob_storage import CreateBlobStorageEntry -from syft.util.util import min_size_for_blob_storage_upload raw_data = {"test": "test"} data = sy.serialize(raw_data, to_bytes=True) @@ -107,31 +107,29 @@ def test_blob_storage_delete(authed_context, blob_storage): def test_action_obj_send_save_to_blob_storage(worker): - # set this so we will always save action objects to blob storage - os.environ["MIN_SIZE_BLOB_STORAGE_MB"] = "0" - - orig_obj: np.ndarray = np.array([1, 2, 3]) - action_obj = ActionObject.from_obj(orig_obj) - assert action_obj.dtype == orig_obj.dtype - + # this small object should not be saved to blob storage + data_small: np.ndarray = np.array([1, 2, 3]) + action_obj = ActionObject.from_obj(data_small) + assert action_obj.dtype == data_small.dtype root_client: DomainClient = worker.root_client action_obj.send(root_client) - assert isinstance(action_obj.syft_blob_storage_entry_id, sy.UID) + assert action_obj.syft_blob_storage_entry_id is None + + # big object that should be saved to blob storage + assert min_size_for_blob_storage_upload(root_client.api.metadata) == 16 + num_elements = 50 * 1024 * 1024 + data_big = np.random.randint(0, 100, size=num_elements) # 4 bytes per int32 + action_obj_2 = ActionObject.from_obj(data_big) + assert can_upload_to_blob_storage(action_obj_2, root_client.api.metadata) + action_obj_2.send(root_client) + assert isinstance(action_obj_2.syft_blob_storage_entry_id, sy.UID) + # get back the object from blob storage to check if it is the same root_authed_ctx = AuthedServiceContext( node=worker, credentials=root_client.verify_key ) - blob_storage = worker.get_service("BlobStorageService") syft_retrieved_data = blob_storage.read( - root_authed_ctx, action_obj.syft_blob_storage_entry_id + root_authed_ctx, action_obj_2.syft_blob_storage_entry_id ) assert isinstance(syft_retrieved_data, SyftObjectRetrieval) - assert all(syft_retrieved_data.read() == orig_obj) - - # stop saving small action objects to blob storage - del os.environ["MIN_SIZE_BLOB_STORAGE_MB"] - assert min_size_for_blob_storage_upload() == 16 - orig_obj_2: np.ndarray = np.array([1, 2, 4]) - action_obj_2 = ActionObject.from_obj(orig_obj_2) - action_obj_2.send(root_client) - assert action_obj_2.syft_blob_storage_entry_id is None + assert all(syft_retrieved_data.read() == data_big) From fbcb4ea8a75b28de364644ba393c9bc6ae5a85f3 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 13 Jun 2024 20:46:04 +0700 Subject: [PATCH 10/30] [syft/tests] add `min_size_blob_storage_mb` to `NodeMetadataJSON` in fixture --- packages/syft/tests/syft/settings/fixtures.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/syft/tests/syft/settings/fixtures.py b/packages/syft/tests/syft/settings/fixtures.py index e083b93bd95..65cbf18deca 100644 --- a/packages/syft/tests/syft/settings/fixtures.py +++ b/packages/syft/tests/syft/settings/fixtures.py @@ -66,6 +66,7 @@ def metadata_json(faker) -> NodeMetadataJSON: node_side_type=NodeSideType.LOW_SIDE.value, show_warnings=False, node_type=NodeType.DOMAIN.value, + min_size_blob_storage_mb=16, ) From 63ee3ec010fa8e3a4116853c91c8e24b09429f68 Mon Sep 17 00:00:00 2001 From: dk Date: Fri, 14 Jun 2024 10:28:07 +0700 Subject: [PATCH 11/30] [syft/blob_storage] fix lint, add some debugging statements --- .../src/syft/protocol/protocol_version.json | 2 +- .../src/syft/service/action/action_object.py | 17 ++++++++++++++++- .../src/syft/service/action/action_service.py | 5 ++++- .../syft/src/syft/service/blob_storage/util.py | 3 ++- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 375aa1af66b..6dfcf62f3a2 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -16,7 +16,7 @@ "NodeMetadata": { "5": { "version": 5, - "hash": "70197b4725dbdea0560ed8388e4d20b76808bee988f3630c5f916ee8f48761f8", + "hash": "f3927d167073a4db369a07e3bbbf756075bbb29e9addec324b8cd2c3597b75a1", "action": "add" } }, diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 8419ff8c3e8..22038f47d2f 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -837,7 +837,22 @@ def _save_to_blob_storage(self) -> SyftError | None: return SyftError( message=f"cannot store empty object {self.id} to the blob storage" ) - api = APIRegistry.api_for(self.syft_node_location, self.syft_client_verify_key) + + api = APIRegistry.api_for( + node_uid=self.syft_node_location, + user_verify_key=self.syft_client_verify_key, + ) + print() + print() + print("---- inside ActionObject._save_to_blob_storage() ----") + print(f"{self.syft_node_location = }") + print(f"{self.syft_client_verify_key = }") + print(f"{APIRegistry = }") + print(f"{api = }") + if api is None: + raise ValueError( + f"api is None. You must login to {self.syft_node_location}" + ) if not can_upload_to_blob_storage(data, api.metadata): self.syft_action_data_cache = data return None diff --git a/packages/syft/src/syft/service/action/action_service.py b/packages/syft/src/syft/service/action/action_service.py index 00f1414b247..ab92f29592f 100644 --- a/packages/syft/src/syft/service/action/action_service.py +++ b/packages/syft/src/syft/service/action/action_service.py @@ -448,6 +448,10 @@ def set_result_to_store( context.node.id, context.credentials, ) + print("---- inside ActionService.set_result_to_store() ----") + print(f"{result_action_object.__dict__ = }") + print("calling result_action_object._save_to_blob_storage()") + blob_store_result = result_action_object._save_to_blob_storage() if isinstance(blob_store_result, SyftError): return Err(blob_store_result.message) @@ -723,7 +727,6 @@ def execute( context.node.id, context.credentials, ) - blob_store_result = result_action_object._save_to_blob_storage() if isinstance(blob_store_result, SyftError): return blob_store_result diff --git a/packages/syft/src/syft/service/blob_storage/util.py b/packages/syft/src/syft/service/blob_storage/util.py index 11d62f5eb97..68f3250035c 100644 --- a/packages/syft/src/syft/service/blob_storage/util.py +++ b/packages/syft/src/syft/service/blob_storage/util.py @@ -10,7 +10,8 @@ def min_size_for_blob_storage_upload(metadata: NodeMetadata | NodeMetadataJSON) -> int: if not isinstance(metadata, (NodeMetadata | NodeMetadataJSON)): raise ValueError( - f"argument `metadata` is type {type(metadata)}, but it should be of type NodeMetadata or NodeMetadataJSON" + f"argument `metadata` is type {type(metadata)}, " + f"but it should be of type NodeMetadata or NodeMetadataJSON" ) return metadata.min_size_blob_storage_mb From d37606cf919419b262ce453b35ad9b1381844dd6 Mon Sep 17 00:00:00 2001 From: dk Date: Fri, 14 Jun 2024 15:00:25 +0700 Subject: [PATCH 12/30] [syft/action_obj] put action object's upload to blob storage code in a try-catch - if can't save the data to blob store, save the data to `syft_action_data_cache` --- .../src/syft/service/action/action_object.py | 44 +++++++++---------- .../src/syft/service/action/action_service.py | 4 -- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 87266358815..3453c86f4af 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -837,30 +837,28 @@ def _save_to_blob_storage(self) -> SyftError | None: return SyftError( message=f"cannot store empty object {self.id} to the blob storage" ) - - api = APIRegistry.api_for( - node_uid=self.syft_node_location, - user_verify_key=self.syft_client_verify_key, - ) - print() - print() - print("---- inside ActionObject._save_to_blob_storage() ----") - print(f"{self.syft_node_location = }") - print(f"{self.syft_client_verify_key = }") - print(f"{APIRegistry = }") - print(f"{api = }") - if api is None: - raise ValueError( - f"api is None. You must login to {self.syft_node_location}" + try: + api = APIRegistry.api_for( + node_uid=self.syft_node_location, + user_verify_key=self.syft_client_verify_key, ) - if not can_upload_to_blob_storage(data, api.metadata): - self.syft_action_data_cache = data - return None - result = self._save_to_blob_storage_(data) - if isinstance(result, SyftError): - return result - if not TraceResultRegistry.current_thread_is_tracing(): - self.syft_action_data_cache = self.as_empty_data() + if api is None: + raise ValueError( + f"api is None. You must login to {self.syft_node_location}" + ) + if can_upload_to_blob_storage(data, api.metadata): + result = self._save_to_blob_storage_(data) + if isinstance(result, SyftError): + return result + if not TraceResultRegistry.current_thread_is_tracing(): + self.syft_action_data_cache = self.as_empty_data() + return None + except Exception as e: + print( + f"Failed to save action object {self.id} to the blob store. Error: {e}" + ) + + self.syft_action_data_cache = data return None @property diff --git a/packages/syft/src/syft/service/action/action_service.py b/packages/syft/src/syft/service/action/action_service.py index ab92f29592f..a7b908c99fa 100644 --- a/packages/syft/src/syft/service/action/action_service.py +++ b/packages/syft/src/syft/service/action/action_service.py @@ -448,10 +448,6 @@ def set_result_to_store( context.node.id, context.credentials, ) - print("---- inside ActionService.set_result_to_store() ----") - print(f"{result_action_object.__dict__ = }") - print("calling result_action_object._save_to_blob_storage()") - blob_store_result = result_action_object._save_to_blob_storage() if isinstance(blob_store_result, SyftError): return Err(blob_store_result.message) From 759f66b68744d5272136aa41808b91c0773a7f13 Mon Sep 17 00:00:00 2001 From: dk Date: Fri, 14 Jun 2024 15:56:55 +0700 Subject: [PATCH 13/30] [syft/tests] test saving big objects to blob storage when uploading big datasets --- .../syft/blob_storage/blob_storage_test.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/packages/syft/tests/syft/blob_storage/blob_storage_test.py b/packages/syft/tests/syft/blob_storage/blob_storage_test.py index 944b25628f2..1889004a47e 100644 --- a/packages/syft/tests/syft/blob_storage/blob_storage_test.py +++ b/packages/syft/tests/syft/blob_storage/blob_storage_test.py @@ -133,3 +133,46 @@ def test_action_obj_send_save_to_blob_storage(worker): ) assert isinstance(syft_retrieved_data, SyftObjectRetrieval) assert all(syft_retrieved_data.read() == data_big) + + +def test_upload_dataset_save_to_blob_storage(worker): + root_client: DomainClient = worker.root_client + root_authed_ctx = AuthedServiceContext( + node=worker, credentials=root_client.verify_key + ) + dataset = sy.Dataset( + name="small_dataset", + asset_list=[ + sy.Asset( + name="small_dataset", + data=np.array([1, 2, 3]), + mock=np.array([1, 1, 1]), + ) + ], + ) + root_client.upload_dataset(dataset) + blob_storage = worker.get_service("BlobStorageService") + assert len(blob_storage.get_all_blob_storage_entries(context=root_authed_ctx)) == 0 + + num_elements = 50 * 1024 * 1024 + data_big = np.random.randint(0, 100, size=num_elements) + dataset_big = sy.Dataset( + name="big_dataset", + asset_list=[ + sy.Asset( + name="big_dataset", + data=data_big, + mock=np.array([1, 1, 1]), + ) + ], + ) + root_client.upload_dataset(dataset_big) + # the private data should be saved to the blob storage + blob_entries: list = blob_storage.get_all_blob_storage_entries( + context=root_authed_ctx + ) + assert len(blob_entries) == 1 + data_big_retrieved: SyftObjectRetrieval = blob_storage.read( + context=root_authed_ctx, uid=blob_entries[0].id + ) + assert all(data_big_retrieved.read() == data_big) From c9b5c3e3986f2a67ac35e634b15f07cbc6725643 Mon Sep 17 00:00:00 2001 From: dk Date: Mon, 24 Jun 2024 12:05:54 +0700 Subject: [PATCH 14/30] [syft/action_obj] - `_save_to_blob_store` returns a SyftWarning if the object is small and not saved to the blob store - passing flags telling action service to not clear the cache data of these small objects --- packages/syft/src/syft/client/domain_client.py | 13 +++++++++++-- .../src/syft/service/action/action_object.py | 18 ++++++++++++++---- .../src/syft/service/action/action_service.py | 13 ++++++++++++- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/packages/syft/src/syft/client/domain_client.py b/packages/syft/src/syft/client/domain_client.py index ca9ee53ce22..16a0170b556 100644 --- a/packages/syft/src/syft/client/domain_client.py +++ b/packages/syft/src/syft/client/domain_client.py @@ -25,6 +25,7 @@ from ..service.dataset.dataset import CreateDataset from ..service.response import SyftError from ..service.response import SyftSuccess +from ..service.response import SyftWarning from ..service.sync.diff_state import ResolvedSyncState from ..service.sync.sync_state import SyncState from ..service.user.roles import Roles @@ -128,7 +129,7 @@ def upload_dataset(self, dataset: CreateDataset) -> SyftSuccess | SyftError: ) as pbar: for asset in dataset.asset_list: try: - contains_empty = asset.contains_empty() + contains_empty: bool = asset.contains_empty() twin = TwinObject( private_obj=ActionObject.from_obj(asset.data), mock_obj=ActionObject.from_obj(asset.mock), @@ -142,8 +143,16 @@ def upload_dataset(self, dataset: CreateDataset) -> SyftSuccess | SyftError: tqdm.write(f"Failed to create twin for {asset.name}. {e}") return SyftError(message=f"Failed to create twin. {e}") + if isinstance(res, SyftWarning): + print(res.message) + skip_save_to_blob_store, skip_clear_cache = True, True + else: + skip_save_to_blob_store, skip_clear_cache = False, False response = self.api.services.action.set( - twin, ignore_detached_objs=contains_empty + twin, + ignore_detached_objs=contains_empty, + skip_save_to_blob_store=skip_save_to_blob_store, + skip_clear_cache=skip_clear_cache, ) if isinstance(response, SyftError): tqdm.write(f"Failed to upload asset: {asset.name}") diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 2f04d4a8295..7749410db28 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -36,7 +36,10 @@ from ...serde.serializable import serializable from ...serde.serialize import _serialize as serialize from ...service.blob_storage.util import can_upload_to_blob_storage +from ...service.blob_storage.util import min_size_for_blob_storage_upload from ...service.response import SyftError +from ...service.response import SyftSuccess +from ...service.response import SyftWarning from ...store.linked_obj import LinkedObject from ...types.base import SyftBaseModel from ...types.datetime import DateTime @@ -844,7 +847,9 @@ def _set_reprs(self, data: any) -> None: ) self.syft_action_data_str_ = truncate_str(str(data)) - def _save_to_blob_storage(self, allow_empty: bool = False) -> SyftError | None: + def _save_to_blob_storage( + self, allow_empty: bool = False + ) -> SyftError | SyftSuccess | SyftWarning: data = self.syft_action_data if isinstance(data, SyftError): return data @@ -867,15 +872,20 @@ def _save_to_blob_storage(self, allow_empty: bool = False) -> SyftError | None: return result if not TraceResultRegistry.current_thread_is_tracing(): self._clear_cache() - return None + return SyftSuccess( + message=f"Saved action object {self.id} to the blob store" + ) except Exception as e: print( f"Failed to save action object {self.id} to the blob store. Error: {e}" ) self.syft_action_data_cache = data - - return None + return SyftWarning( + message=f"The action object {self.id} was not saved to " + f"the blob store but to memory cache since it is " + f"smaller than {min_size_for_blob_storage_upload(api.metadata)} Mb." + ) def _clear_cache(self) -> None: self.syft_action_data_cache = self.as_empty_data() diff --git a/packages/syft/src/syft/service/action/action_service.py b/packages/syft/src/syft/service/action/action_service.py index c680c7003f7..6ecb9d97b6c 100644 --- a/packages/syft/src/syft/service/action/action_service.py +++ b/packages/syft/src/syft/service/action/action_service.py @@ -84,6 +84,8 @@ def set( action_object: ActionObject | TwinObject, add_storage_permission: bool = True, ignore_detached_objs: bool = False, + skip_clear_cache: bool = False, + skip_save_to_blob_store: bool = False, ) -> ActionObject | SyftError: res = self._set( context, @@ -91,6 +93,8 @@ def set( has_result_read_permission=True, add_storage_permission=add_storage_permission, ignore_detached_objs=ignore_detached_objs, + skip_clear_cache=skip_clear_cache, + skip_save_to_blob_store=skip_save_to_blob_store, ) if res.is_err(): return SyftError(message=res.value) @@ -102,6 +106,9 @@ def is_detached_obj( action_object: ActionObject | TwinObject, ignore_detached_obj: bool = False, ) -> bool: + """ + A detached object is an object that is not yet saved to the blob storage. + """ if ( isinstance(action_object, TwinObject) and ( @@ -125,8 +132,12 @@ def _set( add_storage_permission: bool = True, ignore_detached_objs: bool = False, skip_clear_cache: bool = False, + skip_save_to_blob_store: bool = False, ) -> Result[ActionObject, str]: - if self.is_detached_obj(action_object, ignore_detached_objs): + if ( + self.is_detached_obj(action_object, ignore_detached_objs) + and not skip_save_to_blob_store + ): return Err( "you uploaded an ActionObject that is not yet in the blob storage" ) From 033365ed323437cb0bbfce6ff1e63caa751ec438 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 24 Jun 2024 13:47:47 +0700 Subject: [PATCH 15/30] [syft/action_obj] set `skip_save_to_blob_stores` and `skip_clear_cache` in `ActionObject.send` --- .../syft/src/syft/service/action/action_object.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 7749410db28..d0366fd0f35 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -1237,8 +1237,17 @@ def _send( api = self._get_api() if isinstance(api, SyftError): return api + + if isinstance(blob_storage_res, SyftWarning): + print(blob_storage_res.message) + skip_save_to_blob_store, skip_clear_cache = True, True + else: + skip_save_to_blob_store, skip_clear_cache = False, False res = api.services.action.set( - self, add_storage_permission=add_storage_permission + self, + add_storage_permission=add_storage_permission, + skip_save_to_blob_store=skip_save_to_blob_store, + skip_clear_cache=skip_clear_cache, ) if isinstance(res, ActionObject): self.syft_created_at = res.syft_created_at From 4b2ec09583fc59c110bbe6d71716f713d9e9fef9 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 24 Jun 2024 14:07:56 +0700 Subject: [PATCH 16/30] [syft/action_obj] set skip_save_to_blob_stores and `skip_clear_cache` in `ActionObject.execute` --- .../syft/src/syft/service/action/action_service.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_service.py b/packages/syft/src/syft/service/action/action_service.py index 6ecb9d97b6c..79a453f4452 100644 --- a/packages/syft/src/syft/service/action/action_service.py +++ b/packages/syft/src/syft/service/action/action_service.py @@ -23,6 +23,7 @@ from ..policy.policy import retrieve_from_db from ..response import SyftError from ..response import SyftSuccess +from ..response import SyftWarning from ..service import AbstractService from ..service import SERVICE_TO_TYPES from ..service import TYPE_TO_SERVICE @@ -794,8 +795,17 @@ def execute( context.extra_kwargs = { "has_result_read_permission": has_result_read_permission } - - set_result = self._set(context, result_action_object) + if isinstance(blob_store_result, SyftWarning): + print(blob_store_result.message) + skip_save_to_blob_store, skip_clear_cache = True, True + else: + skip_save_to_blob_store, skip_clear_cache = False, False + set_result = self._set( + context, + result_action_object, + skip_save_to_blob_store=skip_save_to_blob_store, + skip_clear_cache=skip_clear_cache, + ) if set_result.is_err(): return Err( f"Failed executing action {action}, set result is an error: {set_result.err()}" From 92c4843d905ede2f8ebc20dc890e1bf222777366 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 24 Jun 2024 14:15:52 +0700 Subject: [PATCH 17/30] [syft/twin_obj] pass `skip_save_to_blob_stores` and `skip_clear_cache` to `ActionService.set` for `TwinObject.send` --- packages/syft/src/syft/types/twin_object.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/packages/syft/src/syft/types/twin_object.py b/packages/syft/src/syft/types/twin_object.py index f94d75744d6..82eae3590b8 100644 --- a/packages/syft/src/syft/types/twin_object.py +++ b/packages/syft/src/syft/types/twin_object.py @@ -17,6 +17,8 @@ from ..service.action.action_object import TwinMode from ..service.action.action_types import action_types from ..service.response import SyftError +from ..service.response import SyftSuccess +from ..service.response import SyftWarning from ..types.syft_object import SYFT_OBJECT_VERSION_2 from .syft_object import SyftObject from .uid import UID @@ -82,7 +84,9 @@ def mock(self) -> ActionObject: mock.id = twin_id return mock - def _save_to_blob_storage(self, allow_empty: bool = False) -> SyftError | None: + def _save_to_blob_storage( + self, allow_empty: bool = False + ) -> SyftError | SyftSuccess | SyftWarning: # Set node location and verify key self.private_obj._set_obj_location_( self.syft_node_location, @@ -99,8 +103,16 @@ def _save_to_blob_storage(self, allow_empty: bool = False) -> SyftError | None: def send(self, client: SyftClient, add_storage_permission: bool = True) -> Any: self._set_obj_location_(client.id, client.verify_key) - self._save_to_blob_storage() + blob_store_result = self._save_to_blob_storage() + if isinstance(blob_store_result, SyftWarning): + print(blob_store_result.message) + skip_save_to_blob_store, skip_clear_cache = True, True + else: + skip_save_to_blob_store, skip_clear_cache = False, False res = client.api.services.action.set( - self, add_storage_permission=add_storage_permission + self, + add_storage_permission=add_storage_permission, + skip_save_to_blob_store=skip_save_to_blob_store, + skip_clear_cache=skip_clear_cache, ) return res From f4c9c39d146c7edab6e6f248ddc9d2b501595150 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 24 Jun 2024 15:04:17 +0700 Subject: [PATCH 18/30] [syft/twin_obj] pass skip_save_to_blob_stores and `skip_clear_cache` to `ActionService.set` for `ActionService.set_result_to_store` --- .../syft/src/syft/service/action/action_service.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/action/action_service.py b/packages/syft/src/syft/service/action/action_service.py index 79a453f4452..26019d632f7 100644 --- a/packages/syft/src/syft/service/action/action_service.py +++ b/packages/syft/src/syft/service/action/action_service.py @@ -516,6 +516,11 @@ def set_result_to_store( blob_store_result = result_action_object._save_to_blob_storage() if isinstance(blob_store_result, SyftError): return Err(blob_store_result.message) + if isinstance(blob_store_result, SyftWarning): + print(blob_store_result.message) + skip_save_to_blob_store, skip_clear_cache = True, True + else: + skip_save_to_blob_store, skip_clear_cache = False, False # IMPORTANT: DO THIS ONLY AFTER ._save_to_blob_storage if isinstance(result_action_object, TwinObject): @@ -528,7 +533,11 @@ def set_result_to_store( # Since this just meta data about the result, they always have access to it. set_result = self._set( - context, result_action_object, has_result_read_permission=True + context, + result_action_object, + has_result_read_permission=True, + skip_save_to_blob_store=skip_save_to_blob_store, + skip_clear_cache=skip_clear_cache, ) if set_result.is_err(): From bbecc0743adcc47feb410eee0848d8970778ba9e Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 24 Jun 2024 15:16:24 +0700 Subject: [PATCH 19/30] [syft/twin_obj] pass skip_save_to_blob_stores and `skip_clear_cache` in the rest of the code base - unit tests passed --- .../syft/src/syft/service/action/action_object.py | 11 ++++++++++- .../syft/src/syft/service/action/action_service.py | 12 +++++++++++- packages/syft/src/syft/service/dataset/dataset.py | 8 +++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index d0366fd0f35..036dc99fea6 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -523,7 +523,16 @@ def process_arg(arg: ActionObject | Asset | UID | Any) -> Any: r = arg._save_to_blob_storage() if isinstance(r, SyftError): print(r.message) - arg = api.services.action.set(arg) + if isinstance(r, SyftWarning): + print(r.message) + skip_save_to_blob_store, skip_clear_cache = True, True + else: + skip_save_to_blob_store, skip_clear_cache = False, False + arg = api.services.action.set( + arg, + skip_save_to_blob_store=skip_save_to_blob_store, + skip_clear_cache=skip_clear_cache, + ) return arg arg_list = [process_arg(arg) for arg in args] if args else [] diff --git a/packages/syft/src/syft/service/action/action_service.py b/packages/syft/src/syft/service/action/action_service.py index 26019d632f7..f7ca2d78efa 100644 --- a/packages/syft/src/syft/service/action/action_service.py +++ b/packages/syft/src/syft/service/action/action_service.py @@ -70,8 +70,18 @@ def np_array(self, context: AuthedServiceContext, data: Any) -> Any: blob_store_result = np_obj._save_to_blob_storage() if isinstance(blob_store_result, SyftError): return blob_store_result + if isinstance(blob_store_result, SyftWarning): + print(blob_store_result.message) + skip_save_to_blob_store, skip_clear_cache = True, True + else: + skip_save_to_blob_store, skip_clear_cache = False, False - np_pointer = self._set(context, np_obj) + np_pointer = self._set( + context, + np_obj, + skip_save_to_blob_store=skip_save_to_blob_store, + skip_clear_cache=skip_clear_cache, + ) return np_pointer @service_method( diff --git a/packages/syft/src/syft/service/dataset/dataset.py b/packages/syft/src/syft/service/dataset/dataset.py index 9dde84429c4..a345aea94bd 100644 --- a/packages/syft/src/syft/service/dataset/dataset.py +++ b/packages/syft/src/syft/service/dataset/dataset.py @@ -709,7 +709,11 @@ def create_and_store_twin(context: TransformContext) -> TransformContext: res = twin._save_to_blob_storage(allow_empty=contains_empty) if isinstance(res, SyftError): raise ValueError(res.message) - + if isinstance(res, SyftWarning): + print(res.message) + skip_save_to_blob_store, skip_clear_cache = True, True + else: + skip_save_to_blob_store, skip_clear_cache = False, False # TODO, upload to blob storage here if context.node is None: raise ValueError( @@ -719,6 +723,8 @@ def create_and_store_twin(context: TransformContext) -> TransformContext: result = action_service._set( context=context.to_node_context(), action_object=twin, + skip_save_to_blob_store=skip_save_to_blob_store, + skip_clear_cache=skip_clear_cache, ) if result.is_err(): raise RuntimeError(f"Failed to create and store twin. Error: {result}") From 1e2f9602d9fc1c83a838e852dc1021a9d43e4d56 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 24 Jun 2024 16:32:21 +0700 Subject: [PATCH 20/30] [syft] fix lint. Change using print "not saving action objs to blob store" to logger.debug --- packages/syft/src/syft/client/domain_client.py | 2 +- packages/syft/src/syft/service/action/action_object.py | 9 ++++----- packages/syft/src/syft/service/action/action_service.py | 7 ++++--- packages/syft/src/syft/service/dataset/dataset.py | 3 ++- packages/syft/src/syft/types/twin_object.py | 3 ++- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/packages/syft/src/syft/client/domain_client.py b/packages/syft/src/syft/client/domain_client.py index 16a0170b556..70543bd3649 100644 --- a/packages/syft/src/syft/client/domain_client.py +++ b/packages/syft/src/syft/client/domain_client.py @@ -144,7 +144,7 @@ def upload_dataset(self, dataset: CreateDataset) -> SyftSuccess | SyftError: return SyftError(message=f"Failed to create twin. {e}") if isinstance(res, SyftWarning): - print(res.message) + logger.debug(res.message) skip_save_to_blob_store, skip_clear_cache = True, True else: skip_save_to_blob_store, skip_clear_cache = False, False diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 036dc99fea6..58312bb822b 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -18,6 +18,7 @@ from typing import TYPE_CHECKING # third party +from loguru import logger from pydantic import ConfigDict from pydantic import Field from pydantic import field_validator @@ -36,7 +37,6 @@ from ...serde.serializable import serializable from ...serde.serialize import _serialize as serialize from ...service.blob_storage.util import can_upload_to_blob_storage -from ...service.blob_storage.util import min_size_for_blob_storage_upload from ...service.response import SyftError from ...service.response import SyftSuccess from ...service.response import SyftWarning @@ -524,7 +524,7 @@ def process_arg(arg: ActionObject | Asset | UID | Any) -> Any: if isinstance(r, SyftError): print(r.message) if isinstance(r, SyftWarning): - print(r.message) + logger.debug(r.message) skip_save_to_blob_store, skip_clear_cache = True, True else: skip_save_to_blob_store, skip_clear_cache = False, False @@ -892,8 +892,7 @@ def _save_to_blob_storage( self.syft_action_data_cache = data return SyftWarning( message=f"The action object {self.id} was not saved to " - f"the blob store but to memory cache since it is " - f"smaller than {min_size_for_blob_storage_upload(api.metadata)} Mb." + f"the blob store but to memory cache since it is small." ) def _clear_cache(self) -> None: @@ -1248,7 +1247,7 @@ def _send( return api if isinstance(blob_storage_res, SyftWarning): - print(blob_storage_res.message) + logger.debug(blob_storage_res.message) skip_save_to_blob_store, skip_clear_cache = True, True else: skip_save_to_blob_store, skip_clear_cache = False, False diff --git a/packages/syft/src/syft/service/action/action_service.py b/packages/syft/src/syft/service/action/action_service.py index f7ca2d78efa..419a6f1f42d 100644 --- a/packages/syft/src/syft/service/action/action_service.py +++ b/packages/syft/src/syft/service/action/action_service.py @@ -3,6 +3,7 @@ from typing import Any # third party +from loguru import logger import numpy as np from result import Err from result import Ok @@ -71,7 +72,7 @@ def np_array(self, context: AuthedServiceContext, data: Any) -> Any: if isinstance(blob_store_result, SyftError): return blob_store_result if isinstance(blob_store_result, SyftWarning): - print(blob_store_result.message) + logger.debug(blob_store_result.message) skip_save_to_blob_store, skip_clear_cache = True, True else: skip_save_to_blob_store, skip_clear_cache = False, False @@ -527,7 +528,7 @@ def set_result_to_store( if isinstance(blob_store_result, SyftError): return Err(blob_store_result.message) if isinstance(blob_store_result, SyftWarning): - print(blob_store_result.message) + logger.debug(blob_store_result.message) skip_save_to_blob_store, skip_clear_cache = True, True else: skip_save_to_blob_store, skip_clear_cache = False, False @@ -815,7 +816,7 @@ def execute( "has_result_read_permission": has_result_read_permission } if isinstance(blob_store_result, SyftWarning): - print(blob_store_result.message) + logger.debug(blob_store_result.message) skip_save_to_blob_store, skip_clear_cache = True, True else: skip_save_to_blob_store, skip_clear_cache = False, False diff --git a/packages/syft/src/syft/service/dataset/dataset.py b/packages/syft/src/syft/service/dataset/dataset.py index a345aea94bd..d583b5d28d7 100644 --- a/packages/syft/src/syft/service/dataset/dataset.py +++ b/packages/syft/src/syft/service/dataset/dataset.py @@ -8,6 +8,7 @@ # third party from IPython.display import display import itables +from loguru import logger import pandas as pd from pydantic import ConfigDict from pydantic import field_validator @@ -710,7 +711,7 @@ def create_and_store_twin(context: TransformContext) -> TransformContext: if isinstance(res, SyftError): raise ValueError(res.message) if isinstance(res, SyftWarning): - print(res.message) + logger.debug(res.message) skip_save_to_blob_store, skip_clear_cache = True, True else: skip_save_to_blob_store, skip_clear_cache = False, False diff --git a/packages/syft/src/syft/types/twin_object.py b/packages/syft/src/syft/types/twin_object.py index 82eae3590b8..ba9b94b30ab 100644 --- a/packages/syft/src/syft/types/twin_object.py +++ b/packages/syft/src/syft/types/twin_object.py @@ -6,6 +6,7 @@ from typing import ClassVar # third party +from loguru import logger from pydantic import field_validator from pydantic import model_validator from typing_extensions import Self @@ -105,7 +106,7 @@ def send(self, client: SyftClient, add_storage_permission: bool = True) -> Any: self._set_obj_location_(client.id, client.verify_key) blob_store_result = self._save_to_blob_storage() if isinstance(blob_store_result, SyftWarning): - print(blob_store_result.message) + logger.debug(blob_store_result.message) skip_save_to_blob_store, skip_clear_cache = True, True else: skip_save_to_blob_store, skip_clear_cache = False, False From 39fe4a6cedd4ed3ba9439663b45a26614aa5aa47 Mon Sep 17 00:00:00 2001 From: dk Date: Tue, 25 Jun 2024 09:57:22 +0700 Subject: [PATCH 21/30] fix logger --- packages/syft/src/syft/service/dataset/dataset.py | 3 ++- packages/syft/src/syft/types/twin_object.py | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/service/dataset/dataset.py b/packages/syft/src/syft/service/dataset/dataset.py index d583b5d28d7..19352d26377 100644 --- a/packages/syft/src/syft/service/dataset/dataset.py +++ b/packages/syft/src/syft/service/dataset/dataset.py @@ -2,13 +2,13 @@ from collections.abc import Callable from datetime import datetime from enum import Enum +import logging import textwrap from typing import Any # third party from IPython.display import display import itables -from loguru import logger import pandas as pd from pydantic import ConfigDict from pydantic import field_validator @@ -50,6 +50,7 @@ from ..response import SyftWarning NamePartitionKey = PartitionKey(key="name", type_=str) +logger = logging.getLogger(__name__) @serializable() diff --git a/packages/syft/src/syft/types/twin_object.py b/packages/syft/src/syft/types/twin_object.py index ba9b94b30ab..b5f7c90e42c 100644 --- a/packages/syft/src/syft/types/twin_object.py +++ b/packages/syft/src/syft/types/twin_object.py @@ -2,11 +2,11 @@ from __future__ import annotations # stdlib +import logging from typing import Any from typing import ClassVar # third party -from loguru import logger from pydantic import field_validator from pydantic import model_validator from typing_extensions import Self @@ -24,6 +24,8 @@ from .syft_object import SyftObject from .uid import UID +logger = logging.getLogger(__name__) + def to_action_object(obj: Any) -> ActionObject: if isinstance(obj, ActionObject): From 18ad21b1278ac776c94c9138aba2f46ba75fdf3c Mon Sep 17 00:00:00 2001 From: dk Date: Tue, 25 Jun 2024 10:05:19 +0700 Subject: [PATCH 22/30] fix logger in action_service.py --- packages/syft/src/syft/service/action/action_service.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/action/action_service.py b/packages/syft/src/syft/service/action/action_service.py index 419a6f1f42d..50504c34301 100644 --- a/packages/syft/src/syft/service/action/action_service.py +++ b/packages/syft/src/syft/service/action/action_service.py @@ -1,9 +1,9 @@ # stdlib import importlib +import logging from typing import Any # third party -from loguru import logger import numpy as np from result import Err from result import Ok @@ -49,6 +49,8 @@ from .pandas import PandasDataFrameObject # noqa: F401 from .pandas import PandasSeriesObject # noqa: F401 +logger = logging.getLogger(__name__) + @serializable() class ActionService(AbstractService): From 77fb1c20d9804f9aaf37ab2e935fd2eac08df732 Mon Sep 17 00:00:00 2001 From: dk Date: Thu, 27 Jun 2024 16:01:52 +0700 Subject: [PATCH 23/30] [syft/action_service] merging flags to save action objects to the blob store / clear cache --- .../syft/src/syft/client/domain_client.py | 5 ++-- .../src/syft/service/action/action_object.py | 10 +++---- .../src/syft/service/action/action_service.py | 26 +++++++------------ .../syft/src/syft/service/dataset/dataset.py | 5 ++-- packages/syft/src/syft/types/twin_object.py | 5 ++-- 5 files changed, 20 insertions(+), 31 deletions(-) diff --git a/packages/syft/src/syft/client/domain_client.py b/packages/syft/src/syft/client/domain_client.py index 7214d8bf966..eb677c6b2cd 100644 --- a/packages/syft/src/syft/client/domain_client.py +++ b/packages/syft/src/syft/client/domain_client.py @@ -148,14 +148,13 @@ def upload_dataset(self, dataset: CreateDataset) -> SyftSuccess | SyftError: if isinstance(res, SyftWarning): logger.debug(res.message) - skip_save_to_blob_store, skip_clear_cache = True, True + skip_save_to_blob_store = True else: - skip_save_to_blob_store, skip_clear_cache = False, False + skip_save_to_blob_store = False response = self.api.services.action.set( twin, ignore_detached_objs=contains_empty, skip_save_to_blob_store=skip_save_to_blob_store, - skip_clear_cache=skip_clear_cache, ) if isinstance(response, SyftError): tqdm.write(f"Failed to upload asset: {asset.name}") diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index bcdf3190fbc..f737c87fcb0 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -527,13 +527,12 @@ def process_arg(arg: ActionObject | Asset | UID | Any) -> Any: print(r.message) if isinstance(r, SyftWarning): logger.debug(r.message) - skip_save_to_blob_store, skip_clear_cache = True, True + skip_save_to_blob_store = True else: - skip_save_to_blob_store, skip_clear_cache = False, False + skip_save_to_blob_store = False arg = api.services.action.set( arg, skip_save_to_blob_store=skip_save_to_blob_store, - skip_clear_cache=skip_clear_cache, ) return arg @@ -1251,14 +1250,13 @@ def _send( if isinstance(blob_storage_res, SyftWarning): logger.debug(blob_storage_res.message) - skip_save_to_blob_store, skip_clear_cache = True, True + skip_save_to_blob_store = True else: - skip_save_to_blob_store, skip_clear_cache = False, False + skip_save_to_blob_store = False res = api.services.action.set( self, add_storage_permission=add_storage_permission, skip_save_to_blob_store=skip_save_to_blob_store, - skip_clear_cache=skip_clear_cache, ) if isinstance(res, ActionObject): self.syft_created_at = res.syft_created_at diff --git a/packages/syft/src/syft/service/action/action_service.py b/packages/syft/src/syft/service/action/action_service.py index 50504c34301..da273e2c12b 100644 --- a/packages/syft/src/syft/service/action/action_service.py +++ b/packages/syft/src/syft/service/action/action_service.py @@ -75,15 +75,14 @@ def np_array(self, context: AuthedServiceContext, data: Any) -> Any: return blob_store_result if isinstance(blob_store_result, SyftWarning): logger.debug(blob_store_result.message) - skip_save_to_blob_store, skip_clear_cache = True, True + skip_save_to_blob_store = True else: - skip_save_to_blob_store, skip_clear_cache = False, False + skip_save_to_blob_store = False np_pointer = self._set( context, np_obj, skip_save_to_blob_store=skip_save_to_blob_store, - skip_clear_cache=skip_clear_cache, ) return np_pointer @@ -98,7 +97,6 @@ def set( action_object: ActionObject | TwinObject, add_storage_permission: bool = True, ignore_detached_objs: bool = False, - skip_clear_cache: bool = False, skip_save_to_blob_store: bool = False, ) -> ActionObject | SyftError: res = self._set( @@ -107,7 +105,6 @@ def set( has_result_read_permission=True, add_storage_permission=add_storage_permission, ignore_detached_objs=ignore_detached_objs, - skip_clear_cache=skip_clear_cache, skip_save_to_blob_store=skip_save_to_blob_store, ) if res.is_err(): @@ -145,7 +142,6 @@ def _set( has_result_read_permission: bool = False, add_storage_permission: bool = True, ignore_detached_objs: bool = False, - skip_clear_cache: bool = False, skip_save_to_blob_store: bool = False, ) -> Result[ActionObject, str]: if ( @@ -153,19 +149,19 @@ def _set( and not skip_save_to_blob_store ): return Err( - "you uploaded an ActionObject that is not yet in the blob storage" + "You uploaded an ActionObject that is not yet in the blob storage" ) """Save an object to the action store""" # 🟡 TODO 9: Create some kind of type checking / protocol for SyftSerializable if isinstance(action_object, ActionObject): action_object.syft_created_at = DateTime.now() - if not skip_clear_cache: + if not skip_save_to_blob_store: action_object._clear_cache() - else: + else: # TwinObject action_object.private_obj.syft_created_at = DateTime.now() # type: ignore[unreachable] action_object.mock_obj.syft_created_at = DateTime.now() - if not skip_clear_cache: + if not skip_save_to_blob_store: action_object.private_obj._clear_cache() action_object.mock_obj._clear_cache() @@ -531,9 +527,9 @@ def set_result_to_store( return Err(blob_store_result.message) if isinstance(blob_store_result, SyftWarning): logger.debug(blob_store_result.message) - skip_save_to_blob_store, skip_clear_cache = True, True + skip_save_to_blob_store = True else: - skip_save_to_blob_store, skip_clear_cache = False, False + skip_save_to_blob_store = False # IMPORTANT: DO THIS ONLY AFTER ._save_to_blob_storage if isinstance(result_action_object, TwinObject): @@ -550,7 +546,6 @@ def set_result_to_store( result_action_object, has_result_read_permission=True, skip_save_to_blob_store=skip_save_to_blob_store, - skip_clear_cache=skip_clear_cache, ) if set_result.is_err(): @@ -819,14 +814,13 @@ def execute( } if isinstance(blob_store_result, SyftWarning): logger.debug(blob_store_result.message) - skip_save_to_blob_store, skip_clear_cache = True, True + skip_save_to_blob_store = True else: - skip_save_to_blob_store, skip_clear_cache = False, False + skip_save_to_blob_store = False set_result = self._set( context, result_action_object, skip_save_to_blob_store=skip_save_to_blob_store, - skip_clear_cache=skip_clear_cache, ) if set_result.is_err(): return Err( diff --git a/packages/syft/src/syft/service/dataset/dataset.py b/packages/syft/src/syft/service/dataset/dataset.py index 19352d26377..2bd7dc33b90 100644 --- a/packages/syft/src/syft/service/dataset/dataset.py +++ b/packages/syft/src/syft/service/dataset/dataset.py @@ -713,9 +713,9 @@ def create_and_store_twin(context: TransformContext) -> TransformContext: raise ValueError(res.message) if isinstance(res, SyftWarning): logger.debug(res.message) - skip_save_to_blob_store, skip_clear_cache = True, True + skip_save_to_blob_store = True else: - skip_save_to_blob_store, skip_clear_cache = False, False + skip_save_to_blob_store = False # TODO, upload to blob storage here if context.node is None: raise ValueError( @@ -726,7 +726,6 @@ def create_and_store_twin(context: TransformContext) -> TransformContext: context=context.to_node_context(), action_object=twin, skip_save_to_blob_store=skip_save_to_blob_store, - skip_clear_cache=skip_clear_cache, ) if result.is_err(): raise RuntimeError(f"Failed to create and store twin. Error: {result}") diff --git a/packages/syft/src/syft/types/twin_object.py b/packages/syft/src/syft/types/twin_object.py index b5f7c90e42c..eae86e9cb5b 100644 --- a/packages/syft/src/syft/types/twin_object.py +++ b/packages/syft/src/syft/types/twin_object.py @@ -109,13 +109,12 @@ def send(self, client: SyftClient, add_storage_permission: bool = True) -> Any: blob_store_result = self._save_to_blob_storage() if isinstance(blob_store_result, SyftWarning): logger.debug(blob_store_result.message) - skip_save_to_blob_store, skip_clear_cache = True, True + skip_save_to_blob_store = True else: - skip_save_to_blob_store, skip_clear_cache = False, False + skip_save_to_blob_store = False res = client.api.services.action.set( self, add_storage_permission=add_storage_permission, skip_save_to_blob_store=skip_save_to_blob_store, - skip_clear_cache=skip_clear_cache, ) return res From 719a9a5b7456d3de3c4bac034b27fe3f9cc1ba17 Mon Sep 17 00:00:00 2001 From: dk Date: Thu, 27 Jun 2024 17:32:24 +0700 Subject: [PATCH 24/30] [syft/action_obj] refactor `ActionObject._save_to_blob_storage` --- .../src/syft/service/action/action_object.py | 63 +++++++++---------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index f737c87fcb0..9f0c7baaf5d 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -793,17 +793,25 @@ def reload_cache(self) -> SyftError | None: return None - def _save_to_blob_storage_(self, data: Any) -> SyftError | None: + def _save_to_blob_storage_(self, data: Any) -> SyftError | SyftWarning | None: # relative from ...types.blob_storage import BlobFile from ...types.blob_storage import CreateBlobStorageEntry + api = APIRegistry.api_for(self.syft_node_location, self.syft_client_verify_key) + if api is None: + raise ValueError( + f"api is None. You must login to {self.syft_node_location}" + ) + if not can_upload_to_blob_storage(data, api.metadata): + return SyftWarning( + message=f"The action object {self.id} was not saved to " + f"the blob store but to memory cache since it is small." + ) + if not isinstance(data, ActionDataEmpty): if isinstance(data, BlobFile): if not data.uploaded: - api = APIRegistry.api_for( - self.syft_node_location, self.syft_client_verify_key - ) data._upload_to_blobstorage_from_api(api) else: serialized = serialize(data, to_bytes=True) @@ -843,21 +851,10 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | None: "skipping writing action object to store, passed data was empty." ) - self.syft_action_data_cache = data + # self.syft_action_data_cache = data return None - def _set_reprs(self, data: any) -> None: - if inspect.isclass(data): - self.syft_action_data_repr_ = truncate_str(repr_cls(data)) - else: - self.syft_action_data_repr_ = truncate_str( - data._repr_markdown_() - if hasattr(data, "_repr_markdown_") - else data.__repr__() - ) - self.syft_action_data_str_ = truncate_str(str(data)) - def _save_to_blob_storage( self, allow_empty: bool = False ) -> SyftError | SyftSuccess | SyftWarning: @@ -869,23 +866,14 @@ def _save_to_blob_storage( message=f"cannot store empty object {self.id} to the blob storage" ) try: - api = APIRegistry.api_for( - node_uid=self.syft_node_location, - user_verify_key=self.syft_client_verify_key, + result = self._save_to_blob_storage_(data) + if isinstance(result, SyftError | SyftWarning): + return result + if not TraceResultRegistry.current_thread_is_tracing(): + self._clear_cache() + return SyftSuccess( + message=f"Saved action object {self.id} to the blob store" ) - if api is None: - raise ValueError( - f"api is None. You must login to {self.syft_node_location}" - ) - if can_upload_to_blob_storage(data, api.metadata): - result = self._save_to_blob_storage_(data) - if isinstance(result, SyftError): - return result - if not TraceResultRegistry.current_thread_is_tracing(): - self._clear_cache() - return SyftSuccess( - message=f"Saved action object {self.id} to the blob store" - ) except Exception as e: print( f"Failed to save action object {self.id} to the blob store. Error: {e}" @@ -900,6 +888,17 @@ def _save_to_blob_storage( def _clear_cache(self) -> None: self.syft_action_data_cache = self.as_empty_data() + def _set_reprs(self, data: any) -> None: + if inspect.isclass(data): + self.syft_action_data_repr_ = truncate_str(repr_cls(data)) + else: + self.syft_action_data_repr_ = truncate_str( + data._repr_markdown_() + if hasattr(data, "_repr_markdown_") + else data.__repr__() + ) + self.syft_action_data_str_ = truncate_str(str(data)) + @property def is_pointer(self) -> bool: return self.syft_node_uid is not None From 2272447b80c8cb7dac92c57308b78ba3442834be Mon Sep 17 00:00:00 2001 From: dk Date: Fri, 28 Jun 2024 08:41:56 +0700 Subject: [PATCH 25/30] [syft/action_obj] stop saving data to cache if saving to the blob store --- packages/syft/src/syft/service/action/action_object.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 9f0c7baaf5d..51d208126b9 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -851,8 +851,6 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | SyftWarning | None: "skipping writing action object to store, passed data was empty." ) - # self.syft_action_data_cache = data - return None def _save_to_blob_storage( From a260de881ad279cd094e35d4d5f0ded2968ee63c Mon Sep 17 00:00:00 2001 From: khoaguin Date: Mon, 1 Jul 2024 15:26:12 +0700 Subject: [PATCH 26/30] [syft/chore] change print to `logger.debug` for blob store path and min size to upload --- packages/syft/src/syft/node/node.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index e056fd32147..977dfb07f70 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -510,12 +510,11 @@ def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: if self.dev_mode: if isinstance(self.blob_store_config, OnDiskBlobStorageConfig): - print( + logger.debug( f"Using on-disk blob storage with path: " f"{self.blob_store_config.client_config.base_directory}", - end=". ", ) - print( + logger.debug( f"Minimum object size to be saved to the blob storage: " f"{self.blob_store_config.min_blob_size} (MB)." ) From d3a6b6a9498e7f9425b3932131c559098a9121a4 Mon Sep 17 00:00:00 2001 From: dk Date: Wed, 3 Jul 2024 08:40:38 +0700 Subject: [PATCH 27/30] [syft/action_obj] saving action data cache immediately after trying to save to blob store - moving / remove some checks Co-authored-by: Shubham Gupta --- .../src/syft/service/action/action_object.py | 29 ++++++++++--------- .../src/syft/service/blob_storage/util.py | 5 ---- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 51d208126b9..4ef06a9bffc 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -798,22 +798,23 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | SyftWarning | None: from ...types.blob_storage import BlobFile from ...types.blob_storage import CreateBlobStorageEntry - api = APIRegistry.api_for(self.syft_node_location, self.syft_client_verify_key) - if api is None: - raise ValueError( - f"api is None. You must login to {self.syft_node_location}" - ) - if not can_upload_to_blob_storage(data, api.metadata): - return SyftWarning( - message=f"The action object {self.id} was not saved to " - f"the blob store but to memory cache since it is small." - ) - if not isinstance(data, ActionDataEmpty): + api = APIRegistry.api_for( + self.syft_node_location, self.syft_client_verify_key + ) if isinstance(data, BlobFile): if not data.uploaded: data._upload_to_blobstorage_from_api(api) else: + if api is None: + raise ValueError( + f"api is None. You must login to {self.syft_node_location}" + ) + if not can_upload_to_blob_storage(data, api.metadata): + return SyftWarning( + message=f"The action object {self.id} was not saved to " + f"the blob store but to memory cache since it is small." + ) serialized = serialize(data, to_bytes=True) size = sys.getsizeof(serialized) storage_entry = CreateBlobStorageEntry.from_obj(data, file_size=size) @@ -830,13 +831,13 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | SyftWarning | None: ) if allocate_method is not None: blob_deposit_object = allocate_method(storage_entry) - if isinstance(blob_deposit_object, SyftError): return blob_deposit_object result = blob_deposit_object.write(BytesIO(serialized)) if isinstance(result, SyftError): return result + self.syft_blob_storage_entry_id = ( blob_deposit_object.blob_storage_entry_id ) @@ -846,6 +847,7 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | SyftWarning | None: self.syft_action_data_type = type(data) self._set_reprs(data) self.syft_has_bool_attr = hasattr(data, "__bool__") + self.syft_action_data_cache = data else: logger.debug( "skipping writing action object to store, passed data was empty." @@ -859,10 +861,12 @@ def _save_to_blob_storage( data = self.syft_action_data if isinstance(data, SyftError): return data + if isinstance(data, ActionDataEmpty): return SyftError( message=f"cannot store empty object {self.id} to the blob storage" ) + try: result = self._save_to_blob_storage_(data) if isinstance(result, SyftError | SyftWarning): @@ -877,7 +881,6 @@ def _save_to_blob_storage( f"Failed to save action object {self.id} to the blob store. Error: {e}" ) - self.syft_action_data_cache = data return SyftWarning( message=f"The action object {self.id} was not saved to " f"the blob store but to memory cache since it is small." diff --git a/packages/syft/src/syft/service/blob_storage/util.py b/packages/syft/src/syft/service/blob_storage/util.py index 68f3250035c..e82d1de9cff 100644 --- a/packages/syft/src/syft/service/blob_storage/util.py +++ b/packages/syft/src/syft/service/blob_storage/util.py @@ -8,11 +8,6 @@ def min_size_for_blob_storage_upload(metadata: NodeMetadata | NodeMetadataJSON) -> int: - if not isinstance(metadata, (NodeMetadata | NodeMetadataJSON)): - raise ValueError( - f"argument `metadata` is type {type(metadata)}, " - f"but it should be of type NodeMetadata or NodeMetadataJSON" - ) return metadata.min_size_blob_storage_mb From 7f8517e01b6883d569622ac82aab7618cc67b2b4 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 3 Jul 2024 12:30:24 +0530 Subject: [PATCH 28/30] raise exception if exception raised in action object --- .../syft/src/syft/service/action/action_object.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 4ef06a9bffc..ea07a9d4dea 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -847,12 +847,13 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | SyftWarning | None: self.syft_action_data_type = type(data) self._set_reprs(data) self.syft_has_bool_attr = hasattr(data, "__bool__") - self.syft_action_data_cache = data else: logger.debug( "skipping writing action object to store, passed data was empty." ) + self.syft_action_data_cache = data + return None def _save_to_blob_storage( @@ -877,14 +878,7 @@ def _save_to_blob_storage( message=f"Saved action object {self.id} to the blob store" ) except Exception as e: - print( - f"Failed to save action object {self.id} to the blob store. Error: {e}" - ) - - return SyftWarning( - message=f"The action object {self.id} was not saved to " - f"the blob store but to memory cache since it is small." - ) + raise e def _clear_cache(self) -> None: self.syft_action_data_cache = self.as_empty_data() From 5718f0083b6a402e1d9bb4795e4c45434483a8ea Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 3 Jul 2024 14:37:39 +0530 Subject: [PATCH 29/30] get metadata from api_or_context method in action object --- .../src/syft/service/action/action_object.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index ea07a9d4dea..92f734c7a60 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -799,18 +799,19 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | SyftWarning | None: from ...types.blob_storage import CreateBlobStorageEntry if not isinstance(data, ActionDataEmpty): - api = APIRegistry.api_for( - self.syft_node_location, self.syft_client_verify_key - ) if isinstance(data, BlobFile): if not data.uploaded: + api = APIRegistry.api_for( + self.syft_node_location, self.syft_client_verify_key + ) data._upload_to_blobstorage_from_api(api) else: - if api is None: - raise ValueError( - f"api is None. You must login to {self.syft_node_location}" - ) - if not can_upload_to_blob_storage(data, api.metadata): + get_metadata = from_api_or_context( + func_or_path="metadata.get_metadata", + syft_node_location=self.syft_node_location, + syft_client_verify_key=self.syft_client_verify_key, + ) + if not can_upload_to_blob_storage(data, get_metadata()): return SyftWarning( message=f"The action object {self.id} was not saved to " f"the blob store but to memory cache since it is small." From 49ceece4c40ae20aec0b80ca88bed8ad94480188 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 3 Jul 2024 14:43:22 +0530 Subject: [PATCH 30/30] fix linting --- packages/syft/src/syft/service/action/action_object.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/action/action_object.py b/packages/syft/src/syft/service/action/action_object.py index 92f734c7a60..b9ffd16ebf6 100644 --- a/packages/syft/src/syft/service/action/action_object.py +++ b/packages/syft/src/syft/service/action/action_object.py @@ -811,7 +811,9 @@ def _save_to_blob_storage_(self, data: Any) -> SyftError | SyftWarning | None: syft_node_location=self.syft_node_location, syft_client_verify_key=self.syft_client_verify_key, ) - if not can_upload_to_blob_storage(data, get_metadata()): + if get_metadata is not None and not can_upload_to_blob_storage( + data, get_metadata() + ): return SyftWarning( message=f"The action object {self.id} was not saved to " f"the blob store but to memory cache since it is small."