Skip to content
This repository has been archived by the owner on Jan 17, 2025. It is now read-only.

Commit

Permalink
Merge branch 'main' into DVX-481
Browse files Browse the repository at this point in the history
  • Loading branch information
Aryamanz29 authored May 30, 2024
2 parents 38d5715 + 8a62944 commit b082c50
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 132 deletions.
59 changes: 34 additions & 25 deletions pyatlan/cache/atlan_tag_cache.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 Atlan Pte. Ltd.
from threading import Lock
from typing import Dict, Optional, Set

from pyatlan.client.typedef import TypeDefClient
from pyatlan.errors import ErrorCode
from pyatlan.model.enums import AtlanTypeCategory
from pyatlan.model.typedef import AtlanTagDef

lock: Lock = Lock()


class AtlanTagCache:
"""
Expand All @@ -20,11 +23,12 @@ class AtlanTagCache:
def get_cache(cls) -> "AtlanTagCache":
from pyatlan.client.atlan import AtlanClient

client = AtlanClient.get_default_client()
cache_key = client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = AtlanTagCache(typedef_client=client.typedef)
return cls.caches[cache_key]
with lock:
client = AtlanClient.get_default_client()
cache_key = client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = AtlanTagCache(typedef_client=client.typedef)
return cls.caches[cache_key]

@classmethod
def refresh_cache(cls) -> None:
Expand Down Expand Up @@ -72,31 +76,36 @@ def __init__(self, typedef_client: TypeDefClient):
self.deleted_ids: Set[str] = set()
self.deleted_names: Set[str] = set()
self.map_id_to_source_tags_attr_id: Dict[str, str] = {}
self.lock: Lock = Lock()

def _refresh_cache(self) -> None:
"""
Refreshes the cache of Atlan tags by requesting the full set of Atlan tags from Atlan.
"""
response = self.typdef_client.get(
type_category=[AtlanTypeCategory.CLASSIFICATION, AtlanTypeCategory.STRUCT]
)
if not response or not response.struct_defs:
raise ErrorCode.EXPIRED_API_TOKEN.exception_with_parameters()
if response is not None:
self.cache_by_id = {}
self.map_id_to_name = {}
self.map_name_to_id = {}
for atlan_tag in response.atlan_tag_defs:
atlan_tag_id = atlan_tag.name
atlan_tag_name = atlan_tag.display_name
self.cache_by_id[atlan_tag_id] = atlan_tag
self.map_id_to_name[atlan_tag_id] = atlan_tag_name
self.map_name_to_id[atlan_tag_name] = atlan_tag_id
sourceTagsId = ""
for attr_def in atlan_tag.attribute_defs or []:
if attr_def.display_name == "sourceTagAttachment":
sourceTagsId = attr_def.name or ""
self.map_id_to_source_tags_attr_id[atlan_tag_id] = sourceTagsId
with self.lock:
response = self.typdef_client.get(
type_category=[
AtlanTypeCategory.CLASSIFICATION,
AtlanTypeCategory.STRUCT,
]
)
if not response or not response.struct_defs:
raise ErrorCode.EXPIRED_API_TOKEN.exception_with_parameters()
if response is not None:
self.cache_by_id = {}
self.map_id_to_name = {}
self.map_name_to_id = {}
for atlan_tag in response.atlan_tag_defs:
atlan_tag_id = atlan_tag.name
atlan_tag_name = atlan_tag.display_name
self.cache_by_id[atlan_tag_id] = atlan_tag
self.map_id_to_name[atlan_tag_id] = atlan_tag_name
self.map_name_to_id[atlan_tag_name] = atlan_tag_id
sourceTagsId = ""
for attr_def in atlan_tag.attribute_defs or []:
if attr_def.display_name == "sourceTagAttachment":
sourceTagsId = attr_def.name or ""
self.map_id_to_source_tags_attr_id[atlan_tag_id] = sourceTagsId

def _get_id_for_name(self, name: str) -> Optional[str]:
"""
Expand Down
92 changes: 52 additions & 40 deletions pyatlan/cache/custom_metadata_cache.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 Atlan Pte. Ltd.
from threading import Lock
from typing import Dict, List, Optional, Set

from pyatlan.client.typedef import TypeDefClient
from pyatlan.errors import ErrorCode
from pyatlan.model.enums import AtlanTypeCategory
from pyatlan.model.typedef import AttributeDef, CustomMetadataDef

