Skip to content

Commit

Permalink
Merge pull request #9157 from OpenMined/eelco/deposit-result-l2
Browse files Browse the repository at this point in the history
L2 accept by depositing result
  • Loading branch information
eelcovdw authored Aug 13, 2024
2 parents d81d7fe + 1e18fcb commit e203f03
Showing 1 changed file with 265 additions and 8 deletions.
273 changes: 265 additions & 8 deletions packages/syft/src/syft/service/request/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
from ..context import AuthedServiceContext
from ..context import ChangeContext
from ..job.job_stash import Job
from ..job.job_stash import JobInfo
from ..job.job_stash import JobStatus
from ..notification.notifications import Notification
from ..policy.policy import UserPolicy
from ..response import SyftError
from ..response import SyftSuccess
from ..user.user import UserView
Expand Down Expand Up @@ -804,6 +806,8 @@ def deposit_result(
result: Any,
log_stdout: str = "",
log_stderr: str = "",
approve: bool | None = None,
**kwargs: dict[str, Any],
) -> Job | SyftError:
"""
Adds a result to this Request:
Expand All @@ -812,15 +816,40 @@ def deposit_result(
- Create Job with new result and logs
- Update the output history
If this is a L2 request, the old accept_by_deposit_result will be used.
Args:
result (Any): ActionObject or any object to be saved as an ActionObject.
logs (str | None, optional): Optional logs to be saved with the Job. Defaults to None.
log_stdout (str): stdout logs.
log_stderr (str): stderr logs.
approve (bool, optional): Only supported for L2 requests. If True, the request will be approved.
Defaults to None.
Returns:
Job | SyftError: Job object if successful, else SyftError.
"""

# TODO check if this is a low-side request. If not, SyftError
# L2 request
# TODO specify behavior and rewrite old flow
if not self.is_l0_deployment:
if approve is None:
approve = prompt_warning_message(
"Depositing a result on this request will approve it.",
confirm=True,
)
if approve is False:
return SyftError(
message="Cannot deposit result without approving the request."
)
else:
return self._deposit_result_l2(result, **kwargs)

# L0 request
if approve:
return SyftError(
message="This is a request from the low side, it can only be approved by syncing the results."
)

