Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting online domains using peers health check #8795

Closed
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
355e513
[syft/network] getting online networks for python gateway node
khoaguin May 9, 2024
9316f69
[syft/network] when checking for online domains, if not able to creat…
khoaguin May 9, 2024
7fe68f9
[syft/network] on getting online domains by checking `domain.ping_sta…
khoaguin May 9, 2024
25eaaee
[syft/network] add total line to show number of online networks / all…
khoaguin May 10, 2024
eddc66f
[syft/network] - update ping url for networks without frontend
khoaguin May 10, 2024
8182a14
[syft/network] integrating `background_tasks` into `Orchestra.launch`
khoaguin May 10, 2024
8268c29
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 13, 2024
da596ff
[syft/network] pass `background_tasks=True` to `worker_class`
khoaguin May 13, 2024
71e4a34
[test/integration] add removing peers to the beginning of some tests
khoaguin May 13, 2024
0f89b8b
[syft/network] revert back to use `ping_status` instead of `ping_stat…
khoaguin May 14, 2024
744a289
[test/integration] add remove existing peers before testing for `test…
khoaguin May 14, 2024
5fd4c31
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 14, 2024
c3e0e62
[test/integration] trimming down `network/gateway_test`
khoaguin May 14, 2024
c89d671
[test/integration] add some waiting times for local gateway tests bef…
khoaguin May 14, 2024
243ca68
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 14, 2024
3782033
[test/integration] update gateway tests
khoaguin May 14, 2024
1545ec3
[test/integration] update gateway k8s tests
khoaguin May 14, 2024
b2b057f
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 14, 2024
2c45fba
[syft/network] - pick the highest priority route to be the oldest by …
khoaguin May 15, 2024
eaf59b5
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 15, 2024
a6cd9c9
[syft/network] `PeerHealthCheckTask.peer_route_heathcheck` now only
khoaguin May 15, 2024
e20ff1f
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 15, 2024
231bcbf
fix linting
khoaguin May 15, 2024
095a235
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 15, 2024
342439f
[syft/network] return Err if the returned node peer is None
khoaguin May 15, 2024
1384e0e
[test/integration] allow running `gateway_local_test.py` in `syft.tes…
khoaguin May 15, 2024
2dd8873
[test/integration] add `@pytest.mark.network` for tests in `gateway_t…
khoaguin May 15, 2024
821efef
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 15, 2024
abee20d
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 16, 2024
7507fff
add a try catch block to catch invalid api registry entries
rasswanth-s May 16, 2024
4619473
[test/integration] trimming down `gateway_local_test.py`
khoaguin May 16, 2024
3e69bbc
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 16, 2024
3e42aad
[test/integration] small fix
khoaguin May 16, 2024
208f7cc
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 19, 2024
a1d9dab
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 20, 2024
63e4648
Merge branch 'dev' into peer-health-check-online-domains
shubham3121 May 21, 2024
a138151
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 22, 2024
96a3253
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 23, 2024
afd1231
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 24, 2024
039a76f
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 24, 2024
dd01b16
Merge branch 'dev' into peer-health-check-online-domains
khoaguin May 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/grid/backend/grid/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,5 @@ def seaweedfs_config() -> SeaweedFSConfig:
smtp_port=settings.SMTP_PORT,
smtp_host=settings.SMTP_HOST,
association_request_auto_approval=settings.ASSOCIATION_REQUEST_AUTO_APPROVAL,
background_tasks=True,
)
2 changes: 1 addition & 1 deletion packages/hagrid/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.119
current_version = 0.3.121
tag = False
tag_name = {new_version}
commit = True
Expand Down
4 changes: 2 additions & 2 deletions packages/hagrid/hagrid/manifest_template.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
manifestVersion: 0.1
hagrid_version: 0.3.119
hagrid_version: 0.3.121
syft_version: 0.8.7-beta.7
dockerTag: 0.8.7-beta.7
baseUrl: https://raw.githubusercontent.com/OpenMined/PySyft/
hash: 90713c314a1ac09cb604d0efa7d414e9811f2691
hash: 4333433d5bec7bb9bcd52db59029d3bcb23c74c2
target_dir: ~/.hagrid/PySyft/
files:
grid:
Expand Down
5 changes: 4 additions & 1 deletion packages/hagrid/hagrid/orchestra.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def deploy_to_python(
create_producer: bool = False,
queue_port: int | None = None,
association_request_auto_approval: bool = False,
background_tasks: bool = False,
) -> NodeHandle | None:
stage_protocol_changes = ImportFromSyft.import_stage_protocol_changes()
NodeType = ImportFromSyft.import_node_type()
Expand Down Expand Up @@ -272,7 +273,7 @@ def deploy_to_python(
"n_consumers": n_consumers,
"create_producer": create_producer,
"association_request_auto_approval": association_request_auto_approval,
"background_tasks": True,
"background_tasks": background_tasks,
}

if port:
Expand Down Expand Up @@ -493,6 +494,7 @@ def launch(
queue_port: int | None = None,
in_memory_workers: bool = True,
association_request_auto_approval: bool = False,
background_tasks: bool = False,
) -> NodeHandle | None:
NodeType = ImportFromSyft.import_node_type()
os.environ["DEV_MODE"] = str(dev_mode)
Expand Down Expand Up @@ -540,6 +542,7 @@ def launch(
create_producer=create_producer,
queue_port=queue_port,
association_request_auto_approval=association_request_auto_approval,
background_tasks=background_tasks,
)

elif deployment_type_enum == DeploymentType.K8S:
Expand Down
2 changes: 1 addition & 1 deletion packages/hagrid/hagrid/version.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# HAGrid Version
__version__ = "0.3.119"
__version__ = "0.3.121"

if __name__ == "__main__":
print(__version__)
2 changes: 1 addition & 1 deletion packages/hagrid/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from setuptools import find_packages
from setuptools import setup

__version__ = "0.3.119"
__version__ = "0.3.121"

DATA_FILES = {"img": ["hagrid/img/*.png"], "hagrid": ["*.yml"]}

Expand Down
105 changes: 73 additions & 32 deletions packages/syft/src/syft/client/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

# relative
from ..service.metadata.node_metadata import NodeMetadataJSON
from ..service.network.network_service import NodePeer
from ..service.network.node_peer import NodePeer
from ..service.network.node_peer import NodePeerConnectionStatus
from ..service.response import SyftException
from ..types.grid_url import GridURL
from ..util.constants import DEFAULT_TIMEOUT
Expand Down Expand Up @@ -80,10 +81,10 @@ def check_network(network: dict) -> dict[Any, Any] | None:
except Exception:
online = False

# networks without frontend have a /ping route in 0.7.0
# networks without frontend
if not online:
try:
ping_url = url + "ping"
ping_url = url + "api/v2/"
res = requests.get(ping_url, timeout=DEFAULT_TIMEOUT) # nosec
online = res.status_code == 200
except Exception:
Expand Down Expand Up @@ -120,13 +121,40 @@ def _repr_html_(self) -> str:
on = self.online_networks
if len(on) == 0:
return "(no gateways online - try syft.gateways.all_networks to see offline gateways)"
return pd.DataFrame(on)._repr_html_() # type: ignore
df = pd.DataFrame(on)
total_df = pd.DataFrame(
[
[
f"{len(on)} / {len(self.all_networks)} (online networks / all networks)"
]
+ [""] * (len(df.columns) - 1)
],
columns=df.columns,
index=["Total"],
)
df = pd.concat([df, total_df])
return df._repr_html_() # type: ignore

def __repr__(self) -> str:
on = self.online_networks
if len(on) == 0:
return "(no gateways online - try syft.gateways.all_networks to see offline gateways)"
return pd.DataFrame(on).to_string()
df = pd.DataFrame(on)
total_df = pd.DataFrame(
[
[
f"{len(on)} / {len(self.all_networks)} (online networks / all networks)"
]
+ [""] * (len(df.columns) - 1)
],
columns=df.columns,
index=["Total"],
)
df = pd.concat([df, total_df])
return df.to_string()

def __len__(self) -> int:
return len(self.all_networks)

@staticmethod
def create_client(network: dict[str, Any]) -> Client:
Expand Down Expand Up @@ -189,10 +217,10 @@ def check_network(network: dict) -> dict[Any, Any] | None:
except Exception:
online = False

# networks without frontend have a /ping route in 0.7.0
# networks without frontend
if not online:
try:
ping_url = url + "ping"
ping_url = url + "api/v2/"
res = requests.get(ping_url, timeout=DEFAULT_TIMEOUT)
online = res.status_code == 200
except Exception:
Expand Down Expand Up @@ -228,32 +256,25 @@ def check_network(network: dict) -> dict[Any, Any] | None:

@property
def online_domains(self) -> list[tuple[NodePeer, NodeMetadataJSON | None]]:
def check_domain(
peer: NodePeer,
) -> tuple[NodePeer, NodeMetadataJSON | None] | None:
try:
guest_client = peer.guest_client
metadata = guest_client.metadata
return peer, metadata
except Exception as e: # nosec
print(f"Error in checking domain with exception {e}")
return None

networks = self.online_networks

# We can use a with statement to ensure threads are cleaned up promptly
with futures.ThreadPoolExecutor(max_workers=20) as executor:
# map
_all_online_domains = []
for network in networks:
_all_online_domains = []
for network in networks:
try:
network_client = NetworkRegistry.create_client(network)
domains: list[NodePeer] = network_client.domains.retrieve_nodes()
for domain in domains:
self.all_domains[str(domain.id)] = domain
_online_domains = list(
executor.map(lambda domain: check_domain(domain), domains)
)
_all_online_domains += _online_domains
except Exception as e:
print(f"Error in creating network client with exception {e}")
continue

domains: list[NodePeer] = network_client.domains.retrieve_nodes()
for domain in domains:
self.all_domains[str(domain.id)] = domain

_all_online_domains += [
(domain, domain.guest_client.metadata)
for domain in domains
if domain.ping_status == NodePeerConnectionStatus.ACTIVE
]

return [domain for domain in _all_online_domains if domain is not None]

Expand Down Expand Up @@ -281,13 +302,33 @@ def _repr_html_(self) -> str:
on: list[dict[str, Any]] = self.__make_dict__()
if len(on) == 0:
return "(no domains online - try syft.domains.all_domains to see offline domains)"
return pd.DataFrame(on)._repr_html_() # type: ignore
df = pd.DataFrame(on)
total_df = pd.DataFrame(
[
[f"{len(on)} / {len(self.all_domains)} (online domains / all domains)"]
+ [""] * (len(df.columns) - 1)
],
columns=df.columns,
index=["Total"],
)
df = pd.concat([df, total_df])
return df._repr_html_() # type: ignore

def __repr__(self) -> str:
on: list[dict[str, Any]] = self.__make_dict__()
if len(on) == 0:
return "(no domains online - try syft.domains.all_domains to see offline domains)"
return pd.DataFrame(on).to_string()
df = pd.DataFrame(on)
total_df = pd.DataFrame(
[
[f"{len(on)} / {len(self.all_domains)} (online domains / all domains)"]
+ [""] * (len(df.columns) - 1)
],
columns=df.columns,
index=["Total"],
)
df = pd.concat([df, total_df])
return df._repr_html_() # type: ignore

def create_client(self, peer: NodePeer) -> Client:
try:
Expand Down
5 changes: 5 additions & 0 deletions packages/syft/src/syft/node/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def run_uvicorn(
create_producer: bool,
association_request_auto_approval: bool,
n_consumers: int,
background_tasks: bool,
) -> None:
async def _run_uvicorn(
name: str,
Expand Down Expand Up @@ -112,6 +113,7 @@ async def _run_uvicorn(
create_producer=create_producer,
n_consumers=n_consumers,
association_request_auto_approval=association_request_auto_approval,
background_tasks=background_tasks,
)
else:
worker = worker_class(
Expand All @@ -127,6 +129,7 @@ async def _run_uvicorn(
create_producer=create_producer,
n_consumers=n_consumers,
association_request_auto_approval=association_request_auto_approval,
background_tasks=background_tasks,
)
router = make_routes(worker=worker)
app = make_app(worker.name, router=router)
Expand Down Expand Up @@ -186,6 +189,7 @@ def serve_node(
create_producer: bool = False,
n_consumers: int = 0,
association_request_auto_approval: bool = False,
background_tasks: bool = False,
) -> tuple[Callable, Callable]:
server_process = multiprocessing.Process(
target=run_uvicorn,
Expand All @@ -204,6 +208,7 @@ def serve_node(
"create_producer": create_producer,
"n_consumers": n_consumers,
"association_request_auto_approval": association_request_auto_approval,
"background_tasks": background_tasks,
},
)

Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/network/node_peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class NodePeer(SyftObject):
"name",
"node_type",
"admin_email",
"ping_status.value",
"ping_status",
"ping_status_message",
"pinged_timestamp",
]
Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/network/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

@serializable(without=["thread"])
class PeerHealthCheckTask:
repeat_time = 60 # in seconds
repeat_time = 10 # in seconds

def __init__(self) -> None:
self.thread: threading.Thread | None = None
Expand Down
6 changes: 3 additions & 3 deletions packages/syft/src/syft/store/sqlite_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,9 @@ def __iter__(self) -> Any:
def __del__(self) -> None:
try:
self._close()
except BaseException:
print("Could not close connection")
pass
except Exception as e:
print(f"Could not close connection. Error: {e}")
raise e


@serializable()
Expand Down
2 changes: 1 addition & 1 deletion scripts/hagrid_hash
Original file line number Diff line number Diff line change
@@ -1 +1 @@
bcc1cc0354932a7d2cf4f35f781fde47
56f89d45a711a6bf79a460fc8cd4ae20
Loading
Loading