Skip to content

Commit

Permalink
Merge pull request #8802 from kiendang/prebuilt-image-worker
Browse files Browse the repository at this point in the history
Add support for submitting a prebuilt image
  • Loading branch information
shubham3121 authored May 27, 2024
2 parents 4d5aee0 + 19d627f commit 54cad0a
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 164 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ repos:
# files: "^notebooks/(api|tutorials|admin)"
hooks:
- id: nbqa-isort
- id: nbqa-black

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
Expand Down
4 changes: 2 additions & 2 deletions notebooks/admin/Custom API + Custom Worker.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@
"metadata": {},
"outputs": [],
"source": [
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config\n",
"submit_result = domain_client.api.services.worker_image.submit(\n",
" worker_config=docker_config\n",
")\n",
"submit_result"
]
Expand Down
8 changes: 4 additions & 4 deletions notebooks/api/0.8/10-container-images.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@
"metadata": {},
"outputs": [],
"source": [
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config\n",
"submit_result = domain_client.api.services.worker_image.submit(\n",
" worker_config=docker_config\n",
")"
]
},
Expand Down Expand Up @@ -1095,8 +1095,8 @@
"metadata": {},
"outputs": [],
"source": [
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config_2\n",
"submit_result = domain_client.api.services.worker_image.submit(\n",
" worker_config=docker_config_2\n",
")\n",
"submit_result"
]
Expand Down
8 changes: 4 additions & 4 deletions notebooks/api/0.8/11-container-images-k8s.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@
"metadata": {},
"outputs": [],
"source": [
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config\n",
"submit_result = domain_client.api.services.worker_image.submit(\n",
" worker_config=docker_config\n",
")\n",
"submit_result"
]
Expand Down Expand Up @@ -935,8 +935,8 @@
"outputs": [],
"source": [
"submit_result = None\n",
"submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n",
" docker_config=docker_config_opendp\n",
"submit_result = domain_client.api.services.worker_image.submit(\n",
" worker_config=docker_config_opendp\n",
")\n",
"submit_result"
]
Expand Down
1 change: 1 addition & 0 deletions packages/syft/src/syft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .client.user_settings import UserSettings # noqa: F401
from .client.user_settings import settings # noqa: F401
from .custom_worker.config import DockerWorkerConfig # noqa: F401
from .custom_worker.config import PrebuiltWorkerConfig # noqa: F401
from .node.credentials import SyftSigningKey # noqa: F401
from .node.domain import Domain # noqa: F401
from .node.enclave import Enclave # noqa: F401
Expand Down
4 changes: 1 addition & 3 deletions packages/syft/src/syft/custom_worker/runner_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .k8s import get_kr8s_client

JSONPATH_AVAILABLE_REPLICAS = "{.status.availableReplicas}"
CREATE_POOL_TIMEOUT_SEC = 60
CREATE_POOL_TIMEOUT_SEC = 180
SCALE_POOL_TIMEOUT_SEC = 60


Expand Down Expand Up @@ -60,8 +60,6 @@ def create_pool(
f"jsonpath='{JSONPATH_AVAILABLE_REPLICAS}'={replicas}",
timeout=CREATE_POOL_TIMEOUT_SEC,
)
except Exception:
raise
finally:
if pull_secret:
pull_secret.delete(propagation_policy="Foreground")
Expand Down
67 changes: 49 additions & 18 deletions packages/syft/src/syft/service/request/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any

# third party
from pydantic import model_validator
from result import Err
from result import Ok
from result import Result
Expand All @@ -15,6 +16,7 @@
from ...abstract_node import NodeSideType
from ...client.api import APIRegistry
from ...client.client import SyftClient
from ...custom_worker.config import DockerWorkerConfig
from ...custom_worker.config import WorkerConfig
from ...custom_worker.k8s import IN_KUBERNETES
from ...node.credentials import SyftVerifyKey
Expand Down Expand Up @@ -187,7 +189,7 @@ def __repr_syft_nested__(self) -> str:


