diff --git a/.github/workflows/pr-tests-stack.yml b/.github/workflows/pr-tests-stack.yml index b626a0cfa0b..67f0d4854b1 100644 --- a/.github/workflows/pr-tests-stack.yml +++ b/.github/workflows/pr-tests-stack.yml @@ -542,6 +542,10 @@ jobs: with: python-version: ${{ matrix.python-version }} + - name: Add K3d Registry + run: | + sudo python ./scripts/patch_hosts.py --add-k3d-registry + # free 10GB of space - name: Remove unnecessary files if: matrix.os == 'ubuntu-latest' @@ -614,14 +618,14 @@ jobs: GITHUB_CI: true shell: bash run: | - # install k3d K3D_VERSION=v5.6.0 + DEVSPACE_VERSION=v6.3.10 + # install k3d wget https://github.com/k3d-io/k3d/releases/download/${K3D_VERSION}/k3d-linux-amd64 mv k3d-linux-amd64 k3d chmod +x k3d export PATH=`pwd`:$PATH k3d version - DEVSPACE_VERSION=v6.3.3 curl -sSL https://github.com/loft-sh/devspace/releases/download/${DEVSPACE_VERSION}/devspace-linux-amd64 -o ./devspace chmod +x devspace devspace version @@ -629,3 +633,36 @@ jobs: tox -e syft.build.helm tox -e syft.package.helm # tox -e syft.test.helm + + - name: Get current timestamp + id: date + if: failure() + shell: bash + run: echo "date=$(date +%s)" >> $GITHUB_OUTPUT + + - name: Collect logs from k3d + if: steps.changes.outputs.stack == 'true' && failure() + shell: bash + run: | + mkdir -p ./k8s-logs + kubectl describe all -A --context k3d-testgateway1 --namespace testgateway1 > ./k8s-logs/testgateway1-desc-${{ steps.date.outputs.date }}.txt + kubectl describe all -A --context k3d-testdomain1 --namespace testdomain1 > ./k8s-logs/testdomain1-desc-${{ steps.date.outputs.date }}.txt + kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testgateway1 --namespace testgateway1 > ./k8s-logs/testgateway1-logs-${{ steps.date.outputs.date }}.txt + kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testdomain1 --namespace testdomain1 > ./k8s-logs/testdomain1-logs-${{ steps.date.outputs.date }}.txt + ls -la ./k8s-logs + + - name: Upload logs to GitHub + uses: actions/upload-artifact@master + if: steps.changes.outputs.stack == 'true' && failure() + with: + name: k8s-logs-${{ matrix.os }}-${{ steps.date.outputs.date }} + path: ./k8s-logs/ + + - name: Cleanup k3d + if: steps.changes.outputs.stack == 'true' && failure() + shell: bash + run: | + export PATH=`pwd`:$PATH + k3d cluster delete testgateway1 || true + k3d cluster delete testdomain1 || true + k3d registry delete k3d-registry.localhost || true diff --git a/notebooks/api/0.8/11-container-images-k8s.ipynb b/notebooks/api/0.8/11-container-images-k8s.ipynb new file mode 100644 index 00000000000..fe3fd0ed66d --- /dev/null +++ b/notebooks/api/0.8/11-container-images-k8s.ipynb @@ -0,0 +1,1161 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "ab8aca22-8bd7-4764-8f2d-27dd5f33d8c6", + "metadata": {}, + "outputs": [], + "source": [ + "SYFT_VERSION = \">=0.8.2.b0,<0.9\"\n", + "package_string = f'\"syft{SYFT_VERSION}\"'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2cb8c995-c806-4b8e-a892-9bc461c61935", + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import os\n", + "\n", + "# third party\n", + "import numpy as np\n", + "import requests\n", + "\n", + "# syft absolute\n", + "import syft as sy\n", + "\n", + "sy.requires(SYFT_VERSION)\n", + "\n", + "# syft absolute\n", + "from syft.service.worker.image_registry import SyftImageRegistry\n", + "from syft.service.worker.worker_image import SyftWorkerImage" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4079d39-b88f-4709-87da-95f79f1d47ee", + "metadata": {}, + "outputs": [], + "source": [ + "os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"k8s\"\n", + "os.environ[\"DEV_MODE\"] = \"True\"\n", + "\n", + "# Uncomment this to add custom values\n", + "# os.environ[\"NODE_URL\"] = \"http://localhost\"\n", + "# os.environ[\"NODE_PORT\"] = \"8080\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0bc7b5dc-1565-4261-ac98-db2602c5877b", + "metadata": {}, + "outputs": [], + "source": [ + "domain = sy.orchestra.launch(\n", + " name=\"test-domain-1\",\n", + " dev_mode=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91f1988a-daa3-42f0-9bfe-f9fdd9597fdc", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client = domain.login(email=\"info@openmined.org\", password=\"changethis\")\n", + "domain_client" + ] + }, + { + "cell_type": "markdown", + "id": "55439eb5-1e92-46a6-a45a-471917a86265", + "metadata": {}, + "source": [ + "We should see a default worker pool" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5c841af-c423-4d8f-9d16-c7b982f27128", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.worker_pools" + ] + }, + { + "cell_type": "markdown", + "id": "3c7a124a", + "metadata": {}, + "source": [ + "#### Submit Dockerfile" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8ca6bd49", + "metadata": {}, + "outputs": [], + "source": [ + "registry = \"k3d-registry.localhost:5000\"\n", + "repo = \"openmined/grid-backend\"\n", + "\n", + "res = requests.get(url=f\"http://{registry}/v2/{repo}/tags/list\")\n", + "tag = res.json()[\"tags\"][0]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "75193f9f-3622-4071-9aba-d42a5dc5b301", + "metadata": {}, + "outputs": [], + "source": [ + "custom_dockerfile_str = f\"\"\"\n", + "FROM {registry}/{repo}:{tag}\n", + "\n", + "RUN pip install pydicom\n", + "\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b6bfe92a-e873-4dc3-b3a0-6715f8843785", + "metadata": {}, + "outputs": [], + "source": [ + "docker_config = sy.DockerWorkerConfig(dockerfile=custom_dockerfile_str)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "941cf5e2-4ba8-488f-880b-de908d23a4c3", + "metadata": {}, + "outputs": [], + "source": [ + "assert docker_config.dockerfile == custom_dockerfile_str" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d4a60bf8-22d3-4052-b9cc-f6dcf68b2dd8", + "metadata": {}, + "outputs": [], + "source": [ + "submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n", + " docker_config=docker_config\n", + ")\n", + "submit_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ebb3b7e9-c7a4-4c99-866b-13c6a75d04e8", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(submit_result, sy.SyftSuccess), str(submit_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d9cc2eb9-9f28-454f-96bc-fbb722f78bb5", + "metadata": {}, + "outputs": [], + "source": [ + "dockerfile_list = domain_client.images.get_all()\n", + "dockerfile_list" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8e56f9e8-5cf3-418b-9774-75a47c8ef276", + "metadata": {}, + "outputs": [], + "source": [ + "assert not isinstance(dockerfile_list, sy.SyftError), str(dockerfile_list)\n", + "assert len(dockerfile_list) == 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "133dacbe-4d2e-458e-830b-2c18bce018e4", + "metadata": {}, + "outputs": [], + "source": [ + "workerimage = next(\n", + " (\n", + " image\n", + " for image in dockerfile_list\n", + " if image.config.dockerfile == custom_dockerfile_str\n", + " ),\n", + " None,\n", + ")\n", + "\n", + "assert isinstance(workerimage, SyftWorkerImage), str(workerimage)\n", + "workerimage" + ] + }, + { + "cell_type": "markdown", + "id": "91a66871", + "metadata": {}, + "source": [ + "#### Add k3d Registry in Syft" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cde8bfff", + "metadata": {}, + "outputs": [], + "source": [ + "registry_add_result = domain_client.api.services.image_registry.add(registry)\n", + "registry_add_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "82321b35", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(registry_add_result, sy.SyftSuccess), str(registry_add_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3d4a4c33", + "metadata": {}, + "outputs": [], + "source": [ + "image_registry_list = domain_client.api.services.image_registry.get_all()\n", + "image_registry_list" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3c045549", + "metadata": {}, + "outputs": [], + "source": [ + "assert not isinstance(image_registry_list, sy.SyftError), str(image_registry_list)\n", + "assert len(image_registry_list) == 1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22f6e2f6", + "metadata": {}, + "outputs": [], + "source": [ + "local_registry = image_registry_list[0]\n", + "local_registry" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cb9664ca", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(local_registry, SyftImageRegistry), str(local_registry)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "78f89b88", + "metadata": {}, + "outputs": [], + "source": [ + "registry_uid = local_registry.id" + ] + }, + { + "cell_type": "markdown", + "id": "637a9596", + "metadata": {}, + "source": [ + "#### Build Image" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aa6573e1-ea18-4049-b6bf-1615521d8ced", + "metadata": {}, + "outputs": [], + "source": [ + "docker_tag = \"openmined/custom-worker:0.7.8\"\n", + "\n", + "\n", + "docker_build_result = domain_client.api.services.worker_image.build(\n", + " image_uid=workerimage.id,\n", + " tag=docker_tag,\n", + " registry_uid=registry_uid,\n", + ")\n", + "docker_build_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21e3679d-ef71-44af-a2ab-91bed47472c1", + "metadata": {}, + "outputs": [], + "source": [ + "assert not isinstance(docker_build_result, sy.SyftError), str(docker_build_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c540043d-4485-4213-b93c-358e4c507f5a", + "metadata": {}, + "outputs": [], + "source": [ + "image_list = domain_client.images.get_all()\n", + "image_list" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7af0a33d-e1a9-4f2b-9113-d17a3730397c", + "metadata": {}, + "outputs": [], + "source": [ + "# we can also index with string using the repo_with_tag format\n", + "workerimage = next((image for image in image_list if image.id == workerimage.id), None)\n", + "workerimage" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c4242f66", + "metadata": {}, + "outputs": [], + "source": [ + "assert workerimage is not None, str([image.__dict__ for image in image_list])\n", + "assert workerimage.is_built is not None, str(workerimage)\n", + "assert workerimage.built_at is not None, str(workerimage)\n", + "assert workerimage.image_hash is not None, str(workerimage)\n", + "assert image_list[workerimage.built_image_tag] == workerimage" + ] + }, + { + "cell_type": "markdown", + "id": "e726428e", + "metadata": {}, + "source": [ + "#### Push Image to Local Registry" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8468ce02", + "metadata": {}, + "outputs": [], + "source": [ + "push_result = None\n", + "push_result = domain_client.api.services.worker_image.push(workerimage.id)\n", + "push_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5ca573b", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(push_result, sy.SyftSuccess), str(push_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18941fce", + "metadata": {}, + "outputs": [], + "source": [ + "base_url = f\"http://{workerimage.image_identifier.registry_host}\"\n", + "expected_tag = workerimage.image_identifier.tag\n", + "\n", + "repos = requests.get(f\"{base_url}/v2/_catalog\").json()[\"repositories\"]\n", + "tags = requests.get(f\"{base_url}/v2/openmined/custom-worker/tags/list\").json()\n", + "tags = tags[\"tags\"]\n", + "\n", + "assert (\n", + " \"openmined/custom-worker\" in repos\n", + "), f\"'openmined/custom-worker' not uploaded to local registry | {repos}\"\n", + "assert (\n", + " expected_tag in tags\n", + "), f\"'openmined/custom-worker' with tag {expected_tag} not available | {tags}\"" + ] + }, + { + "cell_type": "markdown", + "id": "f5007073", + "metadata": {}, + "source": [ + "#### Create Worker Pool From Image" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f57b5443-8519-4464-89a2-37deb25f6923", + "metadata": {}, + "outputs": [], + "source": [ + "worker_pool_name = \"custom-pool\"\n", + "worker_pool_res = domain_client.api.services.worker_pool.launch(\n", + " name=worker_pool_name,\n", + " image_uid=workerimage.id,\n", + " num_workers=3,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f418fb83-4111-412c-ab11-8d4587239dc6", + "metadata": {}, + "outputs": [], + "source": [ + "assert not isinstance(worker_pool_res, sy.SyftError), str(worker_pool_res)\n", + "assert len(worker_pool_res) == 3" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "64b5d651-3dd6-45e6-b189-c7e278a7ddd1", + "metadata": {}, + "outputs": [], + "source": [ + "for status in worker_pool_res:\n", + " assert status.error is None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "977ff49b-0975-4e75-bd36-7ed124be52b8", + "metadata": {}, + "outputs": [], + "source": [ + "worker_pool_list = domain_client.worker_pools.get_all()\n", + "worker_pool_list" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "62f20239", + "metadata": {}, + "outputs": [], + "source": [ + "assert not isinstance(worker_pool_list, sy.SyftError), str(worker_pool_res)\n", + "assert len(worker_pool_list) == 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ce6bd8c3-bc0a-4cdd-b594-4fccdd2097d4", + "metadata": {}, + "outputs": [], + "source": [ + "worker_pool = next(\n", + " (pool for pool in worker_pool_list if pool.name == worker_pool_name),\n", + " None,\n", + ")\n", + "\n", + "assert worker_pool is not None, str(\n", + " [worker_pool.__dict__ for worker_pool in worker_pool_list]\n", + ")\n", + "assert len(worker_pool.workers) == 3" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14aeb0f5-673b-44f7-974c-203e18fa1c79", + "metadata": {}, + "outputs": [], + "source": [ + "# We can filter pools based on the image id upon which the pools were built\n", + "filtered_result = domain_client.api.services.worker_pool.filter_by_image_id(\n", + " image_uid=workerimage.id\n", + ")\n", + "filtered_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "87d1f356", + "metadata": {}, + "outputs": [], + "source": [ + "assert not isinstance(filtered_result, sy.SyftError), str(filtered_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fe5900fe-057e-4be2-b3c6-c69ec07bacb4", + "metadata": {}, + "outputs": [], + "source": [ + "second_worker = worker_pool.workers[1]\n", + "second_worker" + ] + }, + { + "cell_type": "markdown", + "id": "1c3166b0", + "metadata": {}, + "source": [ + "#### Get Worker Logs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "187cb1ee", + "metadata": {}, + "outputs": [], + "source": [ + "worker_logs = domain_client.api.services.worker.logs(\n", + " uid=second_worker.id,\n", + ")\n", + "worker_logs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f08fc155", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(worker_logs, str)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "400d545a-a912-423f-aeb8-aadfba7a3848", + "metadata": {}, + "outputs": [], + "source": [ + "worker_pool" + ] + }, + { + "cell_type": "markdown", + "id": "88971463-6991-448e-9c6d-51beb0c1b553", + "metadata": {}, + "source": [ + "### Syft function" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5561d74b-4610-4279-bb09-abf287732aa0", + "metadata": {}, + "outputs": [], + "source": [ + "data = np.array([1, 2, 3])\n", + "data_action_obj = sy.ActionObject.from_obj(data)\n", + "\n", + "data_pointer = domain_client.api.services.action.set(data_action_obj)\n", + "data_pointer" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dc174d96-b4b1-4d65-aa76-921439507ba7", + "metadata": {}, + "outputs": [], + "source": [ + "@sy.syft_function(\n", + " input_policy=sy.ExactMatch(x=data_pointer),\n", + " output_policy=sy.SingleExecutionExactOutput(),\n", + " worker_pool_name=worker_pool_name,\n", + ")\n", + "def custom_worker_func(x):\n", + " return {\"y\": x + 1}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4ce5de72-4e50-46ff-8a7c-9f9eb7e0f018", + "metadata": {}, + "outputs": [], + "source": [ + "custom_worker_func" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "771b0ec6-267a-439e-9eff-34ea80a81137", + "metadata": {}, + "outputs": [], + "source": [ + "assert custom_worker_func.worker_pool_name == worker_pool.name" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c0f3c93e-1610-406e-b93d-1ba5421017a2", + "metadata": {}, + "outputs": [], + "source": [ + "request = domain_client.code.request_code_execution(custom_worker_func)\n", + "request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "db820de6-f6b2-446d-a6d5-f07f217de97b", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.requests[-1].approve(approve_nested=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fd8a8734-4c22-4dd5-9835-f48dc6ebade9", + "metadata": {}, + "outputs": [], + "source": [ + "job = domain_client.code.custom_worker_func(x=data_pointer, blocking=False)\n", + "job" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01bff2ed-d4f4-4607-b750-3f935eb85d17", + "metadata": {}, + "outputs": [], + "source": [ + "worker_pool = domain_client.worker_pools[worker_pool_name]\n", + "worker_pool" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2cd24b35-94f5-4f39-aae8-92046136137b", + "metadata": {}, + "outputs": [], + "source": [ + "job.wait()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0daeddfd-731a-49f5-90f5-a974af49bb02", + "metadata": {}, + "outputs": [], + "source": [ + "assert job.status.value == \"completed\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e9be648a-ead9-4cd5-b857-a10a9410c937", + "metadata": {}, + "outputs": [], + "source": [ + "job_list = domain_client.jobs.get_by_user_code_id(job.user_code_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3d828222-68d6-4010-9e62-141ea59c47b6", + "metadata": {}, + "outputs": [], + "source": [ + "assert not isinstance(job_list, sy.SyftError), job_list" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c6b9d5a8-9e91-451a-91b5-e0455e2c2246", + "metadata": {}, + "outputs": [], + "source": [ + "job_refresh = job_list[0]\n", + "assert job_refresh.job_worker_id is not None, str([job.to_dict() for job in job_list])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8c8da391-50c2-44c5-9f24-2853b0f5852f", + "metadata": {}, + "outputs": [], + "source": [ + "# Validate the result received from the syft function\n", + "result = job.wait().get()\n", + "result_matches = result[\"y\"] == data + 1\n", + "assert result_matches.all()" + ] + }, + { + "cell_type": "markdown", + "id": "f20a29df-2e63-484f-8b67-d6a397722e66", + "metadata": {}, + "source": [ + "#### Worker Pool and Image Creation Request/Approval" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2b8cd7a0-ba17-4ad0-b3de-5af1282a6dc6", + "metadata": {}, + "outputs": [], + "source": [ + "dockerfile_opendp = f\"\"\"\n", + "FROM {registry}/{repo}:{tag}\n", + "\n", + "RUN pip install opendp\n", + "\"\"\"\n", + "\n", + "docker_config_opendp = sy.DockerWorkerConfig(dockerfile=dockerfile_opendp)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "48a7a9b5-266d-4f22-9b99-061dbb3c83ab", + "metadata": {}, + "outputs": [], + "source": [ + "submit_result = None\n", + "submit_result = domain_client.api.services.worker_image.submit_dockerfile(\n", + " docker_config=docker_config_opendp\n", + ")\n", + "submit_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6dc3afe6", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(submit_result, sy.SyftSuccess), str(submit_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8b91474e", + "metadata": {}, + "outputs": [], + "source": [ + "_images = domain_client.images\n", + "assert not isinstance(_images, sy.SyftError), str(_images)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b62871bc-6c32-4fac-95af-5b062bc65992", + "metadata": {}, + "outputs": [], + "source": [ + "workerimage_opendp = next(\n", + " (im for im in _images if im.config == docker_config_opendp),\n", + " None,\n", + ")\n", + "assert workerimage_opendp is not None, str([im.__dict__ for im in _images])" + ] + }, + { + "cell_type": "markdown", + "id": "35f8e35f-91f3-4d2b-8e70-386021e9a692", + "metadata": {}, + "source": [ + "##### Build image first then create pool" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f5a773e7-4dc1-4325-bc26-eb3c7d88969a", + "metadata": {}, + "outputs": [], + "source": [ + "docker_tag_opendp = \"openmined/custom-worker-opendp:latest\"\n", + "\n", + "docker_build_result = domain_client.api.services.worker_image.build(\n", + " image_uid=workerimage_opendp.id,\n", + " tag=docker_tag_opendp,\n", + " registry_uid=registry_uid,\n", + ")\n", + "\n", + "docker_build_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cb59b64c", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(docker_build_result, sy.SyftSuccess), str(docker_build_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a39ab3e0", + "metadata": {}, + "outputs": [], + "source": [ + "_images = domain_client.images\n", + "assert not isinstance(_images, sy.SyftError), str(_images)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "30f77d3f", + "metadata": {}, + "outputs": [], + "source": [ + "workerimage_opendp = next(\n", + " (image for image in _images if image.id == workerimage_opendp.id),\n", + " None,\n", + ")\n", + "assert workerimage_opendp is not None, str([image.__dict__ for image in _images])\n", + "assert workerimage_opendp.is_built is not None, str(workerimage_opendp.__dict__)\n", + "assert workerimage_opendp.built_at is not None, str(workerimage_opendp.__dict__)\n", + "assert workerimage_opendp.image_hash is not None, str(workerimage_opendp.__dict__)\n", + "\n", + "assert _images[workerimage_opendp.built_image_tag] == workerimage_opendp, str(\n", + " workerimage_opendp\n", + ")\n", + "\n", + "workerimage_opendp" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "79211b85", + "metadata": {}, + "outputs": [], + "source": [ + "# Push OpenDP Image to registry\n", + "push_result = None\n", + "push_result = domain_client.api.services.worker_image.push(workerimage_opendp.id)\n", + "assert isinstance(push_result, sy.SyftSuccess), str(push_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7b0b2bb2-5612-463f-af88-f74e4f31719a", + "metadata": {}, + "outputs": [], + "source": [ + "pool_name_opendp = \"opendp-pool\"\n", + "pool_create_request = domain_client.api.services.worker_pool.pool_creation_request(\n", + " pool_name=pool_name_opendp,\n", + " num_workers=3,\n", + " image_uid=workerimage_opendp.id,\n", + ")\n", + "pool_create_request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2b337373-9486-426a-a282-b0b179139ba7", + "metadata": {}, + "outputs": [], + "source": [ + "assert not isinstance(pool_create_request, sy.SyftError), str(pool_create_request)\n", + "assert len(pool_create_request.changes) == 1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0b59e175-76ba-46b8-a7cd-796a872969e4", + "metadata": {}, + "outputs": [], + "source": [ + "# get the pending request and approve it\n", + "req_result = pool_create_request.approve()\n", + "req_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4ce90111-11bd-4ebd-bb4a-4217a57c7d8d", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(req_result, sy.SyftSuccess), str(req_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2ea69b17-eb3c-4f01-9a47-4895dd286e5e", + "metadata": {}, + "outputs": [], + "source": [ + "pool_opendp = domain_client.worker_pools[pool_name_opendp]\n", + "assert not isinstance(pool_opendp, sy.SyftError), str(pool_opendp)\n", + "assert len(pool_opendp.worker_list) == 3" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b0f8e4cb-6ccf-4c9f-866e-6e63fa67427c", + "metadata": {}, + "outputs": [], + "source": [ + "worker_pool_list = domain_client.worker_pools.get_all()\n", + "\n", + "assert not isinstance(worker_pool_list, sy.SyftError), str(worker_pool_list)\n", + "assert len(worker_pool_list) == 3" + ] + }, + { + "cell_type": "markdown", + "id": "6e671e1e", + "metadata": {}, + "source": [ + "Request to build the image and create the pool at the same time" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7c69e8bf", + "metadata": {}, + "outputs": [], + "source": [ + "dockerfile_recordlinkage = f\"\"\"\n", + "FROM {registry}/{repo}:{tag}\n", + "\n", + "RUN pip install recordlinkage\n", + "\"\"\"\n", + "\n", + "docker_config_recordlinkage = sy.DockerWorkerConfig(dockerfile=dockerfile_recordlinkage)\n", + "\n", + "docker_tag_recordlinkage = \"openmined/custom-worker-recordlinkage:latest\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81689b96", + "metadata": {}, + "outputs": [], + "source": [ + "pool_name_recordlinkage = \"recordlinkage-pool\"\n", + "\n", + "pool_image_create_request = (\n", + " domain_client.api.services.worker_pool.create_image_and_pool_request(\n", + " pool_name=pool_name_recordlinkage,\n", + " num_workers=2,\n", + " tag=docker_tag_recordlinkage,\n", + " config=docker_config_recordlinkage,\n", + " registry_uid=registry_uid,\n", + " reason=\"I want to do some more cool data science with PySyft and OpenDP\",\n", + " )\n", + ")\n", + "pool_image_create_request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6efd9eaa", + "metadata": {}, + "outputs": [], + "source": [ + "assert not isinstance(pool_image_create_request, sy.SyftError), str(\n", + " pool_image_create_request\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ea55e617", + "metadata": {}, + "outputs": [], + "source": [ + "assert len(pool_image_create_request.changes) == 2\n", + "assert pool_image_create_request.changes[0].config == docker_config_recordlinkage\n", + "assert pool_image_create_request.changes[1].num_workers == 2\n", + "assert pool_image_create_request.changes[1].pool_name == pool_name_recordlinkage" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1cc6f12a", + "metadata": {}, + "outputs": [], + "source": [ + "req_result = pool_image_create_request.approve()\n", + "req_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "76b52e2c", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(req_result, sy.SyftSuccess), str(req_result)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca4ab4f1", + "metadata": {}, + "outputs": [], + "source": [ + "_requests = domain_client.requests\n", + "assert not isinstance(_requests, sy.SyftError), str(_requests)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e79ef5cd", + "metadata": {}, + "outputs": [], + "source": [ + "pool_image_create_request = next(\n", + " (req for req in _requests if req.id == pool_image_create_request.id),\n", + " None,\n", + ")\n", + "assert pool_image_create_request is not None, str([req.__dict__ for req in _requests])\n", + "assert pool_image_create_request.status.value == 2, str(pool_image_create_request)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5518a574", + "metadata": {}, + "outputs": [], + "source": [ + "domain_client.images" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bb6b48b1", + "metadata": {}, + "outputs": [], + "source": [ + "image_exists = False\n", + "for im in domain_client.images.get_all():\n", + " if (\n", + " im.image_identifier\n", + " and im.image_identifier.repo_with_tag == docker_tag_recordlinkage\n", + " ):\n", + " image_exists = True\n", + "\n", + "assert image_exists, str([im.__dict__ for im in _images])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a47b8580", + "metadata": {}, + "outputs": [], + "source": [ + "assert domain_client.worker_pools[pool_name_recordlinkage]\n", + "assert len(domain_client.worker_pools[pool_name_recordlinkage].worker_list) == 2" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/packages/grid/backend/grid/logger/config.py b/packages/grid/backend/grid/logger/config.py index 0e0b6844003..7c5ea9ddb41 100644 --- a/packages/grid/backend/grid/logger/config.py +++ b/packages/grid/backend/grid/logger/config.py @@ -38,7 +38,7 @@ class LogConfig(BaseSettings): "{message}" ) - LOGURU_LEVEL: str = LogLevel.DEBUG.value + LOGURU_LEVEL: str = LogLevel.INFO.value LOGURU_SINK: Optional[str] = "/var/log/pygrid/grid.log" LOGURU_COMPRESSION: Optional[str] LOGURU_ROTATION: Union[ diff --git a/packages/grid/devspace.yaml b/packages/grid/devspace.yaml index 85f97c85d78..7fd681991c0 100644 --- a/packages/grid/devspace.yaml +++ b/packages/grid/devspace.yaml @@ -72,7 +72,7 @@ deployments: settings: nodeName: ${NODE_NAME} nodeType: "domain" - inMemoryWorkers: true + defaultWorkerPoolCount: 1 configuration: devmode: True diff --git a/packages/grid/helm/syft/templates/backend-service-account.yaml b/packages/grid/helm/syft/templates/backend-service-account.yaml index d31e01f0160..35be3230bd5 100644 --- a/packages/grid/helm/syft/templates/backend-service-account.yaml +++ b/packages/grid/helm/syft/templates/backend-service-account.yaml @@ -37,14 +37,16 @@ metadata: rules: - apiGroups: [""] resources: ["pods", "configmaps"] - verbs: ["list", "create", "update", "patch", "delete", "get"] + verbs: ["create", "get", "list", "watch", "update", "patch", "delete"] - apiGroups: [""] resources: ["pods/log"] verbs: ["get", "list", "watch"] - apiGroups: ["batch"] resources: ["jobs"] - verbs: ["create", "delete", "get", "list", "watch", "update"] - + verbs: ["create", "get", "list", "watch", "update", "patch", "delete"] + - apiGroups: ["apps"] + resources: ["statefulsets"] + verbs: ["create", "get", "list", "watch", "update", "patch", "delete"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/packages/grid/helm/syft/templates/backend-statefulset.yaml b/packages/grid/helm/syft/templates/backend-statefulset.yaml index aa52513a181..55cde6647b7 100644 --- a/packages/grid/helm/syft/templates/backend-statefulset.yaml +++ b/packages/grid/helm/syft/templates/backend-statefulset.yaml @@ -117,6 +117,10 @@ spec: value: "{{ .Values.node.settings.inMemoryWorkers }}" - name: LOG_LEVEL value: {{ .Values.node.settings.logLevel }} + - name: DEFAULT_WORKER_POOL_IMAGE + value: {{ .Values.syft.registry }}/openmined/grid-backend:{{ .Values.syft.version }} + - name: DEFAULT_WORKER_POOL_COUNT + value: "{{ .Values.node.settings.defaultWorkerPoolCount }}" envFrom: null image: {{ .Values.syft.registry }}/openmined/grid-backend:{{ .Values.syft.version }} lifecycle: null diff --git a/packages/syft/src/syft/custom_worker/builder.py b/packages/syft/src/syft/custom_worker/builder.py index 754c3c9418d..5e479cee71c 100644 --- a/packages/syft/src/syft/custom_worker/builder.py +++ b/packages/syft/src/syft/custom_worker/builder.py @@ -13,11 +13,10 @@ from .config import CustomWorkerConfig from .config import DockerWorkerConfig from .config import WorkerConfig +from .k8s import IN_KUBERNETES __all__ = ["CustomWorkerBuilder"] -IN_KUBERNETES = os.getenv("CONTAINER_HOST") == "k8s" - class CustomWorkerBuilder: TYPE_CPU = "cpu" @@ -73,7 +72,7 @@ def push_image( """ return self.builder.push_image( - tag, + tag=tag, username=username, password=password, registry_url=registry_url, diff --git a/packages/syft/src/syft/custom_worker/builder_docker.py b/packages/syft/src/syft/custom_worker/builder_docker.py index 973ac569d3c..e1f24520c25 100644 --- a/packages/syft/src/syft/custom_worker/builder_docker.py +++ b/packages/syft/src/syft/custom_worker/builder_docker.py @@ -67,7 +67,7 @@ def push_image( ) result = client.images.push(repository=tag) - return ImagePushResult(logs=result) + return ImagePushResult(logs=result, exit_code=0) def _parse_output(self, log_iterator: Iterable) -> str: log = "" diff --git a/packages/syft/src/syft/custom_worker/builder_k8s.py b/packages/syft/src/syft/custom_worker/builder_k8s.py index ff38439abe4..3c3558ea75b 100644 --- a/packages/syft/src/syft/custom_worker/builder_k8s.py +++ b/packages/syft/src/syft/custom_worker/builder_k8s.py @@ -3,6 +3,7 @@ import os from pathlib import Path from typing import Dict +from typing import List from typing import Optional # third party @@ -15,16 +16,14 @@ from .builder_types import BuilderBase from .builder_types import ImageBuildResult from .builder_types import ImagePushResult +from .k8s import BUILD_OUTPUT_PVC +from .k8s import JOB_COMPLETION_TTL +from .k8s import KUBERNETES_NAMESPACE __all__ = ["KubernetesBuilder"] -JOB_COMPLETION_TTL = 60 -BUILD_OUTPUT_PVC = "worker-builds" -KUBERNETES_NAMESPACE = os.getenv("K8S_NAMESPACE", "syft") - - -class InvalidImageDigest(Exception): +class BuildFailed(Exception): pass @@ -63,14 +62,18 @@ def build_image( # wait for job to complete/fail job.wait(["condition=Complete", "condition=Failed"]) - # TODO: check job status, raise with logs + # get logs + logs = self._get_logs(job) image_digest = self._get_image_digest(job) if not image_digest: - raise InvalidImageDigest("Did not get any image digest from the job") + exit_code = self._get_container_exit_code(job) + raise BuildFailed( + "Failed to build the image. " + f"Kaniko exit code={exit_code}. " + f"Logs={logs}" + ) - # get logs - logs = self._get_logs(job) except Exception: raise finally: @@ -100,7 +103,8 @@ def push_image( registry_url=registry_url, ) job.wait(["condition=Complete", "condition=Failed"]) - return ImagePushResult(logs=self._get_logs(job)) + exit_code = self._get_container_exit_code(job)[0] + return ImagePushResult(logs=self._get_logs(job), exit_code=exit_code) def _new_job_id(self, tag: str) -> str: return self._get_tag_hash(tag)[:16] @@ -118,6 +122,15 @@ def _get_image_digest(self, job: Job) -> Optional[str]: return container_status.state.terminated.message return None + def _get_container_exit_code(self, job: Job) -> List[int]: + selector = {"batch.kubernetes.io/job-name": job.metadata.name} + pods = self.client.get("pods", label_selector=selector) + exit_codes = [] + for pod in pods: + for container_status in pod.status.containerStatuses: + exit_codes.append(container_status.state.terminated.exitCode) + return exit_codes + def _get_logs(self, job: Job) -> str: selector = {"batch.kubernetes.io/job-name": job.metadata.name} pods = self.client.get("pods", label_selector=selector) @@ -167,7 +180,7 @@ def _create_kaniko_build_job( }, }, "spec": { - "backoffLimit": 2, + "backoffLimit": 0, "ttlSecondsAfterFinished": JOB_COMPLETION_TTL, "template": { "spec": { @@ -236,6 +249,22 @@ def _create_push_job( tag_hash = self._get_tag_hash(tag) registry_url = registry_url or tag.split("/")[0] + extra_flags = "" + if os.getenv("DEV_MODE") == "True": + extra_flags = "--insecure" + + run_cmds = [ + "echo Logging in to $REG_URL with user $REG_USERNAME...", + # login to registry + "crane auth login $REG_URL -u $REG_USERNAME -p $REG_PASSWORD", + # push with credentials + "echo Pushing image....", + f"crane push --image-refs /dev/termination-log {extra_flags} /output/{tag_hash}.tar {tag}", + # cleanup built tarfile + "echo Cleaning up tar....", + f"rm /output/{tag_hash}.tar", + ] + job = Job( { "metadata": { @@ -273,19 +302,7 @@ def _create_push_job( }, ], "command": ["sh"], - "args": [ - "-c", - " && ".join( - [ - "crane auth login $REG_URL -u $REG_USERNAME -p $REG_PASSWORD", - # push with credentials - f"crane push --image-refs /dev/termination-log /output/{tag_hash}.tar {tag}", # noqa: E501 - # cleanup built tarfile - f"rm /output/{tag_hash}.tar", - # for retagging use crane cp {tag} {new_tag} - ] - ), - ], + "args": ["-c", " && ".join(run_cmds)], "volumeMounts": [ { "name": "build-output", diff --git a/packages/syft/src/syft/custom_worker/builder_types.py b/packages/syft/src/syft/custom_worker/builder_types.py index 2b3cff4cad9..53c27788791 100644 --- a/packages/syft/src/syft/custom_worker/builder_types.py +++ b/packages/syft/src/syft/custom_worker/builder_types.py @@ -17,6 +17,7 @@ class ImageBuildResult(BaseModel): class ImagePushResult(BaseModel): logs: str + exit_code: int class BuilderBase(ABC): diff --git a/packages/syft/src/syft/custom_worker/k8s.py b/packages/syft/src/syft/custom_worker/k8s.py new file mode 100644 index 00000000000..491d7333b38 --- /dev/null +++ b/packages/syft/src/syft/custom_worker/k8s.py @@ -0,0 +1,84 @@ +# stdlib +from enum import Enum +import os +from typing import Optional + +# third party +from kr8s._data_utils import list_dict_unpack +from pydantic import BaseModel + +# Time after which Job will be deleted +JOB_COMPLETION_TTL = 60 + +# Persistent volume claim for storing build output +BUILD_OUTPUT_PVC = "worker-builds" + +# Kubernetes namespace +KUBERNETES_NAMESPACE = os.getenv("K8S_NAMESPACE", "syft") + +# Kubernetes runtime flag +IN_KUBERNETES = os.getenv("CONTAINER_HOST") == "k8s" + + +class PodPhase(Enum): + Pending = "Pending" + Running = "Running" + Succeeded = "Succeeded" + Failed = "Failed" + Unknown = "Unknown" + + +class PodCondition(BaseModel): + pod_scheduled: bool + containers_ready: bool + initialized: bool + ready: bool + + @classmethod + def from_conditions(cls, conditions: list): + pod_cond = list_dict_unpack(conditions, key="type", value="status") + pod_cond_flags = {k: v == "True" for k, v in pod_cond.items()} + return cls( + pod_scheduled=pod_cond_flags.get("PodScheduled", False), + containers_ready=pod_cond_flags.get("ContainersReady", False), + initialized=pod_cond_flags.get("Initialized", False), + ready=pod_cond_flags.get("Ready", False), + ) + + +class ContainerStatus(BaseModel): + ready: bool + running: bool + waiting: bool + reason: Optional[str] # when waiting=True + message: Optional[str] # when waiting=True + startedAt: Optional[str] # when running=True + + @classmethod + def from_status(cls, cstatus: dict): + cstate = cstatus.get("state", {}) + + return cls( + ready=cstatus.get("ready", False), + running="running" in cstate, + waiting="waiting" in cstate, + reason=cstate.get("waiting", {}).get("reason", None), + message=cstate.get("waiting", {}).get("message", None), + startedAt=cstate.get("running", {}).get("startedAt", None), + ) + + +class PodStatus(BaseModel): + phase: PodPhase + condition: PodCondition + container: ContainerStatus + + @classmethod + def from_status_dict(cls: "PodStatus", status: dict): + return cls( + phase=PodPhase(status.get("phase", "Unknown")), + condition=PodCondition.from_conditions(status.get("conditions", [])), + container=ContainerStatus.from_status( + status.get("containerStatuses", {})[0] + ), + ) diff --git a/packages/syft/src/syft/custom_worker/runner_k8s.py b/packages/syft/src/syft/custom_worker/runner_k8s.py new file mode 100644 index 00000000000..3f0824182fb --- /dev/null +++ b/packages/syft/src/syft/custom_worker/runner_k8s.py @@ -0,0 +1,219 @@ +# stdlib +import copy +import os +from time import sleep +from typing import List +from typing import Optional +from typing import Union + +# third party +import kr8s +from kr8s.objects import APIObject +from kr8s.objects import Pod +from kr8s.objects import StatefulSet + +# relative +from .k8s import KUBERNETES_NAMESPACE +from .k8s import PodStatus + + +class KubernetesRunner: + def __init__(self): + self.client = kr8s.api(namespace=KUBERNETES_NAMESPACE) + + def create_pool( + self, + pool_name: str, + tag: str, + replicas: int = 1, + env_vars: Optional[dict] = None, + **kwargs, + ) -> StatefulSet: + deployment = self._create_stateful_set( + pool_name, + tag, + replicas, + env_vars, + **kwargs, + ) + self.wait(deployment, available_replicas=replicas) + return deployment + + def scale_pool(self, pool_name: str, replicas: int) -> Optional[StatefulSet]: + deployment = self.get_pool(pool_name) + if not deployment: + return None + deployment.scale(replicas) + self.wait(deployment, available_replicas=replicas) + return deployment + + def get_pool(self, pool_name: str) -> Optional[StatefulSet]: + selector = {"app.kubernetes.io/component": pool_name} + for _set in self.client.get("statefulsets", label_selector=selector): + return _set + return None + + def delete_pool(self, pool_name: str) -> bool: + selector = {"app.kubernetes.io/component": pool_name} + for _set in self.client.get("statefulsets", label_selector=selector): + _set.delete() + return True + return False + + def delete_pod(self, pod_name: str) -> bool: + pods = self.client.get("pods", pod_name) + for pod in pods: + pod.delete() + return True + return False + + def get_pods(self, pool_name: str) -> List[Pod]: + selector = {"app.kubernetes.io/component": pool_name} + pods = self.client.get("pods", label_selector=selector) + if len(pods) > 0: + pods.sort(key=lambda pod: pod.name) + return pods + + def get_pod_logs(self, pod_name: str) -> str: + pods = self.client.get("pods", pod_name) + logs = [] + for pod in pods: + logs.append(f"----------Logs for pod={pod.metadata.name}----------") + for log in pod.logs(): + logs.append(log) + + return "\n".join(logs) + + def get_pod_status(self, pod: Union[str, Pod]) -> Optional[PodStatus]: + if isinstance(pod, str): + pods = self.client.get("pods", pod) + if len(pods) == 0: + return None + pod = pods[0] + else: + pod.refresh() + + return PodStatus.from_status_dict(pod.status) + + def wait( + self, + deployment: StatefulSet, + available_replicas: int, + timeout: int = 60, + ) -> None: + # TODO: Report wait('jsonpath=') bug to kr8s + # Until then this is the substitute implementation + + if available_replicas <= 0: + return + + while True: + if timeout == 0: + raise TimeoutError("Timeout waiting for replicas") + + deployment.refresh() + if deployment.status.availableReplicas == available_replicas: + break + + timeout -= 1 + sleep(1) + + def _current_pod_name(self) -> str: + env_val = os.getenv("K8S_POD_NAME") + if env_val: + return env_val + + selector = {"app.kubernetes.io/component": "backend"} + for pod in self.client.get("pods", label_selector=selector): + return pod.name + + def _get_obj_from_list(self, objs: List[dict], name: str) -> dict: + """Helper function extract kubernetes object from list by name""" + for obj in objs: + if obj.name == name: + return obj + + def _create_stateful_set( + self, + pool_name: str, + tag: str, + replicas=1, + env_vars: Optional[dict] = None, + **kwargs, + ) -> StatefulSet: + """Create a stateful set for a pool""" + + env_vars = env_vars or {} + + _pod = Pod.get(self._current_pod_name()) + + creds_volume = self._get_obj_from_list( + objs=_pod.spec.volumes, + name="credentials-data", + ) + creds_volume_mount = self._get_obj_from_list( + objs=_pod.spec.containers[0].volumeMounts, + name="credentials-data", + ) + + env = _pod.spec.containers[0].env.to_list() + env_clone = copy.deepcopy(env) + + # update existing + for item in env_clone: + k = item["name"] + if k in env_vars: + v = env_vars.pop(k) + item["value"] = v + + # append remaining + for k, v in env_vars.items(): + env_clone.append({"name": k, "value": v}) + + stateful_set = StatefulSet( + { + "metadata": { + "name": pool_name, + "labels": { + "app.kubernetes.io/name": KUBERNETES_NAMESPACE, + "app.kubernetes.io/component": pool_name, + "app.kubernetes.io/managed-by": "kr8s", + }, + }, + "spec": { + "replicas": replicas, + "selector": { + "matchLabels": { + "app.kubernetes.io/component": pool_name, + } + }, + "template": { + "metadata": { + "labels": { + "app.kubernetes.io/name": KUBERNETES_NAMESPACE, + "app.kubernetes.io/component": pool_name, + } + }, + "spec": { + "containers": [ + { + "name": pool_name, + "image": tag, + "env": env_clone, + "volumeMounts": [creds_volume_mount], + } + ], + "volumes": [creds_volume], + }, + }, + }, + } + ) + return self._create_or_get(stateful_set) + + def _create_or_get(self, obj: APIObject) -> APIObject: + if not obj.exists(): + obj.create() + else: + obj.refresh() + return obj diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 1896c3e981e..2bdb32445b1 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -188,6 +188,18 @@ def get_container_host() -> Optional[str]: return get_env("CONTAINER_HOST") +def get_default_worker_image() -> str: + return get_env("DEFAULT_WORKER_POOL_IMAGE") + + +def get_default_worker_pool_name() -> str: + return get_env("DEFAULT_WORKER_POOL_NAME", DEFAULT_WORKER_POOL_NAME) + + +def get_default_worker_pool_count() -> int: + return int(get_env("DEFAULT_WORKER_POOL_COUNT", 1)) + + def in_kubernetes() -> Optional[str]: return get_container_host() == "k8s" @@ -199,8 +211,22 @@ def get_venv_packages() -> str: return res +def get_syft_worker() -> bool: + return str_to_bool(get_env("SYFT_WORKER", "false")) + + +def get_k8s_pod_name() -> Optional[str]: + return get_env("K8S_POD_NAME") + + def get_syft_worker_uid() -> Optional[str]: - return get_env("SYFT_WORKER_UID", None) + is_worker = get_syft_worker() + pod_name = get_k8s_pod_name() + uid = get_env("SYFT_WORKER_UID") + # if uid is empty is a K8S worker, generate a uid from the pod name + if (not uid) and is_worker and pod_name: + uid = str(UID.with_seed(pod_name)) + return uid signing_key_env = get_private_key_env() @@ -1362,7 +1388,8 @@ def user_code_stash(self) -> UserCodeStash: def get_default_worker_pool(self): result = self.pool_stash.get_by_name( - credentials=self.verify_key, pool_name=DEFAULT_WORKER_POOL_NAME + credentials=self.verify_key, + pool_name=get_default_worker_pool_name(), ) if result.is_err(): return SyftError(message=f"{result.err()}") @@ -1512,36 +1539,43 @@ def get_all_nodes(cls) -> List[Node]: return list(cls.__node_registry__.values()) -def create_default_worker_pool(node: Node) -> Optional[SyftError]: - if node.in_memory_workers: - print("Creating default worker pool with in memory workers") +def get_default_worker_tag_by_env(dev_mode=False): + if in_kubernetes(): + return get_default_worker_image() + elif dev_mode: + return "local-dev" + else: + return __version__ - credentials = node.verify_key +def create_default_worker_pool(node: Node) -> Optional[SyftError]: + credentials = node.verify_key + pull_image = not node.dev_mode image_stash = node.get_service(SyftWorkerImageService).stash - + default_pool_name = get_default_worker_pool_name() + default_worker_pool = node.get_default_worker_pool() + default_worker_tag = get_default_worker_tag_by_env(node.dev_mode) + worker_count = get_default_worker_pool_count() context = AuthedServiceContext( node=node, credentials=credentials, role=ServiceRole.ADMIN, ) - print("Creating Default Worker Image") + print(f"Creating default worker image with tag='{default_worker_tag}'") # Get/Create a default worker SyftWorkerImage default_image = create_default_image( credentials=credentials, image_stash=image_stash, - dev_mode=node.dev_mode, - syft_version_tag="local-dev" if node.dev_mode else __version__, + tag=default_worker_tag, + in_kubernetes=in_kubernetes(), ) - - # Skip pulling image if using locally built image - pull_image = not node.dev_mode if isinstance(default_image, SyftError): + print("Failed to create default worker image: ", default_image.message) return default_image if not default_image.is_built: - print("Building Default Worker Image") + print(f"Building default worker image with tag={default_worker_tag}") image_build_method = node.get_service_method(SyftWorkerImageService.build) # Build the Image for given tag result = image_build_method( @@ -1555,21 +1589,23 @@ def create_default_worker_pool(node: Node) -> Optional[SyftError]: print("Failed to build default worker image: ", result.message) return - default_worker_pool = node.get_default_worker_pool() - worker_count = node.queue_config.client_config.n_consumers - # Create worker pool if it doesn't exists + print( + "Setting up worker pool" + f"name={default_pool_name} " + f"workers={worker_count} " + f"image_uid={default_image.id} " + f"in_memory={node.in_memory_workers}" + ) if default_worker_pool is None: worker_to_add_ = worker_count create_pool_method = node.get_service_method(SyftWorkerPoolService.launch) - print("Creating default Worker Pool") result = create_pool_method( context, - name=DEFAULT_WORKER_POOL_NAME, + name=default_pool_name, image_uid=default_image.id, - num_workers=worker_to_add_, + num_workers=worker_count, ) - else: # Else add a worker to existing worker pool worker_to_add_ = max(default_worker_pool.max_count, worker_count) - len( @@ -1577,11 +1613,13 @@ def create_default_worker_pool(node: Node) -> Optional[SyftError]: ) add_worker_method = node.get_service_method(SyftWorkerPoolService.add_workers) result = add_worker_method( - context=context, number=worker_to_add_, pool_name=DEFAULT_WORKER_POOL_NAME + context=context, + number=worker_to_add_, + pool_name=default_pool_name, ) if isinstance(result, SyftError): - print(f"Failed to create Worker for Default workers. Error: {result.message}") + print(f"Default worker pool error. {result.message}") return for n in range(worker_to_add_): diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index d394d8f29bd..f5a76efa9ce 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -1086,7 +1086,7 @@ "CreateCustomImageChange": { "1": { "version": 1, - "hash": "197b765666d605cf601bd69bff21b4a31896b6acd14af38739a5047ca29ff015", + "hash": "bc09dca7995938f3b3a2bd9c8b3c2feffc8484df466144a425cb69cadb2ab635", "action": "add" } }, diff --git a/packages/syft/src/syft/service/queue/zmq_queue.py b/packages/syft/src/syft/service/queue/zmq_queue.py index dd8b2740b6d..a6a346b6d11 100644 --- a/packages/syft/src/syft/service/queue/zmq_queue.py +++ b/packages/syft/src/syft/service/queue/zmq_queue.py @@ -370,8 +370,9 @@ def purge_workers(self): for worker in list(self.waiting): if worker.has_expired(): logger.info( - "Deleting expired worker={} expiry={} now={}", + "Deleting expired Worker id={} uid={} expiry={} now={}", worker.identity, + worker.syft_worker_id, worker.get_expiry(), Timeout.now(), ) @@ -521,9 +522,10 @@ def process_worker(self, address: bytes, msg: List[bytes]): worker.service = service worker.syft_worker_id = UID(syft_worker_id) logger.info( - "New Worker id={} service={}", - worker.identity, + "New Worker service={} id={} uid={}", service.name, + worker.identity, + worker.syft_worker_id, ) self.worker_waiting(worker) diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index 46f156d9ad2..b606512780a 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -20,6 +20,7 @@ from ...abstract_node import NodeSideType from ...client.api import APIRegistry from ...custom_worker.config import WorkerConfig +from ...custom_worker.k8s import IN_KUBERNETES from ...node.credentials import SyftVerifyKey from ...serde.serializable import serializable from ...serde.serialize import _serialize @@ -193,6 +194,7 @@ class CreateCustomImageChange(Change): config: WorkerConfig tag: str + registry_uid: Optional[UID] __repr_attrs__ = ["config", "tag"] @@ -220,8 +222,30 @@ def _run( worker_image = result.ok() build_result = worker_image_service.build( - service_context, image_uid=worker_image.id, tag=self.tag + service_context, + image_uid=worker_image.id, + tag=self.tag, + registry_uid=self.registry_uid, ) + + if isinstance(build_result, SyftError): + return Err(build_result) + + if IN_KUBERNETES: + push_result = worker_image_service.push( + service_context, + image=worker_image.id, + ) + + if isinstance(push_result, SyftError): + return Err(push_result) + + return Ok( + SyftSuccess( + message=f"Build Result: {build_result.message} \n Push Result: {push_result.message}" + ) + ) + return Ok(build_result) except Exception as e: diff --git a/packages/syft/src/syft/service/worker/utils.py b/packages/syft/src/syft/service/worker/utils.py index 3bb7f6ef662..a06ab53ef15 100644 --- a/packages/syft/src/syft/service/worker/utils.py +++ b/packages/syft/src/syft/service/worker/utils.py @@ -6,6 +6,7 @@ import sys from typing import List from typing import Optional +from typing import Tuple from typing import Union # third party @@ -17,10 +18,14 @@ from ...custom_worker.builder_types import ImageBuildResult from ...custom_worker.builder_types import ImagePushResult from ...custom_worker.config import DockerWorkerConfig +from ...custom_worker.k8s import PodStatus +from ...custom_worker.runner_k8s import KubernetesRunner from ...node.credentials import SyftVerifyKey +from ...types.datetime import DateTime from ...types.uid import UID from ...util.util import get_queue_address from ..response import SyftError +from .image_identifier import SyftWorkerImageIdentifier from .worker_image import SyftWorkerImage from .worker_image_stash import SyftWorkerImageStash from .worker_pool import ContainerSpawnStatus @@ -232,7 +237,8 @@ def run_workers_in_threads( ) except Exception as e: print( - f"Failed to start consumer for Pool Name: {pool_name}, Worker Name: {worker_name}. Error: {e}" + "Failed to start consumer for" + f"pool={pool_name} worker={worker_name}. Error: {e}" ) worker.status = WorkerStatus.STOPPED error = str(e) @@ -248,6 +254,156 @@ def run_workers_in_threads( return results +def create_kubernetes_pool( + runner: KubernetesRunner, + worker_image: SyftWorker, + pool_name: str, + replicas: int, + queue_port: int, + debug: bool, +): + pool = None + error = False + + try: + print( + "Creating new pool " + f"name={pool_name} " + f"tag={worker_image.image_identifier.full_name_with_tag} " + f"replicas={replicas}" + ) + pool = runner.create_pool( + pool_name=pool_name, + tag=worker_image.image_identifier.full_name_with_tag, + replicas=replicas, + env_vars={ + "SYFT_WORKER": "True", + "DEV_MODE": f"{debug}", + "QUEUE_PORT": f"{queue_port}", + "CONSUMER_SERVICE_NAME": pool_name, + "N_CONSUMERS": "1", + "CREATE_PRODUCER": "False", + "INMEMORY_WORKERS": "False", + }, + ) + except Exception as e: + error = True + return SyftError(message=f"Failed to start workers {e}") + finally: + if error and pool: + pool.delete() + + return runner.get_pods(pool_name=pool_name) + + +def scale_kubernetes_pool( + runner: KubernetesRunner, + pool_name: str, + replicas: int, +): + pool = runner.get_pool(pool_name) + if not pool: + return SyftError(message=f"Pool does not exist. name={pool_name}") + + try: + print(f"Scaling pool name={pool_name} to replicas={replicas}") + runner.scale_pool(pool_name=pool_name, replicas=replicas) + except Exception as e: + return SyftError(message=f"Failed to scale workers {e}") + + return runner.get_pods(pool_name=pool_name) + + +def run_workers_in_kubernetes( + worker_image: SyftWorkerImage, + worker_count: int, + pool_name: str, + queue_port: int, + start_idx=0, + debug: bool = False, + username: Optional[str] = None, + password: Optional[str] = None, + registry_url: Optional[str] = None, + **kwargs, +) -> Union[List[ContainerSpawnStatus], SyftError]: + spawn_status = [] + runner = KubernetesRunner() + + if start_idx == 0: + pool_pods = create_kubernetes_pool( + runner, + worker_image, + pool_name, + worker_count, + queue_port, + debug, + ) + else: + pool_pods = scale_kubernetes_pool(runner, pool_name, worker_count) + + if isinstance(pool_pods, list) and len(pool_pods) > 0: + # slice only those pods that we're interested in + pool_pods = pool_pods[start_idx:] + + if isinstance(pool_pods, SyftError): + return pool_pods + + # create worker object + for pod in pool_pods: + status = runner.get_pod_status(pod) + status, healthcheck, error = map_pod_to_worker_status(status) + + # this worker id will be the same as the one in the worker + syft_worker_uid = UID.with_seed(pod.metadata.name) + + worker = SyftWorker( + id=syft_worker_uid, + name=pod.metadata.name, + container_id=None, + status=status, + healthcheck=healthcheck, + image=worker_image, + worker_pool_name=pool_name, + ) + + spawn_status.append( + ContainerSpawnStatus( + worker_name=pod.metadata.name, + worker=worker, + error=error, + ) + ) + + return spawn_status + + +def map_pod_to_worker_status( + status: PodStatus, +) -> Tuple[WorkerStatus, WorkerHealth, Optional[str]]: + worker_status = None + worker_healthcheck = None + worker_error = None + + # check if pod is ready through pod.status.condition.Ready & pod.status.condition.ContainersReady + pod_ready = status.condition.ready and status.condition.containers_ready + + if not pod_ready: + # extract error if not ready + worker_error = f"{status.container.reason}: {status.container.message}" + + # map readiness to status - it's either running or pending. + # closely relates to pod.status.phase, but avoiding as it is not as detailed as pod.status.conditions + worker_status = WorkerStatus.RUNNING if pod_ready else WorkerStatus.PENDING + + # TODO: update these values based on actual runtime probes instead of kube pod statuses + # if there are any errors, then healthcheck is unhealthy + worker_healthcheck = ( + WorkerHealth.UNHEALTHY if worker_error else WorkerHealth.HEALTHY + ) + + return worker_status, worker_healthcheck, worker_error + + def run_containers( pool_name: str, worker_image: SyftWorkerImage, @@ -259,31 +415,40 @@ def run_containers( username: Optional[str] = None, password: Optional[str] = None, registry_url: Optional[str] = None, -) -> List[ContainerSpawnStatus]: +) -> Union[List[ContainerSpawnStatus], SyftError]: results = [] - if orchestration not in [WorkerOrchestrationType.DOCKER]: - return SyftError(message="Only Orchestration via Docker is supported.") - if not worker_image.is_built: return SyftError(message="Image must be built before running it.") - with contextlib.closing(docker.from_env()) as client: - for worker_count in range(start_idx + 1, number + 1): - worker_name = f"{pool_name}-{worker_count}" - spawn_result = run_container_using_docker( - docker_client=client, - worker_name=worker_name, - worker_count=worker_count, - worker_image=worker_image, - pool_name=pool_name, - queue_port=queue_port, - debug=dev_mode, - username=username, - password=password, - registry_url=registry_url, - ) - results.append(spawn_result) + print(f"Starting workers with start_idx={start_idx} count={number}") + + if orchestration == WorkerOrchestrationType.DOCKER: + with contextlib.closing(docker.from_env()) as client: + for worker_count in range(start_idx + 1, number + 1): + worker_name = f"{pool_name}-{worker_count}" + spawn_result = run_container_using_docker( + docker_client=client, + worker_name=worker_name, + worker_count=worker_count, + worker_image=worker_image, + pool_name=pool_name, + queue_port=queue_port, + debug=dev_mode, + username=username, + password=password, + registry_url=registry_url, + ) + results.append(spawn_result) + elif orchestration == WorkerOrchestrationType.KUBERNETES: + return run_workers_in_kubernetes( + worker_image=worker_image, + worker_count=number, + pool_name=pool_name, + queue_port=queue_port, + debug=dev_mode, + start_idx=start_idx, + ) return results @@ -291,8 +456,8 @@ def run_containers( def create_default_image( credentials: SyftVerifyKey, image_stash: SyftWorkerImageStash, - dev_mode: bool, - syft_version_tag: str, + tag: str, + in_kubernetes: bool = False, ) -> Union[SyftError, SyftWorkerImage]: # TODO: Hardcode worker dockerfile since not able to COPY # worker_cpu.dockerfile to backend in backend.dockerfile. @@ -300,23 +465,41 @@ def create_default_image( # default_cpu_dockerfile = get_syft_cpu_dockerfile() # DockerWorkerConfig.from_path(default_cpu_dockerfile) - default_cpu_dockerfile = f"""ARG SYFT_VERSION_TAG='{syft_version_tag}' \n""" - default_cpu_dockerfile += """FROM openmined/grid-backend:${SYFT_VERSION_TAG} - ARG PYTHON_VERSION="3.11" - ARG SYSTEM_PACKAGES="" - ARG PIP_PACKAGES="pip --dry-run" - ARG CUSTOM_CMD='echo "No custom commands passed"' + if not in_kubernetes: + default_cpu_dockerfile = f"""ARG SYFT_VERSION_TAG='{tag}' \n""" + default_cpu_dockerfile += """FROM openmined/grid-backend:${SYFT_VERSION_TAG} + ARG PYTHON_VERSION="3.11" + ARG SYSTEM_PACKAGES="" + ARG PIP_PACKAGES="pip --dry-run" + ARG CUSTOM_CMD='echo "No custom commands passed"' + + # Worker specific environment variables go here + ENV SYFT_WORKER="true" + ENV DOCKER_TAG=${SYFT_VERSION_TAG} + + RUN apk update && \ + apk add ${SYSTEM_PACKAGES} && \ + pip install --user ${PIP_PACKAGES} && \ + bash -c "$CUSTOM_CMD" + """ + worker_config = DockerWorkerConfig(dockerfile=default_cpu_dockerfile) + _new_image = SyftWorkerImage( + config=worker_config, + created_by=credentials, + ) + else: + # in k8s we don't need to build the image, just the tag of backend is enough - # Worker specific environment variables go here - ENV SYFT_WORKER="true" - ENV DOCKER_TAG=${SYFT_VERSION_TAG} + # a very bad and hacky way to keep the Stash's unique `config` requirment happy + worker_config = DockerWorkerConfig(dockerfile=tag) - RUN apk update && \ - apk add ${SYSTEM_PACKAGES} && \ - pip install --user ${PIP_PACKAGES} && \ - bash -c "$CUSTOM_CMD" - """ - worker_config = DockerWorkerConfig(dockerfile=default_cpu_dockerfile) + # create SyftWorkerImage from a pre-built image + _new_image = SyftWorkerImage( + config=worker_config, + created_by=credentials, + image_identifier=SyftWorkerImageIdentifier.from_str(tag), + built_at=DateTime.now(), + ) result = image_stash.get_by_docker_config( credentials=credentials, @@ -324,12 +507,7 @@ def create_default_image( ) if result.ok() is None: - default_syft_image = SyftWorkerImage( - config=worker_config, - created_by=credentials, - ) - result = image_stash.set(credentials, default_syft_image) - + result = image_stash.set(credentials, _new_image) if result.is_err(): return SyftError(message=f"Failed to save image stash: {result.err()}") @@ -345,9 +523,7 @@ def _get_healthcheck_based_on_status(status: WorkerStatus) -> WorkerHealth: return WorkerHealth.UNHEALTHY -def docker_build( - image: SyftWorkerImage, **kwargs -) -> Union[ImageBuildResult, SyftError]: +def image_build(image: SyftWorkerImage, **kwargs) -> Union[ImageBuildResult, SyftError]: full_tag = image.image_identifier.full_name_with_tag try: builder = CustomWorkerBuilder() @@ -372,11 +548,12 @@ def docker_build( ) -def docker_push( +def image_push( image: SyftWorkerImage, username: Optional[str] = None, password: Optional[str] = None, ) -> Union[ImagePushResult, SyftError]: + full_tag = image.image_identifier.full_name_with_tag try: builder = CustomWorkerBuilder() result = builder.push_image( @@ -387,21 +564,32 @@ def docker_push( password=password, ) - if "error" in result.logs: + if "error" in result.logs.lower() or result.exit_code: return SyftError( - message=f"Failed to push {image.image_identifier}. Logs:\n{result.logs}" + message=f"Failed to push {full_tag}. " + f"Exit code: {result.exit_code}. " + f"Logs:\n{result.logs}" ) return result except docker.errors.APIError as e: - return SyftError( - message=f"Docker API error when pushing {image.image_identifier}. {e}" - ) + return SyftError(message=f"Docker API error when pushing {full_tag}. {e}") except docker.errors.DockerException as e: return SyftError( - message=f"Docker exception when pushing {image.image_identifier}. Reason - {e}" + message=f"Docker exception when pushing {full_tag}. Reason - {e}" ) except Exception as e: return SyftError( message=f"Unknown exception when pushing {image.image_identifier}. Reason - {e}" ) + + +def get_orchestration_type() -> WorkerOrchestrationType: + """Returns orchestration type from env. Defaults to Python.""" + + orchstration_type_ = os.getenv("CONTAINER_HOST") + return ( + WorkerOrchestrationType(orchstration_type_.lower()) + if orchstration_type_ + else WorkerOrchestrationType.PYTHON + ) diff --git a/packages/syft/src/syft/service/worker/worker_image_service.py b/packages/syft/src/syft/service/worker/worker_image_service.py index 80456c52cbb..13e2883d402 100644 --- a/packages/syft/src/syft/service/worker/worker_image_service.py +++ b/packages/syft/src/syft/service/worker/worker_image_service.py @@ -11,6 +11,7 @@ # relative from ...custom_worker.config import DockerWorkerConfig +from ...custom_worker.k8s import IN_KUBERNETES from ...serde.serializable import serializable from ...store.document_store import DocumentStore from ...types.datetime import DateTime @@ -25,8 +26,8 @@ from ..user.user_roles import DATA_SCIENTIST_ROLE_LEVEL from .image_registry import SyftImageRegistry from .image_registry_service import SyftImageRegistryService -from .utils import docker_build -from .utils import docker_push +from .utils import image_build +from .utils import image_push from .worker_image import SyftWorkerImage from .worker_image import SyftWorkerImageIdentifier from .worker_image_stash import SyftWorkerImageStash @@ -77,6 +78,9 @@ def build( ) -> Union[SyftSuccess, SyftError]: registry: SyftImageRegistry = None + if IN_KUBERNETES and registry_uid is None: + return SyftError(message="Registry UID is required in Kubernetes mode.") + result = self.stash.get_by_uid(credentials=context.credentials, uid=image_uid) if result.is_err(): return SyftError( @@ -118,7 +122,7 @@ def build( result = None if not context.node.in_memory_workers: - build_result = docker_build(worker_image, pull=pull) + build_result = image_build(worker_image, pull=pull) if isinstance(build_result, SyftError): return build_result @@ -171,7 +175,7 @@ def push( message=f"Image ID: {worker_image.id} does not have a valid registry host." ) - result = docker_push( + result = image_push( image=worker_image, username=username, password=password, @@ -205,6 +209,8 @@ def get_all( if im.image_identifier is not None: res.append((im.image_identifier.full_name_with_tag, im)) else: + # FIXME: syft deployments in kubernetes results in a new image per version + # This results in "default-worker-image" key having multiple values and DictTuple() throws exception res.append(("default-worker-image", im)) return DictTuple(res) @@ -223,7 +229,14 @@ def remove( return SyftError(message=f"{res.err()}") image: SyftWorkerImage = res.ok() - if not context.node.in_memory_workers and image and image.image_identifier: + if context.node.in_memory_workers: + pass + elif IN_KUBERNETES: + # TODO: Implement image deletion in kubernetes + return SyftError( + message="Image Deletion is not yet implemented in Kubernetes !!" + ) + elif image and image.image_identifier: try: full_tag: str = image.image_identifier.full_name_with_tag with contextlib.closing(docker.from_env()) as client: diff --git a/packages/syft/src/syft/service/worker/worker_image_stash.py b/packages/syft/src/syft/service/worker/worker_image_stash.py index c2e3a5a1d5d..49eb6fa1802 100644 --- a/packages/syft/src/syft/service/worker/worker_image_stash.py +++ b/packages/syft/src/syft/service/worker/worker_image_stash.py @@ -53,7 +53,7 @@ def set( credentials=credentials, config=obj.config ) if result.is_ok() and result.ok() is not None: - return Err(f"Image already exists for given DockerConfig: {obj.config}") + return Err(f"Image already exists for: {obj.config}") return super().set(credentials, obj, add_permissions, ignore_duplicates) diff --git a/packages/syft/src/syft/service/worker/worker_pool.py b/packages/syft/src/syft/service/worker/worker_pool.py index f1b3fd384f6..377ee118b55 100644 --- a/packages/syft/src/syft/service/worker/worker_pool.py +++ b/packages/syft/src/syft/service/worker/worker_pool.py @@ -244,9 +244,9 @@ def workers(self) -> List[SyftWorker]: @serializable() -class WorkerOrchestrationType: +class WorkerOrchestrationType(Enum): DOCKER = "docker" - K8s = "k8s" + KUBERNETES = "k8s" PYTHON = "python" diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index d2799bea74d..3787ab62e95 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -10,6 +10,7 @@ # relative from ...custom_worker.config import CustomWorkerConfig from ...custom_worker.config import WorkerConfig +from ...custom_worker.k8s import IN_KUBERNETES from ...serde.serializable import serializable from ...store.document_store import DocumentStore from ...store.linked_obj import LinkedObject @@ -32,12 +33,12 @@ from ..user.user_roles import DATA_SCIENTIST_ROLE_LEVEL from .image_identifier import SyftWorkerImageIdentifier from .utils import DEFAULT_WORKER_POOL_NAME +from .utils import get_orchestration_type from .utils import run_containers from .utils import run_workers_in_threads from .worker_image import SyftWorkerImage from .worker_image_stash import SyftWorkerImageStash from .worker_pool import ContainerSpawnStatus -from .worker_pool import WorkerOrchestrationType from .worker_pool import WorkerPool from .worker_pool_stash import SyftWorkerPoolStash from .worker_service import WorkerService @@ -115,7 +116,7 @@ def launch( # Create worker pool from given image, with the given worker pool # and with the desired number of workers - worker_list, container_statuses = _create_workers_in_pool( + result = _create_workers_in_pool( context=context, pool_name=name, existing_worker_cnt=0, @@ -126,6 +127,11 @@ def launch( reg_password=reg_password, ) + if isinstance(result, SyftError): + return result + + worker_list, container_statuses = result + # Update the Database with the pool information worker_pool = WorkerPool( name=name, @@ -227,6 +233,7 @@ def create_image_and_pool_request( num_workers: int, tag: str, config: WorkerConfig, + registry_uid: Optional[UID] = None, reason: Optional[str] = "", ) -> Union[SyftError, SyftSuccess]: """ @@ -244,6 +251,9 @@ def create_image_and_pool_request( if isinstance(config, CustomWorkerConfig): return SyftError(message="We only support DockerWorkerConfig.") + if IN_KUBERNETES and registry_uid is None: + return SyftError(message="Registry UID is required in Kubernetes mode.") + # Check if an image already exists for given docker config search_result = self.image_stash.get_by_docker_config( credentials=context.credentials, config=config @@ -262,7 +272,7 @@ def create_image_and_pool_request( # Validate Image Tag try: - image_identifier = SyftWorkerImageIdentifier.from_str(tag=tag) + SyftWorkerImageIdentifier.from_str(tag=tag) except pydantic.ValidationError as e: return SyftError(message=f"Failed to create tag: {e}") @@ -274,7 +284,8 @@ def create_image_and_pool_request( # If this change is approved, then build an image using the config create_custom_image_change = CreateCustomImageChange( config=config, - tag=image_identifier.full_name_with_tag, + tag=tag, + registry_uid=registry_uid, ) # Check if a pool already exists for given pool name @@ -353,6 +364,9 @@ def add_workers( Union[List[ContainerSpawnStatus], SyftError]: List of spawned workers with their status and error if any. """ + if number <= 0: + return SyftError(message=f"Invalid number of workers: {number}") + # Extract pool using either using pool id or pool name if pool_id: result = self.stash.get_by_uid(credentials=context.credentials, uid=pool_id) @@ -385,7 +399,7 @@ def add_workers( worker_stash = worker_service.stash # Add workers to given pool from the given image - worker_list, container_statuses = _create_workers_in_pool( + result = _create_workers_in_pool( context=context, pool_name=worker_pool.name, existing_worker_cnt=existing_worker_cnt, @@ -394,6 +408,11 @@ def add_workers( worker_stash=worker_stash, ) + if isinstance(result, SyftError): + return result + + worker_list, container_statuses = result + worker_pool.worker_list += worker_list worker_pool.max_count = existing_worker_cnt + number @@ -528,7 +547,7 @@ def _create_workers_in_pool( worker_stash: WorkerStash, reg_username: Optional[str] = None, reg_password: Optional[str] = None, -) -> Tuple[List[LinkedObject], List[ContainerSpawnStatus]]: +) -> Union[Tuple[List[LinkedObject], List[ContainerSpawnStatus]], SyftError]: queue_port = context.node.queue_config.client_config.queue_port # Check if workers needs to be run in memory or as containers @@ -543,18 +562,21 @@ def _create_workers_in_pool( number=worker_cnt + existing_worker_cnt, ) else: - container_statuses: List[ContainerSpawnStatus] = run_containers( + result = run_containers( pool_name=pool_name, worker_image=worker_image, start_idx=existing_worker_cnt, number=worker_cnt + existing_worker_cnt, - orchestration=WorkerOrchestrationType.DOCKER, + orchestration=get_orchestration_type(), queue_port=queue_port, dev_mode=context.node.dev_mode, username=reg_username, password=reg_password, registry_url=worker_image.image_identifier.registry_host, ) + if isinstance(result, SyftError): + return result + container_statuses = result linked_worker_list = [] diff --git a/packages/syft/src/syft/service/worker/worker_service.py b/packages/syft/src/syft/service/worker/worker_service.py index d16c054b935..9871d7fa18a 100644 --- a/packages/syft/src/syft/service/worker/worker_service.py +++ b/packages/syft/src/syft/service/worker/worker_service.py @@ -12,6 +12,8 @@ from docker.models.containers import Container # relative +from ...custom_worker.k8s import IN_KUBERNETES +from ...custom_worker.runner_k8s import KubernetesRunner from ...node.credentials import SyftVerifyKey from ...serde.serializable import serializable from ...store.document_store import DocumentStore @@ -27,6 +29,7 @@ from ..user.user_roles import DATA_SCIENTIST_ROLE_LEVEL from .utils import DEFAULT_WORKER_POOL_NAME from .utils import _get_healthcheck_based_on_status +from .utils import map_pod_to_worker_status from .worker_pool import ContainerSpawnStatus from .worker_pool import SyftWorker from .worker_pool import WorkerHealth @@ -71,25 +74,13 @@ def list(self, context: AuthedServiceContext) -> Union[SyftSuccess, SyftError]: if result.is_err(): return SyftError(message=f"Failed to fetch workers. {result.err()}") - workers = result.ok() + workers: List[SyftWorker] = result.ok() if context.node.in_memory_workers: return workers - - # If container workers, check their statuses - with contextlib.closing(docker.from_env()) as client: - for idx, worker in enumerate(workers): - worker_ = _check_and_update_status_for_worker( - client=client, - worker=worker, - worker_stash=self.stash, - credentials=context.credentials, - ) - - if not isinstance(worker_, SyftWorker): - return worker_ - - workers[idx] = worker_ + else: + # If container workers, check their statuses + workers = refresh_worker_status(workers, self.stash, context.credentials) return workers @@ -101,12 +92,12 @@ def status( context: AuthedServiceContext, uid: UID, ) -> Union[Tuple[WorkerStatus, WorkerHealth], SyftError]: - worker = self.get(context=context, uid=uid) + result = self.get(context=context, uid=uid) - if not isinstance(worker, SyftWorker): - return worker + if isinstance(result, SyftError): + return result - return worker.status, worker.healthcheck + return result.status, result.healthcheck @service_method( path="worker.get", @@ -122,14 +113,8 @@ def get( if context.node.in_memory_workers: return worker - - with contextlib.closing(docker.from_env()) as client: - return _check_and_update_status_for_worker( - client=client, - worker=worker, - worker_stash=self.stash, - credentials=context.credentials, - ) + else: + return refresh_worker_status([worker], self.stash, context.credentials)[0] @service_method( path="worker.logs", @@ -148,6 +133,9 @@ def logs( if context.node.in_memory_workers: logs = b"Logs not implemented for In Memory Workers" + elif IN_KUBERNETES: + runner = KubernetesRunner() + return runner.get_pod_logs(pod_name=worker.name) else: with contextlib.closing(docker.from_env()) as client: docker_container = _get_worker_container(client, worker) @@ -204,7 +192,18 @@ def delete( f"associated with SyftWorker {uid}" ) - if not context.node.in_memory_workers: + if IN_KUBERNETES: + # Kubernetes will only restart the worker NOT REMOVE IT + runner = KubernetesRunner() + runner.delete_pod(pod_name=worker.name) + return SyftSuccess( + # pod deletion is not supported in Kubernetes, removing and recreating the pod. + message=( + "Worker deletion is not supported in Kubernetes. " + f"Removing and re-creating worker id={worker.id}" + ) + ) + elif not context.node.in_memory_workers: # delete the worker using docker client sdk with contextlib.closing(docker.from_env()) as client: docker_container = _get_worker_container(client, worker) @@ -252,33 +251,60 @@ def _get_worker( return worker -def _check_and_update_status_for_worker( - client: docker.DockerClient, - worker: SyftWorker, +def refresh_worker_status( + workers: List[SyftWorker], worker_stash: WorkerStash, credentials: SyftVerifyKey, -) -> Union[SyftWorker, SyftError]: - worker_status = _get_worker_container_status(client, worker) +): + if IN_KUBERNETES: + result = refresh_status_kubernetes(workers) + else: + result = refresh_status_docker(workers) + + if isinstance(result, SyftError): + return result + + for worker in result: + stash_result = worker_stash.update( + credentials=credentials, + obj=worker, + ) + if stash_result.is_err(): + return SyftError( + message=f"Failed to update status for worker: {worker.id}. Error: {stash_result.err()}" + ) - if isinstance(worker_status, SyftError): - return worker_status + return result - worker.status = worker_status - worker.healthcheck = _get_healthcheck_based_on_status(status=worker_status) +def refresh_status_kubernetes(workers: List[SyftWorker]): + updated_workers = [] + runner = KubernetesRunner() + for worker in workers: + status = runner.get_pod_status(pod_name=worker.name) + if not status: + return SyftError(message=f"Pod does not exist. name={worker.name}") + status, health, _ = map_pod_to_worker_status(status) + worker.status = status + worker.healthcheck = health + updated_workers.append(worker) - result = worker_stash.update( - credentials=credentials, - obj=worker, - ) + return updated_workers - return ( - SyftError( - message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" - ) - if result.is_err() - else result.ok() - ) + +def refresh_status_docker(workers: List[SyftWorker]): + updated_workers = [] + + with contextlib.closing(docker.from_env()) as client: + for worker in workers: + status = _get_worker_container_status(client, worker) + if isinstance(status, SyftError): + return status + worker.status = status + worker.healthcheck = _get_healthcheck_based_on_status(status=status) + updated_workers.append(worker) + + return updated_workers def _stop_worker_container( diff --git a/packages/syft/src/syft/types/uid.py b/packages/syft/src/syft/types/uid.py index 7adec049f8e..68f68e8a639 100644 --- a/packages/syft/src/syft/types/uid.py +++ b/packages/syft/src/syft/types/uid.py @@ -1,4 +1,5 @@ # stdlib +import hashlib from typing import Any from typing import Callable from typing import Dict @@ -64,9 +65,9 @@ def __init__(self, value: Optional[Union[uuid_type, str, bytes]] = None): # if value is not set - create a novel and unique ID. if isinstance(value, str): - value = uuid.UUID(value) + value = uuid.UUID(value, version=4) elif isinstance(value, bytes): - value = uuid.UUID(bytes=value) + value = uuid.UUID(bytes=value, version=4) elif isinstance(value, UID): value = value.value @@ -80,6 +81,11 @@ def from_string(value: str) -> "UID": critical(f"Unable to convert {value} to UUID. {e}") traceback_and_raise(e) + @staticmethod + def with_seed(value: str) -> "UID": + md5 = hashlib.md5(value.encode("utf-8"), usedforsecurity=False) + return UID(md5.hexdigest()) + def to_string(self) -> str: return self.no_dash diff --git a/tests/integration/container_workload/pool_image_test.py b/tests/integration/container_workload/pool_image_test.py index d059c86390d..a4486865cb2 100644 --- a/tests/integration/container_workload/pool_image_test.py +++ b/tests/integration/container_workload/pool_image_test.py @@ -273,5 +273,6 @@ def custom_worker_func(x): # TODO: delete the launched pool # Clean the build images + sleep(10) delete_result = domain_client.api.services.worker_image.remove(uid=built_image.id) assert isinstance(delete_result, sy.SyftSuccess) diff --git a/tox.ini b/tox.ini index 76ea80664a6..838d8fe35e5 100644 --- a/tox.ini +++ b/tox.ini @@ -490,7 +490,7 @@ commands = pytest --nbmake "$subfolder" -p no:randomly --ignore=tutorials/model-training -n $(python -c 'import multiprocessing; print(multiprocessing.cpu_count())') -vvvv && \ pytest --nbmake tutorials/model-training -p no:randomly -vvvv; \ else \ - pytest --nbmake "$subfolder" -p no:randomly -vvvv; \ + pytest --nbmake "$subfolder" -p no:randomly -k 'not 11-container-images-k8s.ipynb' -vvvv; \ fi \ done" ; pytest --nbmake api/0.8 -p no:randomly -vvvv @@ -526,7 +526,7 @@ commands = bash -c "echo Running with ORCHESTRA_DEPLOYMENT_TYPE=$ORCHESTRA_DEPLOYMENT_TYPE DEV_MODE=$DEV_MODE TEST_NOTEBOOK_PATHS=$TEST_NOTEBOOK_PATHS; date" bash -c "for subfolder in $(echo ${TEST_NOTEBOOK_PATHS} | tr ',' ' ');\ do \ - pytest --nbmake "$subfolder" -p no:randomly -vvvv --nbmake-timeout=1000;\ + pytest --nbmake "$subfolder" -p no:randomly -vvvv -k 'not 11-container-images-k8s.ipynb' --nbmake-timeout=1000;\ done" ; pytest --nbmake api/0.8 -p no:randomly -vvvv @@ -673,6 +673,7 @@ setenv = ORCHESTRA_DEPLOYMENT_TYPE = {env:ORCHESTRA_DEPLOYMENT_TYPE:k8s} NODE_PORT = {env:NODE_PORT:9082} GITHUB_CI = {env:GITHUB_CI:false} + PYTEST_MODULES = {env:PYTEST_MODULES:frontend container_workload} commands = bash -c "echo Running with GITHUB_CI=$GITHUB_CI; date" python -c 'import syft as sy; sy.stage_protocol_changes()' @@ -692,7 +693,7 @@ commands = # Creating registory - bash -c 'k3d registry create registry.localhost --port 12345 -v `pwd`/k3d-registry:/var/lib/registry || true' + bash -c 'k3d registry create registry.localhost --port 5000 -v `pwd`/k3d-registry:/var/lib/registry || true' # Creating testgateway1 cluster on port 9081 bash -c 'NODE_NAME=testgateway1 NODE_PORT=9081 && \ @@ -706,7 +707,7 @@ commands = -p gateway \ --var NODE_NAME=$NODE_NAME \ --var TEST_MODE=1 \ - --var CONTAINER_REGISTRY=k3d-registry.localhost:12345 \ + --var CONTAINER_REGISTRY=k3d-registry.localhost:5000 \ --var NODE_TYPE=gateway \ deploy -b; \ do ((--r))||exit;echo "retrying" && sleep 20;done)' @@ -722,11 +723,10 @@ commands = devspace --no-warn --kube-context "k3d-$NODE_NAME" --namespace $NODE_NAME \ --var NODE_NAME=$NODE_NAME \ --var TEST_MODE=1 \ - --var CONTAINER_REGISTRY=k3d-registry.localhost:12345 \ + --var CONTAINER_REGISTRY=k3d-registry.localhost:5000 \ deploy -b; \ do ((--r))||exit;echo "retrying" && sleep 20;done)' - # free up build cache after build of images bash -c 'if [[ "$GITHUB_CI" != "false" ]]; then \ docker image prune --all --force; \ @@ -750,8 +750,6 @@ commands = bash packages/grid/scripts/wait_for.sh service proxy --context k3d-testdomain1 --namespace testdomain1 bash packages/grid/scripts/wait_for.sh service seaweedfs --context k3d-testdomain1 --namespace testdomain1 - - # Checking logs generated & startup of test-domain 1 bash -c '(kubectl logs service/backend --context k3d-testdomain1 --namespace testdomain1 -f &) | grep -q "Application startup complete" || true' # Checking logs generated & startup of testgateway1 @@ -768,6 +766,16 @@ commands = exit $return; \ fi' + ; ; container workload + ; bash -c 'if [[ "$PYTEST_MODULES" == *"container_workload"* ]]; then \ + ; echo "Starting Container Workload test"; date; \ + ; pytest tests/integration -m container_workload -p no:randomly --co; \ + ; pytest tests/integration -m container_workload -vvvv -p no:randomly -p no:benchmark -o log_cli=True --capture=no; \ + ; return=$?; \ + ; echo "Finished container workload"; date; \ + ; exit $return; \ + ; fi' + # Since we randomize the password, we retrieve them and store as environment variables # which would then be used by the notebook @@ -775,12 +783,24 @@ commands = # ignore 06 because of opendp on arm64 # Run 0.8 notebooks - bash -c " source ./scripts/get_k8s_secret_ci.sh; pytest --nbmake notebooks/api/0.8 -p no:randomly -k 'not 10-container-images.ipynb' -vvvv" + bash -c 'echo Gateway Cluster Info; kubectl describe all -A --context k3d-testgateway1 --namespace testgateway1' + bash -c 'echo Gateway Logs; kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testgateway1 --namespace testgateway1' + bash -c 'echo Domain Cluster Info; kubectl describe all -A --context k3d-testdomain1 --namespace testdomain1' + bash -c 'echo Domain Logs; kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testdomain1 --namespace testdomain1' + + bash -c " source ./scripts/get_k8s_secret_ci.sh; \ + pytest --nbmake notebooks/api/0.8 -p no:randomly -k 'not 10-container-images.ipynb' -vvvv --nbmake-timeout=1000" + + bash -c 'echo Gateway Cluster Info; kubectl describe all -A --context k3d-testgateway1 --namespace testgateway1' + bash -c 'echo Gateway Logs; kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testgateway1 --namespace testgateway1' + bash -c 'echo Domain Cluster Info; kubectl describe all -A --context k3d-testdomain1 --namespace testdomain1' + bash -c 'echo Domain Logs; kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-testdomain1 --namespace testdomain1' #Integration + Gateway Connection Tests # Gateway tests are not run in kuberetes, as currently,it does not have a way to configure # high/low side warning flag. - bash -c " source ./scripts/get_k8s_secret_ci.sh;pytest tests/integration/network -k 'not test_domain_gateway_user_code' -p no:randomly -vvvv" + bash -c " source ./scripts/get_k8s_secret_ci.sh; \ + pytest tests/integration/network -k 'not test_domain_gateway_user_code' -p no:randomly -vvvv" # deleting clusters created bash -c "k3d cluster delete testgateway1 || true" @@ -859,14 +879,14 @@ commands = bash -c "k3d registry delete k3d-registry.localhost || true" - bash -c 'k3d registry create registry.localhost --port 12345 -v `pwd`/k3d-registry:/var/lib/registry || true' + bash -c 'k3d registry create registry.localhost --port 5000 -v `pwd`/k3d-registry:/var/lib/registry || true' bash -c 'NODE_NAME=syft NODE_PORT=8080 && \ k3d cluster create syft -p "$NODE_PORT:80@loadbalancer" --registry-use k3d-registry.localhost || true \ k3d cluster start syft' - ; skopeo list-tags --tls-verify=false docker://k3d-registry.localhost:12345/openmined/grid-backend - ; skopeo inspect --tls-verify=false docker://k3d-registry.localhost:12345/openmined/grid-backend:f1725f + ; skopeo list-tags --tls-verify=false docker://k3d-registry.localhost:5000/openmined/grid-backend + ; skopeo inspect --tls-verify=false docker://k3d-registry.localhost:5000/openmined/grid-backend:f1725f ; helm uninstall --kube-context k3d-syft --namespace syft syft ; helm install --kube-context k3d-syft --namespace syft syft ./syft ; k3d cluster create syft -p "8080:80@loadbalancer" && k3d cluster start syft @@ -992,11 +1012,12 @@ passenv=HOME, USER allowlist_externals = bash commands = - bash -c 'devspace purge --force-purge --kube-context k3d-syft-dev --namespace syft' - bash -c 'kubectl delete job --all --namespace syft || true' - bash -c 'kubectl delete secret --all --namespace syft || true' - bash -c 'kubectl delete pvc --all --namespace syft || true' + bash -c 'devspace purge --force-purge --kube-context k3d-syft-dev --namespace syft; sleep 3' bash -c 'devspace cleanup images --kube-context k3d-syft-dev --namespace syft --var CONTAINER_REGISTRY=k3d-registry.localhost:5000 || true' + bash -c 'kubectl delete all --all --namespace syft || true' + bash -c 'kubectl delete pvc --all --namespace syft || true' + bash -c 'kubectl delete secret --all --namespace syft || true' + bash -c 'kubectl delete configmap --all --namespace syft || true' [testenv:dev.k8s.destroy] description = Destroy local Kubernetes cluster