Skip to content

Commit

Permalink
[syft/network] update NodePeer objects using a get and update with …
Browse files Browse the repository at this point in the history
…`SyftLock` in network stash
  • Loading branch information
khoaguin committed May 17, 2024
1 parent 3e42aad commit 376810a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
32 changes: 28 additions & 4 deletions packages/syft/src/syft/service/network/network_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from ...store.document_store import PartitionKey
from ...store.document_store import PartitionSettings
from ...store.document_store import QueryKeys
from ...store.locks import NoLockingConfig
from ...store.locks import SyftLock
from ...types.grid_url import GridURL
from ...types.transforms import TransformContext
from ...types.transforms import keep
Expand Down Expand Up @@ -77,6 +79,7 @@ class NetworkStash(BaseUIDStoreStash):

def __init__(self, store: DocumentStore) -> None:
super().__init__(store=store)
self.lock = SyftLock(NoLockingConfig())

def get_by_name(
self, credentials: SyftVerifyKey, name: str
Expand All @@ -95,6 +98,25 @@ def update(
return Err(SyftError(message=valid.err()))
return super().update(credentials, peer, has_permission=has_permission)

def get_and_update_with_lock(
self,
credentials: SyftVerifyKey,
peer: NodePeer,
has_permission: bool = False,
) -> Result[NodePeer, str]:
with self.lock:
result = self.get_by_uid(credentials=credentials, uid=peer.id)
if result.is_err():
return Err(
f"Failed to query peer {peer.id} with name '{peer.name}' from stash. Err: {result.err()}"
)
existing = result.ok()
if existing is None:
return Err(
f"Failed to query peer {peer.id} with name '{peer.name}' from stash: peer is None"
)
return self.update(credentials, peer, has_permission=has_permission)

def create_or_update_peer(
self, credentials: SyftVerifyKey, peer: NodePeer
) -> Result[NodePeer, str]:
Expand Down Expand Up @@ -136,7 +158,7 @@ def update_peer_ping_status(
result = self.get_by_uid(credentials=credentials, uid=peer.id)
if result.is_err():
return Err(
f"Failed to query peer peer {peer.id} with name '{peer.name}' from stash. Err: {result.err()}"
f"Failed to query peer {peer.id} with name '{peer.name}' from stash. Err: {result.err()}"
)
existing = result.ok()
if existing is None:
Expand Down Expand Up @@ -612,7 +634,7 @@ def add_route(
f"peer '{remote_node_peer.name}' with id '{existed_route.id}'."
)
# update the peer in the store with the updated routes
result = self.stash.update(
result = self.stash.get_and_update_with_lock(
credentials=context.node.verify_key,
peer=remote_node_peer,
)
Expand Down Expand Up @@ -770,7 +792,7 @@ def delete_route(
)
else:
# update the peer with the route removed
result = self.stash.update(
result = self.stash.get_and_update_with_lock(
credentials=context.node.verify_key, peer=remote_node_peer
)
if result.is_err():
Expand Down Expand Up @@ -869,7 +891,9 @@ def update_route_priority(
return updated_node_route
new_priority: int = updated_node_route.priority
# update the peer in the store
result = self.stash.update(context.node.verify_key, remote_node_peer)
result = self.stash.get_and_update_with_lock(
context.node.verify_key, remote_node_peer
)
if result.is_err():
return SyftError(message=str(result.err()))

Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/network/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def peer_route_heathcheck(self, context: AuthedServiceContext) -> SyftError | No
else:
peer.ping_status_message = f"Peer '{peer.name}''s ping status: {peer.ping_status.value.lower()}"

result = network_stash.update_peer_ping_status(
result = network_stash.get_and_update_with_lock(
credentials=context.node.verify_key,
peer=peer,
has_permission=True,
Expand Down

0 comments on commit 376810a

Please sign in to comment.