diff --git a/.prospector.yaml b/.prospector.yaml index 44f52e4..ee1d68a 100644 --- a/.prospector.yaml +++ b/.prospector.yaml @@ -3,7 +3,7 @@ test-warnings: true doc-warnings: false ignore-paths: - - env + - venv - tests - build diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/models/pysat_diagnosis_model.py b/flamapy/metamodels/pysat_diagnosis_metamodel/models/pysat_diagnosis_model.py index 742ba77..2a35f58 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/models/pysat_diagnosis_model.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/models/pysat_diagnosis_model.py @@ -1,6 +1,7 @@ -from typing import List, Dict, Tuple +from typing import List, Dict from flamapy.metamodels.configuration_metamodel.models import Configuration + from flamapy.metamodels.pysat_metamodel.models import PySATModel @@ -32,27 +33,30 @@ def get_extension() -> str: def __init__(self) -> None: super().__init__() - self.C = None # set of constraints which could be faulty - self.B = None # background knowledge (i.e., the knowledge that is known to be true) - - self.KB = None # set of all CNF with added assumptions - self.constraint_map: list[(str, list[list[int]])] = [] # map clauses to relationships/constraint - - self.constraint_assumption_map = None - - def add_clause_toMap(self, description: str, clauses: list[list[int]]) -> None: - self.constraint_map.append((description, clauses)) - - def get_C(self) -> list: - return self.C - - def get_B(self) -> list: - return self.B - - def get_KB(self) -> list: - return self.KB - - def get_pretty_diagnoses(self, assumptions: list[list[int]]) -> str: + # set of constraints which could be faulty + self.set_c: List[int] = [] + # background knowledge (i.e., the knowledge that is known to be true) + self.set_b: List[int] = [] + # set of all CNF with added assumptions + self.set_kb: List[List[int]] = [] + # map clauses to relationships/constraint + self.constraint_map: Dict[str, List[List[int]]] = {} + # map id of assumptions to relationships/constraint + self.constraint_assumption_map: Dict[int, str] = {} + + def add_clause_to_map(self, description: str, clauses: List[List[int]]) -> None: + self.constraint_map[description] = clauses + + def get_c(self) -> List[int]: + return self.set_c + + def get_b(self) -> List[int]: + return self.set_b + + def get_kb(self) -> List[List[int]]: + return self.set_kb + + def get_pretty_diagnoses(self, assumptions: List[List[int]]) -> str: diagnoses = [] for ass in assumptions: diag = [] @@ -63,7 +67,8 @@ def get_pretty_diagnoses(self, assumptions: list[list[int]]) -> str: return ','.join(diagnoses) - def prepare_diagnosis_task(self, configuration: Configuration = None, test_case: Configuration = None) -> None: + def prepare_diagnosis_task(self, configuration: Configuration = None, + test_case: Configuration = None) -> None: """ Execute this method after the model is built. If a configuration is given: @@ -83,19 +88,18 @@ def prepare_diagnosis_task(self, configuration: Configuration = None, test_case: if configuration is not None: # C = configuration # B = {f0 = true} + CF (i.e., = PySATModel) - self.C, self.B, self.KB, self.constraint_assumption_map = \ - self.prepare_assumptions(configuration=configuration) + self._prepare_assumptions(configuration=configuration) else: if test_case is None: # Diagnosis the feature model # C = CF (i.e., = PySATModel - {f0 = true}) # B = {f0 = true} - self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions() + self._prepare_assumptions() else: # Diagnosis the error # C = CF (i.e., = PySATModel - {f0 = true}) # B = {f0 = true} + test_case - self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions(test_case=test_case) + self._prepare_assumptions(test_case=test_case) def prepare_redundancy_detection_task(self) -> None: """ @@ -105,76 +109,68 @@ def prepare_redundancy_detection_task(self) -> None: B = {} """ # C = CF (i.e., = PySATModel - {f0 = true}) - self.C = self.prepare_assumptions - self.B = [] # B = {} + # self.set_c = self._prepare_assumptions + # self.set_b = [] # B = {} # ToDo: TBD - def prepare_assumptions(self, configuration: Configuration = None, test_case: Configuration = None) \ - -> Tuple[List, List, List, Dict]: - assumption = [] - KB = [] - constraint_assumption_map = {} + def _prepare_assumptions(self, configuration: Configuration = None, + test_case: Configuration = None) -> None: + assumption: List[int] = [] id_assumption = len(self.variables) + 1 - id_assumption = self.prepare_assumptions_for_KB(KB, assumption, constraint_assumption_map, id_assumption) + id_assumption = self._prepare_assumptions_for_kb(assumption, id_assumption) start_id_configuration = len(assumption) if configuration is not None: - constraint_assumption_map = {} # reset the map - id_assumption = self.prepare_assumptions_for_configuration(KB, assumption, configuration, - constraint_assumption_map, - id_assumption) + self.constraint_assumption_map = {} # reset the map + id_assumption = self._prepare_assumptions_for_configuration(assumption, + configuration, + id_assumption) start_id_test_case = len(assumption) if test_case is not None: - self.prepare_assumptions_for_configuration(KB, assumption, test_case, - constraint_assumption_map, - id_assumption) + self._prepare_assumptions_for_configuration(assumption, test_case, + id_assumption) if configuration is not None: - B = assumption[:start_id_configuration] - C = assumption[start_id_configuration:] + self.set_b = assumption[:start_id_configuration] + self.set_c = assumption[start_id_configuration:] else: if test_case is not None: - B = [assumption[0]] + assumption[start_id_test_case:] - C = assumption[1:start_id_test_case] + self.set_b = [assumption[0]] + assumption[start_id_test_case:] + self.set_c = assumption[1:start_id_test_case] else: - B = [assumption[0]] - C = assumption[1:] - - return C, B, KB, constraint_assumption_map + self.set_b = [assumption[0]] + self.set_c = assumption[1:] - def prepare_assumptions_for_KB(self, KB, assumption, constraint_assumption_map, id_assumption): - c_map = self.constraint_map + def _prepare_assumptions_for_kb(self, assumption: List[int], id_assumption: int) -> int: + cstr_map = self.constraint_map # loop through all tuples in the constraint map - for i in range(len(c_map)): - # get description - desc = c_map[i][0] + for _, key in enumerate(cstr_map): # get clauses - clauses = c_map[i][1] + clauses = cstr_map[key] # loop through all variables in the constraint - for j in range(len(clauses)): - # get each clause - clause = clauses[j] + for _, clause in enumerate(clauses): # add the assumption variable to the clause # assumption => clause # i.e., -assumption v clause clause.append(-1 * id_assumption) assumption.append(id_assumption) - KB.extend(clauses) - constraint_assumption_map[id_assumption] = desc + self.set_kb.extend(clauses) + self.constraint_assumption_map[id_assumption] = key id_assumption += 1 return id_assumption - def prepare_assumptions_for_configuration(self, KB, assumption, configuration, constraint_assumption_map, - id_assumption): + def _prepare_assumptions_for_configuration(self, assumption: List[int], + configuration: Configuration, + id_assumption: int) -> int: config = [feat.name for feat in configuration.elements] for feat in config: if feat not in self.variables.keys(): - raise Exception(f'Feature {feat} is not in the model.') + raise KeyError(f'Feature {feat} is not in the model.') for feat in configuration.elements.items(): desc = '' @@ -188,8 +184,8 @@ def prepare_assumptions_for_configuration(self, KB, assumption, configuration, c clause = [-1 * self.variables[feat[0].name], -1 * id_assumption] assumption.append(id_assumption) - KB.append(clause) - constraint_assumption_map[id_assumption] = desc + self.set_kb.append(clause) + self.constraint_assumption_map[id_assumption] = desc id_assumption += 1 diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/__init__.py index e6b6196..bb7e2d0 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/__init__.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/__init__.py @@ -1,8 +1,10 @@ +from .glucose3_abstract_identifier import Glucose3AbstractIdentifier from .glucose3_conflict import Glucose3Conflict from .glucose3_diagnosis import Glucose3Diagnosis __all__ = [ + 'Glucose3AbstractIdentifier', 'Glucose3Diagnosis', 'Glucose3Conflict' ] diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/__init__.py index e69de29..1a765f1 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/__init__.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/__init__.py @@ -0,0 +1,9 @@ +from .checker import ConsistencyChecker +from .fastdiag import FastDiag +from .quickxplain import QuickXPlain + +__all__ = [ + "ConsistencyChecker", + "FastDiag", + "QuickXPlain" +] \ No newline at end of file diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/checker.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/checker.py index 1f97196..35bcc61 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/checker.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/checker.py @@ -2,29 +2,29 @@ A Java version of this implementation is available at: https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/checker/ChocoConsistencyChecker.java """ +from typing import List from pysat.solvers import Solver class ConsistencyChecker: - - def __init__(self, solverName: str, KB: list) -> None: - self.solver = None + + def __init__(self, solver_name: str, set_kb: List[List[int]]) -> None: self.result = False - self.solver = Solver(solverName, bootstrap_with=KB) + self.solver = Solver(solver_name, bootstrap_with=set_kb) - def is_consistent(self, C: list, Δ: list) -> bool: + def is_consistent(self, set_c: List[int], delta: List[int]) -> bool: """ Check if the given CNF formula is consistent using a solver. - :param C: a list of assumptions should be added to the CNF formula - :param Δ: a list of assumptions should not be added to the CNF formula + :param set_c: a list of assumptions should be added to the CNF formula + :param delta: a list of assumptions should not be added to the CNF formula :return: a boolean value indicating whether the given CNF formula is consistent """ - assumptions = C + [-1 * item for item in Δ] + assumptions = set_c + [-1 * item for item in delta] self.result = self.solver.solve(assumptions=assumptions) # print(f"assumptions: {assumptions} - result: {self.result}") return self.result - def delete(self): - self.solver.delete() + def delete(self) -> None: + self.solver.delete() \ No newline at end of file diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/fastdiag.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/fastdiag.py index 40749d7..9c2f36e 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/fastdiag.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/fastdiag.py @@ -4,9 +4,10 @@ """ import logging +from typing import List -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.utils import split, diff -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker +from .checker import ConsistencyChecker +from .utils import split, diff class FastDiag: @@ -20,7 +21,7 @@ class FastDiag: def __init__(self, checker: ConsistencyChecker) -> None: self.checker = checker - def findDiagnosis(self, C: list, B: list) -> list: + def find_diagnosis(self, set_c: List[int], set_b: List[int]) -> List[int]: """ Activate FastDiag algorithm if there exists at least one constraint, which induces an inconsistency in B. Otherwise, it returns an empty set. @@ -28,27 +29,28 @@ def findDiagnosis(self, C: list, B: list) -> list: // Func FastDiag(C, B) : Δ // if isEmpty(C) or consistent(B U C) return Φ // else return C \\ FD(Φ, C, B) - :param C: a consideration set of constraints - :param B: a background knowledge + :param set_c: a consideration set of constraints + :param set_b: a background knowledge :return: a diagnosis or an empty set """ - logging.debug(f'fastDiag [C={C}, B={B}]') + logging.debug('fastDiag [C=%s, B=%s]', set_c, set_b) # print(f'fastDiag [C={C}, B={B}]') # if isEmpty(C) or consistent(B U C) return Φ - if len(C) == 0 or self.checker.is_consistent(B + C, []): + if len(set_c) == 0 or self.checker.is_consistent(set_b + set_c, []): logging.debug('return Φ') # print('return Φ') return [] - else: # return C \ FD(C, B, Φ) - mss = self.fd([], C, B) - diag = diff(C, mss) - logging.debug(f'return {diag}') - # print(f'return {diag}') - return diag + # return C \ FD(C, B, Φ) + mss = self._fd([], set_c, set_b) + diag = diff(set_c, mss) - def fd(self, Δ: list, C: list, B: list) -> list: + logging.debug('return %s', diag) + # print(f'return {diag}') + return diag + + def _fd(self, delta: List[int], set_c: List[int], set_b: List[int]) -> List[int]: """ The implementation of MSS-based FastDiag algorithm. The algorithm determines a maximal satisfiable subset MSS (Γ) of C U B. @@ -61,33 +63,33 @@ def fd(self, Δ: list, C: list, B: list) -> list: // Δ1 = FD(C2, C1, B); // Δ2 = FD(C1 - Δ1, C2, B U Δ1); // return Δ1 ∪ Δ2; - :param Δ: check to skip redundant consistency checks - :param C: a consideration set of constraints - :param B: a background knowledge + :param delta: check to skip redundant consistency checks + :param set_c: a consideration set of constraints + :param set_b: a background knowledge :return: a maximal satisfiable subset MSS of C U B """ - logging.debug(f'>>> FD [Δ={Δ}, C={C}, B={B}]') + logging.debug('>>> FD [Δ=%s, C=%s, B=%s]', delta, set_c, set_b) # if Δ != Φ and consistent(B U C) return C; - if len(Δ) != 0 and self.checker.is_consistent(B + C, Δ): - logging.debug(f'<<< return {C}') - return C + if len(delta) != 0 and self.checker.is_consistent(set_b + set_c, delta): + logging.debug('<<< return %s', set_c) + return set_c # if singleton(C) return Φ; - if len(C) == 1: + if len(set_c) == 1: logging.debug('<<< return Φ') return [] # C1 = {c1..ck}; C2 = {ck+1..cn}; - C1, C2 = split(C) + set_c1, set_c2 = split(set_c) # Δ1 = FD(C2, C1, B); - Δ1 = self.fd(C2, C1, B) + delta1 = self._fd(set_c2, set_c1, set_b) # Δ2 = FD(C1 - Δ1, C2, B U Δ1); - C1withoutΔ1 = diff(C1, Δ1) - Δ2 = self.fd(C1withoutΔ1, C2, B + Δ1) + c1_without_delta1 = diff(set_c1, delta1) + delta2 = self._fd(c1_without_delta1, set_c2, set_b + delta1) logging.debug('<<< return [Δ1={Δ1} ∪ Δ2={Δ2}]') # return Δ1 + Δ2 - return Δ1 + Δ2 + return delta1 + delta2 \ No newline at end of file diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/__init__.py index e69de29..66e80d3 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/__init__.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/__init__.py @@ -0,0 +1,8 @@ +from .hsdag import HSDAG +from .node import Node, NodeStatus + +__all__ = [ + 'HSDAG', + 'Node', + 'NodeStatus' +] \ No newline at end of file diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/hsdag.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/hsdag.py index 07d5861..ef19822 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/hsdag.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/hsdag.py @@ -2,12 +2,18 @@ A Java version of this implementation is available at: https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/hs/HSDAG.java """ +from typing import List, Optional, Dict -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis import utils -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.labeler import IHSLabelable, LabelerType, \ - AbstractHSParameters -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.node import Node, NodeStatus -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.utils import diff, contains +from .labeler.labeler import IHSLabelable, LabelerType, AbstractHSParameters +from .node import Node, NodeStatus +from .. import utils +from ..utils import diff, contains + + +class HSDAGException(Exception): + """ + Exception class for HSDAG + """ class HSDAG: @@ -16,6 +22,7 @@ class HSDAG: IHSLabeler algorithms could return labels (conflict or diagnosis) which are not minimal. """ + # pylint: disable=too-many-instance-attributes def __init__(self, labeler: IHSLabelable) -> None: self.labeler = labeler # could be FastDiag or QuickXPlain @@ -23,38 +30,39 @@ def __init__(self, labeler: IHSLabelable) -> None: self.max_number_conflicts = -1 # maximum number of conflicts to be found self.max_depth = 0 # maximum depth of the HS-dag - self.node_labels = [] # list of diagnoses or conflicts found - self.path_labels = [] # list of diagnoses or conflicts found + self.node_labels: List[List[int]] = [] # list of diagnoses or conflicts found + self.path_labels: List[List[int]] = [] # list of diagnoses or conflicts found - self.root = None # root node of the HS-dag - self.open_nodes = [] # list of open nodes - self.label_nodes_map = {} # Map of - self.nodes_lookup = {} # Map of + self.root: Optional[Node] = None # root node of the HS-dag + self.open_nodes: List[Node] = [] # list of open nodes + # Map of + self.label_nodes_map: Dict[int, List[Node]] = {} + self.nodes_lookup: Dict[int, Node] = {} # Map of - def get_conflicts(self): + def get_conflicts(self) -> List[List[int]]: """ Returns the list of conflicts found. """ if self.labeler.get_type() == LabelerType.CONFLICT: return self.node_labels - else: - return self.path_labels + return self.path_labels - def get_diagnoses(self): + def get_diagnoses(self) -> List[List[int]]: """ Returns the list of diagnoses found. """ if self.labeler.get_type() == LabelerType.CONFLICT: return self.path_labels - else: - return self.node_labels + return self.node_labels def should_stop_construction(self) -> bool: - condition1 = self.max_number_diagnoses != -1 and self.max_number_diagnoses <= len(self.get_diagnoses()) - condition2 = self.max_number_conflicts != -1 and self.max_number_conflicts <= len(self.get_conflicts()) + condition1 = (self.max_number_diagnoses != -1 + and self.max_number_diagnoses <= len(self.get_diagnoses())) + condition2 = (self.max_number_conflicts != -1 + and self.max_number_conflicts <= len(self.get_conflicts())) return condition1 or condition2 - def construct(self): + def construct(self) -> None: """ Constructs the HS-dag. """ @@ -87,7 +95,7 @@ def create_root(self, param: AbstractHSParameters) -> bool: return has_root_label - def create_nodes(self): + def create_nodes(self) -> None: """ Creates nodes of the HS-dag. """ @@ -106,7 +114,7 @@ def create_nodes(self): if node.status == NodeStatus.OPEN: self.expand(node) - def label(self, node: Node): + def label(self, node: Node) -> None: """ Labels a node - identify new conflict or diagnosis. """ @@ -128,162 +136,177 @@ def label(self, node: Node): else: # found a path label self.found_a_path_label_at_node(node) - def expand(self, nodeToExpand: Node): + def expand(self, node_to_expand: Node) -> None: """ Creates children of a node. """ - for arcLabel in nodeToExpand.label: - param_parent_node = nodeToExpand.parameters - new_param = self.labeler.identify_new_node_parameters(param_parent_node, arcLabel) + for arc_label in node_to_expand.label: + param_parent_node = node_to_expand.parameters + if param_parent_node is None: + raise HSDAGException("The parent node must have parameters.") + + new_param = self.labeler.identify_new_node_parameters(param_parent_node, arc_label) # rule 1.a - reuse node - node = self.get_reusable_node(nodeToExpand.path_label, arcLabel) + node = self.get_reusable_node(node_to_expand.path_label, arc_label) if node is not None: - node.add_parent(nodeToExpand) + node.add_parent(node_to_expand) else: # rule 1.b - generate a new node - node = Node(parent=nodeToExpand, arc_label=arcLabel, parameters=new_param) + node = Node(parent=node_to_expand, arc_label=arc_label, parameters=new_param) hashcode = sum(node.path_label) self.nodes_lookup[hashcode] = node if not self.can_prune(node): self.open_nodes.append(node) - def add_item_to_label_nodes_map(self, label, node): + def add_item_to_label_nodes_map(self, label: List[int], node: Node) -> None: """ Adds a node to the label_nodes_map. """ hashcode = sum(label) - if hashcode in self.label_nodes_map.keys(): + if hashcode in self.label_nodes_map: self.label_nodes_map[hashcode].append(node) else: - self.label_nodes_map[hashcode] = node + self.label_nodes_map[hashcode] = [node] @staticmethod - def compute_label(labeler: IHSLabelable, param: AbstractHSParameters): + def compute_label(labeler: IHSLabelable, param: AbstractHSParameters) -> List[List[int]]: return labeler.get_label(param) @staticmethod - def compute_label_from_node(labeler: IHSLabelable, node: Node): + def compute_label_from_node(labeler: IHSLabelable, node: Node) -> List[List[int]]: param = node.parameters + if param is None: + raise HSDAGException("The node must have parameters.") return HSDAG.compute_label(labeler, param) - def add_node_labels(self, labels: list[list[int]]): + def add_node_labels(self, labels: List[List[int]]) -> None: for label in labels: self.node_labels.append(label) - def found_a_path_label_at_node(self, node: Node): + def found_a_path_label_at_node(self, node: Node) -> None: node.status = NodeStatus.CHECKED - pathLabel = node.path_label.copy() + path_label = node.path_label.copy() - self.path_labels.append(pathLabel) + self.path_labels.append(path_label) @staticmethod - def select_label(labels): + def select_label(labels: List[List[int]]) -> List[int]: return labels[0] - def has_nodes_to_expand(self): + def has_nodes_to_expand(self) -> bool: return len(self.open_nodes) > 0 - def get_next_node(self): + def get_next_node(self) -> Node: return self.open_nodes.pop(0) - def has_root(self): + def has_root(self) -> bool: return self.root is not None # Pruning engine - def skip_node(self, node: Node): + def skip_node(self, node: Node) -> bool: condition1 = self.max_depth != 0 and self.max_depth < node.level return node.status != NodeStatus.OPEN or condition1 or self.can_prune(node) - def can_prune(self, node: Node): + def can_prune(self, node_2prime: Node) -> bool: # 3.i - if n is checked, and n'' is such that H(n) ⊆ H(n'), then close the node n'' # n is a diagnosis for path_label in self.path_labels: # if node.path_label.containsAll(path_label): - if all(elem in path_label for elem in node.path_label): - node.status = NodeStatus.CLOSED + if all(elem in path_label for elem in node_2prime.path_label): + node_2prime.status = NodeStatus.CLOSED return True - # 3.ii - if n has been generated and node n'' is such that H(n') = H(n), then close node n'' - for n in self.open_nodes: - if len(n.path_label) == len(node.path_label) \ - and len(diff(n.path_label, node.path_label)) == 0: - node.status = NodeStatus.CLOSED + # 3.ii - if n has been generated and node n'' is such that H(n') = H(n), + # then close node n'' + for node in self.open_nodes: + if len(node.path_label) == len(node_2prime.path_label) \ + and len(diff(node.path_label, node_2prime.path_label)) == 0: + node_2prime.status = NodeStatus.CLOSED return True return False - def get_reusable_labels(self, node: Node): + def get_reusable_labels(self, node: Node) -> List[List[int]]: labels = [] for label in self.node_labels: # H(node) ∩ S = {} - if not utils.hasIntersection(node.path_label, label): + if not utils.has_intersection(node.path_label, label): labels.append(label) return labels - def get_reusable_node(self, path_labels, arc_label): - if path_labels is None: - h = [arc_label] + def get_reusable_node(self, path_labels: List[int], arc_label: int) -> Node | None: + if len(path_labels) == 0: + new_path_labels = [arc_label] else: - h = path_labels.copy() - h.append(arc_label) - hashcode = sum(h) + new_path_labels = path_labels.copy() + new_path_labels.append(arc_label) + hashcode = sum(new_path_labels) return self.nodes_lookup.get(hashcode) - def process_labels(self, labels): - if len(labels) > 0: - # check existing and obtained labels for subset-relations - non_min_labels = [] + def process_labels(self, labels: List[List[int]]) -> None: + # check existing and obtained labels for subset-relations + if len(labels) <= 0: + return - for fs in self.node_labels: - if contains(non_min_labels, fs): + non_min_labels: List[List[int]] = [] + for first_label in self.node_labels: + if contains(non_min_labels, first_label): + continue + + for second_label in labels: + if contains(non_min_labels, second_label): continue - for cs in labels: - if contains(non_min_labels, cs): - continue + greater = first_label if len(first_label) > len(second_label) else second_label + smaller = second_label if len(first_label) > len(second_label) else first_label + if not utils.contains_all(greater, smaller): + continue - greater = fs if len(fs) > len(cs) else cs - smaller = cs if len(fs) > len(cs) else fs + non_min_labels.append(greater) + # update the DAG + nodes = self.label_nodes_map.get(sum(greater)) - if utils.contains_all(greater, smaller): - non_min_labels.append(greater) + if nodes is None: + continue - if len(greater) > len(smaller): - # update the DAG - nodes = self.label_nodes_map.get(greater) + # get a list of nodes which have the status OPEN + open_nodes = [n for n in nodes if n.status == NodeStatus.OPEN] - if nodes is not None: - for nd in nodes: - if nd.get_status() == NodeStatus.OPEN: - nd.set_label(smaller) # relabel the node with smaller - self.add_item_to_label_nodes_map(smaller, nd) # add new label to the map + self.update_dag(greater, open_nodes, smaller) - delete = diff(greater, smaller) - for label in delete: - child = nd.get_children().get(label) + # remove the known non - minimal conflicts + for non_min_label in non_min_labels: + labels.remove(non_min_label) # labels.removeAll(non_min_labels) + hashcode = sum(non_min_label) + del self.label_nodes_map[hashcode] # non_min_labels.forEach(label_nodesMap::remove) - if child is not None: - child.parents.remove(nd) - nd.get_children().remove(label) - self.clean_up_nodes(child) + # add new labels to the list of labels + self.add_node_labels(labels) - # remove the known non - minimal conflicts - for label in non_min_labels: - labels.remove(label) # labels.removeAll(non_min_labels) - del self.label_nodes_map[label] # non_min_labels.forEach(label_nodesMap::remove) + def update_dag(self, greater: List[int], open_nodes: List[Node], smaller: List[int]) -> None: + for node in open_nodes: + node.label = smaller # relabel the node with smaller + # add new label to the map + self.add_item_to_label_nodes_map(smaller, node) + delete = diff(greater, smaller) - # add new labels to the list of labels - self.add_node_labels(labels) - # hsdag.addNodeLabels(labels) + for label in delete: + child = node.children.get(label) + if child is None: + continue + child.parents.remove(node) + del node.children[label] + self.clean_up_nodes(child) - def clean_up_nodes(self, node: Node): - del self.nodes_lookup[node.path_label] + def clean_up_nodes(self, node: Node) -> None: + hashcode = sum(node.path_label) + del self.nodes_lookup[hashcode] if node.status == NodeStatus.OPEN: node.status = NodeStatus.PRUNED # downward clean up - for arcLabel in node.children.keys(): - child = node.children.get(arcLabel) + for arc_label in node.children.keys(): + child = node.children.get(arc_label) if child is not None: self.clean_up_nodes(child) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/__init__.py index e69de29..4239229 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/__init__.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/__init__.py @@ -0,0 +1,13 @@ +from .fastdiag_labeler import FastDiagParameters, FastDiagLabeler +from .labeler import IHSLabelable, AbstractHSParameters, LabelerType +from .quickxplain_labeler import QuickXPlainParameters, QuickXPlainLabeler + +__all__ = [ + 'IHSLabelable', + 'AbstractHSParameters', + 'LabelerType', + 'FastDiagParameters', + 'FastDiagLabeler', + 'QuickXPlainParameters', + 'QuickXPlainLabeler' +] \ No newline at end of file diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/fastdiag_labeler.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/fastdiag_labeler.py index 559da19..6c9092b 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/fastdiag_labeler.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/fastdiag_labeler.py @@ -4,18 +4,19 @@ """ from dataclasses import dataclass +from typing import List -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.fastdiag import FastDiag -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.labeler import IHSLabelable, LabelerType, AbstractHSParameters +from .labeler import IHSLabelable, LabelerType, AbstractHSParameters +from ...checker import ConsistencyChecker +from ...fastdiag import FastDiag @dataclass class FastDiagParameters(AbstractHSParameters): - B: list[int] + set_b: List[int] - def __str__(self): - return f"FastDiagParameters{{C={self.C}, B={self.B}}}" + def __str__(self) -> str: + return f"FastDiagParameters{{C={self.set_c}, B={self.set_b}}}" class FastDiagLabeler(FastDiag, IHSLabelable): @@ -33,36 +34,38 @@ def get_type(self) -> LabelerType: def get_initial_parameters(self) -> AbstractHSParameters: return self.initial_parameters - def get_label(self, parameters: AbstractHSParameters) -> list: + def get_label(self, parameters: AbstractHSParameters) -> List[List[int]]: """ Identifies a diagnosis """ - assert isinstance(parameters, FastDiagParameters), "parameter must be an instance of FastDiagParameters" - neg_C = [-1 * item for item in parameters.C] - if len(parameters.C) >= 1 \ - and (len(parameters.B) == 0 or self.checker.is_consistent(parameters.B + neg_C, [])): - - diag = self.findDiagnosis(parameters.C, parameters.B) + assert isinstance(parameters, FastDiagParameters), \ + "parameter must be an instance of FastDiagParameters" + neg_c = [-1 * item for item in parameters.set_c] + if len(parameters.set_c) >= 1 \ + and (len(parameters.set_b) == 0 + or self.checker.is_consistent(parameters.set_b + neg_c, [])): + + diag = self.find_diagnosis(parameters.set_c, parameters.set_b) if len(diag) != 0: return [diag] return [] - def identify_new_node_parameters(self, param_parent_node: AbstractHSParameters, arcLabel: int) \ - -> AbstractHSParameters: + def identify_new_node_parameters(self, param_parent_node: AbstractHSParameters, + arc_label: int) -> AbstractHSParameters: """ Identifies the new node's parameters on the basis of the parent node's parameters. """ - assert isinstance(param_parent_node, - FastDiagParameters), "parameter must be an instance of FastDiagParameters" + assert isinstance(param_parent_node, FastDiagParameters),\ + "parameter must be an instance of FastDiagParameters" - C = param_parent_node.C.copy() - C.remove(arcLabel) - B = param_parent_node.B.copy() - B.append(arcLabel) + new_c = param_parent_node.set_c.copy() + new_c.remove(arc_label) + new_b = param_parent_node.set_b.copy() + new_b.append(arc_label) # D = param_parent_node.D.copy() # D.append(arcLabel) - return FastDiagParameters(C, [], B) + return FastDiagParameters(new_c, [], new_b) - def get_instance(self, checker: ConsistencyChecker): + def get_instance(self, checker: ConsistencyChecker) -> IHSLabelable: return FastDiagLabeler(checker, self.initial_parameters) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/labeler.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/labeler.py index 559c5a9..ff68679 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/labeler.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/labeler.py @@ -6,8 +6,9 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum +from typing import List -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker +from ...checker import ConsistencyChecker class LabelerType(Enum): @@ -17,8 +18,8 @@ class LabelerType(Enum): @dataclass class AbstractHSParameters: - C: list[int] - D: list[int] + set_c: List[int] + set_d: List[int] class IHSLabelable(ABC): @@ -35,14 +36,14 @@ def get_initial_parameters(self) -> AbstractHSParameters: pass @abstractmethod - def get_label(self, parameters: AbstractHSParameters) -> list: + def get_label(self, parameters: AbstractHSParameters) -> List[List[int]]: pass @abstractmethod - def identify_new_node_parameters(self, param_parent_node: AbstractHSParameters, arcLabel: int) \ - -> AbstractHSParameters: + def identify_new_node_parameters(self, param_parent_node: AbstractHSParameters, + arc_label: int) -> AbstractHSParameters: pass @abstractmethod - def get_instance(self, checker: ConsistencyChecker): + def get_instance(self, checker: ConsistencyChecker) -> 'IHSLabelable': pass diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/quickxplain_labeler.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/quickxplain_labeler.py index f41f3f5..de15350 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/quickxplain_labeler.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/quickxplain_labeler.py @@ -4,19 +4,19 @@ """ from dataclasses import dataclass +from typing import List -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.labeler import IHSLabelable, LabelerType, \ - AbstractHSParameters -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.quickxplain import QuickXPlain +from .labeler import IHSLabelable, LabelerType, AbstractHSParameters +from ...checker import ConsistencyChecker +from ...quickxplain import QuickXPlain @dataclass class QuickXPlainParameters(AbstractHSParameters): - B: list[int] + set_b: List[int] - def __str__(self): - return f"QuickXPlainParameters{{C={self.C}, B={self.B}}}" + def __str__(self) -> str: + return f"QuickXPlainParameters{{C={self.set_c}, B={self.set_b}}}" class QuickXPlainLabeler(QuickXPlain, IHSLabelable): @@ -34,35 +34,36 @@ def get_type(self) -> LabelerType: def get_initial_parameters(self) -> AbstractHSParameters: return self.initial_parameters - def get_label(self, parameters: AbstractHSParameters) -> list: + def get_label(self, parameters: AbstractHSParameters) -> List[List[int]]: """ Identifies a conflict """ - assert isinstance(parameters, QuickXPlainParameters), "parameter must be an instance of QuickXPlainParameters" + assert isinstance(parameters, QuickXPlainParameters), \ + "parameter must be an instance of QuickXPlainParameters" - cs = self.findConflictSet(parameters.C, parameters.B + parameters.D) + set_cs = self.find_conflict(parameters.set_c, parameters.set_b + parameters.set_d) - if len(cs) != 0: + if len(set_cs) != 0: # reverse the order of the conflict set - cs.reverse() - return [cs] + set_cs.reverse() + return [set_cs] return [] - def identify_new_node_parameters(self, param_parent_node: AbstractHSParameters, arcLabel: int) \ - -> AbstractHSParameters: + def identify_new_node_parameters(self, param_parent_node: AbstractHSParameters, + arc_label: int) -> AbstractHSParameters: """ Identifies the new node's parameters on the basis of the parent node's parameters. """ - assert isinstance(param_parent_node, - QuickXPlainParameters), "parameter must be an instance of QuickXPlainParameters" + assert isinstance(param_parent_node, QuickXPlainParameters), \ + "parameter must be an instance of QuickXPlainParameters" - C = param_parent_node.C.copy() - C.remove(arcLabel) - B = param_parent_node.B.copy() - D = param_parent_node.D.copy() - D.append(-1 * arcLabel) + new_c = param_parent_node.set_c.copy() + new_c.remove(arc_label) + new_b = param_parent_node.set_b.copy() + new_d = param_parent_node.set_d.copy() + new_d.append(-1 * arc_label) - return QuickXPlainParameters(C, D, B) + return QuickXPlainParameters(new_c, new_d, new_b) - def get_instance(self, checker: ConsistencyChecker): + def get_instance(self, checker: ConsistencyChecker) -> 'IHSLabelable': return QuickXPlainLabeler(checker, self.initial_parameters) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/node.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/node.py index 19b8f7b..dce3bad 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/node.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/node.py @@ -3,8 +3,9 @@ https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/hs/Node.java """ from enum import Enum +from typing import Optional, List, Any, Dict -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.labeler import AbstractHSParameters +from .labeler.labeler import AbstractHSParameters class NodeStatus(Enum): @@ -14,21 +15,25 @@ class NodeStatus(Enum): CHECKED = 'Checked' # Checked - the label of this node is a Conflict or a Diagnosis -class Node(object): +# pylint: disable=too-many-instance-attributes +class Node: """ A data structure representing a node of an HS-dag. """ generating_node_id = -1 - def __init__(self, parent: 'Node' = None, arc_label: int = None, parameters: AbstractHSParameters = None): - self.id = Node.generating_node_id = Node.generating_node_id + 1 + def __init__(self, parent: Optional['Node'] = None, arc_label: Optional[int] = None, + parameters: Optional[AbstractHSParameters] = None): + self.node_id = Node.generating_node_id = Node.generating_node_id + 1 self.status = NodeStatus.OPEN # node status - self.label = [] # label of the node - it can be a minimal conflict or a diagnosis or null - self.arc_label = arc_label # the constraint associated to the arch which comes to this node + # label of the node - it can be a minimal conflict or a diagnosis or null + self.label: List[Any] = [] + # the constraint associated to the arch which comes to this node + self.arc_label = arc_label self.parameters = parameters - self.path_label = None # labels of the path to here - self.parents = None # the node's parents. Can be null for the root node. - self.children = {} # the node's children + self.path_label: List[int] = [] # labels of the path to here + self.parents: List[Node] = [] # the node's parents. Can be null for the root node. + self.children: Dict[Any, Any] = {} # the node's children if parent is None: self.level = 0 # tree level @@ -36,16 +41,17 @@ def __init__(self, parent: 'Node' = None, arc_label: int = None, parameters: Abs self.level = parent.level + 1 # tree level self.parents = [] self.parents.append(parent) - if parent.path_label is not None: - self.path_label = parent.path_label.copy() - else: + if len(parent.path_label) == 0: self.path_label = [] - self.path_label.append(arc_label) + else: + self.path_label = parent.path_label.copy() + if arc_label is not None: + self.path_label.append(arc_label) parent.children[arc_label] = self @staticmethod - def create_root(label: list[int], parameters: AbstractHSParameters) -> 'Node': + def create_root(label: List[int], parameters: AbstractHSParameters) -> 'Node': Node.generating_node_id = -1 root = Node() @@ -54,19 +60,19 @@ def create_root(label: list[int], parameters: AbstractHSParameters) -> 'Node': return root - def add_parent(self, parent: 'Node'): + def add_parent(self, parent: 'Node') -> None: if self.is_root(): raise ValueError("The root node cannot have parents.") - else: - self.parents.append(parent) + self.parents.append(parent) - def add_child(self, arcLabel: int, child: 'Node'): - self.children[arcLabel] = child + def add_child(self, arc_label: int, child: 'Node') -> None: + self.children[arc_label] = child child.add_parent(self) def is_root(self) -> bool: - return self.parents is None + return len(self.parents) == 0 - def __str__(self): - return f"Node{{id={self.id}, level={self.level}, status={self.status}, label={self.label}, " \ - f"parameter={self.parameters}, arcLabel={self.arc_label}, pathLabels={self.path_label}}}" + def __str__(self) -> str: + return (f"Node{{id={self.node_id}, level={self.level}, status={self.status}, " + f"label={self.label}, parameter={self.parameters}, arcLabel={self.arc_label}, " + f"pathLabels={self.path_label}}}") diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/quickxplain.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/quickxplain.py index c689189..d7fd590 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/quickxplain.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/quickxplain.py @@ -4,47 +4,50 @@ """ import logging +from typing import List -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.utils import split +from .checker import ConsistencyChecker +from .utils import split class QuickXPlain: """ Implementation of QuickXPlain algorithm - Junker, Ulrich. "Quickxplain: Conflict detection for arbitrary constraint propagation algorithms." + Junker, Ulrich. "Quickxplain: Conflict detection + for arbitrary constraint propagation algorithms." IJCAI’01 Workshop on Modelling and Solving problems with constraints. Vol. 4. 2001. """ def __init__(self, checker: ConsistencyChecker) -> None: self.checker = checker - def findConflictSet(self, C: list, B: list) -> list: + def find_conflict(self, set_c: List[int], set_b: List[int]) -> List[int]: """ // Func QuickXPlain(C={c1,c2,…, cm}, B): CS // IF consistent(B∪C) return "No conflict"; // IF isEmpty(C) return Φ; // ELSE return QX(Φ, C, B); - :param C: a consideration set - :param B: a background knowledge + :param set_c: a consideration set + :param set_b: a background knowledge :return: a conflict set or an empty set """ - logging.debug(f'quickXPlain [C={C}, B={B}]') + logging.debug('>>> QuickXPlain [C=%s, B=%s]', set_c, set_b) # print(f'quickXPlain [C={C}, B={B}]') # if C is empty or consistent(B U C) then return empty set - if len(C) == 0 or self.checker.is_consistent(B + C, []): + if len(set_c) == 0 or self.checker.is_consistent(set_b + set_c, []): logging.debug('return Φ') # print('return Φ') return [] - else: # return QX(Φ, C, B) - cs = self.qx([], C, B) - logging.debug(f'return {cs}') - # print(f'return {cs}') - return cs + # return QX(Φ, C, B) + set_cs = self._qx([], set_c, set_b) - def qx(self, D: list, C: list, B: list) -> list: + logging.debug('return %s', set_cs) + # print(f'return {cs}') + return set_cs + + def _qx(self, set_d: List[int], set_c: List[int], set_b: List[int]) -> List[int]: """ // func QX(Δ, C={c1,c2, …, cq}, B): CS // IF (Δ != Φ AND inconsistent(B)) return Φ; @@ -54,32 +57,32 @@ def qx(self, D: list, C: list, B: list) -> list: // CS1 <-- QX(C2, C1, B ∪ C2); // CS2 <-- QX(CS1, C2, B ∪ CS1); // return (CS1 ∪ CS2) - :param D: check to skip redundant consistency checks - :param C: a consideration set of constraints - :param B: a background knowledge + :param set_d: check to skip redundant consistency checks + :param set_c: a consideration set of constraints + :param set_b: a background knowledge :return: a conflict set or an empty set """ - logging.debug(f'>>> QX [D={D}, C={C}, B={B}]') + logging.debug('>>> QX [D=%s, C=%s, B=%s]', set_d, set_c, set_b) # if D != Φ and inconsistent(B) then return Φ - if len(D) != 0 and not self.checker.is_consistent(B, C): + if len(set_d) != 0 and not self.checker.is_consistent(set_b, set_c): logging.debug('<<< return Φ') return [] # if C is singleton then return C - if len(C) == 1: - logging.debug(f'<<< return {C}') - return C + if len(set_c) == 1: + logging.debug('<<< return %s', set_c) + return set_c # C1 = {c1..ck}; C2 = {ck+1..cn}; - C1, C2 = split(C) + set_c1, set_c2 = split(set_c) # CS1 = QX(C2, C1, B U C2) - CS1 = self.qx(C2, C1, (B + C2)) + cs1 = self._qx(set_c2, set_c1, (set_b + set_c2)) # CS2 = QX(CS1, C2, B U CS1) - CS2 = self.qx(CS1, C2, (B + CS1)) + cs2 = self._qx(cs1, set_c2, (set_b + cs1)) - logging.debug(f'<<< return [CS1={CS1} U CS2={CS2}]') + logging.debug('<<< return %s', (cs1 + cs2)) # return CS1 U CS2 - return CS1 + CS2 + return cs1 + cs2 diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/utils.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/utils.py index 07b1389..df0b65f 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/utils.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/utils.py @@ -1,44 +1,46 @@ #!/usr/bin/env python """Provides utility functions.""" +from typing import List, Tuple -def split(C: list) -> (list, list): +def split(clauses: List[int]) -> Tuple[List[int], List[int]]: """ Splits the given list of constraints/clauses into two parts. - :param C: a list of clauses + :param clauses: a list of clauses :return: a tuple of two lists """ - half_size = len(C) // 2 - return C[:half_size], C[half_size:] + half_size = len(clauses) // 2 + return clauses[:half_size], clauses[half_size:] -def diff(x: list, y: list) -> list: +def diff(list_x: List[int], list_y: List[int]) -> List[int]: """ Returns the difference of two lists. - :param x: list - :param y: list + :param list_x: list + :param list_y: list :return: list """ - return [item for item in x if item not in y] + return [item for item in list_x if item not in list_y] -def get_hashcode(C: list) -> str: +def get_hashcode(clauses: List[int]) -> str: """ Returns the hashcode of the given CNF formula. - :param C: a list of clauses + :param clauses: a list of clauses :return: the hashcode of the given CNF formula """ - C = sorted(C, key=lambda x: x[0]) - return str(C) + # clauses = sorted(clauses, key=lambda x: x[0]) + clauses = sorted(clauses) + return str(clauses) -def hasIntersection(list1: list, list2: list) -> bool: +def has_intersection(list1: List[int], list2: List[int]) -> bool: return any(i in list1 for i in list2) -def contains(listofList: list, aList: list) -> bool: - return any(aList == x for x in listofList) +def contains(list_of_lists: List[List[int]], a_list: List[int]) -> bool: + return any(a_list == x for x in list_of_lists) -def contains_all(greater: list, smaller: list) -> bool: +def contains_all(greater: List[int], smaller: List[int]) -> bool: return all(i in smaller for i in greater) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_abstract_identifier.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_abstract_identifier.py new file mode 100644 index 0000000..cc797aa --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_abstract_identifier.py @@ -0,0 +1,86 @@ +from abc import abstractmethod +from typing import cast, List, Tuple + +from flamapy.core.models import VariabilityModel +from flamapy.core.operations import Operation +from flamapy.metamodels.configuration_metamodel.models import Configuration + +from .diagnosis.checker import ConsistencyChecker +from .diagnosis.hsdag.hsdag import HSDAG +from ..models.pysat_diagnosis_model import DiagnosisModel + + +def _execute_hsdag(model: DiagnosisModel, hsdag: HSDAG) -> Tuple[str, str]: + hsdag.construct() + diagnoses = hsdag.get_diagnoses() + conflicts = hsdag.get_conflicts() + + if len(diagnoses) == 0: + diag_mess = 'No diagnosis found' + elif len(diagnoses) == 1: + diag_mess = 'Diagnosis: ' + diag_mess += model.get_pretty_diagnoses(diagnoses) + else: + diag_mess = 'Diagnoses: ' + diag_mess += model.get_pretty_diagnoses(diagnoses) + if len(conflicts) == 0: + cs_mess = 'No conflicts found' + elif len(conflicts) == 1: + cs_mess = 'Conflict: ' + cs_mess += model.get_pretty_diagnoses(conflicts) + else: + cs_mess = 'Conflicts: ' + cs_mess += model.get_pretty_diagnoses(conflicts) + + return cs_mess, diag_mess + + +class Glucose3AbstractIdentifier(Operation): + """ + An abstract operation for computes conflicts or diagnoses. + Four optional inputs: + - configuration - a configuration to be diagnosed + - test_case - a test case to be used for diagnosis + - max_depth - specify the maximum depth of the HSDAG to be computed + """ + + def __init__(self) -> None: + self.result = False + self.configuration = None + self.test_case = None + self.solver_name = 'glucose3' + self.result_messages: List[str] = [] + + self.checker = None + self.max_depth = 0 # 0 means no limit + + def set_max_depth(self, max_depth: int) -> None: + self.max_depth = max_depth + + def set_configuration(self, configuration: Configuration) -> None: + self.configuration = configuration + + def set_test_case(self, test_case: Configuration) -> None: + self.test_case = test_case + + def get_result(self) -> List[str]: + return self.result_messages + + def execute(self, model: VariabilityModel) -> 'Glucose3AbstractIdentifier': + model = cast(DiagnosisModel, model) + + checker, labeler = self.prepare_hsdag(model) + + cs_mess, diag_mess = _execute_hsdag(model, labeler) + + self.set_result_messages(cs_mess, diag_mess) + checker.delete() + return self + + @abstractmethod + def prepare_hsdag(self, model: DiagnosisModel) -> Tuple[ConsistencyChecker, HSDAG]: + pass + + @abstractmethod + def set_result_messages(self, cs_mess: str, diag_mess: str) -> None: + pass \ No newline at end of file diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_conflict.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_conflict.py index 430c086..ba52ba6 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_conflict.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_conflict.py @@ -1,19 +1,16 @@ -from typing import Any +from typing import Tuple -from flamapy.core.operations import Operation -from flamapy.metamodels.configuration_metamodel.models import Configuration -from flamapy.core.models import VariabilityModel +from flamapy.metamodels.pysat_diagnosis_metamodel.models import DiagnosisModel +from . import Glucose3AbstractIdentifier +from .diagnosis.checker import ConsistencyChecker +from .diagnosis.hsdag.hsdag import HSDAG +from .diagnosis.hsdag.labeler.quickxplain_labeler import QuickXPlainParameters, QuickXPlainLabeler -from flamapy.metamodels.pysat_diagnosis_metamodel.models.pysat_diagnosis_model import DiagnosisModel -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.hsdag import HSDAG -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.quickxplain_labeler import \ - QuickXPlainParameters, QuickXPlainLabeler -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker - -class Glucose3Conflict(Operation): +class Glucose3Conflict(Glucose3AbstractIdentifier): """ - An operation that computes conflicts and diagnoses using the combination of HSDAG and QuickXPlain algorithms. + An operation that computes conflicts and diagnoses + using the combination of HSDAG and QuickXPlain algorithms. Four optional inputs: - configuration - a configuration to be diagnosed - test_case - a test case to be used for diagnosis @@ -22,76 +19,34 @@ class Glucose3Conflict(Operation): """ def __init__(self) -> None: - self.result = False - self.configuration = None - self.test_case = None - self.solverName = 'glucose3' - self.diagnosis_messages: list[str] = [] - - self.checker = None + super().__init__() self.max_conflicts = -1 # -1 means no limit - self.max_depth = 0 # 0 means no limit def set_max_conflicts(self, max_conflicts: int) -> None: self.max_conflicts = max_conflicts - def set_max_depth(self, max_depth: int) -> None: - self.max_depth = max_depth - - def get_diagnosis_messages(self) -> list[Any]: - return self.get_result() - - def set_configuration(self, configuration: Configuration) -> None: - self.configuration = configuration - - def set_test_case(self, test_case: Configuration) -> None: - self.test_case = test_case - - def get_result(self) -> list[str]: - return self.diagnosis_messages - - def execute(self, model: VariabilityModel) -> 'Glucose3Conflict': + def prepare_hsdag(self, model: DiagnosisModel) -> Tuple[ConsistencyChecker, HSDAG]: # transform model to diagnosis model model.prepare_diagnosis_task(configuration=self.configuration, test_case=self.test_case) - # print(f'C: {diag_model.get_C()}') # print(f'B: {diag_model.get_B()}') - C = model.get_C() + set_c = model.get_c() + # if self.configuration is None: - # C.reverse() # reverse the list to get the correct order of constraints in the diagnosis messages + # reverse the list to get the correct order of constraints in the diagnosis messages + # C.reverse() + + checker = ConsistencyChecker(self.solver_name, model.get_kb()) + parameters = QuickXPlainParameters(set_c, [], model.get_b()) + labeler = QuickXPlainLabeler(checker, parameters) - checker = ConsistencyChecker(self.solverName, model.get_KB()) - parameters = QuickXPlainParameters(C, [], model.get_B()) - quickxplain = QuickXPlainLabeler(checker, parameters) - hsdag = HSDAG(quickxplain) + hsdag = HSDAG(labeler) hsdag.max_number_conflicts = self.max_conflicts hsdag.max_depth = self.max_depth - hsdag.construct() - - diagnoses = hsdag.get_diagnoses() - conflicts = hsdag.get_conflicts() - - if len(diagnoses) == 0: - diag_mess = 'No diagnosis found' - elif len(diagnoses) == 1: - diag_mess = f'Diagnosis: ' - diag_mess += model.get_pretty_diagnoses(diagnoses) - else: - diag_mess = f'Diagnoses: ' - diag_mess += model.get_pretty_diagnoses(diagnoses) - - if len(conflicts) == 0: - cs_mess = 'No conflicts found' - elif len(conflicts) == 1: - cs_mess = f'Conflict: ' - cs_mess += model.get_pretty_diagnoses(conflicts) - else: - cs_mess = f'Conflicts: ' - cs_mess += model.get_pretty_diagnoses(conflicts) + return checker, hsdag - self.diagnosis_messages.append(cs_mess) - self.diagnosis_messages.append(diag_mess) - checker.delete() - return self + def set_result_messages(self, cs_mess: str, diag_mess: str) -> None: + self.result_messages.append(cs_mess) + self.result_messages.append(diag_mess) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_diagnosis.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_diagnosis.py index 244c308..160d4bd 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_diagnosis.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_diagnosis.py @@ -1,20 +1,16 @@ -from typing import Any +from typing import Tuple -from flamapy.core.operations import Operation -from flamapy.metamodels.configuration_metamodel.models import Configuration -from flamapy.core.models import VariabilityModel +from flamapy.metamodels.pysat_diagnosis_metamodel.models import DiagnosisModel +from . import Glucose3AbstractIdentifier +from .diagnosis.checker import ConsistencyChecker +from .diagnosis.hsdag.hsdag import HSDAG +from .diagnosis.hsdag.labeler.fastdiag_labeler import FastDiagParameters, FastDiagLabeler -from flamapy.metamodels.pysat_diagnosis_metamodel.models.pysat_diagnosis_model import DiagnosisModel -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.hsdag import HSDAG -from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.fastdiag_labeler import FastDiagParameters, \ - FastDiagLabeler - - -class Glucose3Diagnosis(Operation): +class Glucose3Diagnosis(Glucose3AbstractIdentifier): """ - An operation that computes diagnoses and conflict sets using the combination of HSDAG and FastDiag algorithms. + An operation that computes diagnoses and conflict sets + using the combination of HSDAG and FastDiag algorithms. Four optional inputs: - configuration - a configuration to be diagnosed - test_case - a test case to be used for diagnosis @@ -23,79 +19,33 @@ class Glucose3Diagnosis(Operation): """ def __init__(self) -> None: - self.result = False - self.configuration = None - self.test_case = None - self.solverName = 'glucose3' - self.diagnosis_messages: list[str] = [] - - self.checker = None + super().__init__() self.max_diagnoses = -1 # -1 means no limit - self.max_depth = 0 # 0 means no limit def set_max_diagnoses(self, max_diagnoses: int) -> None: self.max_diagnoses = max_diagnoses - def set_max_depth(self, max_depth: int) -> None: - self.max_depth = max_depth - - def get_diagnosis_messages(self) -> list[Any]: - return self.get_result() - - def set_configuration(self, configuration: Configuration) -> None: - self.configuration = configuration - - def set_test_case(self, test_case: Configuration) -> None: - self.test_case = test_case - - def is_valid(self) -> bool: - pass - - def get_result(self) -> list[str]: - return self.diagnosis_messages - - def execute(self, model: VariabilityModel) -> 'Glucose3Diagnosis': + def prepare_hsdag(self, model: DiagnosisModel) -> Tuple[ConsistencyChecker, HSDAG]: # transform model to diagnosis model model.prepare_diagnosis_task(configuration=self.configuration, test_case=self.test_case) - print(f'C: {model.get_C()}') - print(f'B: {model.get_B()}') + # print(f'C: {model.get_c()}') + # print(f'B: {model.get_b()}') - C = model.get_C() + set_c = model.get_c() # if self.configuration is None: # C.reverse() # reverse the list to get the correct order of diagnosis - checker = ConsistencyChecker(self.solverName, model.get_KB()) - parameters = FastDiagParameters(C, [], model.get_B()) - fastdiag = FastDiagLabeler(checker, parameters) - hsdag = HSDAG(fastdiag) + checker = ConsistencyChecker(self.solver_name, model.get_kb()) + parameters = FastDiagParameters(set_c, [], model.get_b()) + labeler = FastDiagLabeler(checker, parameters) + + hsdag = HSDAG(labeler) hsdag.max_number_diagnoses = self.max_diagnoses hsdag.max_depth = self.max_depth - hsdag.construct() - - diagnoses = hsdag.get_diagnoses() - conflicts = hsdag.get_conflicts() - - if len(diagnoses) == 0: - diag_mess = 'No diagnosis found' - elif len(diagnoses) == 1: - diag_mess = f'Diagnosis: ' - diag_mess += model.get_pretty_diagnoses(diagnoses) - else: - diag_mess = f'Diagnoses: ' - diag_mess += model.get_pretty_diagnoses(diagnoses) - - if len(conflicts) == 0: - cs_mess = 'No conflicts found' - elif len(conflicts) == 1: - cs_mess = f'Conflict: ' - cs_mess += model.get_pretty_diagnoses(conflicts) - else: - cs_mess = f'Conflicts: ' - cs_mess += model.get_pretty_diagnoses(conflicts) + return checker, hsdag - self.diagnosis_messages.append(diag_mess) - self.diagnosis_messages.append(cs_mess) - checker.delete() - return self + def set_result_messages(self, cs_mess: str, diag_mess: str) -> None: + self.result_messages.append(diag_mess) + self.result_messages.append(cs_mess) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/__init__.py index 55ce70b..2e673e8 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/__init__.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/__init__.py @@ -2,8 +2,5 @@ __all__ = [ - 'FmToPysat', - 'CNFReader', - 'DimacsReader', - 'DimacsWriter' + "FmToDiagPysat", ] diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/fm_to_diag_pysat.py b/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/fm_to_diag_pysat.py index 3ed224e..949d934 100644 --- a/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/fm_to_diag_pysat.py +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/fm_to_diag_pysat.py @@ -1,15 +1,15 @@ -import itertools from typing import Any, List -from flamapy.core.transformations import ModelToModel from flamapy.metamodels.fm_metamodel.models.feature_model import ( FeatureModel, Constraint, Feature, Relation, ) -from flamapy.metamodels.pysat_diagnosis_metamodel.models.pysat_diagnosis_model import DiagnosisModel + from flamapy.metamodels.pysat_metamodel.transformations.fm_to_pysat import FmToPysat +from ..models.pysat_diagnosis_model import DiagnosisModel + class FmToDiagPysat(FmToPysat): @staticmethod @@ -21,19 +21,45 @@ def get_destination_extension() -> str: return 'pysat_diagnosis' def __init__(self, source_model: FeatureModel) -> None: - self.source_model = source_model - self.counter = 1 + super().__init__(source_model) self.destination_model = DiagnosisModel() # self.r_cnf = self.destination_model.r_cnf # self.ctc_cnf = self.destination_model.ctc_cnf def add_root(self, feature: Feature) -> None: #self.r_cnf.append([self.destination_model.variables.get(feature.name)]) - self.destination_model.add_clause([self.destination_model.variables.get(feature.name)]) - print(self.destination_model.__class__) - self.destination_model.add_clause_toMap(str(feature), [[self.destination_model.variables.get(feature.name)]]) + var = self.destination_model.variables.get(feature.name) + if var is None: + raise KeyError(f'Feature {feature.name} not found in the model') + + self.destination_model.add_clause([var]) + self.destination_model.add_clause_to_map(str(feature), [[var]]) - def _store_constraint_relation(self, relation: Relation, clauses:List[List[int]]) -> None: + def _store_constraint_relation(self, relation: Relation, clauses: List[List[int]]) -> None: for clause in clauses: self.destination_model.add_clause(clause) - self.destination_model.add_clause_toMap(str(relation), clauses) \ No newline at end of file + self.destination_model.add_clause_to_map(str(relation), clauses) + + def add_constraint(self, ctc: Constraint) -> None: + def get_term_variable(term: Any) -> int: + negated = False + if term.startswith('-'): + term = term[1:] + negated = True + + var = self.destination_model.variables.get(term) + if var is None: + raise KeyError(f'Feature {term} not found in the model') + + if negated: + return -var + return var + + ctc_clauses = [] + clauses = ctc.ast.get_clauses() + for clause in clauses: + clause_variables = list(map(get_term_variable, clause)) + ctc_clauses.append(clause_variables) + self.destination_model.add_clause(clause_variables) + + self.destination_model.add_clause_to_map(str(ctc), ctc_clauses) diff --git a/tests/unit_tests/diagnosis_test.py b/tests/unit_tests/diagnosis_test.py index 68f6f06..bea1ad0 100644 --- a/tests/unit_tests/diagnosis_test.py +++ b/tests/unit_tests/diagnosis_test.py @@ -2,10 +2,9 @@ from flamapy.metamodels.configuration_metamodel.transformations import ConfigurationBasicReader from flamapy.metamodels.fm_metamodel.transformations import FeatureIDEReader +from flamapy.metamodels.pysat_diagnosis_metamodel.transformations import FmToDiagPysat -from flamapy.metamodels.pysat_metamodel.transformations import FmToPysat - -from flamapy.metamodels.pysat_metamodel.operations import Glucose3Diagnosis, Glucose3Conflict +from flamapy.metamodels.pysat_diagnosis_metamodel.operations import Glucose3Diagnosis, Glucose3Conflict # def test_with_DiscoverMetamodels(): @@ -27,7 +26,7 @@ def test_fastdiag_all(): Identify all diagnoses """ feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() hsdag_fastdiag = Glucose3Diagnosis() hsdag_fastdiag.execute(model) @@ -43,7 +42,7 @@ def test_fastdiag_one(): Identify one diagnosis """ feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() hsdag_fastdiag = Glucose3Diagnosis() hsdag_fastdiag.max_diagnoses = 1 @@ -60,7 +59,7 @@ def test_fastdiag_two(): Identify two diagnoses """ feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() hsdag_fastdiag = Glucose3Diagnosis() hsdag_fastdiag.max_diagnoses = 2 @@ -74,7 +73,7 @@ def test_fastdiag_two(): def test_quickxplain_all(): feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() hsdag_quickxplain = Glucose3Conflict() hsdag_quickxplain.execute(model) @@ -88,7 +87,7 @@ def test_quickxplain_all(): def test_quickxplain_one(): feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() hsdag_quickxplain = Glucose3Conflict() hsdag_quickxplain.max_conflicts = 1 @@ -103,7 +102,7 @@ def test_quickxplain_one(): def test_quickxplain_two(): feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() hsdag_quickxplain = Glucose3Conflict() hsdag_quickxplain.max_conflicts = 2 @@ -118,7 +117,7 @@ def test_quickxplain_two(): def test_fastdiag_with_configuration(): feature_model = FeatureIDEReader("../resources/smartwatch_consistent.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() configuration = ConfigurationBasicReader("../resources/smartwatch_nonvalid.csvconf").transform() @@ -134,7 +133,7 @@ def test_fastdiag_with_configuration(): def test_quickxplain_with_configuration(): feature_model = FeatureIDEReader("../resources/smartwatch_consistent.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() configuration = ConfigurationBasicReader("../resources/smartwatch_nonvalid.csvconf").transform() @@ -149,7 +148,7 @@ def test_quickxplain_with_configuration(): def test_fastdiag_with_test_case(): feature_model = FeatureIDEReader("../resources/smartwatch_deadfeature.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() test_case = ConfigurationBasicReader("../resources/smartwatch_testcase.csvconf").transform() @@ -167,7 +166,7 @@ def test_fastdiag_with_test_case(): def test_quickxplain_with_testcase(): feature_model = FeatureIDEReader("../resources/smartwatch_deadfeature.fide").transform() - model = FmToPysat(feature_model).transform() + model = FmToDiagPysat(feature_model).transform() test_case = ConfigurationBasicReader("../resources/smartwatch_testcase.csvconf").transform()