From 2f1303fabe913c9da4fe2fd05a2339dccbf505d4 Mon Sep 17 00:00:00 2001 From: eelcovdw Date: Mon, 12 Aug 2024 08:58:32 +0200 Subject: [PATCH 1/4] deposit L2 --- .../syft/src/syft/service/request/request.py | 39 ++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index e6ea6f77bb9..e9229edd66a 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -590,7 +590,7 @@ def approve( disable_warnings: bool = False, approve_nested: bool = False, **kwargs: dict, - ) -> Result[SyftSuccess, SyftError]: + ) -> SyftSuccess | SyftError: api = self._get_api() if isinstance(api, SyftError): return api @@ -804,6 +804,7 @@ def deposit_result( result: Any, log_stdout: str = "", log_stderr: str = "", + approve: bool | None = None, ) -> Job | SyftError: """ Adds a result to this Request: @@ -815,6 +816,8 @@ def deposit_result( Args: result (Any): ActionObject or any object to be saved as an ActionObject. logs (str | None, optional): Optional logs to be saved with the Job. Defaults to None. + approve (bool | None, optional): Optional approval flag. + Can only be used if this is a high-side request. Defaults to None. Returns: Job | SyftError: Job object if successful, else SyftError. @@ -829,11 +832,37 @@ def deposit_result( if isinstance(code, SyftError): return code + # By default, do not add permissions for code owner + # Permissions are added: + # - when syncing the Job (for l0 deployments) + # - when depositing the result with approve=True (for high-side requests) + add_permissions_for_code_owner = False + if self.is_l0_deployment: + if approve is not None: + return SyftError( + message="Approve is only available for high side code requests." + "Please use request.deposit_result() without approve instead, and approve by syncing." + ) if not self.is_l0_deployment: - return SyftError( - message="deposit_result is only available for low side code requests. " - "Please use request.approve() instead." - ) + if approve is None: + return SyftError( + message="Approve flag is required for high-side code requests." + ) + if approve: + approve_res = self.approve() + if isinstance(approve_res, SyftError): + return approve_res + add_permissions_for_code_owner = True + else: + prompt_res = prompt_warning_message( + message=( + "By not approving this request, the data scientist will not be able to " + "access the results. Are you sure you want to continue?" + ), + confirm=True, + ) + if not prompt_res: + return SyftError(message="Result not deposited.") # Create ActionObject action_object = self._create_action_object_for_deposited_result(result) From 51ba9e323539c0df3c8b7a370f6a82d27d83dadc Mon Sep 17 00:00:00 2001 From: eelcovdw Date: Mon, 12 Aug 2024 09:12:41 +0200 Subject: [PATCH 2/4] add old deposit result --- .../syft/src/syft/service/request/request.py | 198 +++++++++++++++++- 1 file changed, 197 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index e9229edd66a..a2dc47bd6ce 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -49,7 +49,7 @@ from ..code.user_code import UserCodeStatusCollection from ..context import AuthedServiceContext from ..context import ChangeContext -from ..job.job_stash import Job +from ..job.job_stash import Job, JobInfo from ..job.job_stash import JobStatus from ..notification.notifications import Notification from ..response import SyftError @@ -889,6 +889,202 @@ def deposit_result( return job + def accept_by_depositing_result( + self, result: Any, force: bool = False + ) -> SyftError | SyftSuccess: + # this code is extremely brittle because its a work around that relies on + # the type of request being very specifically tied to code which needs approving + + # Special case for results from Jobs (High-low side async) + if isinstance(result, JobInfo): + job_info = result + if not job_info.includes_result: + return SyftError( + message="JobInfo should not include result. Use sync_job instead." + ) + result = job_info.result + if isinstance(result, ActionObject): + # Do not allow accepting a result produced by a Job, + # This can cause an inconsistent Job state + action_object_job = self._get_job_from_action_object(result) + if action_object_job is not None: + return SyftError( + message=f"This ActionObject is the result of Job {action_object_job.id}, " + f"please use the `Job.info` instead." + ) + else: + job_info = JobInfo( + includes_metadata=True, + includes_result=True, + status=JobStatus.COMPLETED, + resolved=True, + ) + else: + # NOTE result is added at the end of function (once ActionObject is created) + job_info = JobInfo( + includes_metadata=True, + includes_result=True, + status=JobStatus.COMPLETED, + resolved=True, + ) + + user_code_status_change: UserCodeStatusChange = self.changes[0] + code = user_code_status_change.code + output_history = code.output_history + if isinstance(output_history, SyftError): + return output_history + output_policy = code.output_policy + if isinstance(output_policy, SyftError): + return output_policy + if isinstance(user_code_status_change.code.output_policy_type, UserPolicy): + return SyftError( + message="UserCode uses an user-submitted custom policy. Please use .approve()" + ) + + if not user_code_status_change.change_object_is_type(UserCodeStatusCollection): + raise TypeError( + f"accept_by_depositing_result can only be run on {UserCodeStatusCollection} not " + f"{user_code_status_change.linked_obj.object_type}" + ) + if not type(user_code_status_change) == UserCodeStatusChange: + raise TypeError( + f"accept_by_depositing_result can only be run on {UserCodeStatusChange} not " + f"{type(user_code_status_change)}" + ) + + api = APIRegistry.api_for(self.node_uid, self.syft_client_verify_key) + if not api: + raise Exception( + f"No access to Syft API. Please login to {self.node_uid} first." + ) + if api.signing_key is None: + raise ValueError(f"{api}'s signing key is None") + is_approved = user_code_status_change.approved + + permission_request = self.approve(approve_nested=True) + if isinstance(permission_request, SyftError): + return permission_request + + job = self._get_latest_or_create_job() + if isinstance(job, SyftError): + return job + + # This weird order is due to the fact that state is None before calling approve + # we could fix it in a future release + if is_approved: + if not force: + return SyftError( + message="Already approved, if you want to force updating the result use force=True" + ) + # TODO: this should overwrite the output history instead + action_obj_id = output_history[0].output_ids[0] # type: ignore + + if not isinstance(result, ActionObject): + action_object = ActionObject.from_obj( + result, + id=action_obj_id, + syft_client_verify_key=api.signing_key.verify_key, + syft_node_location=api.node_uid, + ) + else: + action_object = result + action_object_is_from_this_node = ( + self.syft_node_location == action_object.syft_node_location + ) + if ( + action_object.syft_blob_storage_entry_id is None + or not action_object_is_from_this_node + ): + action_object.reload_cache() + action_object.syft_node_location = self.syft_node_location + action_object.syft_client_verify_key = self.syft_client_verify_key + blob_store_result = action_object._save_to_blob_storage() + if isinstance(blob_store_result, SyftError): + return blob_store_result + result = api.services.action.set(action_object) + if isinstance(result, SyftError): + return result + else: + if not isinstance(result, ActionObject): + action_object = ActionObject.from_obj( + result, + syft_client_verify_key=api.signing_key.verify_key, + syft_node_location=api.node_uid, + ) + else: + action_object = result + + # TODO: proper check for if actionobject is already uploaded + # we also need this for manualy syncing + action_object_is_from_this_node = ( + self.syft_node_location == action_object.syft_node_location + ) + if ( + action_object.syft_blob_storage_entry_id is None + or not action_object_is_from_this_node + ): + action_object.reload_cache() + action_object.syft_node_location = self.syft_node_location + action_object.syft_client_verify_key = self.syft_client_verify_key + blob_store_result = action_object._save_to_blob_storage() + if isinstance(blob_store_result, SyftError): + return blob_store_result + result = api.services.action.set(action_object) + if isinstance(result, SyftError): + return result + + action_object_link = LinkedObject.from_obj(result, node_uid=self.node_uid) + permission_change = ActionStoreChange( + linked_obj=action_object_link, + apply_permission_type=ActionPermission.READ, + ) + + new_changes = [permission_change] + result_request = api.services.request.add_changes( + uid=self.id, changes=new_changes + ) + if isinstance(result_request, SyftError): + return result_request + self = result_request + + approved = self.approve(disable_warnings=True, approve_nested=True) + if isinstance(approved, SyftError): + return approved + + input_ids = {} + if code.input_policy is not None: + for inps in code.input_policy.inputs.values(): + input_ids.update(inps) + + res = api.services.code.store_as_history( + user_code_id=code.id, + outputs=result, + job_id=job.id, + input_ids=input_ids, + ) + if isinstance(res, SyftError): + return res + + job_info.result = action_object + job_info.status = ( + JobStatus.ERRORED + if isinstance(action_object.syft_action_data, Err) + else JobStatus.COMPLETED + ) + + existing_result = job.result.id if job.result is not None else None + print( + f"Job({job.id}) Setting new result {existing_result} -> {job_info.result.id}" + ) + job.apply_info(job_info) + + job_service = api.services.job + res = job_service.update(job) + if isinstance(res, SyftError): + return res + + return SyftSuccess(message="Request submitted for updating result.") + @deprecated( return_syfterror=True, reason="accept_by_depositing_result has been removed. Use approve instead to " From feffebb89bfaaadf7389a0c8eca556b1b37d83ea Mon Sep 17 00:00:00 2001 From: eelcovdw Date: Mon, 12 Aug 2024 10:59:18 +0200 Subject: [PATCH 3/4] add old L2 deposit result flow --- .../syft/src/syft/service/request/request.py | 133 +++++++++++------- 1 file changed, 80 insertions(+), 53 deletions(-) diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index a2dc47bd6ce..4ff3fa7c32c 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -49,9 +49,11 @@ from ..code.user_code import UserCodeStatusCollection from ..context import AuthedServiceContext from ..context import ChangeContext -from ..job.job_stash import Job, JobInfo +from ..job.job_stash import Job +from ..job.job_stash import JobInfo from ..job.job_stash import JobStatus from ..notification.notifications import Notification +from ..policy.policy import UserPolicy from ..response import SyftError from ..response import SyftSuccess from ..user.user import UserView @@ -590,7 +592,7 @@ def approve( disable_warnings: bool = False, approve_nested: bool = False, **kwargs: dict, - ) -> SyftSuccess | SyftError: + ) -> Result[SyftSuccess, SyftError]: api = self._get_api() if isinstance(api, SyftError): return api @@ -805,6 +807,7 @@ def deposit_result( log_stdout: str = "", log_stderr: str = "", approve: bool | None = None, + **kwargs: dict[str, Any], ) -> Job | SyftError: """ Adds a result to this Request: @@ -813,18 +816,36 @@ def deposit_result( - Create Job with new result and logs - Update the output history + If this is a L2 request, the old accept_by_deposit_result will be used. + Args: result (Any): ActionObject or any object to be saved as an ActionObject. - logs (str | None, optional): Optional logs to be saved with the Job. Defaults to None. - approve (bool | None, optional): Optional approval flag. - Can only be used if this is a high-side request. Defaults to None. + log_stdout (str): stdout logs. + log_stderr (str): stderr logs. + approve (bool, optional): Only supported for L2 requests. If True, the request will be approved. + Defaults to None. + Returns: Job | SyftError: Job object if successful, else SyftError. """ - # TODO check if this is a low-side request. If not, SyftError + # L2 request + # TODO specify behavior and rewrite old flow + if not self.is_l0_deployment: + if approve is None: + approve = prompt_warning_message( + "Depositing a result on this request will approve it.", + confirm=True, + ) + if approve is False: + return SyftError( + message="Cannot deposit result without approving the request." + ) + else: + return self._deposit_result_l2(result, **kwargs) + # L0 request api = self._get_api() if isinstance(api, SyftError): return api @@ -832,38 +853,6 @@ def deposit_result( if isinstance(code, SyftError): return code - # By default, do not add permissions for code owner - # Permissions are added: - # - when syncing the Job (for l0 deployments) - # - when depositing the result with approve=True (for high-side requests) - add_permissions_for_code_owner = False - if self.is_l0_deployment: - if approve is not None: - return SyftError( - message="Approve is only available for high side code requests." - "Please use request.deposit_result() without approve instead, and approve by syncing." - ) - if not self.is_l0_deployment: - if approve is None: - return SyftError( - message="Approve flag is required for high-side code requests." - ) - if approve: - approve_res = self.approve() - if isinstance(approve_res, SyftError): - return approve_res - add_permissions_for_code_owner = True - else: - prompt_res = prompt_warning_message( - message=( - "By not approving this request, the data scientist will not be able to " - "access the results. Are you sure you want to continue?" - ), - confirm=True, - ) - if not prompt_res: - return SyftError(message="Result not deposited.") - # Create ActionObject action_object = self._create_action_object_for_deposited_result(result) if isinstance(action_object, SyftError): @@ -889,9 +878,45 @@ def deposit_result( return job - def accept_by_depositing_result( - self, result: Any, force: bool = False - ) -> SyftError | SyftSuccess: + def _get_job_from_action_object(self, action_object: ActionObject) -> Job | None: + api = self._get_api() + if isinstance(api, SyftError): + return None + + job = api.services.job.get_by_result_id(action_object.id.id) + return job + + def _get_latest_or_create_job(self) -> Job | SyftError: + """Get the latest job for this requests user_code, or creates one if no jobs exist""" + api = self._get_api() + if isinstance(api, SyftError): + return api + job_service = api.services.job + + existing_jobs = job_service.get_by_user_code_id(self.code.id) + if isinstance(existing_jobs, SyftError): + return existing_jobs + + if len(existing_jobs) == 0: + job = job_service.create_job_for_user_code_id( + user_code_id=self.code.id, + add_code_owner_read_permissions=True, + ) + else: + job = existing_jobs[-1] + res = job_service.add_read_permission_job_for_code_owner(job, self.code) + res = job_service.add_read_permission_log_for_code_owner( + job.log_id, self.code + ) + print(res) + + return job + + def _deposit_result_l2( + self, + result: Any, + force: bool = False, + ) -> Job | SyftError: # this code is extremely brittle because its a work around that relies on # the type of request being very specifically tied to code which needs approving @@ -903,7 +928,7 @@ def accept_by_depositing_result( message="JobInfo should not include result. Use sync_job instead." ) result = job_info.result - if isinstance(result, ActionObject): + elif isinstance(result, ActionObject): # Do not allow accepting a result produced by a Job, # This can cause an inconsistent Job state action_object_job = self._get_job_from_action_object(result) @@ -952,10 +977,10 @@ def accept_by_depositing_result( f"{type(user_code_status_change)}" ) - api = APIRegistry.api_for(self.node_uid, self.syft_client_verify_key) + api = APIRegistry.api_for(self.server_uid, self.syft_client_verify_key) if not api: raise Exception( - f"No access to Syft API. Please login to {self.node_uid} first." + f"No access to Syft API. Please login to {self.server_uid} first." ) if api.signing_key is None: raise ValueError(f"{api}'s signing key is None") @@ -984,19 +1009,19 @@ def accept_by_depositing_result( result, id=action_obj_id, syft_client_verify_key=api.signing_key.verify_key, - syft_node_location=api.node_uid, + syft_server_location=api.server_uid, ) else: action_object = result action_object_is_from_this_node = ( - self.syft_node_location == action_object.syft_node_location + self.syft_server_location == action_object.syft_server_location ) if ( action_object.syft_blob_storage_entry_id is None or not action_object_is_from_this_node ): action_object.reload_cache() - action_object.syft_node_location = self.syft_node_location + action_object.syft_server_location = self.syft_server_location action_object.syft_client_verify_key = self.syft_client_verify_key blob_store_result = action_object._save_to_blob_storage() if isinstance(blob_store_result, SyftError): @@ -1009,7 +1034,7 @@ def accept_by_depositing_result( action_object = ActionObject.from_obj( result, syft_client_verify_key=api.signing_key.verify_key, - syft_node_location=api.node_uid, + syft_server_location=api.server_uid, ) else: action_object = result @@ -1017,14 +1042,14 @@ def accept_by_depositing_result( # TODO: proper check for if actionobject is already uploaded # we also need this for manualy syncing action_object_is_from_this_node = ( - self.syft_node_location == action_object.syft_node_location + self.syft_server_location == action_object.syft_server_location ) if ( action_object.syft_blob_storage_entry_id is None or not action_object_is_from_this_node ): action_object.reload_cache() - action_object.syft_node_location = self.syft_node_location + action_object.syft_server_location = self.syft_server_location action_object.syft_client_verify_key = self.syft_client_verify_key blob_store_result = action_object._save_to_blob_storage() if isinstance(blob_store_result, SyftError): @@ -1033,7 +1058,9 @@ def accept_by_depositing_result( if isinstance(result, SyftError): return result - action_object_link = LinkedObject.from_obj(result, node_uid=self.node_uid) + action_object_link = LinkedObject.from_obj( + result, server_uid=self.server_uid + ) permission_change = ActionStoreChange( linked_obj=action_object_link, apply_permission_type=ActionPermission.READ, @@ -1056,7 +1083,7 @@ def accept_by_depositing_result( for inps in code.input_policy.inputs.values(): input_ids.update(inps) - res = api.services.code.store_as_history( + res = api.services.code.store_execution_output( user_code_id=code.id, outputs=result, job_id=job.id, @@ -1083,7 +1110,7 @@ def accept_by_depositing_result( if isinstance(res, SyftError): return res - return SyftSuccess(message="Request submitted for updating result.") + return job @deprecated( return_syfterror=True, From b79363ebb7d15d2d125dfc349990341dee4bfa2d Mon Sep 17 00:00:00 2001 From: eelcovdw Date: Mon, 12 Aug 2024 11:02:57 +0200 Subject: [PATCH 4/4] error handling --- packages/syft/src/syft/service/request/request.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index 4ff3fa7c32c..087f15627df 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -846,6 +846,11 @@ def deposit_result( return self._deposit_result_l2(result, **kwargs) # L0 request + if approve: + return SyftError( + message="This is a request from the low side, it can only be approved by syncing the results." + ) + api = self._get_api() if isinstance(api, SyftError): return api