Skip to content

Commit

Permalink
Merge pull request #17 from ncstate-sat/cmt-120/use-acslib-in-cmt
Browse files Browse the repository at this point in the history
Use acslib in cmt
  • Loading branch information
ryan-semmler authored Sep 3, 2024
2 parents e44e2e5 + 2d68a63 commit 40d2bc0
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 63 deletions.
1 change: 1 addition & 0 deletions acslib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
__version__ = "0.1.0"

from acslib.ccure import CcureAPI
from .base.search import BooleanOperators, TermOperators
13 changes: 7 additions & 6 deletions acslib/ccure/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

class CcureAPI:
def __init__(self, connection: Optional[CcureConnection] = None):
self.personnel = CcurePersonnel(connection)
self.clearance = CcureClearance(connection)
self.credential = CcureCredential(connection)
self.clearance_item = CcureClearanceItem(connection)
self.action = CcureAction(connection)
self.ccure_object = CcureACS(connection)
self.connection = connection or CcureConnection()
self.personnel = CcurePersonnel(self.connection)
self.clearance = CcureClearance(self.connection)
self.credential = CcureCredential(self.connection)
self.clearance_item = CcureClearanceItem(self.connection)
self.action = CcureAction(self.connection)
self.ccure_object = CcureACS(self.connection)
27 changes: 15 additions & 12 deletions acslib/ccure/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ def assign_clearances(self, personnel_id: int, clearance_ids: list[int]) -> ACSR
def revoke_clearances(self, personnel_id: int, clearance_ids: list[int]) -> ACSRequestResponse:
"""
Revoke a person's clearances
Two steps: 1: Get the PersonnelClearancePair object IDs
2: Remove those PersonnelClearancePair objects
Two steps:
1: Get the PersonnelClearancePair object IDs
2: Remove those PersonnelClearancePair objects
"""

# get PersonnelClearancePair object IDs
clearance_query = " OR ".join(
f"ClearanceID = {clearance_id}" for clearance_id in clearance_ids
)
search_filter = CcureFilter(display_properties=["PersonnelID"])
search_filter = CcureFilter(display_properties=["PersonnelID", "ObjectID"])
clearance_assignments = super().search(
object_type=ObjectType.CLEARANCE_ASSIGNMENT.complete,
search_filter=search_filter,
Expand All @@ -55,18 +56,19 @@ def revoke_clearances(self, personnel_id: int, clearance_ids: list[int]) -> ACSR
)
assignment_ids = [assignment.get("ObjectID") for assignment in clearance_assignments]

# remove PersonnelClearancePair objects
return self.remove_children(
parent_type=self.type,
parent_id=personnel_id,
child_type=ObjectType.CLEARANCE_ASSIGNMENT.complete,
child_ids=assignment_ids,
)
if assignment_ids:
# remove PersonnelClearancePair objects
return self.remove_children(
parent_type=self.type,
parent_id=personnel_id,
child_type=ObjectType.CLEARANCE_ASSIGNMENT.complete,
child_ids=assignment_ids,
)

def get_assigned_clearances(
self, personnel_id: int, page_size=100, page_number=1
) -> list[dict]:
"""Get the clearance assignments associated with a person"""
"""Get personnel/clearance pairs associated with the given person"""
search_filter = CcureFilter(
lookups={"PersonnelID": NFUZZ}, display_properties=["PersonnelID", "ClearanceID"]
)
Expand Down Expand Up @@ -120,6 +122,7 @@ def __init__(self, connection: Optional[CcureConnection] = None):
self.type = ObjectType.CLEARANCE.complete

def get_assignees(self, clearance_id: int, page_size=100, page_number=1) -> list[dict]:
"""Get clearance/personnel pairs belonging to the given clearance"""
search_filter = CcureFilter(
lookups={"ClearanceID": NFUZZ}, display_properties=["PersonnelID", "ClearanceID"]
)
Expand All @@ -132,7 +135,7 @@ def get_assignees(self, clearance_id: int, page_size=100, page_number=1) -> list
)


class CcureAction(CcureACS):
class CcureAction:
def __init__(self, connection: Optional[CcureConnection] = None):
self.personnel = PersonnelAction(connection)
self.clearance = ClearanceAction(connection)
11 changes: 8 additions & 3 deletions acslib/ccure/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from numbers import Number
from typing import Optional, Any

from acslib.base import AccessControlSystem, ACSRequestData, ACSRequestResponse, ACSRequestException
Expand All @@ -8,7 +9,7 @@
class CcureACS(AccessControlSystem):
"""Base class for CCure API interactions"""

