From 42c9700ea3f6f02522b37d0c38572051c62321b8 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Tue, 16 Jan 2024 14:17:56 +0530 Subject: [PATCH 01/18] add create_image_request api to submit request to create images - create CreateImageChange class to create image on approve - add a method on DockerWokerConfig to check if we can build image locally - Co-authored-by: Khoa Nguyen --- notebooks/api/0.8/10-container-images.ipynb | 113 +++++++++++++++++- packages/syft/src/syft/__init__.py | 1 + .../syft/src/syft/custom_worker/builder.py | 4 +- .../syft/src/syft/custom_worker/config.py | 16 +++ .../src/syft/protocol/protocol_version.json | 7 ++ packages/syft/src/syft/service/context.py | 7 ++ .../syft/src/syft/service/request/request.py | 56 ++++++++- .../service/worker/worker_image_service.py | 44 +++++++ 8 files changed, 242 insertions(+), 6 deletions(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index ae4842fb67b..9e8a6060f9e 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -31,7 +31,6 @@ "\n", "sy.requires(SYFT_VERSION)\n", "# syft absolute\n", - "from syft.custom_worker.config import DockerWorkerConfig\n", "from syft.service.worker.image_registry import SyftImageRegistry\n", "from syft.service.worker.worker_image import SyftWorkerImage\n", "\n", @@ -131,9 +130,117 @@ "metadata": {}, "outputs": [], "source": [ - "docker_config = DockerWorkerConfig(dockerfile=custom_dockerfile_str)" + "docker_config = sy.DockerWorkerConfig(dockerfile=custom_dockerfile_str)" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "09bd3d35-f995-41f4-8dc6-565621597e94", + "metadata": {}, + "outputs": [], + "source": [ + "docker_config.check_build(tag=\"openmined/my-tag:version\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "845d3577-4b80-4968-aa8c-ea609b194f7e", + "metadata": {}, + "outputs": [], + "source": [ + "result = domain_client.api.services.worker_image.create_image_request(\n", + " config=docker_config,\n", + " tag=\"openmined/my-tag:version2\",\n", + " reason=\"I Want to do some DS stuff with pydicom\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11dcf42f-9df5-4e10-af4b-ad33c4623141", + "metadata": {}, + "outputs": [], + "source": [ + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4f7b6220-d162-4854-b2b3-a443df258c89", + "metadata": {}, + "outputs": [], + "source": [ + "res = domain_client.requests[0].approve()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bef26c9d-d9e5-44f2-ba11-11936d4587a1", + "metadata": {}, + "outputs": [], + "source": [ + "res" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18319ecf-6b45-4bbf-a489-1cac37acc9ad", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.api.services.worker_image" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b7c2bd78-9740-41bb-b441-d88c6360f7db", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.code.__dict__.keys()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1900c7c5-11c0-4279-8ca6-49c865661c3d", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.api.services.worker_image.submit_dockerfile?" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b05506fb-0872-4f3c-8875-7def7df5067b", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "08b81390-60b1-404c-ba71-728ba091c982", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7a18b662-f20c-41f5-b191-3db083c28ca0", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": null, @@ -1059,7 +1166,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.9.7" } }, "nbformat": 4, diff --git a/packages/syft/src/syft/__init__.py b/packages/syft/src/syft/__init__.py index cee87942ce3..05cc75a28cc 100644 --- a/packages/syft/src/syft/__init__.py +++ b/packages/syft/src/syft/__init__.py @@ -24,6 +24,7 @@ from .client.search import SearchResults # noqa: F401 from .client.user_settings import UserSettings # noqa: F401 from .client.user_settings import settings # noqa: F401 +from .custom_worker.config import DockerWorkerConfig # noqa: F401 from .external import OBLV # noqa: F401 from .external import enable_external_lib # noqa: F401 from .node.credentials import SyftSigningKey # noqa: F401 diff --git a/packages/syft/src/syft/custom_worker/builder.py b/packages/syft/src/syft/custom_worker/builder.py index f1e148287d6..6f5638ec637 100644 --- a/packages/syft/src/syft/custom_worker/builder.py +++ b/packages/syft/src/syft/custom_worker/builder.py @@ -93,12 +93,12 @@ def _build_template(self, config: CustomWorkerConfig, **kwargs: Any): def _build_image(self, tag: str, **build_opts) -> Tuple[Image, Iterable]: # Core docker build call. Func signature should match with Docker SDK's BuildApiMixin with contextlib.closing(docker.from_env()) as client: - image = client.images.build( + image_result = client.images.build( tag=tag, timeout=self.BUILD_MAX_WAIT, **build_opts, ) - return image + return image_result def _push_image( self, diff --git a/packages/syft/src/syft/custom_worker/config.py b/packages/syft/src/syft/custom_worker/config.py index 40ef66606c7..286e42e2704 100644 --- a/packages/syft/src/syft/custom_worker/config.py +++ b/packages/syft/src/syft/custom_worker/config.py @@ -5,9 +5,11 @@ from typing import Dict from typing import List from typing import Optional +from typing import Tuple from typing import Union # third party +from docker.models.images import Image from packaging import version from pydantic import validator from typing_extensions import Self @@ -15,6 +17,8 @@ # relative from ..serde.serializable import serializable +from ..service.response import SyftError +from ..service.response import SyftSuccess from ..types.base import SyftBaseModel PYTHON_DEFAULT_VER = "3.11" @@ -124,3 +128,15 @@ def __hash__(self) -> int: def __str__(self) -> str: return self.dockerfile + + def check_build(self, tag: str, **kwargs) -> Tuple[Image, SyftSuccess]: + # relative + from ..service.worker.utils import parse_output + from .builder import CustomWorkerBuilder + + builder = CustomWorkerBuilder() + try: + _, logs = builder.build_image(config=self, tag=tag, **kwargs) + return SyftSuccess(message=parse_output(logs)) + except Exception as e: + return SyftError(message=f"Failed to build image !! Error: {str(e)}.") diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index e4d30477721..7e3ea26ee62 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -1082,6 +1082,13 @@ "hash": "911d3e550721c78702aa819082d431305b491c597f23b70d8a7118c801e69694", "action": "add" } + }, + "CreateCustomImageChange": { + "1": { + "version": 1, + "hash": "197b765666d605cf601bd69bff21b4a31896b6acd14af38739a5047ca29ff015", + "action": "add" + } } } } diff --git a/packages/syft/src/syft/service/context.py b/packages/syft/src/syft/service/context.py index ee44738a93f..bc05e6b97fd 100644 --- a/packages/syft/src/syft/service/context.py +++ b/packages/syft/src/syft/service/context.py @@ -84,3 +84,10 @@ def from_service(context: AuthedServiceContext) -> Self: approving_user_credentials=context.credentials, extra_kwargs=context.extra_kwargs, ) + + def to_service_ctx(self) -> AuthedServiceContext: + return AuthedServiceContext( + node=self.node, + credentials=self.approving_user_credentials, + extra_kwargs=self.extra_kwargs, + ) diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index 6278fa4b997..ea9871c7130 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -19,6 +19,7 @@ # relative from ...abstract_node import NodeSideType from ...client.api import APIRegistry +from ...custom_worker.config import WorkerConfig from ...node.credentials import SyftVerifyKey from ...serde.serializable import serializable from ...serde.serialize import _serialize @@ -182,6 +183,57 @@ def __repr_syft_nested__(self): {self.linked_obj.object_type.__canonical_name__}:{self.linked_obj.object_uid.short()}" +@serializable() +class CreateCustomImageChange(Change): + __canonical_name__ = "CreateCustomImageChange" + __version__ = SYFT_OBJECT_VERSION_1 + + config: WorkerConfig + tag: str + + __repr_attrs__ = ["config", "tag"] + + def _run( + self, context: ChangeContext, apply: bool + ) -> Result[SyftSuccess, SyftError]: + try: + worker_image_service = context.node.get_service("SyftWorkerImageService") + + service_context = context.to_service_ctx() + result = worker_image_service.submit_dockerfile( + service_context, docker_config=self.config + ) + + if isinstance(result, SyftError): + return Err(result) + + result = worker_image_service.stash.get_by_docker_config( + service_context.credentials, config=self.config + ) + + if result.is_err(): + return Ok(SyftError(message=f"{result.err()}")) + + worker_image = result.ok() + + build_result = worker_image_service.build( + service_context, image_uid=worker_image.id, tag=self.tag + ) + return Ok(build_result) + + except Exception as e: + return Err(SyftError(message=f"Failed to create/build image: {e}")) + + def apply(self, context: ChangeContext) -> Result[SyftSuccess, SyftError]: + return self._run(context=context, apply=True) + + def undo(self, context: ChangeContext) -> Result[SyftSuccess, SyftError]: + return self._run(context=context, apply=False) + + def __repr_syft_nested__(self): + return f"Create Image for Config: {self.config} with tag: {self.tag}" + + @serializable() class Request(SyftObject): __canonical_name__ = "Request" @@ -355,7 +407,9 @@ def approve(self, disable_warnings: bool = False, approve_nested: bool = False): metadata = api.connection.get_node_metadata(api.signing_key) message, is_enclave = None, False - if len(self.codes) > 1 and not approve_nested: + is_code_request = not isinstance(self.codes, SyftError) + + if is_code_request and len(self.codes) > 1 and not approve_nested: return SyftError( message="Multiple codes detected, please use approve_nested=True" ) diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index 9b8daf5c192..b37436c0bcf 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -10,6 +10,7 @@ import pydantic # relative +from ...custom_worker.config import CustomWorkerConfig from ...custom_worker.config import DockerWorkerConfig from ...serde.serializable import serializable from ...store.document_store import DocumentStore @@ -17,6 +18,9 @@ from ...types.dicttuple import DictTuple from ...types.uid import UID from ..context import AuthedServiceContext +from ..request.request import CreateCustomImageChange +from ..request.request import SubmitRequest +from ..request.request_service import RequestService from ..response import SyftError from ..response import SyftSuccess from ..service import AbstractService @@ -62,6 +66,46 @@ def submit_dockerfile( message=f"Dockerfile ID: {worker_image.id} successfully submitted." ) + @service_method( + path="worker_image.create_image_request", + name="create_image_request", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def create_image_request( + self, + context: AuthedServiceContext, + config: DockerWorkerConfig, + tag: str, + reason: Optional[str] = "", + ) -> Union[SyftSuccess, SyftError]: + """Request Image creation for the given config.""" + + if isinstance(config, CustomWorkerConfig): + return SyftError(message="We only support DockerWorkerConfig.") + + search_result = self.stash.get_by_docker_config( + credentials=context.credentials, config=config + ) + + if search_result.is_err(): + return SyftError(message=str(search_result.err())) + + worker_image: SyftWorkerImage = search_result.ok() + + if worker_image is not None: + return SyftError(message=f"Image already exists for config: {config}") + + create_image_change = CreateCustomImageChange(config=config, tag=tag) + + changes = [create_image_change] + + request = SubmitRequest(changes=changes) + method = context.node.get_service_method(RequestService.submit) + result = method(context=context, request=request, reason=reason) + + # The Request service already returns either a SyftSuccess or SyftError + return result + @service_method( path="worker_image.build", name="build", From 5322389ca6a277b2bbea756180c457a322488f6b Mon Sep 17 00:00:00 2001 From: khoaguin Date: Tue, 16 Jan 2024 18:41:46 +0700 Subject: [PATCH 02/18] [syft/worker_image] add `pool_creation_request` to make request to create pools for DS - create `CreateCustomWorkerPoolChange` class to create a pool upon approval Co-authored-by: Shubham Gupta --- notebooks/api/0.8/10-container-images.ipynb | 44 ++++++++++++---- .../src/syft/protocol/protocol_version.json | 7 +++ .../syft/src/syft/service/request/request.py | 32 +++++++++++- .../service/worker/worker_image_service.py | 5 +- .../service/worker/worker_pool_service.py | 51 +++++++++++++++++++ 5 files changed, 125 insertions(+), 14 deletions(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 92125c4749d..fdd51e8a63b 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -184,43 +184,67 @@ "metadata": {}, "outputs": [], "source": [ - "res" + "domain_client.requests" ] }, { "cell_type": "code", "execution_count": null, - "id": "18319ecf-6b45-4bbf-a489-1cac37acc9ad", + "id": "d1c2ca5b-942b-4eac-a6a3-e2d361f0b373", "metadata": {}, "outputs": [], "source": [ - "domain_client.api.services.worker_image" + "domain_client.images" ] }, { "cell_type": "code", "execution_count": null, - "id": "b7c2bd78-9740-41bb-b441-d88c6360f7db", + "id": "b05506fb-0872-4f3c-8875-7def7df5067b", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1c596b15-4af6-4834-b9ba-436f14367235", + "metadata": {}, + "outputs": [], + "source": [ + "result = domain_client.api.services.worker_pool.pool_creation_request(\n", + " pool_name=\"my-custom-pool-1\",\n", + " num_workers=3,\n", + " reason=\"I want to create a pool to run some DS stuff with pydicom\",\n", + " image_uid=domain_client.images[1].id,\n", + ")\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "330df48c-f630-45e5-a96d-9adca9f9c87b", "metadata": {}, "outputs": [], "source": [ - "domain_client.code.__dict__.keys()" + "domain_client.requests" ] }, { "cell_type": "code", "execution_count": null, - "id": "1900c7c5-11c0-4279-8ca6-49c865661c3d", + "id": "08b81390-60b1-404c-ba71-728ba091c982", "metadata": {}, "outputs": [], "source": [ - "domain_client.api.services.worker_image.submit_dockerfile?" + "domain_client.requests[1].approve()" ] }, { "cell_type": "code", "execution_count": null, - "id": "b05506fb-0872-4f3c-8875-7def7df5067b", + "id": "842bab25-5680-4409-b722-5c1638f2d15d", "metadata": {}, "outputs": [], "source": [] @@ -228,7 +252,7 @@ { "cell_type": "code", "execution_count": null, - "id": "08b81390-60b1-404c-ba71-728ba091c982", + "id": "c88b3396-6e78-4c21-ac7f-086b621cbc1c", "metadata": {}, "outputs": [], "source": [] @@ -1166,7 +1190,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.7" + "version": "3.10.13" } }, "nbformat": 4, diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 7e3ea26ee62..56d2b43adc9 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -1089,6 +1089,13 @@ "hash": "197b765666d605cf601bd69bff21b4a31896b6acd14af38739a5047ca29ff015", "action": "add" } + }, + "CreateCustomWorkerPoolChange": { + "1": { + "version": 1, + "hash": "bb35cefd4feebfaed80f767f0fe60223af9e9c36013cd222c85c88481f4fa681", + "action": "add" + } } } } diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index ea9871c7130..17ba94d5f81 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -234,6 +234,34 @@ def __repr_syft_nested__(self): return f"Create Image for Config: {self.config} with tag: {self.tag}" +@serializable() +class CreateCustomWorkerPoolChange(Change): + __canonical_name__ = "CreateCustomWorkerPoolChange" + __version__ = SYFT_OBJECT_VERSION_1 + + pool_name: str + num_workers: int + image_uid: UID + + __repr_attrs__ = ["pool_name", "num_workers", "image_uid"] + + def _run( + self, context: ChangeContext, apply: bool + ) -> Result[SyftSuccess, SyftError]: + return Err(SyftError(message="Not implemented yet!")) + + def apply(self, context: ChangeContext) -> Result[SyftSuccess, SyftError]: + return self._run(context=context, apply=True) + + def undo(self, context: ChangeContext) -> Result[SyftSuccess, SyftError]: + return self._run(context=context, apply=False) + + def __repr_syft_nested__(self): + return ( + f"Create Worker Pool '{self.pool_name}' for Image with id {self.image_uid}" + ) + + @serializable() class Request(SyftObject): __canonical_name__ = "Request" @@ -428,7 +456,7 @@ def approve(self, disable_warnings: bool = False, approve_nested: bool = False): if message and metadata.show_warnings and not disable_warnings: prompt_warning_message(message=message, confirm=True) - print(f"Request approved for domain {api.node_name}") + print(f"Approving request for domain {api.node_name}") return api.services.request.apply(self.id) def deny(self, reason: str): @@ -444,7 +472,7 @@ def deny(self, reason: str): return api.services.request.undo(uid=self.id, reason=reason) def approve_with_client(self, client): - print(f"Request approved for domain {client.name}") + print(f"Approving request for domain {client.name}") return client.api.services.request.apply(self.id) def apply(self, context: AuthedServiceContext) -> Result[SyftSuccess, SyftError]: diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index 08dd8ea4581..e90ef5f89cd 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -18,6 +18,7 @@ from ...types.dicttuple import DictTuple from ...types.uid import UID from ..context import AuthedServiceContext +from ..request.request import Change from ..request.request import CreateCustomImageChange from ..request.request import SubmitRequest from ..request.request_service import RequestService @@ -69,7 +70,7 @@ def submit_dockerfile( @service_method( path="worker_image.create_image_request", name="create_image_request", - roles=DATA_OWNER_ROLE_LEVEL, + roles=DATA_SCIENTIST_ROLE_LEVEL, ) def create_image_request( self, @@ -97,7 +98,7 @@ def create_image_request( create_image_change = CreateCustomImageChange(config=config, tag=tag) - changes = [create_image_change] + changes: List[Change] = [create_image_change] request = SubmitRequest(changes=changes) method = context.node.get_service_method(RequestService.submit) diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index d21d41c578c..08bfc181547 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -18,6 +18,10 @@ from ...types.dicttuple import DictTuple from ...types.uid import UID from ..context import AuthedServiceContext +from ..request.request import Change +from ..request.request import CreateCustomWorkerPoolChange +from ..request.request import SubmitRequest +from ..request.request_service import RequestService from ..response import SyftError from ..response import SyftSuccess from ..service import AbstractService @@ -133,6 +137,53 @@ def launch( return container_statuses + @service_method( + path="worker_pool.pool_creation_request", + name="pool_creation_request", + roles=DATA_SCIENTIST_ROLE_LEVEL, + ) + def pool_creation_request( + self, + context: AuthedServiceContext, + pool_name: str, + num_workers: int, + image_uid: Optional[UID], + reason: Optional[str] = "", + ) -> Union[SyftError, SyftSuccess]: + """ + Create a request to launch the worker pool based on a built image. + + Args: + context (AuthedServiceContext): The authenticated service context. + pool_name (str): The name of the worker pool. + num_workers (int): The number of workers in the pool. + image_uid (Optional[UID]): The UID of the built image. + reason (Optional[str], optional): The reason for creating the + worker pool. Defaults to "". + """ + # check if the worker pool with the given name already exists? + result = self.stash.get_by_name(context.credentials, pool_name=pool_name) + if result.is_err(): + return SyftError(message=f"{result.err()}") + if result.ok() is not None: + return SyftError( + message=f"Worker Pool with name: {pool_name} already " + f"exists. Please choose another name!" + ) + + create_worker_pool_change = CreateCustomWorkerPoolChange( + pool_name=pool_name, + num_workers=num_workers, + image_uid=image_uid, + ) + changes: List[Change] = [create_worker_pool_change] + + request = SubmitRequest(changes=changes) + method = context.node.get_service_method(RequestService.submit) + result = method(context=context, request=request, reason=reason) + + return result + @service_method( path="worker_pool.get_all", name="get_all", From 3336712282911efcc2ca190548ae0bac5787ad7d Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 17 Jan 2024 08:54:23 +0700 Subject: [PATCH 03/18] [syft/worker_image] DO now can approve and create a pool from a DS's pool creation request --- notebooks/api/0.8/10-container-images.ipynb | 40 ++++++++++++++++++- .../syft/src/syft/service/request/request.py | 25 +++++++++++- .../service/worker/worker_pool_service.py | 3 +- 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index fdd51e8a63b..868151a531e 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -247,7 +247,9 @@ "id": "842bab25-5680-4409-b722-5c1638f2d15d", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "domain_client.worker_pools" + ] }, { "cell_type": "code", @@ -255,7 +257,9 @@ "id": "c88b3396-6e78-4c21-ac7f-086b621cbc1c", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "domain_client.requests" + ] }, { "cell_type": "code", @@ -265,6 +269,38 @@ "outputs": [], "source": [] }, + { + "cell_type": "code", + "execution_count": null, + "id": "1f1493dc-fdf3-485f-b3e5-91b41bceb5c6", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "74a29b31-d729-422e-a324-fafe01ab07ee", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8d3a6873-dbbd-4bc1-97cf-4506253e3f4e", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "982344a0-1390-4dff-b9db-7aba19f38e32", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": null, diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index 17ba94d5f81..b4a59f51e8b 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -248,7 +248,30 @@ class CreateCustomWorkerPoolChange(Change): def _run( self, context: ChangeContext, apply: bool ) -> Result[SyftSuccess, SyftError]: - return Err(SyftError(message="Not implemented yet!")) + """ + This function is run when the DO approves (apply=True) + or deny (apply=False) the request. + """ + if apply: + # get the worker pool service and try to launch a pool + worker_pool_service = context.node.get_service("SyftWorkerPoolService") + service_context: AuthedServiceContext = context.to_service_ctx() + result = worker_pool_service.launch( + context=service_context, + name=self.pool_name, + image_uid=self.image_uid, + num_workers=self.num_workers, + ) + if isinstance(result, SyftError): + return Err(result) + else: + return Ok(result) + else: + return Err( + SyftError( + message=f"Request to create a worker pool with name {self.name} denied" + ) + ) def apply(self, context: ChangeContext) -> Result[SyftSuccess, SyftError]: return self._run(context=context, apply=True) diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index 5cfb672e61f..e937802fc2d 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -170,13 +170,14 @@ def pool_creation_request( f"exists. Please choose another name!" ) + # create a list of Change objects and submit a + # request for these changes to the server create_worker_pool_change = CreateCustomWorkerPoolChange( pool_name=pool_name, num_workers=num_workers, image_uid=image_uid, ) changes: List[Change] = [create_worker_pool_change] - request = SubmitRequest(changes=changes) method = context.node.get_service_method(RequestService.submit) result = method(context=context, request=request, reason=reason) From 54dcad61d5d1e7b96689b387ad340d4e8a7c9d02 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 17 Jan 2024 14:26:34 +0530 Subject: [PATCH 04/18] create a common api for both image and pool creation - remove worker_image.create_image_request API --- .../src/syft/protocol/protocol_version.json | 2 +- .../syft/src/syft/service/request/request.py | 10 +- .../service/worker/worker_image_service.py | 45 -------- .../service/worker/worker_pool_service.py | 102 ++++++++++++++++-- 4 files changed, 106 insertions(+), 53 deletions(-) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 56d2b43adc9..e38315e56c7 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -1093,7 +1093,7 @@ "CreateCustomWorkerPoolChange": { "1": { "version": 1, - "hash": "bb35cefd4feebfaed80f767f0fe60223af9e9c36013cd222c85c88481f4fa681", + "hash": "86894f8ccc037de61f44f9698fd113ba02c3cf3870a3048c00a46e15dcd1941c", "action": "add" } } diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index b4a59f51e8b..24bf743258a 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -241,7 +241,8 @@ class CreateCustomWorkerPoolChange(Change): pool_name: str num_workers: int - image_uid: UID + image_uid: Optional[UID] + config: Optional[WorkerConfig] __repr_attrs__ = ["pool_name", "num_workers", "image_uid"] @@ -256,6 +257,13 @@ def _run( # get the worker pool service and try to launch a pool worker_pool_service = context.node.get_service("SyftWorkerPoolService") service_context: AuthedServiceContext = context.to_service_ctx() + + if self.config is not None: + worker_image = worker_pool_service.stash.get_by_docker_config( + service_context.credentials, self.config + ) + self.image_uid = worker_image.id + result = worker_pool_service.launch( context=service_context, name=self.pool_name, diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index e90ef5f89cd..bcd83e8599c 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -10,7 +10,6 @@ import pydantic # relative -from ...custom_worker.config import CustomWorkerConfig from ...custom_worker.config import DockerWorkerConfig from ...serde.serializable import serializable from ...store.document_store import DocumentStore @@ -18,10 +17,6 @@ from ...types.dicttuple import DictTuple from ...types.uid import UID from ..context import AuthedServiceContext -from ..request.request import Change -from ..request.request import CreateCustomImageChange -from ..request.request import SubmitRequest -from ..request.request_service import RequestService from ..response import SyftError from ..response import SyftSuccess from ..service import AbstractService @@ -67,46 +62,6 @@ def submit_dockerfile( message=f"Dockerfile ID: {worker_image.id} successfully submitted." ) - @service_method( - path="worker_image.create_image_request", - name="create_image_request", - roles=DATA_SCIENTIST_ROLE_LEVEL, - ) - def create_image_request( - self, - context: AuthedServiceContext, - config: DockerWorkerConfig, - tag: str, - reason: Optional[str] = "", - ) -> Union[SyftSuccess, SyftError]: - """Request Image creation for the given config.""" - - if isinstance(config, CustomWorkerConfig): - return SyftError(message="We only support DockerWorkerConfig.") - - search_result = self.stash.get_by_docker_config( - credentials=context.credentials, config=config - ) - - if search_result.is_err(): - return SyftError(message=str(search_result.err())) - - worker_image: SyftWorkerImage = search_result.ok() - - if worker_image is not None: - return SyftError(message=f"Image already exists for config: {config}") - - create_image_change = CreateCustomImageChange(config=config, tag=tag) - - changes: List[Change] = [create_image_change] - - request = SubmitRequest(changes=changes) - method = context.node.get_service_method(RequestService.submit) - result = method(context=context, request=request, reason=reason) - - # The Request service already returns either a SyftSuccess or SyftError - return result - @service_method( path="worker_image.build", name="build", diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index e937802fc2d..a22bce412b9 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -12,6 +12,8 @@ from docker.models.containers import Container # relative +from ...custom_worker.config import CustomWorkerConfig +from ...custom_worker.config import WorkerConfig from ...serde.serializable import serializable from ...store.document_store import DocumentStore from ...store.linked_obj import LinkedObject @@ -19,6 +21,7 @@ from ...types.uid import UID from ..context import AuthedServiceContext from ..request.request import Change +from ..request.request import CreateCustomImageChange from ..request.request import CreateCustomWorkerPoolChange from ..request.request import SubmitRequest from ..request.request_service import RequestService @@ -137,16 +140,16 @@ def launch( return container_statuses @service_method( - path="worker_pool.pool_creation_request", + path="worker_pool.create_image_and_pool_request", name="pool_creation_request", roles=DATA_SCIENTIST_ROLE_LEVEL, ) - def pool_creation_request( + def create_pool_request( self, context: AuthedServiceContext, pool_name: str, num_workers: int, - image_uid: Optional[UID], + image_uid: UID, reason: Optional[str] = "", ) -> Union[SyftError, SyftSuccess]: """ @@ -160,10 +163,97 @@ def pool_creation_request( reason (Optional[str], optional): The reason for creating the worker pool. Defaults to "". """ - # check if the worker pool with the given name already exists? + + search_result = self.image_stash.get_by_uid( + credentials=context.credentials, uid=image_uid + ) + + if search_result.is_err(): + return SyftError(message=str(search_result.err())) + + worker_image: Optional[SyftWorkerImage] = search_result.ok() + + if worker_image is None: + return SyftError( + message=f"No image exists for given image uid : {image_uid}" + ) + result = self.stash.get_by_name(context.credentials, pool_name=pool_name) + if result.is_err(): return SyftError(message=f"{result.err()}") + + worker_pool = result.ok() + + if worker_pool is not None: + return SyftError( + message=f"Worker pool already exists for given pool name: {pool_name}" + ) + + create_worker_pool_change = CreateCustomWorkerPoolChange( + pool_name=pool_name, + num_workers=num_workers, + image_uid=image_uid, + ) + + changes: List[Change] = [create_worker_pool_change] + request = SubmitRequest(changes=changes) + method = context.node.get_service_method(RequestService.submit) + result = method(context=context, request=request, reason=reason) + + return result + + @service_method( + path="worker_pool.create_image_and_pool_request", + name="pool_creation_request", + roles=DATA_SCIENTIST_ROLE_LEVEL, + ) + def create_image_and_pool_request( + self, + context: AuthedServiceContext, + pool_name: str, + num_workers: int, + tag: str, + config: WorkerConfig, + reason: Optional[str] = "", + ) -> Union[SyftError, SyftSuccess]: + """ + Create a request to launch the worker pool based on a built image. + + Args: + context (AuthedServiceContext): The authenticated service context. + pool_name (str): The name of the worker pool. + num_workers (int): The number of workers in the pool. + image_uid (Optional[UID]): The UID of the built image. + reason (Optional[str], optional): The reason for creating the + worker pool. Defaults to "". + """ + + if isinstance(config, CustomWorkerConfig): + return SyftError(message="We only support DockerWorkerConfig.") + + search_result = self.image_stash.get_by_docker_config( + credentials=context.credentials, config=config + ) + + if search_result.is_err(): + return SyftError(message=str(search_result.err())) + + worker_image: Optional[SyftWorkerImage] = search_result.ok() + + changes: List[Change] = [] + + if tag is None: + return SyftError(message="Please provide a valid tag for the given config") + + if worker_image is None: + changes += [CreateCustomImageChange(config=config, tag=tag)] + + result = self.stash.get_by_name(context.credentials, pool_name=pool_name) + + if result.is_err(): + return SyftError(message=f"{result.err()}") + if result.ok() is not None: return SyftError( message=f"Worker Pool with name: {pool_name} already " @@ -175,9 +265,9 @@ def pool_creation_request( create_worker_pool_change = CreateCustomWorkerPoolChange( pool_name=pool_name, num_workers=num_workers, - image_uid=image_uid, + config=config, ) - changes: List[Change] = [create_worker_pool_change] + changes += [create_worker_pool_change] request = SubmitRequest(changes=changes) method = context.node.get_service_method(RequestService.submit) result = method(context=context, request=request, reason=reason) From b7e43dfa7551dc65a60213a9b076f5d8c757d6aa Mon Sep 17 00:00:00 2001 From: khoaguin Date: Wed, 17 Jan 2024 18:17:10 +0700 Subject: [PATCH 05/18] [syft/worker_image] add some small fixes and error handlings to WorkerPoolService - add some nb tests into api notebook 10 --- notebooks/api/0.8/10-container-images.ipynb | 341 +++++++++--------- .../syft/src/syft/service/request/request.py | 7 +- .../service/worker/worker_pool_service.py | 4 +- 3 files changed, 179 insertions(+), 173 deletions(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 868151a531e..4a59f914974 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -133,174 +133,6 @@ "docker_config = sy.DockerWorkerConfig(dockerfile=custom_dockerfile_str)" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "09bd3d35-f995-41f4-8dc6-565621597e94", - "metadata": {}, - "outputs": [], - "source": [ - "docker_config.check_build(tag=\"openmined/my-tag:version\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "845d3577-4b80-4968-aa8c-ea609b194f7e", - "metadata": {}, - "outputs": [], - "source": [ - "result = domain_client.api.services.worker_image.create_image_request(\n", - " config=docker_config,\n", - " tag=\"openmined/my-tag:version2\",\n", - " reason=\"I Want to do some DS stuff with pydicom\",\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "11dcf42f-9df5-4e10-af4b-ad33c4623141", - "metadata": {}, - "outputs": [], - "source": [ - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4f7b6220-d162-4854-b2b3-a443df258c89", - "metadata": {}, - "outputs": [], - "source": [ - "res = domain_client.requests[0].approve()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bef26c9d-d9e5-44f2-ba11-11936d4587a1", - "metadata": {}, - "outputs": [], - "source": [ - "domain_client.requests" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "d1c2ca5b-942b-4eac-a6a3-e2d361f0b373", - "metadata": {}, - "outputs": [], - "source": [ - "domain_client.images" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b05506fb-0872-4f3c-8875-7def7df5067b", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1c596b15-4af6-4834-b9ba-436f14367235", - "metadata": {}, - "outputs": [], - "source": [ - "result = domain_client.api.services.worker_pool.pool_creation_request(\n", - " pool_name=\"my-custom-pool-1\",\n", - " num_workers=3,\n", - " reason=\"I want to create a pool to run some DS stuff with pydicom\",\n", - " image_uid=domain_client.images[1].id,\n", - ")\n", - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "330df48c-f630-45e5-a96d-9adca9f9c87b", - "metadata": {}, - "outputs": [], - "source": [ - "domain_client.requests" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "08b81390-60b1-404c-ba71-728ba091c982", - "metadata": {}, - "outputs": [], - "source": [ - "domain_client.requests[1].approve()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "842bab25-5680-4409-b722-5c1638f2d15d", - "metadata": {}, - "outputs": [], - "source": [ - "domain_client.worker_pools" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c88b3396-6e78-4c21-ac7f-086b621cbc1c", - "metadata": {}, - "outputs": [], - "source": [ - "domain_client.requests" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7a18b662-f20c-41f5-b191-3db083c28ca0", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1f1493dc-fdf3-485f-b3e5-91b41bceb5c6", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "74a29b31-d729-422e-a324-fafe01ab07ee", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "8d3a6873-dbbd-4bc1-97cf-4506253e3f4e", - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "982344a0-1390-4dff-b9db-7aba19f38e32", - "metadata": {}, - "outputs": [], - "source": [] - }, { "cell_type": "code", "execution_count": null, @@ -524,6 +356,7 @@ "outputs": [], "source": [ "docker_tag = \"openmined/custom-worker:0.7.8\"\n", + "\n", "registry_uid = local_registry.id if running_as_container else local_registry.id\n", "\n", "docker_build_result = domain_client.api.services.worker_image.build(\n", @@ -1201,10 +1034,180 @@ " local_registry_container.teardown()" ] }, + { + "cell_type": "markdown", + "id": "f20a29df-2e63-484f-8b67-d6a397722e66", + "metadata": {}, + "source": [ + "#### Worker Pool and Image Creation Request/Approval" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2b8cd7a0-ba17-4ad0-b3de-5af1282a6dc6", + "metadata": {}, + "outputs": [], + "source": [ + "custom_dockerfile_str_2 = \"\"\"\n", + "FROM openmined/grid-backend:0.8.4-beta.12\n", + "\n", + "RUN pip install opendp\n", + "\"\"\"\n", + "\n", + "docker_config_2 = sy.DockerWorkerConfig(dockerfile=custom_dockerfile_str_2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "48a7a9b5-266d-4f22-9b99-061dbb3c83ab", + "metadata": {}, + "outputs": [], + "source": [ + "submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n", + " docker_config=docker_config_2\n", + ")\n", + "submit_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4b3880fe-d682-471d-a52b-364711bf8511", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.images" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b62871bc-6c32-4fac-95af-5b062bc65992", + "metadata": {}, + "outputs": [], + "source": [ + "workerimage_2 = domain_client.images[1]" + ] + }, + { + "cell_type": "markdown", + "id": "35f8e35f-91f3-4d2b-8e70-386021e9a692", + "metadata": {}, + "source": [ + "##### Build image first then create pool" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f5a773e7-4dc1-4325-bc26-eb3c7d88969a", + "metadata": {}, + "outputs": [], + "source": [ + "docker_tag_2 = \"openmined/custom-worker-opendp:latest\"\n", + "\n", + "docker_build_result = domain_client.api.services.worker_image.build(\n", + " image_uid=workerimage_2.id,\n", + " tag=docker_tag_2,\n", + ")\n", + "docker_build_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7b0b2bb2-5612-463f-af88-f74e4f31719a", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.api.services.worker_pool.pool_creation_request(\n", + " pool_name=\"first-opendp-pool\", num_workers=3, image_uid=workerimage_2.id\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0b59e175-76ba-46b8-a7cd-796a872969e4", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.requests[-1].approve()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b0f8e4cb-6ccf-4c9f-866e-6e63fa67427c", + "metadata": {}, + "outputs": [], + "source": [ + "assert len(domain_client.worker_pools.get_all()) == 3" + ] + }, + { + "cell_type": "markdown", + "id": "1340b532-f3bb-4afb-b777-9fb2ba4bd02c", + "metadata": {}, + "source": [ + "##### Request to build the image and create the pool at the same time" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8ead0843-d250-409f-a546-8049d9103646", + "metadata": {}, + "outputs": [], + "source": [ + "# docker_tag_2 = \"openmined/custom-worker-opendp:latest\"\n", + "\n", + "request = domain_client.api.services.worker_pool.pool_image_creation_request(\n", + " pool_name=\"second-opendp-pool\",\n", + " num_workers=2,\n", + " tag=docker_tag_2,\n", + " config=docker_config_2,\n", + " reason=\"I want to do some more cool data science with PySyft and OpenDP\",\n", + ")\n", + "request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f456f727-ca38-4872-9789-e457f211ce6d", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.requests[-1].approve()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6c6760aa-f26b-49b6-9346-416b8e1cca1a", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.images" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6391a086-604a-47a9-959d-d4a626ac57f2", + "metadata": {}, + "outputs": [], + "source": [ + "assert len(domain_client.worker_pools.get_all()) == 4" + ] + }, { "cell_type": "code", "execution_count": null, - "id": "c7e51ab6-f27a-4963-af56-9afcd3193d84", + "id": "a0c34b8a-17c9-404f-926d-5a4127479882", "metadata": {}, "outputs": [], "source": [] diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index 24bf743258a..b08a982f5ed 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -212,7 +212,7 @@ def _run( ) if result.is_err(): - return Ok(SyftError(message=f"{result.err()}")) + return Err(SyftError(message=f"{result.err()}")) worker_image = result.ok() @@ -259,9 +259,12 @@ def _run( service_context: AuthedServiceContext = context.to_service_ctx() if self.config is not None: - worker_image = worker_pool_service.stash.get_by_docker_config( + result = worker_pool_service.image_stash.get_by_docker_config( service_context.credentials, self.config ) + if result.is_err(): + return Err(SyftError(message=f"{result.err()}")) + worker_image = result.ok() self.image_uid = worker_image.id result = worker_pool_service.launch( diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index a22bce412b9..0a4fc349971 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -140,7 +140,7 @@ def launch( return container_statuses @service_method( - path="worker_pool.create_image_and_pool_request", + path="worker_pool.create_pool_request", name="pool_creation_request", roles=DATA_SCIENTIST_ROLE_LEVEL, ) @@ -205,7 +205,7 @@ def create_pool_request( @service_method( path="worker_pool.create_image_and_pool_request", - name="pool_creation_request", + name="pool_image_creation_request", roles=DATA_SCIENTIST_ROLE_LEVEL, ) def create_image_and_pool_request( From 02d5942ae36663a794665ad78bb7ce7e007dafa2 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 17 Jan 2024 17:44:51 +0530 Subject: [PATCH 06/18] validate tag and throw error if image exists for config in pool_image_creation_request --- .../service/worker/worker_pool_service.py | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index 0a4fc349971..e55be1929c0 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -10,6 +10,7 @@ # third party import docker from docker.models.containers import Container +import pydantic # relative from ...custom_worker.config import CustomWorkerConfig @@ -33,6 +34,7 @@ from ..service import service_method from ..user.user_roles import DATA_OWNER_ROLE_LEVEL from ..user.user_roles import DATA_SCIENTIST_ROLE_LEVEL +from .image_identifier import SyftWorkerImageIdentifier from .utils import DEFAULT_WORKER_POOL_NAME from .utils import run_containers from .utils import run_workers_in_threads @@ -224,9 +226,9 @@ def create_image_and_pool_request( context (AuthedServiceContext): The authenticated service context. pool_name (str): The name of the worker pool. num_workers (int): The number of workers in the pool. - image_uid (Optional[UID]): The UID of the built image. - reason (Optional[str], optional): The reason for creating the - worker pool. Defaults to "". + config: (WorkerConfig): Config of the image to be built. + tag (str): human-readable manifest identifier that is typically a specific version or variant of an image + reason (Optional[str], optional): The reason for creating the worker image and pool. Defaults to "". """ if isinstance(config, CustomWorkerConfig): @@ -241,13 +243,26 @@ def create_image_and_pool_request( worker_image: Optional[SyftWorkerImage] = search_result.ok() + try: + image_identifier = SyftWorkerImageIdentifier.from_str(tag=tag) + except pydantic.ValidationError as e: + return SyftError(message=f"Failed to create tag: {e}") + + # create a list of Change objects and submit a + # request for these changes to the server changes: List[Change] = [] - if tag is None: - return SyftError(message="Please provide a valid tag for the given config") + if worker_image is not None: + return SyftError( + message="Image already exists for given config. \ + Please use `worker_pool.create_pool_request` to request pool creation." + ) - if worker_image is None: - changes += [CreateCustomImageChange(config=config, tag=tag)] + # Add create custom image change + create_custom_image_change = CreateCustomImageChange( + config=config, + tag=image_identifier.full_name_with_tag, + ) result = self.stash.get_by_name(context.credentials, pool_name=pool_name) @@ -260,14 +275,13 @@ def create_image_and_pool_request( f"exists. Please choose another name!" ) - # create a list of Change objects and submit a - # request for these changes to the server + # Add create worker pool change create_worker_pool_change = CreateCustomWorkerPoolChange( pool_name=pool_name, num_workers=num_workers, config=config, ) - changes += [create_worker_pool_change] + changes += [create_custom_image_change, create_worker_pool_change] request = SubmitRequest(changes=changes) method = context.node.get_service_method(RequestService.submit) result = method(context=context, request=request, reason=reason) From 5b2a33b6cb54dc7510682e462bd40c8fd91dcd10 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 17 Jan 2024 18:08:43 +0530 Subject: [PATCH 07/18] fix pubic name of create_image_and_pool_request API - add some asserts to container image notebook --- notebooks/api/0.8/10-container-images.ipynb | 111 ++++++++++++++++-- .../service/worker/worker_pool_service.py | 2 +- 2 files changed, 104 insertions(+), 9 deletions(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 4a59f914974..7099fdd1119 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -1127,6 +1127,16 @@ ")" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "2b337373-9486-426a-a282-b0b179139ba7", + "metadata": {}, + "outputs": [], + "source": [ + "assert len(request.changes) == 1" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1137,6 +1147,17 @@ "domain_client.requests[-1].approve()" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "2ea69b17-eb3c-4f01-9a47-4895dd286e5e", + "metadata": {}, + "outputs": [], + "source": [ + "assert domain_client.worker_pools[\"first-opendp-pool\"]\n", + "assert len(domain_client.worker_pools[\"first-opendp-pool\"].worker_list) == 3" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1162,18 +1183,47 @@ "metadata": {}, "outputs": [], "source": [ - "# docker_tag_2 = \"openmined/custom-worker-opendp:latest\"\n", + "custom_dockerfile_str_3 = \"\"\"\n", + "FROM openmined/grid-backend:0.8.4-beta.12\n", "\n", - "request = domain_client.api.services.worker_pool.pool_image_creation_request(\n", - " pool_name=\"second-opendp-pool\",\n", + "RUN pip install recordlinkage\n", + "\"\"\"\n", + "\n", + "docker_config_3 = sy.DockerWorkerConfig(dockerfile=custom_dockerfile_str_3)\n", + "\n", + "docker_tag_3 = \"openmined/custom-worker-recordlinkage:latest\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "441ff01a-6f0c-48db-a14d-deecb4518e18", + "metadata": {}, + "outputs": [], + "source": [ + "request = domain_client.api.services.worker_pool.create_image_and_pool_request(\n", + " pool_name=\"recordlinkage-pool\",\n", " num_workers=2,\n", - " tag=docker_tag_2,\n", - " config=docker_config_2,\n", + " tag=docker_tag_3,\n", + " config=docker_config_3,\n", " reason=\"I want to do some more cool data science with PySyft and OpenDP\",\n", ")\n", "request" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "3c1a1cf0-a31f-4dcc-bc34-8a232fb23b62", + "metadata": {}, + "outputs": [], + "source": [ + "assert len(request.changes) == 2\n", + "assert request.changes[0].config == docker_config_3\n", + "assert request.changes[1].num_workers == 2\n", + "assert request.changes[1].pool_name == \"recordlinkage-pool\"" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1181,7 +1231,27 @@ "metadata": {}, "outputs": [], "source": [ - "domain_client.requests[-1].approve()" + "req_result = domain_client.requests[-1].approve()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6d358265-a2eb-4791-84c4-0e2d0cc88f8a", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(req_result, sy.SyftSuccess)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "83188182-1e58-4d6b-a361-b9ab4fcea356", + "metadata": {}, + "outputs": [], + "source": [ + "assert domain_client.requests[-1].status.value == 2" ] }, { @@ -1194,6 +1264,31 @@ "domain_client.images" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "e3c26241-028b-4f6d-a9dc-c16250f3ac6c", + "metadata": {}, + "outputs": [], + "source": [ + "image_exists = False\n", + "for im in domain_client.images.get_all():\n", + " if im.image_identifier.repo_with_tag == docker_tag_3:\n", + " image_exists = True\n", + "assert image_exists" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7016eccb-8830-4d9f-b1f6-da3dbafeb0f8", + "metadata": {}, + "outputs": [], + "source": [ + "assert domain_client.worker_pools[\"recordlinkage-pool\"]\n", + "assert len(domain_client.worker_pools[\"recordlinkage-pool\"].worker_list) == 2" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1201,7 +1296,7 @@ "metadata": {}, "outputs": [], "source": [ - "assert len(domain_client.worker_pools.get_all()) == 4" + "domain.land()" ] }, { @@ -1229,7 +1324,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.9.7" } }, "nbformat": 4, diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index e55be1929c0..31f79913f8d 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -207,7 +207,7 @@ def create_pool_request( @service_method( path="worker_pool.create_image_and_pool_request", - name="pool_image_creation_request", + name="create_image_and_pool_request", roles=DATA_SCIENTIST_ROLE_LEVEL, ) def create_image_and_pool_request( From cdf8f6d3708b83ca5a31d0f5ae243bd39460fb79 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 18 Jan 2024 09:48:06 +0700 Subject: [PATCH 08/18] [tests/worker_image] addd some tests for DS-DO image and pool creation request approval cycle - add some type annotations for `syft/tests/conftest` --- .../service/worker/worker_image_service.py | 7 +- .../service/worker/worker_pool_service.py | 10 +- packages/syft/tests/conftest.py | 10 +- .../syft/tests/syft/worker_pool/__init__.py | 0 .../worker_pool/worker_pool_serivice_test.py | 112 ++++++++++++++++++ 5 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 packages/syft/tests/syft/worker_pool/__init__.py create mode 100644 packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index bcd83e8599c..604269fe138 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -22,7 +22,6 @@ from ..service import AbstractService from ..service import service_method from ..user.user_roles import DATA_OWNER_ROLE_LEVEL -from ..user.user_roles import DATA_SCIENTIST_ROLE_LEVEL from .image_registry import SyftImageRegistry from .image_registry_service import SyftImageRegistryService from .utils import docker_build @@ -41,10 +40,14 @@ def __init__(self, store: DocumentStore) -> None: self.store = store self.stash = SyftWorkerImageStash(store=store) + # relative + from ..user.user_roles import DATA_SCIENTIST_ROLE_LEVEL + from ..user.user_roles import GUEST_ROLE_LEVEL + @service_method( path="worker_image.submit_dockerfile", name="submit_dockerfile", - roles=DATA_OWNER_ROLE_LEVEL, + roles=GUEST_ROLE_LEVEL, ) def submit_dockerfile( self, context: AuthedServiceContext, docker_config: DockerWorkerConfig diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index 31f79913f8d..bae729c21d9 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -141,10 +141,13 @@ def launch( return container_statuses + # relative + from ..user.user_roles import GUEST_ROLE_LEVEL + @service_method( path="worker_pool.create_pool_request", name="pool_creation_request", - roles=DATA_SCIENTIST_ROLE_LEVEL, + roles=GUEST_ROLE_LEVEL, # TOASK: DATA_SCIENTIST_ROLE_LEVEL not working? ) def create_pool_request( self, @@ -205,10 +208,13 @@ def create_pool_request( return result + # relative + from ..user.user_roles import GUEST_ROLE_LEVEL + @service_method( path="worker_pool.create_image_and_pool_request", name="create_image_and_pool_request", - roles=DATA_SCIENTIST_ROLE_LEVEL, + roles=GUEST_ROLE_LEVEL, # TOASK: DATA_SCIENTIST_ROLE_LEVEL not working? ) def create_image_and_pool_request( self, diff --git a/packages/syft/tests/conftest.py b/packages/syft/tests/conftest.py index 15758739623..0ce969a38ea 100644 --- a/packages/syft/tests/conftest.py +++ b/packages/syft/tests/conftest.py @@ -9,6 +9,8 @@ # syft absolute import syft as sy +from syft.client.domain_client import DomainClient +from syft.node.worker import Worker from syft.protocol.data_protocol import bump_protocol_version from syft.protocol.data_protocol import get_data_protocol from syft.protocol.data_protocol import stage_protocol_changes @@ -69,12 +71,12 @@ def stage_protocol(protocol_file: Path): @pytest.fixture() -def worker(faker, stage_protocol): +def worker(faker) -> Worker: return sy.Worker.named(name=faker.name()) @pytest.fixture() -def root_domain_client(worker): +def root_domain_client(worker) -> DomainClient: return worker.root_client @@ -84,7 +86,7 @@ def root_verify_key(worker): @pytest.fixture() -def guest_client(worker): +def guest_client(worker) -> DomainClient: return worker.guest_client @@ -94,7 +96,7 @@ def guest_verify_key(worker): @pytest.fixture() -def guest_domain_client(root_domain_client): +def guest_domain_client(root_domain_client) -> DomainClient: return root_domain_client.guest() diff --git a/packages/syft/tests/syft/worker_pool/__init__.py b/packages/syft/tests/syft/worker_pool/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py b/packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py new file mode 100644 index 00000000000..fc83c159cbc --- /dev/null +++ b/packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py @@ -0,0 +1,112 @@ +# third party +from faker import Faker + +# syft absolute +from syft.custom_worker.config import DockerWorkerConfig +from syft.node.worker import Worker +from syft.service.request.request import CreateCustomWorkerPoolChange +from syft.service.response import SyftSuccess +from syft.service.worker.worker_image import SyftWorkerImage +from syft.service.worker.worker_pool import WorkerPool + +# relative +from ..request.request_code_accept_deny_test import get_ds_client + + +def test_create_image_and_pool_request_accept(faker: Faker, worker: Worker): + """ + Test the functionality of `SyftWorkerPoolService.create_image_and_pool_request` + when the request is accepted + """ + # construct a root client and data scientist client for a domain + root_client = worker.root_client + ds_client = get_ds_client(faker, root_client, worker.guest_client) + assert root_client.credentials != ds_client.credentials + + # the DS makes a request to create an image and a pool based on the image + custom_dockerfile = """ + FROM openmined/grid-backend:0.8.4-beta.12 + + RUN pip install recordlinkage + """ + docker_config = DockerWorkerConfig(dockerfile=custom_dockerfile) + docker_tag = "openmined/custom-worker-recordlinkage:latest" + request = ds_client.api.services.worker_pool.create_image_and_pool_request( + pool_name="recordlinkage-pool", + num_workers=2, + tag=docker_tag, + config=docker_config, + reason="I want to do some more cool data science with PySyft and Recordlinkage", + ) + assert len(request.changes) == 2 + assert request.changes[0].config == docker_config + assert request.changes[1].num_workers == 2 + assert request.changes[1].pool_name == "recordlinkage-pool" + + # the root client approve the request, so the image should be built + # and the worker pool should be launched + req_result = root_client.requests[-1].approve() + assert isinstance(req_result, SyftSuccess) + assert root_client.requests[-1].status.value == 2 + + all_image_tags = [ + im.image_identifier.repo_with_tag for im in root_client.images.get_all() + ] + assert docker_tag in all_image_tags + launched_pool = root_client.worker_pools["recordlinkage-pool"] + assert isinstance(launched_pool, WorkerPool) + assert len(launched_pool.worker_list) == 2 + + +def test_create_pool_request_accept(faker: Faker, worker: Worker): + """ + Test the functionality of `SyftWorkerPoolService.create_pool_request` + when the request is accepted + """ + # construct a root client and data scientist client for a domain + root_client = worker.root_client + ds_client = get_ds_client(faker, root_client, worker.guest_client) + assert root_client.credentials != ds_client.credentials + + # the DS submits the docker config to build an image + custom_dockerfile_str = """ + FROM openmined/grid-backend:0.8.4-beta.12 + + RUN pip install opendp + """ + docker_config = DockerWorkerConfig(dockerfile=custom_dockerfile_str) + submit_result = ds_client.api.services.worker_image.submit_dockerfile( + docker_config=docker_config + ) + assert isinstance(submit_result, SyftSuccess) + assert len(root_client.images.get_all()) == 2 + + # The root client builds the image + worker_image: SyftWorkerImage = root_client.images[1] + docker_tag = "openmined/custom-worker-opendp:latest" + docker_build_result = root_client.api.services.worker_image.build( + image_uid=worker_image.id, + tag=docker_tag, + ) + # update the worker image variable after the image was built + worker_image: SyftWorkerImage = root_client.images[1] + assert isinstance(docker_build_result, SyftSuccess) + assert worker_image.image_identifier.repo_with_tag == docker_tag + + # The DS client submits a request to build the image + request = ds_client.api.services.worker_pool.pool_creation_request( + pool_name="opendp-pool", num_workers=3, image_uid=worker_image.id + ) + assert len(request.changes) == 1 + change = request.changes[0] + assert isinstance(change, CreateCustomWorkerPoolChange) + assert change.num_workers == 3 + assert change.pool_name == "opendp-pool" + assert change.image_uid == worker_image.id + + # the root client approves the request, and the worker pool should be launched + req_result = root_client.requests[-1].approve() + assert isinstance(req_result, SyftSuccess) + launched_pool = root_client.worker_pools["opendp-pool"] + assert isinstance(launched_pool, WorkerPool) + assert len(launched_pool.worker_list) == 3 From a0af0432acb170ca5151b3b80394ee9d44c63ccd Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 18 Jan 2024 11:18:18 +0530 Subject: [PATCH 09/18] fix test not returning the data scientist client post login - revert API to use DATA_SCIENTIST_ROLE --- .../src/syft/service/worker/worker_image_service.py | 7 ++----- .../src/syft/service/worker/worker_pool_service.py | 10 ++-------- .../syft/request/request_code_accept_deny_test.py | 4 ++-- 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index 604269fe138..bcd83e8599c 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -22,6 +22,7 @@ from ..service import AbstractService from ..service import service_method from ..user.user_roles import DATA_OWNER_ROLE_LEVEL +from ..user.user_roles import DATA_SCIENTIST_ROLE_LEVEL from .image_registry import SyftImageRegistry from .image_registry_service import SyftImageRegistryService from .utils import docker_build @@ -40,14 +41,10 @@ def __init__(self, store: DocumentStore) -> None: self.store = store self.stash = SyftWorkerImageStash(store=store) - # relative - from ..user.user_roles import DATA_SCIENTIST_ROLE_LEVEL - from ..user.user_roles import GUEST_ROLE_LEVEL - @service_method( path="worker_image.submit_dockerfile", name="submit_dockerfile", - roles=GUEST_ROLE_LEVEL, + roles=DATA_OWNER_ROLE_LEVEL, ) def submit_dockerfile( self, context: AuthedServiceContext, docker_config: DockerWorkerConfig diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index bae729c21d9..31f79913f8d 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -141,13 +141,10 @@ def launch( return container_statuses - # relative - from ..user.user_roles import GUEST_ROLE_LEVEL - @service_method( path="worker_pool.create_pool_request", name="pool_creation_request", - roles=GUEST_ROLE_LEVEL, # TOASK: DATA_SCIENTIST_ROLE_LEVEL not working? + roles=DATA_SCIENTIST_ROLE_LEVEL, ) def create_pool_request( self, @@ -208,13 +205,10 @@ def create_pool_request( return result - # relative - from ..user.user_roles import GUEST_ROLE_LEVEL - @service_method( path="worker_pool.create_image_and_pool_request", name="create_image_and_pool_request", - roles=GUEST_ROLE_LEVEL, # TOASK: DATA_SCIENTIST_ROLE_LEVEL not working? + roles=DATA_SCIENTIST_ROLE_LEVEL, ) def create_image_and_pool_request( self, diff --git a/packages/syft/tests/syft/request/request_code_accept_deny_test.py b/packages/syft/tests/syft/request/request_code_accept_deny_test.py index f7d3ff3eb73..242de50ff85 100644 --- a/packages/syft/tests/syft/request/request_code_accept_deny_test.py +++ b/packages/syft/tests/syft/request/request_code_accept_deny_test.py @@ -40,8 +40,8 @@ def get_ds_client(faker: Faker, root_client: SyftClient, guest_client: SyftClien password_verify=password, ) assert isinstance(result, SyftSuccess) - guest_client.login(email=guest_email, password=password) - return guest_client + ds_client = guest_client.login(email=guest_email, password=password) + return ds_client def test_object_mutation(worker: Worker): From 53a7609ae8f6784df850d549bbec55292eb021eb Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 18 Jan 2024 11:22:57 +0530 Subject: [PATCH 10/18] fix test_create_pool_request_accept --- .../tests/syft/worker_pool/worker_pool_serivice_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py b/packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py index fc83c159cbc..c81d7bd83e4 100644 --- a/packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py +++ b/packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py @@ -68,14 +68,14 @@ def test_create_pool_request_accept(faker: Faker, worker: Worker): ds_client = get_ds_client(faker, root_client, worker.guest_client) assert root_client.credentials != ds_client.credentials - # the DS submits the docker config to build an image + # the DO submits the docker config to build an image custom_dockerfile_str = """ FROM openmined/grid-backend:0.8.4-beta.12 RUN pip install opendp """ docker_config = DockerWorkerConfig(dockerfile=custom_dockerfile_str) - submit_result = ds_client.api.services.worker_image.submit_dockerfile( + submit_result = root_client.api.services.worker_image.submit_dockerfile( docker_config=docker_config ) assert isinstance(submit_result, SyftSuccess) @@ -93,7 +93,7 @@ def test_create_pool_request_accept(faker: Faker, worker: Worker): assert isinstance(docker_build_result, SyftSuccess) assert worker_image.image_identifier.repo_with_tag == docker_tag - # The DS client submits a request to build the image + # The DS client submits a request to create a pool from an existing image request = ds_client.api.services.worker_pool.pool_creation_request( pool_name="opendp-pool", num_workers=3, image_uid=worker_image.id ) From 734b3ca8ec5838a8b5a4f79d5440d79401fd618b Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 18 Jan 2024 14:01:44 +0700 Subject: [PATCH 11/18] [tests/worker_image] - add a check if `image_identifier` exist for images - rename the test file due to typo in filename --- notebooks/api/0.8/10-container-images.ipynb | 4 ++-- ...rker_pool_serivice_test.py => worker_pool_service_test.py} | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) rename packages/syft/tests/syft/worker_pool/{worker_pool_serivice_test.py => worker_pool_service_test.py} (97%) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 7099fdd1119..ef6e261f2a3 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -1273,7 +1273,7 @@ "source": [ "image_exists = False\n", "for im in domain_client.images.get_all():\n", - " if im.image_identifier.repo_with_tag == docker_tag_3:\n", + " if im.image_identifier and im.image_identifier.repo_with_tag == docker_tag_3:\n", " image_exists = True\n", "assert image_exists" ] @@ -1324,7 +1324,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.7" + "version": "3.10.13" } }, "nbformat": 4, diff --git a/packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py b/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py similarity index 97% rename from packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py rename to packages/syft/tests/syft/worker_pool/worker_pool_service_test.py index c81d7bd83e4..09b1026b35e 100644 --- a/packages/syft/tests/syft/worker_pool/worker_pool_serivice_test.py +++ b/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py @@ -50,7 +50,9 @@ def test_create_image_and_pool_request_accept(faker: Faker, worker: Worker): assert root_client.requests[-1].status.value == 2 all_image_tags = [ - im.image_identifier.repo_with_tag for im in root_client.images.get_all() + im.image_identifier.repo_with_tag + for im in root_client.images.get_all() + if im.image_identifier ] assert docker_tag in all_image_tags launched_pool = root_client.worker_pools["recordlinkage-pool"] From 40574ba3c047916c9575a5b775df6e969545a090 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 18 Jan 2024 17:03:51 +0700 Subject: [PATCH 12/18] [notebook] fix container-images notebook stack failed due to picking the wrong image to launch worker pool --- notebooks/api/0.8/10-container-images.ipynb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index ef6e261f2a3..fb62096edf0 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -1088,7 +1088,9 @@ "metadata": {}, "outputs": [], "source": [ - "workerimage_2 = domain_client.images[1]" + "for im in domain_client.images:\n", + " if im.built_at is None:\n", + " workerimage_2 = im" ] }, { From 75a93e800f532e78b180e10a703b3c2ec91b87d2 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 18 Jan 2024 17:14:36 +0700 Subject: [PATCH 13/18] [notebook/container-images] getting a pending request and approve it instead of always picking the last one to approve since the order might be wrong when testing with container stack --- notebooks/api/0.8/10-container-images.ipynb | 25 +++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index fb62096edf0..e7cfdcc4809 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -1088,6 +1088,7 @@ "metadata": {}, "outputs": [], "source": [ + "# get the image that's not built\n", "for im in domain_client.images:\n", " if im.built_at is None:\n", " workerimage_2 = im" @@ -1146,7 +1147,22 @@ "metadata": {}, "outputs": [], "source": [ - "domain_client.requests[-1].approve()" + "# get the pending request and approve it\n", + "for r in domain_client.requests:\n", + " print(r.status, r.status.value)\n", + " if r.status.value == 0: # pending request\n", + " req = r\n", + "req_result = req.approve()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4ce90111-11bd-4ebd-bb4a-4217a57c7d8d", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(req_result, sy.SyftSuccess)" ] }, { @@ -1233,7 +1249,12 @@ "metadata": {}, "outputs": [], "source": [ - "req_result = domain_client.requests[-1].approve()" + "# get the pending request and approve it\n", + "for r in domain_client.requests:\n", + " print(r.status, r.status.value)\n", + " if r.status.value == 0: # pending request\n", + " req = r\n", + "req_result = req.approve()" ] }, { From 7abeb3429673f23cb4fe27c4c4d7a887c9fb08a5 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 18 Jan 2024 15:32:52 +0530 Subject: [PATCH 14/18] rename check_build to test_build_image --- notebooks/api/0.8/10-container-images.ipynb | 10 ++++++++++ packages/syft/src/syft/custom_worker/config.py | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index e7cfdcc4809..56971692446 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -1212,6 +1212,16 @@ "docker_tag_3 = \"openmined/custom-worker-recordlinkage:latest\"" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "6732056f", + "metadata": {}, + "outputs": [], + "source": [ + "custom_dockerfile_str_3.test_image_build(tag=docker_tag_3)" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/packages/syft/src/syft/custom_worker/config.py b/packages/syft/src/syft/custom_worker/config.py index 286e42e2704..8caa89e7e23 100644 --- a/packages/syft/src/syft/custom_worker/config.py +++ b/packages/syft/src/syft/custom_worker/config.py @@ -129,7 +129,7 @@ def __hash__(self) -> int: def __str__(self) -> str: return self.dockerfile - def check_build(self, tag: str, **kwargs) -> Tuple[Image, SyftSuccess]: + def test_image_build(self, tag: str, **kwargs) -> Tuple[Image, SyftSuccess]: # relative from ..service.worker.utils import parse_output from .builder import CustomWorkerBuilder From 5396888a50ff053e13f3a2a5bbba7ca7b4402457 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 18 Jan 2024 18:55:21 +0700 Subject: [PATCH 15/18] [notebook/container-images] small fix --- notebooks/api/0.8/10-container-images.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 56971692446..33ed8b2da4e 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -1219,7 +1219,7 @@ "metadata": {}, "outputs": [], "source": [ - "custom_dockerfile_str_3.test_image_build(tag=docker_tag_3)" + "docker_config_3.test_image_build(tag=docker_tag_3)" ] }, { From 16e2d44bccb4a336ca5c032eb2caba5127510055 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 18 Jan 2024 18:52:59 +0530 Subject: [PATCH 16/18] update notebook to clean up containers post tests --- notebooks/api/0.8/10-container-images.ipynb | 94 +++++++++++++-------- 1 file changed, 60 insertions(+), 34 deletions(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 33ed8b2da4e..de2326c33d8 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -45,8 +45,8 @@ "outputs": [], "source": [ "# Uncomment this to run the whole docker based custom workers\n", - "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"container_stack\"\n", - "# os.environ[\"DEV_MODE\"] = \"True\"\n", + "os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"container_stack\"\n", + "os.environ[\"DEV_MODE\"] = \"True\"\n", "\n", "\n", "# Disable inmemory worker for container stack\n", @@ -69,6 +69,7 @@ " create_producer=True,\n", " in_memory_workers=in_memory_workers,\n", " reset=True,\n", + " port=8081,\n", ")" ] }, @@ -1089,8 +1090,9 @@ "outputs": [], "source": [ "# get the image that's not built\n", + "workerimage_2 = None\n", "for im in domain_client.images:\n", - " if im.built_at is None:\n", + " if im.config == docker_config_2:\n", " workerimage_2 = im" ] }, @@ -1125,9 +1127,10 @@ "metadata": {}, "outputs": [], "source": [ - "domain_client.api.services.worker_pool.pool_creation_request(\n", + "pool_create_request = domain_client.api.services.worker_pool.pool_creation_request(\n", " pool_name=\"first-opendp-pool\", num_workers=3, image_uid=workerimage_2.id\n", - ")" + ")\n", + "pool_create_request" ] }, { @@ -1137,7 +1140,7 @@ "metadata": {}, "outputs": [], "source": [ - "assert len(request.changes) == 1" + "assert len(pool_create_request.changes) == 1" ] }, { @@ -1148,11 +1151,8 @@ "outputs": [], "source": [ "# get the pending request and approve it\n", - "for r in domain_client.requests:\n", - " print(r.status, r.status.value)\n", - " if r.status.value == 0: # pending request\n", - " req = r\n", - "req_result = req.approve()" + "req_result = pool_create_request.approve()\n", + "req_result" ] }, { @@ -1219,7 +1219,8 @@ "metadata": {}, "outputs": [], "source": [ - "docker_config_3.test_image_build(tag=docker_tag_3)" + "test_build_res = docker_config_3.test_image_build(tag=docker_tag_3)\n", + "assert isinstance(test_build_res, sy.SyftSuccess)" ] }, { @@ -1229,14 +1230,16 @@ "metadata": {}, "outputs": [], "source": [ - "request = domain_client.api.services.worker_pool.create_image_and_pool_request(\n", - " pool_name=\"recordlinkage-pool\",\n", - " num_workers=2,\n", - " tag=docker_tag_3,\n", - " config=docker_config_3,\n", - " reason=\"I want to do some more cool data science with PySyft and OpenDP\",\n", + "pool_image_create_request = (\n", + " domain_client.api.services.worker_pool.create_image_and_pool_request(\n", + " pool_name=\"recordlinkage-pool\",\n", + " num_workers=2,\n", + " tag=docker_tag_3,\n", + " config=docker_config_3,\n", + " reason=\"I want to do some more cool data science with PySyft and OpenDP\",\n", + " )\n", ")\n", - "request" + "pool_image_create_request" ] }, { @@ -1246,10 +1249,10 @@ "metadata": {}, "outputs": [], "source": [ - "assert len(request.changes) == 2\n", - "assert request.changes[0].config == docker_config_3\n", - "assert request.changes[1].num_workers == 2\n", - "assert request.changes[1].pool_name == \"recordlinkage-pool\"" + "assert len(pool_image_create_request.changes) == 2\n", + "assert pool_image_create_request.changes[0].config == docker_config_3\n", + "assert pool_image_create_request.changes[1].num_workers == 2\n", + "assert pool_image_create_request.changes[1].pool_name == \"recordlinkage-pool\"" ] }, { @@ -1260,11 +1263,8 @@ "outputs": [], "source": [ "# get the pending request and approve it\n", - "for r in domain_client.requests:\n", - " print(r.status, r.status.value)\n", - " if r.status.value == 0: # pending request\n", - " req = r\n", - "req_result = req.approve()" + "req_result = pool_image_create_request.approve()\n", + "req_result" ] }, { @@ -1284,7 +1284,12 @@ "metadata": {}, "outputs": [], "source": [ - "assert domain_client.requests[-1].status.value == 2" + "# Get updated request object and status\n", + "for req in domain_client.requests:\n", + " if req.id == pool_image_create_request.id:\n", + " pool_image_create_request = req\n", + "\n", + "assert pool_image_create_request.status.value == 2" ] }, { @@ -1322,23 +1327,44 @@ "assert len(domain_client.worker_pools[\"recordlinkage-pool\"].worker_list) == 2" ] }, + { + "cell_type": "markdown", + "id": "ca0febe0-ab67-441a-92c2-f3de243bf940", + "metadata": {}, + "source": [ + "#### Clean up workers" + ] + }, { "cell_type": "code", "execution_count": null, - "id": "6391a086-604a-47a9-959d-d4a626ac57f2", + "id": "c0317e06-fd94-43d4-88d5-af39033aafe0", "metadata": {}, "outputs": [], "source": [ - "domain.land()" + "# delete the remaining workers\n", + "for worker_pool in domain_client.worker_pools:\n", + " for worker in worker_pool.workers:\n", + " res = domain_client.api.services.worker_pool.delete_worker(\n", + " worker_pool_id=worker_pool.id,\n", + " worker_id=worker.id,\n", + " )\n", + " assert isinstance(res, sy.SyftSuccess)\n", + "\n", + "# Adding some sleep to allow containers to be fully removed,\n", + "# before removing the image\n", + "time.sleep(10)" ] }, { "cell_type": "code", "execution_count": null, - "id": "a0c34b8a-17c9-404f-926d-5a4127479882", + "id": "6391a086-604a-47a9-959d-d4a626ac57f2", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "domain.land()" + ] } ], "metadata": { @@ -1357,7 +1383,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.9.7" } }, "nbformat": 4, From 8d8108b17ee40b70256dfd002961023647e25340 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Fri, 19 Jan 2024 10:14:22 +0700 Subject: [PATCH 17/18] [notebook/container-images] fix cleaning up workers --- notebooks/api/0.8/10-container-images.ipynb | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 05e5984d752..534ff09ab1b 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -1341,10 +1341,8 @@ "# delete the remaining workers\n", "for worker_pool in domain_client.worker_pools:\n", " for worker in worker_pool.workers:\n", - " res = domain_client.api.services.worker_pool.delete_worker(\n", - " worker_pool_id=worker_pool.id,\n", - " worker_id=worker.id,\n", - " )\n", + " res = domain_client.api.services.worker.delete(uid=worker.id, force=True)\n", + " print(res)\n", " assert isinstance(res, sy.SyftSuccess)\n", "\n", "# Adding some sleep to allow containers to be fully removed,\n", @@ -1352,6 +1350,17 @@ "time.sleep(10)" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "2c809521-cb0d-432f-b75a-7da6d635e85d", + "metadata": {}, + "outputs": [], + "source": [ + "for worker_pool in domain_client.worker_pools:\n", + " assert len(worker_pool.worker_list) == 0" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1379,7 +1388,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.7" + "version": "3.10.13" } }, "nbformat": 4, From e973c9fcfcb9e9cafb75e12313a798c2886e1dfa Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Fri, 19 Jan 2024 15:53:28 +0530 Subject: [PATCH 18/18] add comments to the code for readability --- notebooks/api/0.8/10-container-images.ipynb | 6 +-- .../service/worker/worker_pool_service.py | 54 ++++++++++++++++--- 2 files changed, 50 insertions(+), 10 deletions(-) diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 534ff09ab1b..0c9fe311dbc 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -45,8 +45,8 @@ "outputs": [], "source": [ "# Uncomment this to run the whole docker based custom workers\n", - "os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"container_stack\"\n", - "os.environ[\"DEV_MODE\"] = \"True\"\n", + "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"container_stack\"\n", + "# os.environ[\"DEV_MODE\"] = \"True\"\n", "\n", "\n", "# Disable inmemory worker for container stack\n", @@ -1388,7 +1388,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.9.7" } }, "nbformat": 4, diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index 774abb5c1ad..0a9b48f8103 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -89,6 +89,8 @@ def launch( if result.ok() is not None: return SyftError(message=f"Worker Pool with name: {name} already exists !!") + # If image uid is not passed, then use the default worker image + # to create the worker pool if image_uid is None: result = self.stash.get_by_name( context.credentials, pool_name=DEFAULT_WORKER_POOL_NAME @@ -96,6 +98,7 @@ def launch( default_worker_pool = result.ok() image_uid = default_worker_pool.image_id + # Get the image object for the given image id result = self.image_stash.get_by_uid( credentials=context.credentials, uid=image_uid ) @@ -109,6 +112,8 @@ def launch( worker_service: WorkerService = context.node.get_service("WorkerService") worker_stash = worker_service.stash + # Create worker pool from given image, with the given worker pool + # and with the desired number of workers worker_list, container_statuses = _create_workers_in_pool( context=context, pool_name=name, @@ -120,6 +125,7 @@ def launch( reg_password=reg_password, ) + # Update the Database with the pool information worker_pool = WorkerPool( name=name, max_count=num_workers, @@ -160,6 +166,7 @@ def create_pool_request( worker pool. Defaults to "". """ + # Check if image exists for the given image id search_result = self.image_stash.get_by_uid( credentials=context.credentials, uid=image_uid ) @@ -169,11 +176,13 @@ def create_pool_request( worker_image: Optional[SyftWorkerImage] = search_result.ok() + # Raise error if worker image doesn't exists if worker_image is None: return SyftError( message=f"No image exists for given image uid : {image_uid}" ) + # Check if pool already exists for the given pool name result = self.stash.get_by_name(context.credentials, pool_name=pool_name) if result.is_err(): @@ -186,6 +195,9 @@ def create_pool_request( message=f"Worker pool already exists for given pool name: {pool_name}" ) + # If no worker pool exists for given pool name + # and image exists for given image uid, then create a change + # request object to create the pool with the desired number of workers create_worker_pool_change = CreateCustomWorkerPoolChange( pool_name=pool_name, num_workers=num_workers, @@ -193,6 +205,9 @@ def create_pool_request( ) changes: List[Change] = [create_worker_pool_change] + + # Create a the request object with the changes and submit it + # for approval. request = SubmitRequest(changes=changes) method = context.node.get_service_method(RequestService.submit) result = method(context=context, request=request, reason=reason) @@ -228,6 +243,7 @@ def create_image_and_pool_request( if isinstance(config, CustomWorkerConfig): return SyftError(message="We only support DockerWorkerConfig.") + # Check if an image already exists for given docker config search_result = self.image_stash.get_by_docker_config( credentials=context.credentials, config=config ) @@ -237,32 +253,36 @@ def create_image_and_pool_request( worker_image: Optional[SyftWorkerImage] = search_result.ok() + if worker_image is not None: + return SyftError( + message="Image already exists for given config. \ + Please use `worker_pool.create_pool_request` to request pool creation." + ) + + # Validate Image Tag try: image_identifier = SyftWorkerImageIdentifier.from_str(tag=tag) except pydantic.ValidationError as e: return SyftError(message=f"Failed to create tag: {e}") # create a list of Change objects and submit a - # request for these changes to the server + # request for these changes for approval changes: List[Change] = [] - if worker_image is not None: - return SyftError( - message="Image already exists for given config. \ - Please use `worker_pool.create_pool_request` to request pool creation." - ) - # Add create custom image change + # If this change is approved, then build an image using the config create_custom_image_change = CreateCustomImageChange( config=config, tag=image_identifier.full_name_with_tag, ) + # Check if a pool already exists for given pool name result = self.stash.get_by_name(context.credentials, pool_name=pool_name) if result.is_err(): return SyftError(message=f"{result.err()}") + # Raise an error if worker pool already exists for the given worker pool name if result.ok() is not None: return SyftError( message=f"Worker Pool with name: {pool_name} already " @@ -270,12 +290,16 @@ def create_image_and_pool_request( ) # Add create worker pool change + # If change is approved then worker pool is created and + # the desired number of workers are added to the pool create_worker_pool_change = CreateCustomWorkerPoolChange( pool_name=pool_name, num_workers=num_workers, config=config, ) changes += [create_custom_image_change, create_worker_pool_change] + + # Create a request object and submit a request for approval request = SubmitRequest(changes=changes) method = context.node.get_service_method(RequestService.submit) result = method(context=context, request=request, reason=reason) @@ -314,6 +338,21 @@ def add_workers( pool_id: Optional[UID] = None, pool_name: Optional[str] = None, ) -> Union[List[ContainerSpawnStatus], SyftError]: + """Add workers to existing worker pool. + + Worker pool is fetched either using the unique pool id or pool name. + + Args: + context (AuthedServiceContext): _description_ + number (int): number of workers to add + pool_id (Optional[UID], optional): Unique UID of the pool. Defaults to None. + pool_name (Optional[str], optional): Unique name of the pool. Defaults to None. + + Returns: + Union[List[ContainerSpawnStatus], SyftError]: List of spawned workers with their status and error if any. + """ + + # Extract pool using either using pool id or pool name if pool_id: result = self.stash.get_by_uid(credentials=context.credentials, uid=pool_id) elif pool_name: @@ -344,6 +383,7 @@ def add_workers( worker_service: WorkerService = context.node.get_service("WorkerService") worker_stash = worker_service.stash + # Add workers to given pool from the given image worker_list, container_statuses = _create_workers_in_pool( context=context, pool_name=worker_pool.name,