lock = Lock()


class CustomMetadataCache:
"""
Expand All @@ -20,11 +23,15 @@ class CustomMetadataCache:
def get_cache(cls) -> "CustomMetadataCache":
from pyatlan.client.atlan import AtlanClient

client = AtlanClient.get_default_client()
cache_key = client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = CustomMetadataCache(typedef_client=client.typedef)
return cls.caches[cache_key]
with lock:
client = AtlanClient.get_default_client()
cache_key = client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = CustomMetadataCache(
typedef_client=client.typedef
)
cache = cls.caches[cache_key]
return cache

@classmethod
def refresh_cache(cls) -> None:
Expand Down Expand Up @@ -176,48 +183,53 @@ def __init__(self, typedef_client: TypeDefClient):
self.map_attr_name_to_id: Dict[str, Dict[str, str]] = {}
self.archived_attr_ids: Dict[str, str] = {}
self.types_by_asset: Dict[str, Set[type]] = {}
self.lock: Lock = Lock()

def _refresh_cache(self) -> None:
"""
Refreshes the cache of custom metadata structures by requesting the full set of custom metadata
structures from Atlan.
:raises LogicError: if duplicate custom attributes are detected
"""
response = self.typedef_client.get(
type_category=[AtlanTypeCategory.CUSTOM_METADATA, AtlanTypeCategory.STRUCT]
)
if not response or not response.struct_defs:
raise ErrorCode.EXPIRED_API_TOKEN.exception_with_parameters()
if response is not None:
self.map_id_to_name = {}
self.map_name_to_id = {}
self.map_attr_id_to_name = {}
self.map_attr_name_to_id = {}
self.archived_attr_ids = {}
self.cache_by_id = {}
self.attr_cache_by_id = {}
for cm in response.custom_metadata_defs:
type_id = cm.name
type_name = cm.display_name
self.cache_by_id[type_id] = cm
self.map_id_to_name[type_id] = type_name
self.map_name_to_id[type_name] = type_id
self.map_attr_id_to_name[type_id] = {}
self.map_attr_name_to_id[type_id] = {}
if cm.attribute_defs:
for attr in cm.attribute_defs:
attr_id = str(attr.name)
attr_name = str(attr.display_name)
self.map_attr_id_to_name[type_id][attr_id] = attr_name
self.attr_cache_by_id[attr_id] = attr
if attr.options and attr.options.is_archived:
self.archived_attr_ids[attr_id] = attr_name
elif attr_name in self.map_attr_name_to_id[type_id]:
raise ErrorCode.DUPLICATE_CUSTOM_ATTRIBUTES.exception_with_parameters(
attr_name, type_name
)
else:
self.map_attr_name_to_id[type_id][attr_name] = attr_id
with self.lock:
response = self.typedef_client.get(
type_category=[
AtlanTypeCategory.CUSTOM_METADATA,
AtlanTypeCategory.STRUCT,
]
)
if not response or not response.struct_defs:
raise ErrorCode.EXPIRED_API_TOKEN.exception_with_parameters()
if response is not None:
self.map_id_to_name = {}
self.map_name_to_id = {}
self.map_attr_id_to_name = {}
self.map_attr_name_to_id = {}
self.archived_attr_ids = {}
self.cache_by_id = {}
self.attr_cache_by_id = {}
for cm in response.custom_metadata_defs:
type_id = cm.name
type_name = cm.display_name
self.cache_by_id[type_id] = cm
self.map_id_to_name[type_id] = type_name
self.map_name_to_id[type_name] = type_id
self.map_attr_id_to_name[type_id] = {}
self.map_attr_name_to_id[type_id] = {}
if cm.attribute_defs:
for attr in cm.attribute_defs:
attr_id = str(attr.name)
attr_name = str(attr.display_name)
self.map_attr_id_to_name[type_id][attr_id] = attr_name
self.attr_cache_by_id[attr_id] = attr
if attr.options and attr.options.is_archived:
self.archived_attr_ids[attr_id] = attr_name
elif attr_name in self.map_attr_name_to_id[type_id]:
raise ErrorCode.DUPLICATE_CUSTOM_ATTRIBUTES.exception_with_parameters(
attr_name, type_name
)
else:
self.map_attr_name_to_id[type_id][attr_name] = attr_id

def _get_id_for_name(self, name: str) -> str:
"""
Expand Down
32 changes: 19 additions & 13 deletions pyatlan/cache/enum_cache.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2023 Atlan Pte. Ltd.
from threading import Lock
from typing import Dict, Optional