@serializable()
class CreateCustomImageChange(Change):
class CreateCustomImageChangeV2(Change):
__canonical_name__ = "CreateCustomImageChange"
__version__ = SYFT_OBJECT_VERSION_2

Expand All @@ -198,41 +200,70 @@ class CreateCustomImageChange(Change):

__repr_attrs__ = ["config", "tag"]


@serializable()
class CreateCustomImageChange(Change):
__canonical_name__ = "CreateCustomImageChange"
__version__ = SYFT_OBJECT_VERSION_3

config: WorkerConfig
tag: str | None = None
registry_uid: UID | None = None
pull_image: bool = True

__repr_attrs__ = ["config", "tag"]

@model_validator(mode="after")
def _tag_required_for_dockerworkerconfig(self) -> Self:
if isinstance(self.config, DockerWorkerConfig) and self.tag is None:
raise ValueError("`tag` is required for `DockerWorkerConfig`.")
return self

def _run(
self, context: ChangeContext, apply: bool
) -> Result[SyftSuccess, SyftError]:
try:
worker_image_service = context.node.get_service("SyftWorkerImageService")

service_context = context.to_service_ctx()
result = worker_image_service.submit_dockerfile(
service_context, docker_config=self.config
result = worker_image_service.submit(
service_context, worker_config=self.config
)

if isinstance(result, SyftError):
return Err(result)

result = worker_image_service.stash.get_by_docker_config(
result = worker_image_service.stash.get_by_worker_config(
service_context.credentials, config=self.config
)

if result.is_err():
return Err(SyftError(message=f"{result.err()}"))

worker_image = result.ok()
if (worker_image := result.ok()) is None:
return Err(SyftError(message="The worker image does not exist."))

build_result = worker_image_service.build(
service_context,
image_uid=worker_image.id,
tag=self.tag,
registry_uid=self.registry_uid,
pull=self.pull_image,
)
build_success_message = "Image was pre-built."

if not worker_image.is_prebuilt:
build_result = worker_image_service.build(
service_context,
image_uid=worker_image.id,
tag=self.tag,
registry_uid=self.registry_uid,
pull=self.pull_image,
)

if isinstance(build_result, SyftError):
return Err(build_result)

if isinstance(build_result, SyftError):
return Err(build_result)
build_success_message = build_result.message

build_success = SyftSuccess(
message=f"Build result: {build_success_message}"
)

if IN_KUBERNETES:
if IN_KUBERNETES and not worker_image.is_prebuilt:
push_result = worker_image_service.push(
service_context,
image=worker_image.id,
Expand All @@ -245,11 +276,11 @@ def _run(

return Ok(
SyftSuccess(
message=f"Build Result: {build_result.message} \n Push Result: {push_result.message}"
message=f"{build_success}\nPush result: {push_result.message}"
)
)

return Ok(build_result)
return Ok(build_success)

except Exception as e:
return Err(SyftError(message=f"Failed to create/build image: {e}"))
Expand Down Expand Up @@ -291,7 +322,7 @@ def _run(
service_context: AuthedServiceContext = context.to_service_ctx()

if self.config is not None:
result = worker_pool_service.image_stash.get_by_docker_config(
result = worker_pool_service.image_stash.get_by_worker_config(
service_context.credentials, self.config
)
if result.is_err():
Expand Down
14 changes: 8 additions & 6 deletions packages/syft/src/syft/service/worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ def create_kubernetes_pool(
**kwargs: Any,
) -> list[Pod] | SyftError:
pool = None
error = False

try:
print(
Expand Down Expand Up @@ -366,11 +365,14 @@ def create_kubernetes_pool(
reg_url=reg_url,
)
except Exception as e:
error = True
return SyftError(message=f"Failed to start workers {e}")
finally:
if error and pool:
if pool:
pool.delete()
# stdlib
import traceback

return SyftError(
message=f"Failed to start workers {e} {e.__class__} {e.args} {traceback.format_exc()}."
)

return runner.get_pool_pods(pool_name=pool_name)

Expand Down Expand Up @@ -564,7 +566,7 @@ def create_default_image(
image_identifier=SyftWorkerImageIdentifier.from_str(tag),
)

result = image_stash.get_by_docker_config(
result = image_stash.get_by_worker_config(
credentials=credentials,
config=worker_config,
)
Expand Down
32 changes: 22 additions & 10 deletions packages/syft/src/syft/service/worker/worker_image_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import pydantic

# relative
from ...custom_worker.config import DockerWorkerConfig
from ...custom_worker.config import PrebuiltWorkerConfig
from ...custom_worker.config import WorkerConfig
from ...custom_worker.k8s import IN_KUBERNETES
from ...serde.serializable import serializable
from ...store.document_store import DocumentStore
Expand Down Expand Up @@ -39,16 +40,27 @@ def __init__(self, store: DocumentStore) -> None:
self.stash = SyftWorkerImageStash(store=store)

@service_method(
path="worker_image.submit_dockerfile",
name="submit_dockerfile",
path="worker_image.submit",
name="submit",
roles=DATA_OWNER_ROLE_LEVEL,
)
def submit_dockerfile(
self, context: AuthedServiceContext, docker_config: DockerWorkerConfig
def submit(
self, context: AuthedServiceContext, worker_config: WorkerConfig
) -> SyftSuccess | SyftError:
image_identifier: SyftWorkerImageIdentifier | None = None
if isinstance(worker_config, PrebuiltWorkerConfig):
try:
image_identifier = SyftWorkerImageIdentifier.from_str(worker_config.tag)
except Exception:
return SyftError(
f"Invalid Docker image name: {worker_config.tag}.\n"
+ "Please specify the image name in this format <registry>/<repo>:<tag>."
)

worker_image = SyftWorkerImage(
config=docker_config,
config=worker_config,
created_by=context.credentials,
image_identifier=image_identifier,
)
res = self.stash.set(context.credentials, worker_image)

Expand Down Expand Up @@ -278,14 +290,14 @@ def get_by_uid(
roles=DATA_SCIENTIST_ROLE_LEVEL,
)
def get_by_config(
self, context: AuthedServiceContext, docker_config: DockerWorkerConfig
self, context: AuthedServiceContext, worker_config: WorkerConfig
) -> SyftWorkerImage | SyftError:
res = self.stash.get_by_docker_config(
credentials=context.credentials, config=docker_config
res = self.stash.get_by_worker_config(
credentials=context.credentials, config=worker_config
)
if res.is_err():
return SyftError(
message=f"Failed to get image with docker config {docker_config}. Error: {res.err()}"
message=f"Failed to get image with docker config {worker_config}. Error: {res.err()}"
)
image: SyftWorkerImage = res.ok()
return image
6 changes: 3 additions & 3 deletions packages/syft/src/syft/service/worker/worker_image_stash.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def set(
)

if isinstance(obj.config, DockerWorkerConfig):
result = self.get_by_docker_config(
result = self.get_by_worker_config(
credentials=credentials, config=obj.config
)
if result.is_ok() and result.ok() is not None:
Expand All @@ -62,8 +62,8 @@ def set(
ignore_duplicates=ignore_duplicates,
)

def get_by_docker_config(
self, credentials: SyftVerifyKey, config: DockerWorkerConfig
def get_by_worker_config(
self, credentials: SyftVerifyKey, config: WorkerConfig
) -> Result[SyftWorkerImage | None, str]:
qks = QueryKeys(qks=[WorkerConfigPK.with_obj(config)])
return self.query_one(credentials=credentials, qks=qks)
Loading

0 comments on commit 54cad0a

Please sign in to comment.