def __init__(self, connection: Optional[CcureConnection] = None):
def __init__(self, connection: Optional[CcureConnection]):
super().__init__(connection=connection)
if not self.connection:
self.connection = CcureConnection()
Expand All @@ -23,10 +24,11 @@ def config(self):
def search(
self,
object_type: str,
search_filter: CcureFilter,
terms: Optional[list],
terms: Optional[list] = None,
search_filter: Optional[CcureFilter] = None,
page_size: Optional[int] = None,
page_number: int = 1,
timeout: Number = 0,
search_options: Optional[dict] = None,
where_clause: Optional[str] = None,
) -> int | list:
Expand All @@ -42,6 +44,8 @@ def search(
search_options: other options to include in the request_json. eg. "CountOnly"
where_clause: sql-style WHERE clause to search objects. overrides `terms` if included.
"""
if search_filter is None and not where_clause:
raise ACSRequestException(400, "A search filter or where clause is required.")
if page_size is None:
page_size = self.config.page_size
request_json = {
Expand All @@ -59,6 +63,7 @@ def search(
request_json=request_json,
headers=self.connection.base_headers,
),
timeout=timeout,
)
return response.json

Expand Down
46 changes: 23 additions & 23 deletions acslib/ccure/connection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from numbers import Number
from typing import Optional

from acslib.base import (
ACSConnection,
Expand All @@ -19,9 +20,9 @@ def __init__(self, **kwargs):
Parameters:
:param kwargs:
"""
self.session_id = None
if con_logger := kwargs.get("logger"):
self.logger = con_logger
self._session_id = None
if conn_logger := kwargs.get("logger"):
self.logger = conn_logger
else:
self.logger = logging.getLogger(__name__)

Expand All @@ -30,11 +31,17 @@ def __init__(self, **kwargs):
self.logger.info("Initializing CCure connection")
super().__init__(**kwargs)

@property
def session_id(self) -> str:
if self._session_id:
return self._session_id
return self.login()

@property
def base_headers(self):
"""Headers required for each request to CCure"""
return {
"session-id": self.get_session_id(),
"session-id": self.session_id,
"Access-Control-Expose-Headers": "session-id",
}

Expand All @@ -52,40 +59,33 @@ def login(self):
data=self.config.connection_data,
),
)
self.session_id = response.headers["session-id"]
self.logger.debug(f"Fetched new Session ID: {self.session_id}")
self._session_id = response.headers["session-id"]
self.logger.debug(f"Fetched new Session ID: {self._session_id}")
except ACSRequestException as e:
self.logger.error(f"Error Fetching Session ID: {e}")
self.log_session_details()
self.logger.debug(f"Connection data: {self.config.connection_data}")
raise e
return self.session_id
return self._session_id

def logout(self):
"""Log out of the CCure session"""
if self.session_id:
self.logger.debug(f"Logging out of CCure session: {self.session_id}")
if self._session_id:
self.logger.debug(f"Logging out of CCure session: {self._session_id}")
try:
self.request(
ACSRequestMethod.POST,
request_data=ACSRequestData(
url=self.config.base_url + self.config.endpoints.LOGOUT,
headers={"session-id": self.session_id},
headers={"session-id": self._session_id},
),
)
except ACSRequestException as e:
self.logger.error(f"Error logging out of CCure session: {e}")
self.log_session_details()
finally:
self.logger.debug(f"Removing Session ID: {self.session_id}")
self.session_id = None

def get_session_id(self):
"""Get the ID for the current CCure session or generate a new one"""
self.logger.debug(f"Session ID: {self.session_id}")
if self.session_id is None:
return self.login()
return self.session_id
self.logger.debug(f"Removing Session ID: {self._session_id}")
self._session_id = None

def keepalive(self):
"""Prevent the CCure api session from expiring from inactivity"""
Expand All @@ -96,7 +96,7 @@ def keepalive(self):
request_data=ACSRequestData(
url=self.config.base_url + self.config.endpoints.KEEPALIVE,
headers={
"session-id": self.get_session_id(),
"session-id": self.session_id,
"Access-Control-Expose-Headers": "session-id",
},
),
Expand All @@ -111,7 +111,7 @@ def request(
self,
requests_method: ACSRequestMethod,
request_data: ACSRequestData,
timeout: Number = 0,
timeout: Optional[Number] = 0,
request_attempts: int = 2,
) -> ACSRequestResponse:
"""
Expand All @@ -137,12 +137,12 @@ def request(
raise e
request_attempts -= 1
self.logout()
request_data.headers["session-id"] = self.get_session_id()
request_data.headers["session-id"] = self.session_id

def log_session_details(self):
"""Log session ID and the api version number"""
version_url = self.config.base_url + self.config.endpoints.VERSIONS
self.logger.error(f"Session ID: {self.session_id}")
self.logger.error(f"Session ID: {self._session_id}")
try:
response = self.request(
ACSRequestMethod.POST,
Expand Down
Loading

0 comments on commit 40d2bc0

Please sign in to comment.