api = self._get_api()
if isinstance(api, SyftError):
Expand All @@ -829,12 +858,6 @@ def deposit_result(
if isinstance(code, SyftError):
return code

if not self.is_l0_deployment:
return SyftError(
message="deposit_result is only available for low side code requests. "
"Please use request.approve() instead."
)

# Create ActionObject
action_object = self._create_action_object_for_deposited_result(result)
if isinstance(action_object, SyftError):
Expand All @@ -860,6 +883,240 @@ def deposit_result(

return job

def _get_job_from_action_object(self, action_object: ActionObject) -> Job | None:
api = self._get_api()
if isinstance(api, SyftError):
return None

job = api.services.job.get_by_result_id(action_object.id.id)
return job

def _get_latest_or_create_job(self) -> Job | SyftError:
"""Get the latest job for this requests user_code, or creates one if no jobs exist"""
api = self._get_api()
if isinstance(api, SyftError):
return api
job_service = api.services.job

existing_jobs = job_service.get_by_user_code_id(self.code.id)
if isinstance(existing_jobs, SyftError):
return existing_jobs

if len(existing_jobs) == 0:
job = job_service.create_job_for_user_code_id(
user_code_id=self.code.id,
add_code_owner_read_permissions=True,
)
else:
job = existing_jobs[-1]
res = job_service.add_read_permission_job_for_code_owner(job, self.code)
res = job_service.add_read_permission_log_for_code_owner(
job.log_id, self.code
)
print(res)

return job

def _deposit_result_l2(
self,
result: Any,
force: bool = False,
) -> Job | SyftError:
# this code is extremely brittle because its a work around that relies on
# the type of request being very specifically tied to code which needs approving

# Special case for results from Jobs (High-low side async)
if isinstance(result, JobInfo):
job_info = result
if not job_info.includes_result:
return SyftError(
message="JobInfo should not include result. Use sync_job instead."
)
result = job_info.result
elif isinstance(result, ActionObject):
# Do not allow accepting a result produced by a Job,
# This can cause an inconsistent Job state
action_object_job = self._get_job_from_action_object(result)
if action_object_job is not None:
return SyftError(
message=f"This ActionObject is the result of Job {action_object_job.id}, "
f"please use the `Job.info` instead."
)
else:
job_info = JobInfo(
includes_metadata=True,
includes_result=True,
status=JobStatus.COMPLETED,
resolved=True,
)
else:
# NOTE result is added at the end of function (once ActionObject is created)
job_info = JobInfo(
includes_metadata=True,
includes_result=True,
status=JobStatus.COMPLETED,
resolved=True,
)

user_code_status_change: UserCodeStatusChange = self.changes[0]
code = user_code_status_change.code
output_history = code.output_history
if isinstance(output_history, SyftError):
return output_history
output_policy = code.output_policy
if isinstance(output_policy, SyftError):
return output_policy
if isinstance(user_code_status_change.code.output_policy_type, UserPolicy):
return SyftError(
message="UserCode uses an user-submitted custom policy. Please use .approve()"
)

if not user_code_status_change.change_object_is_type(UserCodeStatusCollection):
raise TypeError(
f"accept_by_depositing_result can only be run on {UserCodeStatusCollection} not "
f"{user_code_status_change.linked_obj.object_type}"
)
if not type(user_code_status_change) == UserCodeStatusChange:
raise TypeError(
f"accept_by_depositing_result can only be run on {UserCodeStatusChange} not "
f"{type(user_code_status_change)}"
)

api = APIRegistry.api_for(self.server_uid, self.syft_client_verify_key)
if not api:
raise Exception(
f"No access to Syft API. Please login to {self.server_uid} first."
)
if api.signing_key is None:
raise ValueError(f"{api}'s signing key is None")
is_approved = user_code_status_change.approved

permission_request = self.approve(approve_nested=True)
if isinstance(permission_request, SyftError):
return permission_request

job = self._get_latest_or_create_job()
if isinstance(job, SyftError):
return job

# This weird order is due to the fact that state is None before calling approve
# we could fix it in a future release
if is_approved:
if not force:
return SyftError(
message="Already approved, if you want to force updating the result use force=True"
)
# TODO: this should overwrite the output history instead
action_obj_id = output_history[0].output_ids[0] # type: ignore

if not isinstance(result, ActionObject):
action_object = ActionObject.from_obj(
result,
id=action_obj_id,
syft_client_verify_key=api.signing_key.verify_key,
syft_server_location=api.server_uid,
)
else:
action_object = result
action_object_is_from_this_node = (
self.syft_server_location == action_object.syft_server_location
)
if (
action_object.syft_blob_storage_entry_id is None
or not action_object_is_from_this_node
):
action_object.reload_cache()
action_object.syft_server_location = self.syft_server_location
action_object.syft_client_verify_key = self.syft_client_verify_key
blob_store_result = action_object._save_to_blob_storage()
if isinstance(blob_store_result, SyftError):
return blob_store_result
result = api.services.action.set(action_object)
if isinstance(result, SyftError):
return result
else:
if not isinstance(result, ActionObject):
action_object = ActionObject.from_obj(
result,
syft_client_verify_key=api.signing_key.verify_key,
syft_server_location=api.server_uid,
)
else:
action_object = result

# TODO: proper check for if actionobject is already uploaded
# we also need this for manualy syncing
action_object_is_from_this_node = (
self.syft_server_location == action_object.syft_server_location
)
if (
action_object.syft_blob_storage_entry_id is None
or not action_object_is_from_this_node
):
action_object.reload_cache()
action_object.syft_server_location = self.syft_server_location
action_object.syft_client_verify_key = self.syft_client_verify_key
blob_store_result = action_object._save_to_blob_storage()
if isinstance(blob_store_result, SyftError):
return blob_store_result
result = api.services.action.set(action_object)
if isinstance(result, SyftError):
return result

action_object_link = LinkedObject.from_obj(
result, server_uid=self.server_uid
)
permission_change = ActionStoreChange(
linked_obj=action_object_link,
apply_permission_type=ActionPermission.READ,
)

new_changes = [permission_change]
result_request = api.services.request.add_changes(
uid=self.id, changes=new_changes
)
if isinstance(result_request, SyftError):
return result_request
self = result_request

approved = self.approve(disable_warnings=True, approve_nested=True)
if isinstance(approved, SyftError):
return approved

input_ids = {}
if code.input_policy is not None:
for inps in code.input_policy.inputs.values():
input_ids.update(inps)

res = api.services.code.store_execution_output(
user_code_id=code.id,
outputs=result,
job_id=job.id,
input_ids=input_ids,
)
if isinstance(res, SyftError):
return res

job_info.result = action_object
job_info.status = (
JobStatus.ERRORED
if isinstance(action_object.syft_action_data, Err)
else JobStatus.COMPLETED
)

existing_result = job.result.id if job.result is not None else None
print(
f"Job({job.id}) Setting new result {existing_result} -> {job_info.result.id}"
)
job.apply_info(job_info)

job_service = api.services.job
res = job_service.update(job)
if isinstance(res, SyftError):
return res

return job

@deprecated(
return_syfterror=True,
reason="accept_by_depositing_result has been removed. Use approve instead to "
Expand Down

0 comments on commit e203f03

Please sign in to comment.