from pyatlan.client.typedef import TypeDefClient
from pyatlan.errors import ErrorCode
from pyatlan.model.enums import AtlanTypeCategory
from pyatlan.model.typedef import EnumDef

lock: Lock = Lock()


class EnumCache:
"""
Expand All @@ -19,11 +22,12 @@ class EnumCache:
def get_cache(cls) -> "EnumCache":
from pyatlan.client.atlan import AtlanClient

client = AtlanClient.get_default_client()
cache_key = client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = EnumCache(typedef_client=client.typedef)
return cls.caches[cache_key]
with lock:
client = AtlanClient.get_default_client()
cache_key = client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = EnumCache(typedef_client=client.typedef)
return cls.caches[cache_key]

@classmethod
def refresh_cache(cls) -> None:
Expand All @@ -48,19 +52,21 @@ def get_by_name(cls, name: str) -> EnumDef:
def __init__(self, typedef_client: TypeDefClient):
self.typedef_client: TypeDefClient = typedef_client
self.cache_by_name: Dict[str, EnumDef] = {}
self.lock: Lock = Lock()

def _refresh_cache(self) -> None:
"""
Refreshes the cache of enumerations by requesting the full set of enumerations from Atlan.
"""
response = self.typedef_client.get(type_category=AtlanTypeCategory.ENUM)
if not response or not response.enum_defs:
raise ErrorCode.EXPIRED_API_TOKEN.exception_with_parameters()
self.cache_by_name = {}
if response is not None:
for enum in response.enum_defs:
type_name = enum.name
self.cache_by_name[type_name] = enum
with self.lock:
response = self.typedef_client.get(type_category=AtlanTypeCategory.ENUM)
if not response or not response.enum_defs:
raise ErrorCode.EXPIRED_API_TOKEN.exception_with_parameters()
self.cache_by_name = {}
if response is not None:
for enum in response.enum_defs:
type_name = enum.name
self.cache_by_name[type_name] = enum

def _get_by_name(self, name: str) -> Optional[EnumDef]:
"""
Expand Down
40 changes: 23 additions & 17 deletions pyatlan/cache/group_cache.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 Atlan Pte. Ltd.
from threading import Lock
from typing import Dict, Iterable, Optional

from pyatlan.client.group import GroupClient

lock: Lock = Lock()


class GroupCache:
"""
Expand All @@ -16,11 +19,12 @@ class GroupCache:
def get_cache(cls) -> "GroupCache":
from pyatlan.client.atlan import AtlanClient

client = AtlanClient.get_default_client()
cache_key = client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = GroupCache(group_client=client.group)
return cls.caches[cache_key]
with lock:
client = AtlanClient.get_default_client()
cache_key = client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = GroupCache(group_client=client.group)
return cls.caches[cache_key]

@classmethod
def get_id_for_name(cls, name: str) -> Optional[str]:
Expand Down Expand Up @@ -66,20 +70,22 @@ def __init__(self, group_client: GroupClient):
self.map_id_to_name: Dict[str, str] = {}
self.map_name_to_id: Dict[str, str] = {}
self.map_alias_to_id: Dict[str, str] = {}
self.lock: Lock = Lock()

def _refresh_cache(self) -> None:
groups = self.group_client.get_all()
if groups is not None:
self.map_id_to_name = {}
self.map_name_to_id = {}
self.map_alias_to_id = {}
for group in groups:
group_id = str(group.id)
group_name = str(group.name)
group_alias = str(group.alias)
self.map_id_to_name[group_id] = group_name
self.map_name_to_id[group_name] = group_id
self.map_alias_to_id[group_alias] = group_id
with self.lock:
groups = self.group_client.get_all()
if groups is not None:
self.map_id_to_name = {}
self.map_name_to_id = {}
self.map_alias_to_id = {}
for group in groups:
group_id = str(group.id)
group_name = str(group.name)
group_alias = str(group.alias)
self.map_id_to_name[group_id] = group_name
self.map_name_to_id[group_name] = group_id
self.map_alias_to_id[group_alias] = group_id

def _get_id_for_name(self, name: str) -> Optional[str]:
"""
Expand Down
Loading

0 comments on commit b082c50

Please sign in to comment.