diff --git a/docs/requirements.txt b/docs/requirements.txt index c27a16b6031..d9247032aa1 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -4,9 +4,9 @@ markupsafe==2.0.1 pydata-sphinx-theme==0.7.2 pygments>=2.15.0 # not directly required, pinned by Snyk to avoid a vulnerability requests>=2.31.0 # not directly required, pinned by Snyk to avoid a vulnerability +setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability sphinx==4.3.0 sphinx-autoapi==1.8.4 sphinx-code-include==1.1.1 sphinx-copybutton==0.4.0 sphinx-panels==0.6.0 -setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 4a24c7cbfd8..d99a3f41375 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -22,7 +22,7 @@ "import os\n", "sy.requires(SYFT_VERSION)\n", "from syft.service.worker.worker_image import SyftWorkerImage\n", - "from syft.service.worker.worker_pool import WorkerStatus, SyftWorker\n", + "from syft.service.worker.image_registry import SyftImageRegistry\n", "from syft.custom_worker.config import DockerWorkerConfig" ] }, @@ -57,6 +57,14 @@ "domain_client = domain.login(email=\"info@openmined.org\", password=\"changethis\")" ] }, + { + "cell_type": "markdown", + "id": "3c7a124a", + "metadata": {}, + "source": [ + "#### Submit Dockerfile" + ] + }, { "cell_type": "code", "execution_count": null, @@ -160,6 +168,76 @@ "assert isinstance(workerimage, SyftWorkerImage)" ] }, + { + "cell_type": "markdown", + "id": "91a66871", + "metadata": {}, + "source": [ + "#### Create Registry" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cde8bfff", + "metadata": {}, + "outputs": [], + "source": [ + "image_add_res = domain_client.api.services.image_registry.add(\"localhost:5678\")\n", + "image_add_res" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "82321b35", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(image_add_res, sy.SyftSuccess), str(image_add_res)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3d4a4c33", + "metadata": {}, + "outputs": [], + "source": [ + "images = domain_client.api.services.image_registry.get_all()\n", + "assert len(images) == 1\n", + "images" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22f6e2f6", + "metadata": {}, + "outputs": [], + "source": [ + "local_registry = images[0]\n", + "local_registry" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cb9664ca", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(local_registry, SyftImageRegistry)" + ] + }, + { + "cell_type": "markdown", + "id": "637a9596", + "metadata": {}, + "source": [ + "#### Build Image" + ] + }, { "cell_type": "code", "execution_count": null, @@ -168,7 +246,7 @@ "outputs": [], "source": [ "docker_tag = \"openmined/test-nginx:0.7.8\"\n", - "docker_build_res = domain_client.api.services.worker_image.build(uid=workerimage.id, tag=docker_tag)" + "docker_build_res = domain_client.api.services.worker_image.build(image_uid=workerimage.id, tag=docker_tag)" ] }, { @@ -188,7 +266,7 @@ "metadata": {}, "outputs": [], "source": [ - "assert isinstance(docker_build_res, sy.SyftSuccess)" + "assert isinstance(docker_build_res, sy.SyftSuccess), str(docker_build_res)" ] }, { @@ -292,6 +370,14 @@ " return None" ] }, + { + "cell_type": "markdown", + "id": "f5007073", + "metadata": {}, + "source": [ + "#### Create Worker Pool From Image" + ] + }, { "cell_type": "code", "execution_count": null, @@ -542,7 +628,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.11.2" } }, "nbformat": 4, diff --git a/packages/syft/src/syft/custom_worker/builder.py b/packages/syft/src/syft/custom_worker/builder.py index b4acda50a22..07db5b26057 100644 --- a/packages/syft/src/syft/custom_worker/builder.py +++ b/packages/syft/src/syft/custom_worker/builder.py @@ -1,43 +1,82 @@ # stdlib import contextlib +import io import os.path from pathlib import Path +from typing import Any +from typing import Iterable +from typing import Optional from typing import Tuple # third party import docker +from docker.models.images import Image # relative from .config import CustomWorkerConfig +from .config import DockerWorkerConfig +from .config import WorkerConfig class CustomWorkerBuilder: TYPE_CPU = "cpu" TYPE_GPU = "gpu" - DOCKERFILE_PROD_PATH = os.path.expandvars("$APPDIR/grid/") - DOCKERFILE_DEV_PATH = "../../../../../grid/backend/" + TEMPLATE_DIR_PROD = os.path.expandvars("$APPDIR/grid/") + TEMPLATE_DIR_DEV = "../../../../../grid/backend/" CUSTOM_IMAGE_PREFIX = "custom-worker" BUILD_MAX_WAIT = 30 * 60 - def build_image(self, config: CustomWorkerConfig) -> None: + def build_image( + self, + config: WorkerConfig, + tag: str = None, + **kwargs: Any, + ) -> Tuple[Image, Iterable[str]]: """ - Builds a Docker image for the custom worker based on the provided configuration. + Builds a Docker image from the given configuration. Args: - config (CustomImageConfig): The configuration for building the Docker image. - Returns: - bool: True if the image was built successfully, raises Exception otherwise. + config (WorkerConfig): The configuration for building the Docker image. + tag (str): The tag to use for the image. + """ + + if isinstance(config, DockerWorkerConfig): + return self._build_dockerfile(config, tag, **kwargs) + elif isinstance(config, CustomWorkerConfig): + return self._build_template(config, **kwargs) + else: + raise TypeError("Unknown worker config type") + + def push_image(self, tag: str, **kwargs: Any) -> str: + """ + Pushes a Docker image to the given repo. + Args: + repo (str): The repo to push the image to. + tag (str): The tag to use for the image. """ + return self._push_image(tag, **kwargs) + + def _build_dockerfile(self, config: DockerWorkerConfig, tag: str, **kwargs): + print("Building with provided dockerfile") + + # convert string to file-like object + file_obj = io.BytesIO(config.dockerfile.encode("utf-8")) + return self._build_image(fileobj=file_obj, tag=tag, **kwargs) + + def _build_template(self, config: CustomWorkerConfig, **kwargs: Any): + # Builds a Docker pre-made CPU/GPU image template using a CustomWorkerConfig + print("Building with dockerfule template") + # remove once GPU is supported if config.build.gpu: raise Exception("GPU custom worker is not supported yet") type = self.TYPE_GPU if config.build.gpu else self.TYPE_CPU - contextdir, dockerfile = self.find_worker_ctx(type) + contextdir, dockerfile = self._find_template_dir(type) imgtag = config.get_signature()[:8] @@ -54,27 +93,41 @@ def build_image(self, config: CustomWorkerConfig) -> None: f"with args={build_args}" ) - try: - # TODO: Push logs to mongo/seaweed? - with contextlib.closing(docker.from_env()) as client: - client.images.build( - path=str(contextdir), - dockerfile=dockerfile, - pull=True, - tag=f"{self.CUSTOM_IMAGE_PREFIX}-{type}:{imgtag}", - timeout=self.BUILD_MAX_WAIT, - buildargs=build_args, + return self._build_image( + tag=f"{self.CUSTOM_IMAGE_PREFIX}-{type}:{imgtag}", + path=str(contextdir), + dockerfile=dockerfile, + buildargs=build_args, + ) + + def _build_image(self, tag: str, **build_opts) -> Tuple[Image, Iterable]: + # Core docker build call. Func signature should match with Docker SDK's BuildApiMixin + with contextlib.closing(docker.from_env()) as client: + image = client.images.build( + tag=tag, + pull=True, + timeout=self.BUILD_MAX_WAIT, + **build_opts, + ) + return image + + def _push_image( + self, + tag: str, + registry_url: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + ) -> str: + with contextlib.closing(docker.from_env()) as client: + if registry_url and username and password: + client.login( + username=username, password=password, registry=registry_url ) - return - except docker.errors.BuildError as e: - raise e - except docker.errors.APIError as e: - raise e - except Exception as e: - raise e + result = client.images.push(repository=tag) + return result - def find_worker_ctx(self, type: str) -> Tuple[Path, str]: + def _find_template_dir(self, type: str) -> Tuple[Path, str]: """ Find the Worker Dockerfile and it's context path - PROD will be in `$APPDIR/grid/` @@ -88,8 +141,8 @@ def find_worker_ctx(self, type: str) -> Tuple[Path, str]: """ filename = f"worker_{type}.dockerfile" lookup_paths = [ - Path(self.DOCKERFILE_PROD_PATH, filename).resolve(), - Path(__file__, self.DOCKERFILE_DEV_PATH, filename).resolve(), + Path(self.TEMPLATE_DIR_PROD, filename).resolve(), + Path(__file__, self.TEMPLATE_DIR_DEV, filename).resolve(), ] for path in lookup_paths: if path.exists(): diff --git a/packages/syft/src/syft/custom_worker/config.py b/packages/syft/src/syft/custom_worker/config.py index 65fa8dabc30..40ef66606c7 100644 --- a/packages/syft/src/syft/custom_worker/config.py +++ b/packages/syft/src/syft/custom_worker/config.py @@ -121,3 +121,6 @@ def __eq__(self, __value: object) -> bool: def __hash__(self) -> int: return hash(self.dockerfile) + + def __str__(self) -> str: + return self.dockerfile diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 322cc7904a3..240dc31ba1a 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -95,6 +95,7 @@ from ..service.user.user_roles import ServiceRole from ..service.user.user_service import UserService from ..service.user.user_stash import UserStash +from ..service.worker.image_registry_service import SyftImageRegistryService from ..service.worker.utils import DEFAULT_WORKER_IMAGE_TAG from ..service.worker.utils import DEFAULT_WORKER_POOL_NAME from ..service.worker.utils import create_default_image @@ -317,6 +318,7 @@ def __init__( MigrateStateService, SyftWorkerImageService, SyftWorkerPoolService, + SyftImageRegistryService, ] if services is None else services @@ -870,6 +872,7 @@ def _construct_services(self): MigrateStateService, SyftWorkerImageService, SyftWorkerPoolService, + SyftImageRegistryService, ] if OBLV: @@ -1434,7 +1437,7 @@ def create_default_worker_pool(node: Node) -> Optional[SyftError]: # Build the Image for given tag result = image_build_method( - context, uid=default_image.id, tag=DEFAULT_WORKER_IMAGE_TAG + context, image_uid=default_image.id, tag=DEFAULT_WORKER_IMAGE_TAG ) if isinstance(result, SyftError): diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 7c455db1371..fa31a1419b6 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -957,7 +957,7 @@ "SyftWorkerImage": { "1": { "version": 1, - "hash": "abfca84237a47aab399970a0a1ad747bb7c0f325944b6647be38050e1e0ac697", + "hash": "2a9585b6a286e24f1a9f3f943d0128730cf853edc549184dc1809d19e1eec54b", "action": "add" } }, @@ -1061,6 +1061,13 @@ "hash": "3644c2caf4c01499e23b4e94d6a41fed0e39ce63c7fc5318c4e205cae9555308", "action": "add" } + }, + "SyftImageRegistry": { + "1": { + "version": 1, + "hash": "dc83910c91947e3d9eaa3e6f8592237448f0408668c7cca80450b5fcd54722e1", + "action": "add" + } } } } diff --git a/packages/syft/src/syft/service/worker/image_identifier.py b/packages/syft/src/syft/service/worker/image_identifier.py new file mode 100644 index 00000000000..fc8e750367e --- /dev/null +++ b/packages/syft/src/syft/service/worker/image_identifier.py @@ -0,0 +1,95 @@ +# stdlib +from typing import Optional +from typing import Tuple + +# third party +from typing_extensions import Self + +# relative +from ...serde.serializable import serializable +from ...types.base import SyftBaseModel +from .image_registry import SyftImageRegistry + + +@serializable() +class SyftWorkerImageIdentifier(SyftBaseModel): + """ + Class to identify syft worker images. + If a user provides an image's identifier with + "docker.io/openmined/test-nginx:0.7.8", the convention we use for + image name, tag and repo for now is + tag = 0.7.8 + repo = openmined/test-nginx + repo_with_tag = openmined/test-nginx:0.7.8 + full_name = docker.io/openmined/test-nginx + full_name_with_tag = docker.io/openmined/test-nginx:0.7.8 + + References: + https://docs.docker.com/engine/reference/commandline/tag/#tag-an-image-referenced-by-name-and-tag + """ + + registry: Optional[SyftImageRegistry] + repo: str + tag: str + + __repr_attrs__ = ["registry", "repo", "tag"] + + @classmethod + def with_registry(cls, tag: str, registry: SyftImageRegistry) -> Self: + """Build a SyftWorkerImageTag from Docker tag & a previously created SyftImageRegistry object.""" + registry_str, repo, tag = SyftWorkerImageIdentifier.parse_str(tag) + + # if we parsed a registry string, make sure it matches the registry object + if registry_str and registry_str != registry.url: + raise ValueError(f"Registry URL mismatch: {registry_str} != {registry.url}") + + return cls(repo=repo, tag=tag, registry=registry) + + @classmethod + def from_str(cls, tag: str) -> Self: + """Build a SyftWorkerImageTag from a pure-string standard Docker tag.""" + registry, repo, tag = SyftWorkerImageIdentifier.parse_str(tag) + return cls(repo=repo, registry=registry, tag=tag) + + @staticmethod + def parse_str(tag: str) -> Tuple[Optional[str], str, str]: + url, tag = tag.rsplit(":", 1) + args = url.rsplit("/", 2) + + if len(args) == 3: + registry = args[0] + repo = "/".join(args[1:]) + else: + registry = None + repo = "/".join(args) + + return registry, repo, tag + + @property + def repo_with_tag(self) -> str: + if self.repo or self.tag: + return f"{self.repo}:{self.tag}" + return None + + @property + def full_name_with_tag(self) -> str: + if self.registry: + return f"{self.registry.url}/{self.repo}:{self.tag}" + else: + # default registry is always docker.io + return f"docker.io/{self.repo}:{self.tag}" + + @property + def registry_host(self) -> str: + if self.registry is None: + return "" + elif isinstance(self.registry, str): + return self.registry + else: + return self.registry.url + + def __hash__(self) -> int: + return hash(self.repo + self.tag + str(hash(self.registry))) + + def __str__(self) -> str: + return f"registry: {str(self.registry)}, repo: {self.repo}, tag: {self.tag}" diff --git a/packages/syft/src/syft/service/worker/image_registry.py b/packages/syft/src/syft/service/worker/image_registry.py new file mode 100644 index 00000000000..2395e4b764a --- /dev/null +++ b/packages/syft/src/syft/service/worker/image_registry.py @@ -0,0 +1,33 @@ +# relative +from ...serde.serializable import serializable +from ...types.syft_object import SYFT_OBJECT_VERSION_1 +from ...types.syft_object import SyftObject +from ...types.uid import UID + + +@serializable() +class SyftImageRegistry(SyftObject): + __canonical_name__ = "SyftImageRegistry" + __version__ = SYFT_OBJECT_VERSION_1 + + __attr_searchable__ = ["url"] + __attr_unique__ = ["url"] + + __repr_attrs__ = ["url"] + + id: UID + url: str + + @classmethod + def from_url(cls, full_str: str): + return cls(id=UID(), url=full_str) + + @property + def tls_enabled(self) -> bool: + return self.url.startswith("https") + + def __hash__(self) -> int: + return hash(self.url + str(self.tls_enabled)) + + def __str__(self) -> str: + return self.url diff --git a/packages/syft/src/syft/service/worker/image_registry_service.py b/packages/syft/src/syft/service/worker/image_registry_service.py new file mode 100644 index 00000000000..48837d4ee9c --- /dev/null +++ b/packages/syft/src/syft/service/worker/image_registry_service.py @@ -0,0 +1,114 @@ +# stdlib +from typing import List +from typing import Union + +# relative +from ...serde.serializable import serializable +from ...store.document_store import DocumentStore +from ...types.uid import UID +from ..context import AuthedServiceContext +from ..response import SyftError +from ..response import SyftSuccess +from ..service import AbstractService +from ..service import SERVICE_TO_TYPES +from ..service import TYPE_TO_SERVICE +from ..service import service_method +from ..user.user_roles import DATA_OWNER_ROLE_LEVEL +from .image_registry import SyftImageRegistry +from .image_registry_stash import SyftImageRegistryStash + +__all__ = ["SyftImageRegistryService"] + + +@serializable() +class SyftImageRegistryService(AbstractService): + store: DocumentStore + stash: SyftImageRegistryStash + + def __init__(self, store: DocumentStore) -> None: + self.store = store + self.stash = SyftImageRegistryStash(store=store) + + @service_method( + path="image_registry.add", + name="add", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def add( + self, + context: AuthedServiceContext, + url: str, + ) -> Union[SyftSuccess, SyftError]: + registry = SyftImageRegistry.from_url(url) + res = self.stash.set(context.credentials, registry) + + if res.is_err(): + return SyftError(message=res.err()) + + return SyftSuccess( + message=f"Image registry created successfully" + ) + + @service_method( + path="image_registry.delete", + name="delete", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def delete( + self, + context: AuthedServiceContext, + uid: UID = None, + url: str = None, + ) -> Union[SyftSuccess, SyftError]: + # TODO - we need to make sure that there are no workers running an image bound to this registry + + # if url is provided, get uid from url + if url: + res = self.stash.delete_by_url(context.credentials, url) + if res.is_err(): + return SyftError(message=res.err()) + return SyftSuccess( + message=f"Image registry successfully deleted." + ) + + # if uid is provided, delete by uid + if uid: + res = self.stash.delete_by_uid(context.credentials, uid) + if res.is_err(): + return SyftError(message=res.err()) + return SyftSuccess( + message=f"Image registry successfully deleted." + ) + else: + return SyftError(message="Either UID or URL must be provided.") + + @service_method( + path="image_registry.get_all", + name="get_all", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def get_all( + self, + context: AuthedServiceContext, + ) -> Union[List[SyftImageRegistry], SyftError]: + result = self.stash.get_all(context.credentials) + if result.is_err(): + return SyftError(message=result.err()) + return result + + @service_method( + path="image_registry.get_by_id", + name="get_by_id", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def get_by_id( + self, context: AuthedServiceContext, uid: UID + ) -> Union[SyftImageRegistry, SyftError]: + result = self.stash.get_by_uid(context.credentials, uid) + if result.is_err(): + return SyftError(message=result.err()) + return result + + +TYPE_TO_SERVICE[SyftImageRegistry] = SyftImageRegistryService +SERVICE_TO_TYPES[SyftImageRegistryService].update({SyftImageRegistry}) diff --git a/packages/syft/src/syft/service/worker/image_registry_stash.py b/packages/syft/src/syft/service/worker/image_registry_stash.py new file mode 100644 index 00000000000..37f71877fc1 --- /dev/null +++ b/packages/syft/src/syft/service/worker/image_registry_stash.py @@ -0,0 +1,51 @@ +# stdlib +from typing import Optional + +# third party +from result import Ok +from result import Result + +# relative +from ...node.credentials import SyftVerifyKey +from ...serde.serializable import serializable +from ...store.document_store import BaseUIDStoreStash +from ...store.document_store import DocumentStore +from ...store.document_store import PartitionKey +from ...store.document_store import PartitionSettings +from ...store.document_store import QueryKeys +from ..response import SyftSuccess +from .image_registry import SyftImageRegistry + +__all__ = ["SyftImageRegistryStash"] + + +URLPartitionKey = PartitionKey(key="url", type_=str) + + +@serializable() +class SyftImageRegistryStash(BaseUIDStoreStash): + object_type = SyftImageRegistry + settings: PartitionSettings = PartitionSettings( + name=SyftImageRegistry.__canonical_name__, + object_type=SyftImageRegistry, + ) + + def __init__(self, store: DocumentStore) -> None: + super().__init__(store=store) + + def get_by_url( + self, + credentials: SyftVerifyKey, + url: str, + ) -> Result[Optional[SyftImageRegistry], str]: + qks = QueryKeys(qks=[URLPartitionKey.with_obj(url)]) + return self.query_one(credentials=credentials, qks=qks) + + def delete_by_url( + self, credentials: SyftVerifyKey, url: str + ) -> Result[SyftSuccess, str]: + qk = URLPartitionKey.with_obj(url) + result = super().delete(credentials=credentials, qk=qk) + if result.is_ok(): + return Ok(SyftSuccess(message=f"URL: {url} deleted")) + return result diff --git a/packages/syft/src/syft/service/worker/utils.py b/packages/syft/src/syft/service/worker/utils.py index e5cb9dcbccd..5caf700bd0e 100644 --- a/packages/syft/src/syft/service/worker/utils.py +++ b/packages/syft/src/syft/service/worker/utils.py @@ -1,16 +1,22 @@ # stdlib import contextlib +import json +import os import socket import socketserver import sys +from typing import Iterable from typing import List +from typing import Optional from typing import Union # third party import docker +from pydantic import BaseModel # relative from ...abstract_node import AbstractNode +from ...custom_worker.builder import CustomWorkerBuilder from ...custom_worker.config import DockerWorkerConfig from ...node.credentials import SyftVerifyKey from ...types.uid import UID @@ -24,6 +30,14 @@ from .worker_pool import WorkerOrchestrationType from .worker_pool import WorkerStatus +DEFAULT_WORKER_IMAGE_TAG = "openmined/default-worker-image-cpu:0.0.1" +DEFAULT_WORKER_POOL_NAME = "default-pool" + + +class ImageBuildResult(BaseModel): + image_hash: str + logs: Iterable[str] + def backend_container_name() -> str: hostname = socket.gethostname() @@ -91,6 +105,9 @@ def run_container_using_docker( pool_name: str, queue_port: int, debug: bool = False, + username: Optional[str] = None, + password: Optional[str] = None, + registry_url: Optional[str] = None, ) -> ContainerSpawnStatus: # Get hostname hostname = socket.gethostname() @@ -106,6 +123,15 @@ def run_container_using_docker( error_message = None worker = None try: + # login to the registry through the client + # so that the subsequent pull/run commands work + if registry_url and username and password: + docker_client.login( + username=username, + password=password, + registry=registry_url, + ) + # If container with given name already exists then stop it # and recreate it. existing_container = get_container( @@ -232,6 +258,9 @@ def run_containers( queue_port: int, dev_mode: bool = False, start_idx: int = 0, + username: Optional[str] = None, + password: Optional[str] = None, + registry_url: Optional[str] = None, ) -> List[ContainerSpawnStatus]: results = [] @@ -249,6 +278,9 @@ def run_containers( pool_name=pool_name, queue_port=queue_port, debug=dev_mode, + username=username, + password=password, + registry_url=registry_url, ) results.append(spawn_result) @@ -285,7 +317,10 @@ def create_default_image( """ worker_config = DockerWorkerConfig(dockerfile=default_cpu_dockerfile) - result = image_stash.get_by_docker_config(credentials, worker_config) + result = image_stash.get_by_docker_config( + credentials=credentials, + config=worker_config, + ) if result.ok() is None: default_syft_image = SyftWorkerImage( @@ -309,5 +344,76 @@ def _get_healthcheck_based_on_status(status: WorkerStatus) -> WorkerHealth: return WorkerHealth.UNHEALTHY -DEFAULT_WORKER_IMAGE_TAG = "openmined/default-worker-image-cpu:0.0.1" -DEFAULT_WORKER_POOL_NAME = "default-pool" +def docker_build( + image: SyftWorkerImage, **kwargs +) -> Union[ImageBuildResult, SyftError]: + try: + builder = CustomWorkerBuilder() + (built_image, logs) = builder.build_image( + config=image.config, + tag=image.image_identifier.full_name_with_tag, + rm=True, + forcerm=True, + **kwargs, + ) + return ImageBuildResult(image_hash=built_image.id, logs=parse_output(logs)) + except docker.errors.APIError as e: + return SyftError( + message=f"Docker API error when building {image.image_tag}. Reason - {e}" + ) + except docker.errors.DockerException as e: + return SyftError( + message=f"Docker exception when building {image.image_tag}. Reason - {e}" + ) + except Exception as e: + return SyftError( + message=f"Unknown exception when building {image.image_tag}. Reason - {e}" + ) + + +def docker_push( + image: SyftWorkerImage, + username: Optional[str] = None, + password: Optional[str] = None, +) -> Union[List[str], SyftError]: + try: + builder = CustomWorkerBuilder() + result = builder.push_image( + # this should be consistent with docker build command + tag=image.image_identifier.full_name_with_tag, + registry_url=image.image_identifier.registry_host, + username=username, + password=password, + ) + + if "error" in result: + return SyftError( + message=f"Failed to push {image.image_identifier}. Logs - {result}" + ) + + return result.split(os.linesep) + except docker.errors.APIError as e: + return SyftError( + message=f"Docker API error when pushing {image.image_identifier}. {e}" + ) + except docker.errors.DockerException as e: + return SyftError( + message=f"Docker exception when pushing {image.image_identifier}. Reason - {e}" + ) + except Exception as e: + return SyftError( + message=f"Unknown exception when pushing {image.image_identifier}. Reason - {e}" + ) + + +def parse_output(log_iterator: Iterable) -> str: + log = "" + for line in log_iterator: + for item in line.values(): + if isinstance(item, str): + log += item + elif isinstance(item, dict): + log += json.dumps(item) + else: + log += str(item) + return log diff --git a/packages/syft/src/syft/service/worker/worker_image.py b/packages/syft/src/syft/service/worker/worker_image.py index febdab14a5c..805802a6200 100644 --- a/packages/syft/src/syft/service/worker/worker_image.py +++ b/packages/syft/src/syft/service/worker/worker_image.py @@ -1,112 +1,15 @@ # stdlib -import io -import json -from typing import Iterator from typing import Optional -# third party -import docker -from typing_extensions import Self - # relative -from ...custom_worker.config import DockerWorkerConfig from ...custom_worker.config import WorkerConfig from ...node.credentials import SyftVerifyKey from ...serde.serializable import serializable -from ...types.base import SyftBaseModel from ...types.datetime import DateTime from ...types.syft_object import SYFT_OBJECT_VERSION_1 from ...types.syft_object import SyftObject from ...types.uid import UID -from ..response import SyftError -from ..response import SyftSuccess - - -def parse_output(log_iterator: Iterator) -> str: - log = "" - for line in log_iterator: - for item in line.values(): - if isinstance(item, str): - log += item - elif isinstance(item, dict): - log += json.dumps(item) - else: - log += str(item) - return log - - -@serializable() -class ContainerImageRegistry(SyftBaseModel): - url: str - tls_enabled: bool - - __repr_attrs__ = ["url"] - - @classmethod - def from_url(cls, full_str: str): - return cls(url=full_str, tls_enabled=full_str.startswith("https")) - - def __hash__(self) -> int: - return hash(self.url + str(self.tls_enabled)) - - def __str__(self) -> str: - return self.url - - -@serializable() -class SyftWorkerImageIdentifier(SyftBaseModel): - """ - Class to identify syft worker images. - If a user provides an image's identifier with - "docker.io/openmined/test-nginx:0.7.8", the convention we use for - image name, tag and repo for now is - tag = 0.7.8 - repo = openmined/test-nginx - repo_with_tag = openmined/test-nginx:0.7.8 - full_name = docker.io/openmined/test-nginx - full_name_with_tag = docker.io/openmined/test-nginx:0.7.8 - - References: - https://docs.docker.com/engine/reference/commandline/tag/#tag-an-image-referenced-by-name-and-tag - """ - - registry: Optional[ContainerImageRegistry] - repo: str - tag: str - - __repr_attrs__ = ["registry", "repo", "tag"] - - @classmethod - def from_str(cls, full_str: str) -> Self: - repo_url, tag = full_str.rsplit(":", 1) - args = repo_url.rsplit("/", 2) - if len(args) == 3: - registry = ContainerImageRegistry.from_url(args[0]) - repo = "/".join(args[1:]) - else: - registry = None - repo = "/".join(args) - return cls(repo=repo, registry=registry, tag=tag) - - @property - def repo_with_tag(self) -> str: - if self.repo or self.tag: - return f"{self.repo}:{self.tag}" - return None - - @property - def full_name_with_tag(self) -> str: - if self.registry: - return f"{self.registry.url}/{self.repo}:{self.tag}" - else: - # default registry is always docker.io - return f"docker.io/{self.repo}:{self.tag}" - - def __hash__(self) -> int: - return hash(self.repo + self.tag + str(hash(self.registry))) - - def __str__(self) -> str: - return f"registry: {str(self.registry)}, repo: {self.repo}, tag: {self.tag}" +from .image_identifier import SyftWorkerImageIdentifier @serializable() @@ -116,7 +19,7 @@ class SyftWorkerImage(SyftObject): __attr_unique__ = ["config"] __attr_searchable__ = ["config", "image_hash", "created_by"] - __repr_attrs__ = ["image_identifier", "image_hash", "created_at"] + __repr_attrs__ = ["image_identifier", "image_hash", "created_at", "built_at"] id: UID config: WorkerConfig @@ -124,35 +27,4 @@ class SyftWorkerImage(SyftObject): image_hash: Optional[str] created_at: DateTime = DateTime.now() created_by: SyftVerifyKey - - -def build_using_docker( - client: docker.DockerClient, - worker_image: SyftWorkerImage, - push: bool = True, - dev_mode: bool = False, -): - if not isinstance(worker_image.config, DockerWorkerConfig): - # Handle this to worker with CustomWorkerConfig later - return SyftError("We only support DockerWorkerConfig") - - try: - file_obj = io.BytesIO(worker_image.config.dockerfile.encode("utf-8")) - - # docker build -f - - # Enable this once we're able to copy worker_cpu.dockerfile in backend - # buildargs = {"SYFT_VERSION_TAG": "local-dev"} if dev_mode else {} - result = client.images.build( - fileobj=file_obj, - rm=True, - tag=worker_image.image_identifier.repo_with_tag, - forcerm=True, - ) - worker_image.image_hash = result[0].id - log = parse_output(result[1]) - return worker_image, SyftSuccess( - message=f"Build {worker_image} succeeded.\n{log}" - ) - except docker.errors.BuildError as e: - return worker_image, SyftError(message=f"Failed to build {worker_image}. {e}") + built_at: Optional[DateTime] diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index 00fc7de9868..d02dc6d45a2 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -13,6 +13,7 @@ from ...custom_worker.config import DockerWorkerConfig from ...serde.serializable import serializable from ...store.document_store import DocumentStore +from ...types.datetime import DateTime from ...types.dicttuple import DictTuple from ...types.uid import UID from ..context import AuthedServiceContext @@ -21,9 +22,12 @@ from ..service import AbstractService from ..service import service_method from ..user.user_roles import DATA_OWNER_ROLE_LEVEL +from .image_registry import SyftImageRegistry +from .image_registry_service import SyftImageRegistryService +from .utils import docker_build +from .utils import docker_push from .worker_image import SyftWorkerImage from .worker_image import SyftWorkerImageIdentifier -from .worker_image import build_using_docker from .worker_image_stash import SyftWorkerImageStash @@ -67,39 +71,62 @@ def submit_dockerfile( def build( self, context: AuthedServiceContext, - uid: UID, + image_uid: UID, tag: str, - push: bool = False, - container_registry: Optional[str] = None, + registry_uid: Optional[UID] = None, ) -> Union[SyftSuccess, SyftError]: - result = self.stash.get_by_uid(credentials=context.credentials, uid=uid) + registry: SyftImageRegistry = None + + result = self.stash.get_by_uid(credentials=context.credentials, uid=image_uid) if result.is_err(): return SyftError( - message=f"Failed to get image for uid: {uid}. Error: {result.err()}" + message=f"Failed to get image for uid: {image_uid}. Error: {result.err()}" ) worker_image: SyftWorkerImage = result.ok() + if registry_uid: + # get registry from image registry service + image_registry_service: SyftImageRegistryService = context.node.get_service( + SyftImageRegistryService + ) + result = image_registry_service.get_by_id(context, registry_uid) + if result.is_err(): + return result + registry: SyftImageRegistry = result.ok() + try: - image_identifier: ( - SyftWorkerImageIdentifier - ) = SyftWorkerImageIdentifier.from_str(full_str=tag) + if registry: + image_identifier = SyftWorkerImageIdentifier.with_registry( + tag=tag, registry=registry + ) + else: + image_identifier = SyftWorkerImageIdentifier.from_str(tag=tag) except pydantic.ValidationError as e: return SyftError(message=f"Failed to create tag: {e}") + # if image is already built and identifier is unchanged, return an error + if ( + worker_image.built_at + and worker_image.image_identifier + and worker_image.image_identifier.full_name_with_tag + == image_identifier.full_name_with_tag + ): + return SyftError(message=f"Image<{image_uid}> is already built") + worker_image.image_identifier = image_identifier if not context.node.in_memory_workers: - with contextlib.closing(docker.from_env()) as client: - worker_image, result = build_using_docker( - client=client, - worker_image=worker_image, - push=push, - dev_mode=context.node.dev_mode, - ) - + result = docker_build(worker_image) if isinstance(result, SyftError): return result + + worker_image.image_hash = result.image_hash + worker_image.built_at = DateTime.now() + + result = SyftSuccess( + message=f"Build {worker_image} succeeded.\n{result.logs}" + ) else: result = SyftSuccess( message="Image building skipped, since using InMemory workers." @@ -114,6 +141,55 @@ def build( return result + @service_method( + path="worker_image.push", + name="push", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def push( + self, + context: AuthedServiceContext, + image: UID, + username: Optional[str] = None, + password: Optional[str] = None, + ) -> Union[SyftSuccess, SyftError]: + if context.node.in_memory_workers: + return SyftSuccess( + message="Skipped pushing image, since using InMemory workers." + ) + + result = self.stash.get_by_uid(credentials=context.credentials, uid=image) + if result.is_err(): + return SyftError( + message=f"Failed to get image for uid: {image}. Error: {result.err()}" + ) + worker_image: SyftWorkerImage = result.ok() + + if ( + worker_image.image_identifier is None + or worker_image.image_identifier.registry_host == "" + ): + return SyftError( + message=f"Image {worker_image} does not have a valid registry host." + ) + elif worker_image.built_at is None: + return SyftError( + message=f"Image {worker_image} is not built yet. Please build it first." + ) + + result = docker_push( + image=worker_image, + username=username, + password=password, + ) + + if isinstance(result, SyftError): + return result + + return SyftSuccess( + message=f'The image was successfully pushed to "{worker_image.image_identifier.full_name_with_tag}"' + ) + @service_method( path="worker_image.get_all", name="get_all", diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index d8f26933936..53577f5d4b1 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -64,6 +64,8 @@ def create_pool( name: str, image_uid: Optional[UID], number: int, + reg_username: Optional[str] = None, + reg_password: Optional[str] = None, ) -> Union[List[ContainerSpawnStatus], SyftError]: """Creates a pool of workers from the given SyftWorkerImage. @@ -111,6 +113,8 @@ def create_pool( worker_cnt=number, worker_image=worker_image, worker_stash=self.worker_stash, + reg_username=reg_username, + reg_password=reg_username, ) worker_pool = WorkerPool( @@ -483,6 +487,8 @@ def _create_workers_in_pool( worker_cnt: int, worker_image: SyftWorkerImage, worker_stash: WorkerStash, + reg_username: Optional[str] = None, + reg_password: Optional[str] = None, ) -> Tuple[List[LinkedObject], List[ContainerSpawnStatus]]: queue_port = context.node.queue_config.client_config.queue_port @@ -506,6 +512,9 @@ def _create_workers_in_pool( orchestration=WorkerOrchestrationType.DOCKER, queue_port=queue_port, dev_mode=context.node.dev_mode, + username=reg_username, + password=reg_password, + registry_url=worker_image.image_identifier.registry_host, ) linked_worker_list = []