diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f33fed5d7a2..ee4f4306f5c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -81,7 +81,6 @@ repos: # files: "^notebooks/(api|tutorials|admin)" hooks: - id: nbqa-isort - - id: nbqa-black - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. diff --git a/notebooks/admin/Custom API + Custom Worker.ipynb b/notebooks/admin/Custom API + Custom Worker.ipynb index ef6bde6ab9c..a27bb7a23c4 100644 --- a/notebooks/admin/Custom API + Custom Worker.ipynb +++ b/notebooks/admin/Custom API + Custom Worker.ipynb @@ -114,8 +114,8 @@ "metadata": {}, "outputs": [], "source": [ - "submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n", - " docker_config=docker_config\n", + "submit_result = domain_client.api.services.worker_image.submit(\n", + " worker_config=docker_config\n", ")\n", "submit_result" ] diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index cb4bd49cf86..870cb655d93 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -227,8 +227,8 @@ "metadata": {}, "outputs": [], "source": [ - "submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n", - " docker_config=docker_config\n", + "submit_result = domain_client.api.services.worker_image.submit(\n", + " worker_config=docker_config\n", ")" ] }, @@ -1095,8 +1095,8 @@ "metadata": {}, "outputs": [], "source": [ - "submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n", - " docker_config=docker_config_2\n", + "submit_result = domain_client.api.services.worker_image.submit(\n", + " worker_config=docker_config_2\n", ")\n", "submit_result" ] diff --git a/notebooks/api/0.8/11-container-images-k8s.ipynb b/notebooks/api/0.8/11-container-images-k8s.ipynb index 77cd71c7912..24f4a7c33df 100644 --- a/notebooks/api/0.8/11-container-images-k8s.ipynb +++ b/notebooks/api/0.8/11-container-images-k8s.ipynb @@ -265,8 +265,8 @@ "metadata": {}, "outputs": [], "source": [ - "submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n", - " docker_config=docker_config\n", + "submit_result = domain_client.api.services.worker_image.submit(\n", + " worker_config=docker_config\n", ")\n", "submit_result" ] @@ -935,8 +935,8 @@ "outputs": [], "source": [ "submit_result = None\n", - "submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n", - " docker_config=docker_config_opendp\n", + "submit_result = domain_client.api.services.worker_image.submit(\n", + " worker_config=docker_config_opendp\n", ")\n", "submit_result" ] diff --git a/packages/syft/src/syft/__init__.py b/packages/syft/src/syft/__init__.py index 280181d7c1b..11d0ae7212b 100644 --- a/packages/syft/src/syft/__init__.py +++ b/packages/syft/src/syft/__init__.py @@ -25,6 +25,7 @@ from .client.user_settings import UserSettings # noqa: F401 from .client.user_settings import settings # noqa: F401 from .custom_worker.config import DockerWorkerConfig # noqa: F401 +from .custom_worker.config import PrebuiltWorkerConfig # noqa: F401 from .node.credentials import SyftSigningKey # noqa: F401 from .node.domain import Domain # noqa: F401 from .node.enclave import Enclave # noqa: F401 diff --git a/packages/syft/src/syft/custom_worker/runner_k8s.py b/packages/syft/src/syft/custom_worker/runner_k8s.py index eeddacd9ebf..f9c48c6e394 100644 --- a/packages/syft/src/syft/custom_worker/runner_k8s.py +++ b/packages/syft/src/syft/custom_worker/runner_k8s.py @@ -13,7 +13,7 @@ from .k8s import get_kr8s_client JSONPATH_AVAILABLE_REPLICAS = "{.status.availableReplicas}" -CREATE_POOL_TIMEOUT_SEC = 60 +CREATE_POOL_TIMEOUT_SEC = 180 SCALE_POOL_TIMEOUT_SEC = 60 @@ -60,8 +60,6 @@ def create_pool( f"jsonpath='{JSONPATH_AVAILABLE_REPLICAS}'={replicas}", timeout=CREATE_POOL_TIMEOUT_SEC, ) - except Exception: - raise finally: if pull_secret: pull_secret.delete(propagation_policy="Foreground") diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index 88f035de0bb..3afbafee914 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -6,6 +6,7 @@ from typing import Any # third party +from pydantic import model_validator from result import Err from result import Ok from result import Result @@ -15,6 +16,7 @@ from ...abstract_node import NodeSideType from ...client.api import APIRegistry from ...client.client import SyftClient +from ...custom_worker.config import DockerWorkerConfig from ...custom_worker.config import WorkerConfig from ...custom_worker.k8s import IN_KUBERNETES from ...node.credentials import SyftVerifyKey @@ -187,7 +189,7 @@ def __repr_syft_nested__(self) -> str: @serializable() -class CreateCustomImageChange(Change): +class CreateCustomImageChangeV2(Change): __canonical_name__ = "CreateCustomImageChange" __version__ = SYFT_OBJECT_VERSION_2 @@ -198,6 +200,25 @@ class CreateCustomImageChange(Change): __repr_attrs__ = ["config", "tag"] + +@serializable() +class CreateCustomImageChange(Change): + __canonical_name__ = "CreateCustomImageChange" + __version__ = SYFT_OBJECT_VERSION_3 + + config: WorkerConfig + tag: str | None = None + registry_uid: UID | None = None + pull_image: bool = True + + __repr_attrs__ = ["config", "tag"] + + @model_validator(mode="after") + def _tag_required_for_dockerworkerconfig(self) -> Self: + if isinstance(self.config, DockerWorkerConfig) and self.tag is None: + raise ValueError("`tag` is required for `DockerWorkerConfig`.") + return self + def _run( self, context: ChangeContext, apply: bool ) -> Result[SyftSuccess, SyftError]: @@ -205,34 +226,44 @@ def _run( worker_image_service = context.node.get_service("SyftWorkerImageService") service_context = context.to_service_ctx() - result = worker_image_service.submit_dockerfile( - service_context, docker_config=self.config + result = worker_image_service.submit( + service_context, worker_config=self.config ) if isinstance(result, SyftError): return Err(result) - result = worker_image_service.stash.get_by_docker_config( + result = worker_image_service.stash.get_by_worker_config( service_context.credentials, config=self.config ) if result.is_err(): return Err(SyftError(message=f"{result.err()}")) - worker_image = result.ok() + if (worker_image := result.ok()) is None: + return Err(SyftError(message="The worker image does not exist.")) - build_result = worker_image_service.build( - service_context, - image_uid=worker_image.id, - tag=self.tag, - registry_uid=self.registry_uid, - pull=self.pull_image, - ) + build_success_message = "Image was pre-built." + + if not worker_image.is_prebuilt: + build_result = worker_image_service.build( + service_context, + image_uid=worker_image.id, + tag=self.tag, + registry_uid=self.registry_uid, + pull=self.pull_image, + ) + + if isinstance(build_result, SyftError): + return Err(build_result) - if isinstance(build_result, SyftError): - return Err(build_result) + build_success_message = build_result.message + + build_success = SyftSuccess( + message=f"Build result: {build_success_message}" + ) - if IN_KUBERNETES: + if IN_KUBERNETES and not worker_image.is_prebuilt: push_result = worker_image_service.push( service_context, image=worker_image.id, @@ -245,11 +276,11 @@ def _run( return Ok( SyftSuccess( - message=f"Build Result: {build_result.message} \n Push Result: {push_result.message}" + message=f"{build_success}\nPush result: {push_result.message}" ) ) - return Ok(build_result) + return Ok(build_success) except Exception as e: return Err(SyftError(message=f"Failed to create/build image: {e}")) @@ -291,7 +322,7 @@ def _run( service_context: AuthedServiceContext = context.to_service_ctx() if self.config is not None: - result = worker_pool_service.image_stash.get_by_docker_config( + result = worker_pool_service.image_stash.get_by_worker_config( service_context.credentials, self.config ) if result.is_err(): diff --git a/packages/syft/src/syft/service/worker/utils.py b/packages/syft/src/syft/service/worker/utils.py index 75d1c36d459..181774da837 100644 --- a/packages/syft/src/syft/service/worker/utils.py +++ b/packages/syft/src/syft/service/worker/utils.py @@ -331,7 +331,6 @@ def create_kubernetes_pool( **kwargs: Any, ) -> list[Pod] | SyftError: pool = None - error = False try: print( @@ -366,11 +365,14 @@ def create_kubernetes_pool( reg_url=reg_url, ) except Exception as e: - error = True - return SyftError(message=f"Failed to start workers {e}") - finally: - if error and pool: + if pool: pool.delete() + # stdlib + import traceback + + return SyftError( + message=f"Failed to start workers {e} {e.__class__} {e.args} {traceback.format_exc()}." + ) return runner.get_pool_pods(pool_name=pool_name) @@ -564,7 +566,7 @@ def create_default_image( image_identifier=SyftWorkerImageIdentifier.from_str(tag), ) - result = image_stash.get_by_docker_config( + result = image_stash.get_by_worker_config( credentials=credentials, config=worker_config, ) diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index da7c39f4582..afd21af9e50 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -6,7 +6,8 @@ import pydantic # relative -from ...custom_worker.config import DockerWorkerConfig +from ...custom_worker.config import PrebuiltWorkerConfig +from ...custom_worker.config import WorkerConfig from ...custom_worker.k8s import IN_KUBERNETES from ...serde.serializable import serializable from ...store.document_store import DocumentStore @@ -39,16 +40,27 @@ def __init__(self, store: DocumentStore) -> None: self.stash = SyftWorkerImageStash(store=store) @service_method( - path="worker_image.submit_dockerfile", - name="submit_dockerfile", + path="worker_image.submit", + name="submit", roles=DATA_OWNER_ROLE_LEVEL, ) - def submit_dockerfile( - self, context: AuthedServiceContext, docker_config: DockerWorkerConfig + def submit( + self, context: AuthedServiceContext, worker_config: WorkerConfig ) -> SyftSuccess | SyftError: + image_identifier: SyftWorkerImageIdentifier | None = None + if isinstance(worker_config, PrebuiltWorkerConfig): + try: + image_identifier = SyftWorkerImageIdentifier.from_str(worker_config.tag) + except Exception: + return SyftError( + f"Invalid Docker image name: {worker_config.tag}.\n" + + "Please specify the image name in this format /:." + ) + worker_image = SyftWorkerImage( - config=docker_config, + config=worker_config, created_by=context.credentials, + image_identifier=image_identifier, ) res = self.stash.set(context.credentials, worker_image) @@ -278,14 +290,14 @@ def get_by_uid( roles=DATA_SCIENTIST_ROLE_LEVEL, ) def get_by_config( - self, context: AuthedServiceContext, docker_config: DockerWorkerConfig + self, context: AuthedServiceContext, worker_config: WorkerConfig ) -> SyftWorkerImage | SyftError: - res = self.stash.get_by_docker_config( - credentials=context.credentials, config=docker_config + res = self.stash.get_by_worker_config( + credentials=context.credentials, config=worker_config ) if res.is_err(): return SyftError( - message=f"Failed to get image with docker config {docker_config}. Error: {res.err()}" + message=f"Failed to get image with docker config {worker_config}. Error: {res.err()}" ) image: SyftWorkerImage = res.ok() return image diff --git a/packages/syft/src/syft/service/worker/worker_image_stash.py b/packages/syft/src/syft/service/worker/worker_image_stash.py index 900bcdd7cd6..ab9dde6701f 100644 --- a/packages/syft/src/syft/service/worker/worker_image_stash.py +++ b/packages/syft/src/syft/service/worker/worker_image_stash.py @@ -48,7 +48,7 @@ def set( ) if isinstance(obj.config, DockerWorkerConfig): - result = self.get_by_docker_config( + result = self.get_by_worker_config( credentials=credentials, config=obj.config ) if result.is_ok() and result.ok() is not None: @@ -62,8 +62,8 @@ def set( ignore_duplicates=ignore_duplicates, ) - def get_by_docker_config( - self, credentials: SyftVerifyKey, config: DockerWorkerConfig + def get_by_worker_config( + self, credentials: SyftVerifyKey, config: WorkerConfig ) -> Result[SyftWorkerImage | None, str]: qks = QueryKeys(qks=[WorkerConfigPK.with_obj(config)]) return self.query_one(credentials=credentials, qks=qks) diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index a8a91965d3c..cef16fb5b72 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -6,7 +6,8 @@ from result import OkErr # relative -from ...custom_worker.config import CustomWorkerConfig +from ...custom_worker.config import DockerWorkerConfig +from ...custom_worker.config import PrebuiltWorkerConfig from ...custom_worker.config import WorkerConfig from ...custom_worker.k8s import IN_KUBERNETES from ...custom_worker.runner_k8s import KubernetesRunner @@ -232,8 +233,8 @@ def create_image_and_pool_request( context: AuthedServiceContext, pool_name: str, num_workers: int, - tag: str, config: WorkerConfig, + tag: str | None = None, registry_uid: UID | None = None, reason: str | None = "", pull_image: bool = True, @@ -246,18 +247,34 @@ def create_image_and_pool_request( pool_name (str): The name of the worker pool. num_workers (int): The number of workers in the pool. config: (WorkerConfig): Config of the image to be built. - tag (str): human-readable manifest identifier that is typically a specific version or variant of an image - reason (Optional[str], optional): The reason for creating the worker image and pool. Defaults to "". + tag (str | None, optional): + a human-readable manifest identifier that is typically a specific version or variant of an image, + only needed for `DockerWorkerConfig` to tag the image after it is built. + reason (str | None, optional): The reason for creating the worker image and pool. Defaults to "". """ - if isinstance(config, CustomWorkerConfig): - return SyftError(message="We only support DockerWorkerConfig.") + if not isinstance(config, DockerWorkerConfig | PrebuiltWorkerConfig): + return SyftError( + message="We only support either `DockerWorkerConfig` or `PrebuiltWorkerConfig`." + ) - if IN_KUBERNETES and registry_uid is None: - return SyftError(message="Registry UID is required in Kubernetes mode.") + if isinstance(config, DockerWorkerConfig): + if tag is None: + return SyftError(message="`tag` is required for `DockerWorkerConfig`.") + + # Validate image tag + try: + SyftWorkerImageIdentifier.from_str(tag=tag) + except pydantic.ValidationError as e: + return SyftError(message=f"Invalid `tag`: {e}.") + + if IN_KUBERNETES and registry_uid is None: + return SyftError( + message="`registry_uid` is required in Kubernetes mode for `DockerWorkerConfig`." + ) # Check if an image already exists for given docker config - search_result = self.image_stash.get_by_docker_config( + search_result = self.image_stash.get_by_worker_config( credentials=context.credentials, config=config ) @@ -272,12 +289,6 @@ def create_image_and_pool_request( Please use `worker_pool.create_pool_request` to request pool creation." ) - # Validate Image Tag - try: - SyftWorkerImageIdentifier.from_str(tag=tag) - except pydantic.ValidationError as e: - return SyftError(message=f"Failed to create tag: {e}") - # create a list of Change objects and submit a # request for these changes for approval changes: list[Change] = [] diff --git a/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py b/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py index 8aad9f5a27e..2b65105c160 100644 --- a/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py +++ b/packages/syft/tests/syft/worker_pool/worker_pool_service_test.py @@ -1,9 +1,12 @@ # third party from faker import Faker +import pytest # syft absolute import syft as sy from syft.custom_worker.config import DockerWorkerConfig +from syft.custom_worker.config import PrebuiltWorkerConfig +from syft.custom_worker.config import WorkerConfig from syft.node.worker import Worker from syft.service.request.request import CreateCustomWorkerPoolChange from syft.service.response import SyftSuccess @@ -13,8 +16,35 @@ # relative from ..request.request_code_accept_deny_test import get_ds_client +PREBUILT_IMAGE_TAG = f"docker.io/openmined/grid-backend:{sy.__version__}" -def test_create_image_and_pool_request_accept(faker: Faker, worker: Worker): +CUSTOM_DOCKERFILE = f""" +FROM {PREBUILT_IMAGE_TAG} + +RUN pip install recordlinkage +""" + +CUSTOM_IMAGE_TAG = "docker.io/openmined/custom-worker-recordlinkage:latest" + +WORKER_CONFIG_TEST_CASES_WITH_N_IMAGES = [ + ( + CUSTOM_IMAGE_TAG, + DockerWorkerConfig(dockerfile=CUSTOM_DOCKERFILE), + 2, # total number of images. + # 2 since we pull a pre-built image (1) as the base image to build a custom image (2) + ), + (None, PrebuiltWorkerConfig(tag=PREBUILT_IMAGE_TAG), 1), +] + +WORKER_CONFIG_TEST_CASES = [ + test_case[:2] for test_case in WORKER_CONFIG_TEST_CASES_WITH_N_IMAGES +] + + +@pytest.mark.parametrize("docker_tag,worker_config", WORKER_CONFIG_TEST_CASES) +def test_create_image_and_pool_request_accept( + faker: Faker, worker: Worker, docker_tag: str, worker_config: WorkerConfig +) -> None: """ Test the functionality of `SyftWorkerPoolService.create_image_and_pool_request` when the request is accepted @@ -25,22 +55,15 @@ def test_create_image_and_pool_request_accept(faker: Faker, worker: Worker): assert root_client.credentials != ds_client.credentials # the DS makes a request to create an image and a pool based on the image - custom_dockerfile = f""" - FROM openmined/grid-backend:{sy.__version__} - - RUN pip install recordlinkage - """ - docker_config = DockerWorkerConfig(dockerfile=custom_dockerfile) - docker_tag = "openmined/custom-worker-recordlinkage:latest" request = ds_client.api.services.worker_pool.create_image_and_pool_request( pool_name="recordlinkage-pool", num_workers=2, tag=docker_tag, - config=docker_config, + config=worker_config, reason="I want to do some more cool data science with PySyft and Recordlinkage", ) assert len(request.changes) == 2 - assert request.changes[0].config == docker_config + assert request.changes[0].config == worker_config assert request.changes[1].num_workers == 2 assert request.changes[1].pool_name == "recordlinkage-pool" @@ -51,17 +74,32 @@ def test_create_image_and_pool_request_accept(faker: Faker, worker: Worker): assert root_client.requests[-1].status.value == 2 all_image_tags = [ - im.image_identifier.repo_with_tag + im.image_identifier.full_name_with_tag for im in root_client.images.get_all() if im.image_identifier ] - assert docker_tag in all_image_tags + tag = ( + worker_config.tag + if isinstance(worker_config, PrebuiltWorkerConfig) + else docker_tag + ) + assert tag in all_image_tags launched_pool = root_client.worker_pools["recordlinkage-pool"] assert isinstance(launched_pool, WorkerPool) assert len(launched_pool.worker_list) == 2 -def test_create_pool_request_accept(faker: Faker, worker: Worker): +@pytest.mark.parametrize( + "docker_tag,worker_config,n_images", + WORKER_CONFIG_TEST_CASES_WITH_N_IMAGES, +) +def test_create_pool_request_accept( + faker: Faker, + worker: Worker, + docker_tag: str, + worker_config: WorkerConfig, + n_images: int, +) -> None: """ Test the functionality of `SyftWorkerPoolService.create_pool_request` when the request is accepted @@ -72,29 +110,27 @@ def test_create_pool_request_accept(faker: Faker, worker: Worker): assert root_client.credentials != ds_client.credentials # the DO submits the docker config to build an image - custom_dockerfile_str = f""" - FROM openmined/grid-backend:{sy.__version__} - - RUN pip install opendp - """ - docker_config = DockerWorkerConfig(dockerfile=custom_dockerfile_str) - submit_result = root_client.api.services.worker_image.submit_dockerfile( - docker_config=docker_config + submit_result = root_client.api.services.worker_image.submit( + worker_config=worker_config ) assert isinstance(submit_result, SyftSuccess) - assert len(root_client.images.get_all()) == 2 + assert len(root_client.images.get_all()) == n_images # The root client builds the image - worker_image: SyftWorkerImage = root_client.images[1] - docker_tag = "openmined/custom-worker-opendp:latest" - docker_build_result = root_client.api.services.worker_image.build( - image_uid=worker_image.id, - tag=docker_tag, + worker_image: SyftWorkerImage = root_client.api.services.worker_image.get_by_config( + worker_config ) - # update the worker image variable after the image was built - worker_image: SyftWorkerImage = root_client.images[1] - assert isinstance(docker_build_result, SyftSuccess) - assert worker_image.image_identifier.repo_with_tag == docker_tag + if not worker_image.is_prebuilt: + docker_build_result = root_client.api.services.worker_image.build( + image_uid=worker_image.id, + tag=docker_tag, + ) + # update the worker image variable after the image was built + worker_image: SyftWorkerImage = ( + root_client.api.services.worker_image.get_by_config(worker_config) + ) + assert isinstance(docker_build_result, SyftSuccess) + assert worker_image.image_identifier.full_name_with_tag == docker_tag # The DS client submits a request to create a pool from an existing image request = ds_client.api.services.worker_pool.pool_creation_request( @@ -113,3 +149,19 @@ def test_create_pool_request_accept(faker: Faker, worker: Worker): launched_pool = root_client.worker_pools["opendp-pool"] assert isinstance(launched_pool, WorkerPool) assert len(launched_pool.worker_list) == 3 + + +WORKER_CONFIGS = [test_case[1] for test_case in WORKER_CONFIG_TEST_CASES] + + +@pytest.mark.parametrize("worker_config", WORKER_CONFIGS) +def test_get_by_worker_config( + worker: Worker, + worker_config: WorkerConfig, +) -> None: + root_client = worker.root_client + for config in WORKER_CONFIGS: + root_client.api.services.worker_image.submit(worker_config=config) + + worker_image = root_client.api.services.worker_image.get_by_config(worker_config) + assert worker_image.config == worker_config diff --git a/packages/syft/tests/syft/worker_pool/worker_test.py b/packages/syft/tests/syft/worker_pool/worker_test.py index 6503e51eb66..3c05875a009 100644 --- a/packages/syft/tests/syft/worker_pool/worker_test.py +++ b/packages/syft/tests/syft/worker_pool/worker_test.py @@ -23,8 +23,8 @@ def test_syft_worker(worker: Worker): """ root_client = worker.root_client docker_config = get_docker_config() - submit_result = root_client.api.services.worker_image.submit_dockerfile( - docker_config=docker_config + submit_result = root_client.api.services.worker_image.submit( + worker_config=docker_config ) assert isinstance(submit_result, SyftSuccess) diff --git a/tests/integration/container_workload/pool_image_test.py b/tests/integration/container_workload/pool_image_test.py index 96613240660..53f0b8b10a1 100644 --- a/tests/integration/container_workload/pool_image_test.py +++ b/tests/integration/container_workload/pool_image_test.py @@ -1,8 +1,8 @@ # stdlib import os +from uuid import uuid4 # third party -from faker import Faker import numpy as np import pytest import requests @@ -11,11 +11,14 @@ import syft as sy from syft.client.domain_client import DomainClient from syft.custom_worker.config import DockerWorkerConfig +from syft.custom_worker.config import PrebuiltWorkerConfig from syft.service.request.request import Request +from syft.service.response import SyftError from syft.service.response import SyftSuccess from syft.service.worker.worker_image import SyftWorkerImage from syft.service.worker.worker_pool import SyftWorker from syft.service.worker.worker_pool import WorkerPool +from syft.types.uid import UID registry = os.getenv("SYFT_BASE_IMAGE_REGISTRY", "docker.io") repo = "openmined/grid-backend" @@ -32,7 +35,7 @@ @pytest.fixture -def external_registry_uid(domain_1_port): +def external_registry_uid(domain_1_port: int) -> UID: domain_client: DomainClient = sy.login( port=domain_1_port, email="info@openmined.org", password="changethis" ) @@ -56,22 +59,25 @@ def external_registry_uid(domain_1_port): return image_registry_list[0].id +def make_docker_config_test_case(pkg: str) -> tuple[str, str]: + return ( + DockerWorkerConfig( + dockerfile=(f"FROM {registry}/{repo}:{tag}\nRUN pip install {pkg}\n") + ), + f"openmined/custom-worker-{pkg}:latest", + ) + + @pytest.mark.container_workload -def test_image_build(domain_1_port, external_registry_uid) -> None: +def test_image_build(domain_1_port: int, external_registry_uid: UID) -> None: domain_client: DomainClient = sy.login( port=domain_1_port, email="info@openmined.org", password="changethis" ) - # Submit Docker Worker Config. - docker_config_rl = f""" - FROM {registry}/{repo}:{tag} - RUN pip install recordlinkage - """ - docker_config = DockerWorkerConfig(dockerfile=docker_config_rl) + docker_config, docker_tag = make_docker_config_test_case("recordlinkage") - # Submit Worker Image - submit_result = domain_client.api.services.worker_image.submit_dockerfile( - docker_config=docker_config + submit_result = domain_client.api.services.worker_image.submit( + worker_config=docker_config ) assert isinstance(submit_result, SyftSuccess) assert len(domain_client.images.get_all()) == 2 @@ -81,7 +87,6 @@ def test_image_build(domain_1_port, external_registry_uid) -> None: assert not isinstance(workerimage, sy.SyftError) # Build docker image - docker_tag = "openmined/custom-worker-rl:latest" docker_build_result = domain_client.api.services.worker_image.build( image_uid=workerimage.id, tag=docker_tag, @@ -100,58 +105,58 @@ def test_image_build(domain_1_port, external_registry_uid) -> None: @pytest.mark.container_workload -def test_pool_launch(domain_1_port, external_registry_uid) -> None: +@pytest.mark.parametrize("prebuilt", [True, False]) +def test_pool_launch( + domain_1_port: int, external_registry_uid: UID, prebuilt: bool +) -> None: domain_client: DomainClient = sy.login( port=domain_1_port, email="info@openmined.org", password="changethis" ) - assert len(domain_client.worker_pools.get_all()) == 1 - - # Submit Docker Worker Config - docker_config_opendp = f""" - FROM {registry}/{repo}:{tag} - RUN pip install opendp - """ - docker_config = DockerWorkerConfig(dockerfile=docker_config_opendp) # Submit Worker Image - submit_result = domain_client.api.services.worker_image.submit_dockerfile( - docker_config=docker_config + worker_config, docker_tag = ( + (PrebuiltWorkerConfig(tag="docker.io/library/nginx:latest"), None) + if prebuilt + else make_docker_config_test_case("opendp") + ) + submit_result = domain_client.api.services.worker_image.submit( + worker_config=worker_config ) assert isinstance(submit_result, SyftSuccess) - worker_image = domain_client.api.services.worker_image.get_by_config(docker_config) + worker_image = domain_client.api.services.worker_image.get_by_config(worker_config) assert not isinstance(worker_image, sy.SyftError) assert worker_image is not None - assert not worker_image.is_built - # Build docker image - docker_tag = "openmined/custom-worker-opendp:latest" - docker_build_result = domain_client.api.services.worker_image.build( - image_uid=worker_image.id, - tag=docker_tag, - registry_uid=external_registry_uid, - ) - assert isinstance(docker_build_result, SyftSuccess) + if not worker_image.is_prebuilt: + assert not worker_image.is_built - # Push Image to External registry - push_result = domain_client.api.services.worker_image.push( - worker_image.id, - username=external_registry_username, - password=external_registry_password, - ) - assert isinstance(push_result, sy.SyftSuccess), str(push_result) + # Build docker image + docker_build_result = domain_client.api.services.worker_image.build( + image_uid=worker_image.id, + tag=docker_tag, + registry_uid=external_registry_uid, + ) + assert isinstance(docker_build_result, SyftSuccess) + + # Push Image to External registry + push_result = domain_client.api.services.worker_image.push( + worker_image.id, + username=external_registry_username, + password=external_registry_password, + ) + assert isinstance(push_result, sy.SyftSuccess), str(push_result) # Launch a worker pool - worker_pool_name = "custom-worker-pool-opendp" + worker_pool_name = f"custom-worker-pool-opendp{'-prebuilt' if prebuilt else ''}" worker_pool_res = domain_client.api.services.worker_pool.launch( name=worker_pool_name, image_uid=worker_image.id, num_workers=3, ) - assert len(worker_pool_res) == 3 + assert not isinstance(worker_pool_res, SyftError) assert all(worker.error is None for worker in worker_pool_res) - assert len(domain_client.worker_pools.get_all()) == 2 worker_pool = domain_client.worker_pools[worker_pool_name] assert len(worker_pool.worker_list) == 3 @@ -186,7 +191,10 @@ def test_pool_launch(domain_1_port, external_registry_uid) -> None: @pytest.mark.container_workload -def test_pool_image_creation_job_requests(domain_1_port, external_registry_uid) -> None: +@pytest.mark.parametrize("prebuilt", [True, False]) +def test_pool_image_creation_job_requests( + domain_1_port: int, external_registry_uid: UID, prebuilt: bool +) -> None: """ Test register ds client, ds requests to create an image and pool creation, do approves, then ds creates a function attached to the worker pool, then creates another @@ -196,8 +204,7 @@ def test_pool_image_creation_job_requests(domain_1_port, external_registry_uid) domain_client: DomainClient = sy.login( port=domain_1_port, email="info@openmined.org", password="changethis" ) - fake = Faker() - ds_username = fake.user_name() + ds_username = uuid4().hex[:8] ds_email = ds_username + "@example.com" res = domain_client.register( name=ds_username, @@ -206,27 +213,35 @@ def test_pool_image_creation_job_requests(domain_1_port, external_registry_uid) password_verify="secret_pw", ) assert isinstance(res, SyftSuccess) + + # Grant user permission to request code execution + ds = next(u for u in domain_client.users if u.email == ds_email) + ds.allow_mock_execution() + ds_client = sy.login(email=ds_email, password="secret_pw", port=domain_1_port) # the DS makes a request to create an image and a pool based on the image - docker_config_np = f""" - FROM {registry}/{repo}:{tag} - RUN pip install numpy - """ - docker_config = DockerWorkerConfig(dockerfile=docker_config_np) - docker_tag = "openmined/custom-worker-np:latest" - worker_pool_name = "custom-worker-pool-numpy" - request = ds_client.api.services.worker_pool.create_image_and_pool_request( - pool_name=worker_pool_name, - num_workers=1, - tag=docker_tag, - config=docker_config, - reason="I want to do some more cool data science with PySyft and Recordlinkage", - registry_uid=external_registry_uid, + worker_config, docker_tag = ( + (PrebuiltWorkerConfig(tag=f"{registry}/{repo}:{tag}"), None) + if prebuilt + else make_docker_config_test_case("numpy") ) + + worker_pool_name = f"custom-worker-pool-numpy{'-prebuilt' if prebuilt else ''}" + + kwargs = { + "pool_name": worker_pool_name, + "num_workers": 1, + "config": worker_config, + "reason": "I want to do some more cool data science with PySyft", + } + if not prebuilt: + kwargs.update({"tag": docker_tag, "registry_uid": external_registry_uid}) + + request = ds_client.api.services.worker_pool.create_image_and_pool_request(**kwargs) assert isinstance(request, Request) assert len(request.changes) == 2 - assert request.changes[0].config == docker_config + assert request.changes[0].config == worker_config assert request.changes[1].num_workers == 1 assert request.changes[1].pool_name == worker_pool_name @@ -251,7 +266,7 @@ def test_pool_image_creation_job_requests(domain_1_port, external_registry_uid) assert isinstance(worker.logs, str) assert worker.job_id is None - built_image = ds_client.api.services.worker_image.get_by_config(docker_config) + built_image = ds_client.api.services.worker_image.get_by_config(worker_config) assert isinstance(built_image, SyftWorkerImage) assert built_image.id == launched_pool.image.id assert worker.image.id == built_image.id @@ -259,7 +274,7 @@ def test_pool_image_creation_job_requests(domain_1_port, external_registry_uid) # Dataset data = np.array([1, 2, 3]) data_action_obj = sy.ActionObject.from_obj(data) - data_pointer = domain_client.api.services.action.set(data_action_obj) + data_pointer = ds_client.api.services.action.set(data_action_obj) # Function @sy.syft_function( @@ -286,7 +301,7 @@ def custom_worker_func(x): job.wait() assert job.status.value == "completed" - job = domain_client.jobs[-1] + job = domain_client.jobs.get_by_user_code_id(job.user_code_id)[-1] assert job.job_worker_id == worker.id # Validate the result received from the syft function