diff --git a/.gitignore b/.gitignore index 058638a..c085867 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ __pycache__ *.egg-info* env +/.idea/ +/build/ diff --git a/README.md b/README.md index 64adb86..b04d5e2 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # pysat_metamodel -This repository will host the pysat metamode and its operation implementation +This repository will host the pysat metamodel and its operation implementation ## Install for development diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/models/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/models/__init__.py new file mode 100644 index 0000000..c0602b2 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/models/__init__.py @@ -0,0 +1,5 @@ +from .pysat_diagnosis_model import DiagnosisModel + +__all__ = [ + 'DiagnosisModel', +] diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/models/pysat_diagnosis_model.py b/flamapy/metamodels/pysat_diagnosis_metamodel/models/pysat_diagnosis_model.py new file mode 100644 index 0000000..742ba77 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/models/pysat_diagnosis_model.py @@ -0,0 +1,196 @@ +from typing import List, Dict, Tuple + +from flamapy.metamodels.configuration_metamodel.models import Configuration +from flamapy.metamodels.pysat_metamodel.models import PySATModel + + +class DiagnosisModel(PySATModel): + """ + This is a new version of the PySATModel class to support the following tasks: + 1. Diagnosis Task + If a configuration is given: + C = configuration + B = {f0 = true} + CF (i.e., = PySATModel) + else (no configuration is given): + a. Diagnosis the feature model + C = CF (i.e., = PySATModel - {f0 = true}) + B = {f0 = true} + b. Diagnosis the error + C = CF (i.e., = PySATModel - {f0 = true}) + B = {f0 = true} + test_case + where test_case is the following: + + Dead feature: test_case = {fi = true} + + False optional feature: test_case = {f_parent = true} & {f_child = false} + 2. Redundancy Detection Task (need negative constraints) + C = CF (i.e., = PySATModel - {f0 = true}) + B = {} + """ + + @staticmethod + def get_extension() -> str: + return 'pysat_diagnosis' + + def __init__(self) -> None: + super().__init__() + self.C = None # set of constraints which could be faulty + self.B = None # background knowledge (i.e., the knowledge that is known to be true) + + self.KB = None # set of all CNF with added assumptions + self.constraint_map: list[(str, list[list[int]])] = [] # map clauses to relationships/constraint + + self.constraint_assumption_map = None + + def add_clause_toMap(self, description: str, clauses: list[list[int]]) -> None: + self.constraint_map.append((description, clauses)) + + def get_C(self) -> list: + return self.C + + def get_B(self) -> list: + return self.B + + def get_KB(self) -> list: + return self.KB + + def get_pretty_diagnoses(self, assumptions: list[list[int]]) -> str: + diagnoses = [] + for ass in assumptions: + diag = [] + for item in ass: + if self.constraint_assumption_map[item]: + diag.append(self.constraint_assumption_map[item]) + diagnoses.append(f"[{', '.join(diag)}]") + + return ','.join(diagnoses) + + def prepare_diagnosis_task(self, configuration: Configuration = None, test_case: Configuration = None) -> None: + """ + Execute this method after the model is built. + If a configuration is given: + C = configuration + B = {f0 = true} + CF (i.e., = PySATModel) + else (no configuration is given): + a. Diagnosis the feature model + C = CF (i.e., = PySATModel - {f0 = true}) + B = {f0 = true} + b. Diagnosis the error + C = CF (i.e., = PySATModel - {f0 = true}) + B = {f0 = true} + test_case + where test_case is the following: + + Dead feature: test_case = {fi = true} + + False optional feature: test_case = {f_parent = true} & {f_child = false} + """ + if configuration is not None: + # C = configuration + # B = {f0 = true} + CF (i.e., = PySATModel) + self.C, self.B, self.KB, self.constraint_assumption_map = \ + self.prepare_assumptions(configuration=configuration) + else: + if test_case is None: + # Diagnosis the feature model + # C = CF (i.e., = PySATModel - {f0 = true}) + # B = {f0 = true} + self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions() + else: + # Diagnosis the error + # C = CF (i.e., = PySATModel - {f0 = true}) + # B = {f0 = true} + test_case + self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions(test_case=test_case) + + def prepare_redundancy_detection_task(self) -> None: + """ + This function prepares the model for WipeOutR algorithm. + Execute this method after the model is built. + C = CF (i.e., = PySATModel - {f0 = true}) + B = {} + """ + # C = CF (i.e., = PySATModel - {f0 = true}) + self.C = self.prepare_assumptions + self.B = [] # B = {} + # ToDo: TBD + + def prepare_assumptions(self, configuration: Configuration = None, test_case: Configuration = None) \ + -> Tuple[List, List, List, Dict]: + assumption = [] + KB = [] + constraint_assumption_map = {} + + id_assumption = len(self.variables) + 1 + id_assumption = self.prepare_assumptions_for_KB(KB, assumption, constraint_assumption_map, id_assumption) + + start_id_configuration = len(assumption) + if configuration is not None: + constraint_assumption_map = {} # reset the map + id_assumption = self.prepare_assumptions_for_configuration(KB, assumption, configuration, + constraint_assumption_map, + id_assumption) + + start_id_test_case = len(assumption) + if test_case is not None: + self.prepare_assumptions_for_configuration(KB, assumption, test_case, + constraint_assumption_map, + id_assumption) + + if configuration is not None: + B = assumption[:start_id_configuration] + C = assumption[start_id_configuration:] + else: + if test_case is not None: + B = [assumption[0]] + assumption[start_id_test_case:] + C = assumption[1:start_id_test_case] + else: + B = [assumption[0]] + C = assumption[1:] + + return C, B, KB, constraint_assumption_map + + def prepare_assumptions_for_KB(self, KB, assumption, constraint_assumption_map, id_assumption): + c_map = self.constraint_map + # loop through all tuples in the constraint map + for i in range(len(c_map)): + # get description + desc = c_map[i][0] + # get clauses + clauses = c_map[i][1] + # loop through all variables in the constraint + for j in range(len(clauses)): + # get each clause + clause = clauses[j] + # add the assumption variable to the clause + # assumption => clause + # i.e., -assumption v clause + clause.append(-1 * id_assumption) + + assumption.append(id_assumption) + KB.extend(clauses) + constraint_assumption_map[id_assumption] = desc + + id_assumption += 1 + + return id_assumption + + def prepare_assumptions_for_configuration(self, KB, assumption, configuration, constraint_assumption_map, + id_assumption): + config = [feat.name for feat in configuration.elements] + for feat in config: + if feat not in self.variables.keys(): + raise Exception(f'Feature {feat} is not in the model.') + + for feat in configuration.elements.items(): + desc = '' + clause = [] + + if feat[1]: + desc = f'{feat[0].name} = true' + clause = [self.variables[feat[0].name], -1 * id_assumption] + elif not feat[1]: + desc = f'{feat[0].name} = false' + clause = [-1 * self.variables[feat[0].name], -1 * id_assumption] + + assumption.append(id_assumption) + KB.append(clause) + constraint_assumption_map[id_assumption] = desc + + id_assumption += 1 + + return id_assumption diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/__init__.py new file mode 100644 index 0000000..e6b6196 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/__init__.py @@ -0,0 +1,8 @@ +from .glucose3_conflict import Glucose3Conflict +from .glucose3_diagnosis import Glucose3Diagnosis + + +__all__ = [ + 'Glucose3Diagnosis', + 'Glucose3Conflict' +] diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/checker.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/checker.py new file mode 100644 index 0000000..1f97196 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/checker.py @@ -0,0 +1,30 @@ +""" +A Java version of this implementation is available at: +https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/checker/ChocoConsistencyChecker.java +""" + +from pysat.solvers import Solver + + +class ConsistencyChecker: + + def __init__(self, solverName: str, KB: list) -> None: + self.solver = None + self.result = False + + self.solver = Solver(solverName, bootstrap_with=KB) + + def is_consistent(self, C: list, Δ: list) -> bool: + """ + Check if the given CNF formula is consistent using a solver. + :param C: a list of assumptions should be added to the CNF formula + :param Δ: a list of assumptions should not be added to the CNF formula + :return: a boolean value indicating whether the given CNF formula is consistent + """ + assumptions = C + [-1 * item for item in Δ] + self.result = self.solver.solve(assumptions=assumptions) + # print(f"assumptions: {assumptions} - result: {self.result}") + return self.result + + def delete(self): + self.solver.delete() diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/fastdiag.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/fastdiag.py new file mode 100644 index 0000000..40749d7 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/fastdiag.py @@ -0,0 +1,93 @@ +""" +A Java version of this implementation is available at: +https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/FastDiagV3.java +""" + +import logging + +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.utils import split, diff +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker + + +class FastDiag: + """ + Implementation of MSS-based FastDiag algorithm. + Le, V. M., Silva, C. V., Felfernig, A., Benavides, D., Galindo, J., & Tran, T. N. T. (2023). + FastDiagP: An Algorithm for Parallelized Direct Diagnosis. + arXiv preprint arXiv:2305.06951. + """ + + def __init__(self, checker: ConsistencyChecker) -> None: + self.checker = checker + + def findDiagnosis(self, C: list, B: list) -> list: + """ + Activate FastDiag algorithm if there exists at least one constraint, + which induces an inconsistency in B. Otherwise, it returns an empty set. + + // Func FastDiag(C, B) : Δ + // if isEmpty(C) or consistent(B U C) return Φ + // else return C \\ FD(Φ, C, B) + :param C: a consideration set of constraints + :param B: a background knowledge + :return: a diagnosis or an empty set + """ + logging.debug(f'fastDiag [C={C}, B={B}]') + # print(f'fastDiag [C={C}, B={B}]') + + # if isEmpty(C) or consistent(B U C) return Φ + if len(C) == 0 or self.checker.is_consistent(B + C, []): + logging.debug('return Φ') + # print('return Φ') + return [] + else: # return C \ FD(C, B, Φ) + mss = self.fd([], C, B) + diag = diff(C, mss) + + logging.debug(f'return {diag}') + # print(f'return {diag}') + return diag + + def fd(self, Δ: list, C: list, B: list) -> list: + """ + The implementation of MSS-based FastDiag algorithm. + The algorithm determines a maximal satisfiable subset MSS (Γ) of C U B. + + // Func FD(Δ, C = {c1..cn}, B) : MSS + // if Δ != Φ and consistent(B U C) return C; + // if singleton(C) return Φ; + // k = n/2; + // C1 = {c1..ck}; C2 = {ck+1..cn}; + // Δ1 = FD(C2, C1, B); + // Δ2 = FD(C1 - Δ1, C2, B U Δ1); + // return Δ1 ∪ Δ2; + :param Δ: check to skip redundant consistency checks + :param C: a consideration set of constraints + :param B: a background knowledge + :return: a maximal satisfiable subset MSS of C U B + """ + logging.debug(f'>>> FD [Δ={Δ}, C={C}, B={B}]') + + # if Δ != Φ and consistent(B U C) return C; + if len(Δ) != 0 and self.checker.is_consistent(B + C, Δ): + logging.debug(f'<<< return {C}') + return C + + # if singleton(C) return Φ; + if len(C) == 1: + logging.debug('<<< return Φ') + return [] + + # C1 = {c1..ck}; C2 = {ck+1..cn}; + C1, C2 = split(C) + + # Δ1 = FD(C2, C1, B); + Δ1 = self.fd(C2, C1, B) + # Δ2 = FD(C1 - Δ1, C2, B U Δ1); + C1withoutΔ1 = diff(C1, Δ1) + Δ2 = self.fd(C1withoutΔ1, C2, B + Δ1) + + logging.debug('<<< return [Δ1={Δ1} ∪ Δ2={Δ2}]') + + # return Δ1 + Δ2 + return Δ1 + Δ2 diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/hsdag.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/hsdag.py new file mode 100644 index 0000000..07d5861 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/hsdag.py @@ -0,0 +1,289 @@ +""" +A Java version of this implementation is available at: +https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/hs/HSDAG.java +""" + +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis import utils +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.labeler import IHSLabelable, LabelerType, \ + AbstractHSParameters +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.node import Node, NodeStatus +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.utils import diff, contains + + +class HSDAG: + """ + Implementation of the HS-dag algorithm. + IHSLabeler algorithms could return labels (conflict or diagnosis) which are not minimal. + """ + + def __init__(self, labeler: IHSLabelable) -> None: + self.labeler = labeler # could be FastDiag or QuickXPlain + + self.max_number_diagnoses = -1 # maximum number of diagnoses to be found + self.max_number_conflicts = -1 # maximum number of conflicts to be found + self.max_depth = 0 # maximum depth of the HS-dag + + self.node_labels = [] # list of diagnoses or conflicts found + self.path_labels = [] # list of diagnoses or conflicts found + + self.root = None # root node of the HS-dag + self.open_nodes = [] # list of open nodes + self.label_nodes_map = {} # Map of + self.nodes_lookup = {} # Map of + + def get_conflicts(self): + """ + Returns the list of conflicts found. + """ + if self.labeler.get_type() == LabelerType.CONFLICT: + return self.node_labels + else: + return self.path_labels + + def get_diagnoses(self): + """ + Returns the list of diagnoses found. + """ + if self.labeler.get_type() == LabelerType.CONFLICT: + return self.path_labels + else: + return self.node_labels + + def should_stop_construction(self) -> bool: + condition1 = self.max_number_diagnoses != -1 and self.max_number_diagnoses <= len(self.get_diagnoses()) + condition2 = self.max_number_conflicts != -1 and self.max_number_conflicts <= len(self.get_conflicts()) + return condition1 or condition2 + + def construct(self): + """ + Constructs the HS-dag. + """ + param = self.labeler.get_initial_parameters() + + has_root_label = self.create_root(param) + + if not self.should_stop_construction() and has_root_label: + self.create_nodes() + + def create_root(self, param: AbstractHSParameters) -> bool: + """ + Creates the root node of the HS-dag. + """ + has_root_label = True + + if not self.has_root(): + labels = self.compute_label(self.labeler, param) + + if len(labels) == 0: + has_root_label = False + else: # create root node + label = self.select_label(labels) + self.root = Node.create_root(label, param) + + self.open_nodes.append(self.root) + + self.add_node_labels(labels) # to reuse labels + self.add_item_to_label_nodes_map(label, self.root) + + return has_root_label + + def create_nodes(self): + """ + Creates nodes of the HS-dag. + """ + while self.has_nodes_to_expand(): + node = self.get_next_node() + + if not node.is_root(): + if self.skip_node(node): + continue + + self.label(node) + + if self.should_stop_construction(): + break + + if node.status == NodeStatus.OPEN: + self.expand(node) + + def label(self, node: Node): + """ + Labels a node - identify new conflict or diagnosis. + """ + # Reusing labels - H(node) ∩ S = {}, then label node by S + labels = self.get_reusable_labels(node) + + # compute labels if there are none to reuse + if len(labels) == 0: + labels = self.compute_label_from_node(self.labeler, node) + + self.process_labels(labels) + + if len(labels) > 0: + label = self.select_label(labels) + + node.label = label + self.add_item_to_label_nodes_map(label, node) + + else: # found a path label + self.found_a_path_label_at_node(node) + + def expand(self, nodeToExpand: Node): + """ + Creates children of a node. + """ + for arcLabel in nodeToExpand.label: + param_parent_node = nodeToExpand.parameters + new_param = self.labeler.identify_new_node_parameters(param_parent_node, arcLabel) + + # rule 1.a - reuse node + node = self.get_reusable_node(nodeToExpand.path_label, arcLabel) + if node is not None: + node.add_parent(nodeToExpand) + else: # rule 1.b - generate a new node + node = Node(parent=nodeToExpand, arc_label=arcLabel, parameters=new_param) + hashcode = sum(node.path_label) + self.nodes_lookup[hashcode] = node + + if not self.can_prune(node): + self.open_nodes.append(node) + + def add_item_to_label_nodes_map(self, label, node): + """ + Adds a node to the label_nodes_map. + """ + hashcode = sum(label) + if hashcode in self.label_nodes_map.keys(): + self.label_nodes_map[hashcode].append(node) + else: + self.label_nodes_map[hashcode] = node + + @staticmethod + def compute_label(labeler: IHSLabelable, param: AbstractHSParameters): + return labeler.get_label(param) + + @staticmethod + def compute_label_from_node(labeler: IHSLabelable, node: Node): + param = node.parameters + return HSDAG.compute_label(labeler, param) + + def add_node_labels(self, labels: list[list[int]]): + for label in labels: + self.node_labels.append(label) + + def found_a_path_label_at_node(self, node: Node): + node.status = NodeStatus.CHECKED + pathLabel = node.path_label.copy() + + self.path_labels.append(pathLabel) + + @staticmethod + def select_label(labels): + return labels[0] + + def has_nodes_to_expand(self): + return len(self.open_nodes) > 0 + + def get_next_node(self): + return self.open_nodes.pop(0) + + def has_root(self): + return self.root is not None + + # Pruning engine + def skip_node(self, node: Node): + condition1 = self.max_depth != 0 and self.max_depth < node.level + return node.status != NodeStatus.OPEN or condition1 or self.can_prune(node) + + def can_prune(self, node: Node): + # 3.i - if n is checked, and n'' is such that H(n) ⊆ H(n'), then close the node n'' + # n is a diagnosis + for path_label in self.path_labels: + # if node.path_label.containsAll(path_label): + if all(elem in path_label for elem in node.path_label): + node.status = NodeStatus.CLOSED + return True + + # 3.ii - if n has been generated and node n'' is such that H(n') = H(n), then close node n'' + for n in self.open_nodes: + if len(n.path_label) == len(node.path_label) \ + and len(diff(n.path_label, node.path_label)) == 0: + node.status = NodeStatus.CLOSED + return True + return False + + def get_reusable_labels(self, node: Node): + labels = [] + for label in self.node_labels: + # H(node) ∩ S = {} + if not utils.hasIntersection(node.path_label, label): + labels.append(label) + return labels + + def get_reusable_node(self, path_labels, arc_label): + if path_labels is None: + h = [arc_label] + else: + h = path_labels.copy() + h.append(arc_label) + hashcode = sum(h) + return self.nodes_lookup.get(hashcode) + + def process_labels(self, labels): + if len(labels) > 0: + # check existing and obtained labels for subset-relations + non_min_labels = [] + + for fs in self.node_labels: + if contains(non_min_labels, fs): + continue + + for cs in labels: + if contains(non_min_labels, cs): + continue + + greater = fs if len(fs) > len(cs) else cs + smaller = cs if len(fs) > len(cs) else fs + + if utils.contains_all(greater, smaller): + non_min_labels.append(greater) + + if len(greater) > len(smaller): + # update the DAG + nodes = self.label_nodes_map.get(greater) + + if nodes is not None: + for nd in nodes: + if nd.get_status() == NodeStatus.OPEN: + nd.set_label(smaller) # relabel the node with smaller + self.add_item_to_label_nodes_map(smaller, nd) # add new label to the map + + delete = diff(greater, smaller) + for label in delete: + child = nd.get_children().get(label) + + if child is not None: + child.parents.remove(nd) + nd.get_children().remove(label) + self.clean_up_nodes(child) + + # remove the known non - minimal conflicts + for label in non_min_labels: + labels.remove(label) # labels.removeAll(non_min_labels) + del self.label_nodes_map[label] # non_min_labels.forEach(label_nodesMap::remove) + + # add new labels to the list of labels + self.add_node_labels(labels) + # hsdag.addNodeLabels(labels) + + def clean_up_nodes(self, node: Node): + del self.nodes_lookup[node.path_label] + + if node.status == NodeStatus.OPEN: + node.status = NodeStatus.PRUNED + + # downward clean up + for arcLabel in node.children.keys(): + child = node.children.get(arcLabel) + if child is not None: + self.clean_up_nodes(child) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/fastdiag_labeler.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/fastdiag_labeler.py new file mode 100644 index 0000000..559da19 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/fastdiag_labeler.py @@ -0,0 +1,68 @@ +""" +A Java version of this implementation is available at: +https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/hs/labeler/FastDiagV3Labeler.java +""" + +from dataclasses import dataclass + +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.fastdiag import FastDiag +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.labeler import IHSLabelable, LabelerType, AbstractHSParameters + + +@dataclass +class FastDiagParameters(AbstractHSParameters): + B: list[int] + + def __str__(self): + return f"FastDiagParameters{{C={self.C}, B={self.B}}}" + + +class FastDiagLabeler(FastDiag, IHSLabelable): + """ + HSLabeler for FastDiag algorithm + """ + + def __init__(self, checker: ConsistencyChecker, parameters: FastDiagParameters): + super().__init__(checker) + self.initial_parameters = parameters + + def get_type(self) -> LabelerType: + return LabelerType.DIAGNOSIS + + def get_initial_parameters(self) -> AbstractHSParameters: + return self.initial_parameters + + def get_label(self, parameters: AbstractHSParameters) -> list: + """ + Identifies a diagnosis + """ + assert isinstance(parameters, FastDiagParameters), "parameter must be an instance of FastDiagParameters" + neg_C = [-1 * item for item in parameters.C] + if len(parameters.C) >= 1 \ + and (len(parameters.B) == 0 or self.checker.is_consistent(parameters.B + neg_C, [])): + + diag = self.findDiagnosis(parameters.C, parameters.B) + if len(diag) != 0: + return [diag] + return [] + + def identify_new_node_parameters(self, param_parent_node: AbstractHSParameters, arcLabel: int) \ + -> AbstractHSParameters: + """ + Identifies the new node's parameters on the basis of the parent node's parameters. + """ + assert isinstance(param_parent_node, + FastDiagParameters), "parameter must be an instance of FastDiagParameters" + + C = param_parent_node.C.copy() + C.remove(arcLabel) + B = param_parent_node.B.copy() + B.append(arcLabel) + # D = param_parent_node.D.copy() + # D.append(arcLabel) + + return FastDiagParameters(C, [], B) + + def get_instance(self, checker: ConsistencyChecker): + return FastDiagLabeler(checker, self.initial_parameters) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/labeler.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/labeler.py new file mode 100644 index 0000000..559c5a9 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/labeler.py @@ -0,0 +1,48 @@ +""" +A Java version of this implementation is available at: +https://github.com/HiConfiT/hiconfit-core/tree/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/hs +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum + +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker + + +class LabelerType(Enum): + CONFLICT = 1 + DIAGNOSIS = 2 + + +@dataclass +class AbstractHSParameters: + C: list[int] + D: list[int] + + +class IHSLabelable(ABC): + """ + Interface for the HSDAG's labeler + """ + + @abstractmethod + def get_type(self) -> LabelerType: + pass + + @abstractmethod + def get_initial_parameters(self) -> AbstractHSParameters: + pass + + @abstractmethod + def get_label(self, parameters: AbstractHSParameters) -> list: + pass + + @abstractmethod + def identify_new_node_parameters(self, param_parent_node: AbstractHSParameters, arcLabel: int) \ + -> AbstractHSParameters: + pass + + @abstractmethod + def get_instance(self, checker: ConsistencyChecker): + pass diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/quickxplain_labeler.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/quickxplain_labeler.py new file mode 100644 index 0000000..f41f3f5 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/labeler/quickxplain_labeler.py @@ -0,0 +1,68 @@ +""" +A Java version of this implementation is available at: +https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/hs/labeler/QuickXPlainLabeler.java +""" + +from dataclasses import dataclass + +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.labeler import IHSLabelable, LabelerType, \ + AbstractHSParameters +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.quickxplain import QuickXPlain + + +@dataclass +class QuickXPlainParameters(AbstractHSParameters): + B: list[int] + + def __str__(self): + return f"QuickXPlainParameters{{C={self.C}, B={self.B}}}" + + +class QuickXPlainLabeler(QuickXPlain, IHSLabelable): + """ + HSLabeler for QuickXPlain algorithm + """ + + def __init__(self, checker: ConsistencyChecker, parameters: QuickXPlainParameters): + super().__init__(checker) + self.initial_parameters = parameters + + def get_type(self) -> LabelerType: + return LabelerType.CONFLICT + + def get_initial_parameters(self) -> AbstractHSParameters: + return self.initial_parameters + + def get_label(self, parameters: AbstractHSParameters) -> list: + """ + Identifies a conflict + """ + assert isinstance(parameters, QuickXPlainParameters), "parameter must be an instance of QuickXPlainParameters" + + cs = self.findConflictSet(parameters.C, parameters.B + parameters.D) + + if len(cs) != 0: + # reverse the order of the conflict set + cs.reverse() + return [cs] + return [] + + def identify_new_node_parameters(self, param_parent_node: AbstractHSParameters, arcLabel: int) \ + -> AbstractHSParameters: + """ + Identifies the new node's parameters on the basis of the parent node's parameters. + """ + assert isinstance(param_parent_node, + QuickXPlainParameters), "parameter must be an instance of QuickXPlainParameters" + + C = param_parent_node.C.copy() + C.remove(arcLabel) + B = param_parent_node.B.copy() + D = param_parent_node.D.copy() + D.append(-1 * arcLabel) + + return QuickXPlainParameters(C, D, B) + + def get_instance(self, checker: ConsistencyChecker): + return QuickXPlainLabeler(checker, self.initial_parameters) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/node.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/node.py new file mode 100644 index 0000000..19b8f7b --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/hsdag/node.py @@ -0,0 +1,72 @@ +""" +A Java version of this implementation is available at: +https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/hs/Node.java +""" +from enum import Enum + +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.labeler import AbstractHSParameters + + +class NodeStatus(Enum): + OPEN = 'Open' + CLOSED = 'Closed' + PRUNED = 'Pruned' + CHECKED = 'Checked' # Checked - the label of this node is a Conflict or a Diagnosis + + +class Node(object): + """ + A data structure representing a node of an HS-dag. + """ + generating_node_id = -1 + + def __init__(self, parent: 'Node' = None, arc_label: int = None, parameters: AbstractHSParameters = None): + self.id = Node.generating_node_id = Node.generating_node_id + 1 + self.status = NodeStatus.OPEN # node status + self.label = [] # label of the node - it can be a minimal conflict or a diagnosis or null + self.arc_label = arc_label # the constraint associated to the arch which comes to this node + self.parameters = parameters + self.path_label = None # labels of the path to here + self.parents = None # the node's parents. Can be null for the root node. + self.children = {} # the node's children + + if parent is None: + self.level = 0 # tree level + else: + self.level = parent.level + 1 # tree level + self.parents = [] + self.parents.append(parent) + if parent.path_label is not None: + self.path_label = parent.path_label.copy() + else: + self.path_label = [] + self.path_label.append(arc_label) + + parent.children[arc_label] = self + + @staticmethod + def create_root(label: list[int], parameters: AbstractHSParameters) -> 'Node': + Node.generating_node_id = -1 + + root = Node() + root.label = label + root.parameters = parameters + + return root + + def add_parent(self, parent: 'Node'): + if self.is_root(): + raise ValueError("The root node cannot have parents.") + else: + self.parents.append(parent) + + def add_child(self, arcLabel: int, child: 'Node'): + self.children[arcLabel] = child + child.add_parent(self) + + def is_root(self) -> bool: + return self.parents is None + + def __str__(self): + return f"Node{{id={self.id}, level={self.level}, status={self.status}, label={self.label}, " \ + f"parameter={self.parameters}, arcLabel={self.arc_label}, pathLabels={self.path_label}}}" diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/quickxplain.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/quickxplain.py new file mode 100644 index 0000000..c689189 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/quickxplain.py @@ -0,0 +1,85 @@ +""" +A Java version of this implementation is available at: +https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/QuickXPlain.java +""" + +import logging + +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.utils import split + + +class QuickXPlain: + """ + Implementation of QuickXPlain algorithm + Junker, Ulrich. "Quickxplain: Conflict detection for arbitrary constraint propagation algorithms." + IJCAI’01 Workshop on Modelling and Solving problems with constraints. Vol. 4. 2001. + """ + + def __init__(self, checker: ConsistencyChecker) -> None: + self.checker = checker + + def findConflictSet(self, C: list, B: list) -> list: + """ + // Func QuickXPlain(C={c1,c2,…, cm}, B): CS + // IF consistent(B∪C) return "No conflict"; + // IF isEmpty(C) return Φ; + // ELSE return QX(Φ, C, B); + :param C: a consideration set + :param B: a background knowledge + :return: a conflict set or an empty set + """ + logging.debug(f'quickXPlain [C={C}, B={B}]') + # print(f'quickXPlain [C={C}, B={B}]') + + # if C is empty or consistent(B U C) then return empty set + if len(C) == 0 or self.checker.is_consistent(B + C, []): + logging.debug('return Φ') + # print('return Φ') + return [] + else: # return QX(Φ, C, B) + cs = self.qx([], C, B) + + logging.debug(f'return {cs}') + # print(f'return {cs}') + return cs + + def qx(self, D: list, C: list, B: list) -> list: + """ + // func QX(Δ, C={c1,c2, …, cq}, B): CS + // IF (Δ != Φ AND inconsistent(B)) return Φ; + // IF singleton(C) return C; + // k = q/2; + // C1 <-- {c1, …, ck}; C2 <-- {ck+1, …, cq}; + // CS1 <-- QX(C2, C1, B ∪ C2); + // CS2 <-- QX(CS1, C2, B ∪ CS1); + // return (CS1 ∪ CS2) + :param D: check to skip redundant consistency checks + :param C: a consideration set of constraints + :param B: a background knowledge + :return: a conflict set or an empty set + """ + logging.debug(f'>>> QX [D={D}, C={C}, B={B}]') + + # if D != Φ and inconsistent(B) then return Φ + if len(D) != 0 and not self.checker.is_consistent(B, C): + logging.debug('<<< return Φ') + return [] + + # if C is singleton then return C + if len(C) == 1: + logging.debug(f'<<< return {C}') + return C + + # C1 = {c1..ck}; C2 = {ck+1..cn}; + C1, C2 = split(C) + + # CS1 = QX(C2, C1, B U C2) + CS1 = self.qx(C2, C1, (B + C2)) + # CS2 = QX(CS1, C2, B U CS1) + CS2 = self.qx(CS1, C2, (B + CS1)) + + logging.debug(f'<<< return [CS1={CS1} U CS2={CS2}]') + + # return CS1 U CS2 + return CS1 + CS2 diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/utils.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/utils.py new file mode 100644 index 0000000..07b1389 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/diagnosis/utils.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +"""Provides utility functions.""" + + +def split(C: list) -> (list, list): + """ + Splits the given list of constraints/clauses into two parts. + :param C: a list of clauses + :return: a tuple of two lists + """ + half_size = len(C) // 2 + return C[:half_size], C[half_size:] + + +def diff(x: list, y: list) -> list: + """ + Returns the difference of two lists. + :param x: list + :param y: list + :return: list + """ + return [item for item in x if item not in y] + + +def get_hashcode(C: list) -> str: + """ + Returns the hashcode of the given CNF formula. + :param C: a list of clauses + :return: the hashcode of the given CNF formula + """ + C = sorted(C, key=lambda x: x[0]) + return str(C) + + +def hasIntersection(list1: list, list2: list) -> bool: + return any(i in list1 for i in list2) + + +def contains(listofList: list, aList: list) -> bool: + return any(aList == x for x in listofList) + + +def contains_all(greater: list, smaller: list) -> bool: + return all(i in smaller for i in greater) diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_conflict.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_conflict.py new file mode 100644 index 0000000..430c086 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_conflict.py @@ -0,0 +1,97 @@ +from typing import Any + +from flamapy.core.operations import Operation +from flamapy.metamodels.configuration_metamodel.models import Configuration +from flamapy.core.models import VariabilityModel + +from flamapy.metamodels.pysat_diagnosis_metamodel.models.pysat_diagnosis_model import DiagnosisModel +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.hsdag import HSDAG +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.quickxplain_labeler import \ + QuickXPlainParameters, QuickXPlainLabeler +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker + + +class Glucose3Conflict(Operation): + """ + An operation that computes conflicts and diagnoses using the combination of HSDAG and QuickXPlain algorithms. + Four optional inputs: + - configuration - a configuration to be diagnosed + - test_case - a test case to be used for diagnosis + - max_conflicts - specify the maximum number of conflicts to be computed + - max_depth - specify the maximum depth of the HSDAG to be computed + """ + + def __init__(self) -> None: + self.result = False + self.configuration = None + self.test_case = None + self.solverName = 'glucose3' + self.diagnosis_messages: list[str] = [] + + self.checker = None + self.max_conflicts = -1 # -1 means no limit + self.max_depth = 0 # 0 means no limit + + def set_max_conflicts(self, max_conflicts: int) -> None: + self.max_conflicts = max_conflicts + + def set_max_depth(self, max_depth: int) -> None: + self.max_depth = max_depth + + def get_diagnosis_messages(self) -> list[Any]: + return self.get_result() + + def set_configuration(self, configuration: Configuration) -> None: + self.configuration = configuration + + def set_test_case(self, test_case: Configuration) -> None: + self.test_case = test_case + + def get_result(self) -> list[str]: + return self.diagnosis_messages + + def execute(self, model: VariabilityModel) -> 'Glucose3Conflict': + # transform model to diagnosis model + model.prepare_diagnosis_task(configuration=self.configuration, test_case=self.test_case) + + # print(f'C: {diag_model.get_C()}') + # print(f'B: {diag_model.get_B()}') + + C = model.get_C() + # if self.configuration is None: + # C.reverse() # reverse the list to get the correct order of constraints in the diagnosis messages + + checker = ConsistencyChecker(self.solverName, model.get_KB()) + parameters = QuickXPlainParameters(C, [], model.get_B()) + quickxplain = QuickXPlainLabeler(checker, parameters) + hsdag = HSDAG(quickxplain) + hsdag.max_number_conflicts = self.max_conflicts + hsdag.max_depth = self.max_depth + + hsdag.construct() + + diagnoses = hsdag.get_diagnoses() + conflicts = hsdag.get_conflicts() + + if len(diagnoses) == 0: + diag_mess = 'No diagnosis found' + elif len(diagnoses) == 1: + diag_mess = f'Diagnosis: ' + diag_mess += model.get_pretty_diagnoses(diagnoses) + else: + diag_mess = f'Diagnoses: ' + diag_mess += model.get_pretty_diagnoses(diagnoses) + + if len(conflicts) == 0: + cs_mess = 'No conflicts found' + elif len(conflicts) == 1: + cs_mess = f'Conflict: ' + cs_mess += model.get_pretty_diagnoses(conflicts) + else: + cs_mess = f'Conflicts: ' + cs_mess += model.get_pretty_diagnoses(conflicts) + + self.diagnosis_messages.append(cs_mess) + self.diagnosis_messages.append(diag_mess) + checker.delete() + return self diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_diagnosis.py b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_diagnosis.py new file mode 100644 index 0000000..244c308 --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/operations/glucose3_diagnosis.py @@ -0,0 +1,101 @@ +from typing import Any + +from flamapy.core.operations import Operation +from flamapy.metamodels.configuration_metamodel.models import Configuration +from flamapy.core.models import VariabilityModel + +from flamapy.metamodels.pysat_diagnosis_metamodel.models.pysat_diagnosis_model import DiagnosisModel +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker + +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.hsdag import HSDAG +from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.hsdag.labeler.fastdiag_labeler import FastDiagParameters, \ + FastDiagLabeler + + +class Glucose3Diagnosis(Operation): + """ + An operation that computes diagnoses and conflict sets using the combination of HSDAG and FastDiag algorithms. + Four optional inputs: + - configuration - a configuration to be diagnosed + - test_case - a test case to be used for diagnosis + - max_diagnoses - specify the maximum number of diagnoses to be computed + - max_depth - specify the maximum depth of the HSDAG to be computed + """ + + def __init__(self) -> None: + self.result = False + self.configuration = None + self.test_case = None + self.solverName = 'glucose3' + self.diagnosis_messages: list[str] = [] + + self.checker = None + self.max_diagnoses = -1 # -1 means no limit + self.max_depth = 0 # 0 means no limit + + def set_max_diagnoses(self, max_diagnoses: int) -> None: + self.max_diagnoses = max_diagnoses + + def set_max_depth(self, max_depth: int) -> None: + self.max_depth = max_depth + + def get_diagnosis_messages(self) -> list[Any]: + return self.get_result() + + def set_configuration(self, configuration: Configuration) -> None: + self.configuration = configuration + + def set_test_case(self, test_case: Configuration) -> None: + self.test_case = test_case + + def is_valid(self) -> bool: + pass + + def get_result(self) -> list[str]: + return self.diagnosis_messages + + def execute(self, model: VariabilityModel) -> 'Glucose3Diagnosis': + # transform model to diagnosis model + model.prepare_diagnosis_task(configuration=self.configuration, test_case=self.test_case) + + print(f'C: {model.get_C()}') + print(f'B: {model.get_B()}') + + C = model.get_C() + # if self.configuration is None: + # C.reverse() # reverse the list to get the correct order of diagnosis + + checker = ConsistencyChecker(self.solverName, model.get_KB()) + parameters = FastDiagParameters(C, [], model.get_B()) + fastdiag = FastDiagLabeler(checker, parameters) + hsdag = HSDAG(fastdiag) + hsdag.max_number_diagnoses = self.max_diagnoses + hsdag.max_depth = self.max_depth + + hsdag.construct() + + diagnoses = hsdag.get_diagnoses() + conflicts = hsdag.get_conflicts() + + if len(diagnoses) == 0: + diag_mess = 'No diagnosis found' + elif len(diagnoses) == 1: + diag_mess = f'Diagnosis: ' + diag_mess += model.get_pretty_diagnoses(diagnoses) + else: + diag_mess = f'Diagnoses: ' + diag_mess += model.get_pretty_diagnoses(diagnoses) + + if len(conflicts) == 0: + cs_mess = 'No conflicts found' + elif len(conflicts) == 1: + cs_mess = f'Conflict: ' + cs_mess += model.get_pretty_diagnoses(conflicts) + else: + cs_mess = f'Conflicts: ' + cs_mess += model.get_pretty_diagnoses(conflicts) + + self.diagnosis_messages.append(diag_mess) + self.diagnosis_messages.append(cs_mess) + checker.delete() + return self diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/__init__.py b/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/__init__.py new file mode 100644 index 0000000..55ce70b --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/__init__.py @@ -0,0 +1,9 @@ +from .fm_to_diag_pysat import FmToDiagPysat + + +__all__ = [ + 'FmToPysat', + 'CNFReader', + 'DimacsReader', + 'DimacsWriter' +] diff --git a/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/fm_to_diag_pysat.py b/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/fm_to_diag_pysat.py new file mode 100644 index 0000000..3ed224e --- /dev/null +++ b/flamapy/metamodels/pysat_diagnosis_metamodel/transformations/fm_to_diag_pysat.py @@ -0,0 +1,39 @@ +import itertools +from typing import Any, List + +from flamapy.core.transformations import ModelToModel +from flamapy.metamodels.fm_metamodel.models.feature_model import ( + FeatureModel, + Constraint, + Feature, + Relation, +) +from flamapy.metamodels.pysat_diagnosis_metamodel.models.pysat_diagnosis_model import DiagnosisModel +from flamapy.metamodels.pysat_metamodel.transformations.fm_to_pysat import FmToPysat + +class FmToDiagPysat(FmToPysat): + @staticmethod + def get_source_extension() -> str: + return 'fm' + + @staticmethod + def get_destination_extension() -> str: + return 'pysat_diagnosis' + + def __init__(self, source_model: FeatureModel) -> None: + self.source_model = source_model + self.counter = 1 + self.destination_model = DiagnosisModel() + # self.r_cnf = self.destination_model.r_cnf + # self.ctc_cnf = self.destination_model.ctc_cnf + + def add_root(self, feature: Feature) -> None: + #self.r_cnf.append([self.destination_model.variables.get(feature.name)]) + self.destination_model.add_clause([self.destination_model.variables.get(feature.name)]) + print(self.destination_model.__class__) + self.destination_model.add_clause_toMap(str(feature), [[self.destination_model.variables.get(feature.name)]]) + + def _store_constraint_relation(self, relation: Relation, clauses:List[List[int]]) -> None: + for clause in clauses: + self.destination_model.add_clause(clause) + self.destination_model.add_clause_toMap(str(relation), clauses) \ No newline at end of file diff --git a/flamapy/metamodels/pysat_metamodel/models/txtcnf_model.py b/flamapy/metamodels/pysat_metamodel/models/txtcnf_model.py index c396a13..0ec2dc9 100644 --- a/flamapy/metamodels/pysat_metamodel/models/txtcnf_model.py +++ b/flamapy/metamodels/pysat_metamodel/models/txtcnf_model.py @@ -201,7 +201,7 @@ def extract_variables(cnf_formula: str) -> list[str]: clauses[len(clauses) - 1] = clauses[len(clauses) - 1][:-1] for clause in clauses: - tokens = clause.split(' ') + tokens = flamapy.metamodels.pysat_metamodel.operations.diagnosis.utils.split(' ') tokens = list(filter(lambda t: t != cnf_notation.value[CNFLogicConnective.OR], tokens)) for feature in tokens: if feature == cnf_notation.value[CNFLogicConnective.NOT]: diff --git a/flamapy/metamodels/pysat_metamodel/transformations/cnf_to_pysat.py b/flamapy/metamodels/pysat_metamodel/transformations/cnf_to_pysat.py index 090828b..b4ab634 100644 --- a/flamapy/metamodels/pysat_metamodel/transformations/cnf_to_pysat.py +++ b/flamapy/metamodels/pysat_metamodel/transformations/cnf_to_pysat.py @@ -61,7 +61,7 @@ def _extract_clauses(self, cnf_formula: str, cnf_notation: TextCNFNotation) -> N clauses[len(clauses) - 1] = clauses[len(clauses) - 1][:-1] for _c in clauses: - tokens = _c.split(' ') + tokens = flamapy.metamodels.pysat_metamodel.operations.diagnosis.utils.split(' ') tokens = list(filter(lambda t: t != cnf_notation.value[CNFLogicConnective.OR], tokens)) logic_not = False cnf_clause = [] diff --git a/flamapy/metamodels/pysat_metamodel/transformations/fm_to_pysat.py b/flamapy/metamodels/pysat_metamodel/transformations/fm_to_pysat.py index fef9357..b2e8a07 100644 --- a/flamapy/metamodels/pysat_metamodel/transformations/fm_to_pysat.py +++ b/flamapy/metamodels/pysat_metamodel/transformations/fm_to_pysat.py @@ -1,5 +1,5 @@ import itertools -from typing import Any +from typing import Any, List from flamapy.core.transformations import ModelToModel from flamapy.metamodels.fm_metamodel.models.feature_model import ( @@ -37,84 +37,117 @@ def add_root(self, feature: Feature) -> None: # self.r_cnf.append([self.destination_model.variables.get(feature.name)]) value = self.destination_model.get_variable(feature.name) self.destination_model.add_clause([value]) - - def add_relation(self, relation: Relation) -> None: # noqa: MC0001 + + def _add_mandatory_relation(self, relation: Relation) -> List[List[int]]: + value_parent = self.destination_model.get_variable(relation.parent.name) + value_child = self.destination_model.get_variable(relation.children[0].name) + clauses = [[-1 * value_parent, value_child], [-1 * value_child, value_parent]] + return clauses + + def _add_optional_relation(self, relation: Relation) -> List[List[int]]: value_parent = self.destination_model.get_variable(relation.parent.name) value_children = self.destination_model.get_variable(relation.children[0].name) - if relation.is_mandatory(): - self.destination_model.add_clause([-1 * value_parent, value_children]) - self.destination_model.add_clause([-1 * value_children, value_parent]) + clauses =[[-1 * value_children, value_parent]] + return clauses - elif relation.is_optional(): - self.destination_model.add_clause([-1 * value_children, value_parent]) - - elif relation.is_or(): # this is a 1 to n relatinship with multiple childs - # add the first cnf child1 or child2 or ... or childN or no parent) - # first elem of the constraint - alt_cnf = [-1 * value_parent] - for child in relation.children: - alt_cnf.append(self.destination_model.get_variable(child.name)) - self.destination_model.add_clause(alt_cnf) - - for child in relation.children: - self.destination_model.add_clause([ - -1 * self.destination_model.get_variable(child.name), - value_parent - ]) - - # TODO: fix too many nested blocks - elif relation.is_alternative(): # pylint: disable=too-many-nested-blocks - # this is a 1 to 1 relatinship with multiple childs - # add the first cnf child1 or child2 or ... or childN or no parent) - - # first elem of the constraint - alt_cnf = [-1 * value_parent] - for child in relation.children: - alt_cnf.append(self.destination_model.get_variable(child.name)) - self.destination_model.add_clause(alt_cnf) - - for i, _ in enumerate(relation.children): - for j in range(i + 1, len(relation.children)): - if i != j: - self.destination_model.add_clause([ - -1 * self.destination_model.get_variable(relation.children[i].name), - -1 * self.destination_model.get_variable(relation.children[j].name) - ]) - self.destination_model.add_clause([ - -1 * self.destination_model.get_variable(relation.children[i].name), - value_parent - ]) + def _add_or_relation(self, relation: Relation) -> List[List[int]]: + # this is a 1 to n relatinship with multiple childs + # add the first cnf child1 or child2 or ... or childN or no parent) + # first elem of the constraint + value_parent = self.destination_model.get_variable(relation.parent.name) - else: - # This is a _min to _max relationship - _min = relation.card_min - _max = relation.card_max - for val in range(len(relation.children) + 1): - if val < _min or val > _max: - # combinations of val elements - for combination in itertools.combinations(relation.children, val): - cnf = [-1 * value_parent] - for feat in relation.children: - if feat in combination: - cnf.append(-1 * self.destination_model.get_variable(feat.name)) - else: - cnf.append(self.destination_model.get_variable(feat.name)) - self.destination_model.add_clause(cnf) - - # there is a special case when coping with the upper part of the thru table - # In the case of allowing 0 childs, you cannot exclude the option in that - # no feature in this relation is activated - for val in range(1, len(relation.children) + 1): + alt_cnf = [-1 * value_parent] + for child in relation.children: + alt_cnf.append(self.destination_model.get_variable(child.name)) + clauses=[alt_cnf] + + for child in relation.children: + clauses.append([ + -1 * self.destination_model.get_variable(child.name), + value_parent + ]) + + return clauses + + def _add_alternative_relation(self, relation: Relation) -> List[List[int]]: + # pylint: disable=too-many-nested-blocks + # this is a 1 to 1 relatinship with multiple childs + # add the first cnf child1 or child2 or ... or childN or no parent) + + value_parent = self.destination_model.get_variable(relation.parent.name) + # first elem of the constraint + alt_cnf = [-1 * value_parent] + for child in relation.children: + alt_cnf.append(self.destination_model.get_variable(child.name)) + clauses=[alt_cnf] + + for i, _ in enumerate(relation.children): + for j in range(i + 1, len(relation.children)): + if i != j: + clauses.append([ + -1 * self.destination_model.get_variable(relation.children[i].name), + -1 * self.destination_model.get_variable(relation.children[j].name) + ]) + clauses.append([ + -1 * self.destination_model.get_variable(relation.children[i].name), + value_parent + ]) + return clauses + + def _add_constraint_relation(self, relation: Relation) -> List[List[int]]: + value_parent = self.destination_model.get_variable(relation.parent.name) + + # This is a _min to _max relationship + _min = relation.card_min + _max = relation.card_max + clauses = [] + + for val in range(len(relation.children) + 1): + if val < _min or val > _max: + # combinations of val elements for combination in itertools.combinations(relation.children, val): - cnf = [value_parent] + cnf = [-1 * value_parent] for feat in relation.children: if feat in combination: cnf.append(-1 * self.destination_model.get_variable(feat.name)) else: cnf.append(self.destination_model.get_variable(feat.name)) - self.destination_model.add_clause(cnf) + clauses.append(cnf) + + # there is a special case when coping with the upper part of the thru table + # In the case of allowing 0 childs, you cannot exclude the option in that + # no feature in this relation is activated + for val in range(1, len(relation.children) + 1): + + for combination in itertools.combinations(relation.children, val): + cnf = [value_parent] + for feat in relation.children: + if feat in combination: + cnf.append(-1 * self.destination_model.get_variable(feat.name)) + else: + cnf.append(self.destination_model.get_variable(feat.name)) + clauses.append(cnf) + + def _store_constraint_relation(self, relation: Relation, clauses:List[List[int]]) -> None: + for clause in clauses: + self.destination_model.add_clause(clause) + + def add_relation(self, relation: Relation) -> None: # noqa: MC0001 + + if relation.is_mandatory(): + clauses = self._add_mandatory_relation(relation) + elif relation.is_optional(): + clauses = self._add_optional_relation(relation) + elif relation.is_or(): + clauses = self._add_or_relation(relation) + elif relation.is_alternative(): + clauses = self._add_alternative_relation(relation) + else: + clauses = self._add_constraint_relation(relation) + + self._store_constraint_relation(relation,clauses) def add_constraint(self, ctc: Constraint) -> None: def get_term_variable(term: Any) -> int: diff --git a/tests/resources/pizzas.fide b/tests/resources/pizzas.fide new file mode 100644 index 0000000..bb2d5cd --- /dev/null +++ b/tests/resources/pizzas.fide @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + + + + + + + + + + + CheesyCrust + Big + + + + diff --git a/tests/resources/pizzas_nonvalid.conf b/tests/resources/pizzas_nonvalid.conf new file mode 100644 index 0000000..460580f --- /dev/null +++ b/tests/resources/pizzas_nonvalid.conf @@ -0,0 +1,2 @@ +Big,True +Normal,True diff --git a/tests/resources/pizzas_nonvalid.csvconf b/tests/resources/pizzas_nonvalid.csvconf new file mode 100644 index 0000000..460580f --- /dev/null +++ b/tests/resources/pizzas_nonvalid.csvconf @@ -0,0 +1,2 @@ +Big,True +Normal,True diff --git a/tests/resources/smartwatch_consistent.fide b/tests/resources/smartwatch_consistent.fide new file mode 100644 index 0000000..f846acd --- /dev/null +++ b/tests/resources/smartwatch_consistent.fide @@ -0,0 +1,44 @@ + + + + + + + + + + + + + + + + + + + + + + + Camera + High Resolution + + + + + Compass + GPS + + + + + + Analog + + + Cellular + + + + + diff --git a/tests/resources/smartwatch_deadfeature.fide b/tests/resources/smartwatch_deadfeature.fide new file mode 100644 index 0000000..e17d91e --- /dev/null +++ b/tests/resources/smartwatch_deadfeature.fide @@ -0,0 +1,50 @@ + + + + + + + + + + + + + + + + + + + + + + + Camera + High Resolution + + + + + Compass + GPS + + + + + + Analog + + + Cellular + + + + + + Smartwatch + Analog + + + + diff --git a/tests/resources/smartwatch_inconsistent.fide b/tests/resources/smartwatch_inconsistent.fide new file mode 100644 index 0000000..20f5d4d --- /dev/null +++ b/tests/resources/smartwatch_inconsistent.fide @@ -0,0 +1,56 @@ + + + + + + + + + + + + + + + + + + + + + + + Camera + High Resolution + + + + + Compass + GPS + + + + + + Analog + + + Cellular + + + + + + Smartwatch + Cellular + + + + + Smartwatch + Analog + + + + diff --git a/tests/resources/smartwatch_nonvalid.csvconf b/tests/resources/smartwatch_nonvalid.csvconf new file mode 100644 index 0000000..35c1044 --- /dev/null +++ b/tests/resources/smartwatch_nonvalid.csvconf @@ -0,0 +1,2 @@ +Analog,True +E-ink,True diff --git a/tests/resources/smartwatch_testcase.csvconf b/tests/resources/smartwatch_testcase.csvconf new file mode 100644 index 0000000..4bc2f0a --- /dev/null +++ b/tests/resources/smartwatch_testcase.csvconf @@ -0,0 +1 @@ +E-ink,True diff --git a/tests/unit_tests/diagnosis_test.py b/tests/unit_tests/diagnosis_test.py new file mode 100644 index 0000000..68f6f06 --- /dev/null +++ b/tests/unit_tests/diagnosis_test.py @@ -0,0 +1,187 @@ +import unittest + +from flamapy.metamodels.configuration_metamodel.transformations import ConfigurationBasicReader +from flamapy.metamodels.fm_metamodel.transformations import FeatureIDEReader + +from flamapy.metamodels.pysat_metamodel.transformations import FmToPysat + +from flamapy.metamodels.pysat_metamodel.operations import Glucose3Diagnosis, Glucose3Conflict + + +# def test_with_DiscoverMetamodels(): +# """ +# Won't work because the configuration is not set +# We want an optional configuration +# In fact, Glucose3FastDiag implements ValidConfiguration, so it requires a configuration. +# """ +# dm = DiscoverMetamodels() +# result = dm.use_operation_from_file("Glucose3FastDiag", "../resources/smartwatch_inconsistent.fide") +# print(result) +# result = dm.use_operation_from_file("Glucose3FastDiag", "../resources/smartwatch_consistent.fide") +# print(result) +# assert result == ['Diagnosis: [[-8, -4]]'] + + +def test_fastdiag_all(): + """ + Identify all diagnoses + """ + feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() + model = FmToPysat(feature_model).transform() + + hsdag_fastdiag = Glucose3Diagnosis() + hsdag_fastdiag.execute(model) + result = hsdag_fastdiag.get_result() + + print(result) + assert result == ['Diagnoses: [(5) IMPLIES[Smartwatch][Analog]],[(4) IMPLIES[Smartwatch][Cellular]],[(3) OR[NOT[Analog][]][NOT[Cellular][]]]', + 'Conflict: [(5) IMPLIES[Smartwatch][Analog], (4) IMPLIES[Smartwatch][Cellular], (3) OR[NOT[Analog][]][NOT[Cellular][]]]'] + + +def test_fastdiag_one(): + """ + Identify one diagnosis + """ + feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() + model = FmToPysat(feature_model).transform() + + hsdag_fastdiag = Glucose3Diagnosis() + hsdag_fastdiag.max_diagnoses = 1 + hsdag_fastdiag.execute(model) + result = hsdag_fastdiag.get_result() + + print(result) + assert result == ['Diagnosis: [(5) IMPLIES[Smartwatch][Analog]]', + 'No conflicts found'] + + +def test_fastdiag_two(): + """ + Identify two diagnoses + """ + feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() + model = FmToPysat(feature_model).transform() + + hsdag_fastdiag = Glucose3Diagnosis() + hsdag_fastdiag.max_diagnoses = 2 + hsdag_fastdiag.execute(model) + result = hsdag_fastdiag.get_result() + + print(result) + assert result == ['Diagnoses: [(5) IMPLIES[Smartwatch][Analog]],[(4) IMPLIES[Smartwatch][Cellular]]', + 'No conflicts found'] + + +def test_quickxplain_all(): + feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() + model = FmToPysat(feature_model).transform() + + hsdag_quickxplain = Glucose3Conflict() + hsdag_quickxplain.execute(model) + result = hsdag_quickxplain.get_result() + + print(result) + assert result == [ + 'Conflict: [(5) IMPLIES[Smartwatch][Analog], (4) IMPLIES[Smartwatch][Cellular], (3) OR[NOT[Analog][]][NOT[Cellular][]]]', + 'Diagnoses: [(5) IMPLIES[Smartwatch][Analog]],[(4) IMPLIES[Smartwatch][Cellular]],[(3) OR[NOT[Analog][]][NOT[Cellular][]]]'] + + +def test_quickxplain_one(): + feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() + model = FmToPysat(feature_model).transform() + + hsdag_quickxplain = Glucose3Conflict() + hsdag_quickxplain.max_conflicts = 1 + hsdag_quickxplain.execute(model) + result = hsdag_quickxplain.get_result() + + print(result) + assert result == [ + 'Conflict: [(5) IMPLIES[Smartwatch][Analog], (4) IMPLIES[Smartwatch][Cellular], (3) OR[NOT[Analog][]][NOT[Cellular][]]]', + 'No diagnosis found'] + + +def test_quickxplain_two(): + feature_model = FeatureIDEReader("../resources/smartwatch_inconsistent.fide").transform() + model = FmToPysat(feature_model).transform() + + hsdag_quickxplain = Glucose3Conflict() + hsdag_quickxplain.max_conflicts = 2 + hsdag_quickxplain.execute(model) + result = hsdag_quickxplain.get_result() + + print(result) + assert result == [ + 'Conflict: [(5) IMPLIES[Smartwatch][Analog], (4) IMPLIES[Smartwatch][Cellular], (3) OR[NOT[Analog][]][NOT[Cellular][]]]', + 'Diagnoses: [(5) IMPLIES[Smartwatch][Analog]],[(4) IMPLIES[Smartwatch][Cellular]],[(3) OR[NOT[Analog][]][NOT[Cellular][]]]'] + + +def test_fastdiag_with_configuration(): + feature_model = FeatureIDEReader("../resources/smartwatch_consistent.fide").transform() + model = FmToPysat(feature_model).transform() + + configuration = ConfigurationBasicReader("../resources/smartwatch_nonvalid.csvconf").transform() + + hsdag_fastdiag = Glucose3Diagnosis() + hsdag_fastdiag.set_configuration(configuration) + hsdag_fastdiag.execute(model) + result = hsdag_fastdiag.get_result() + + print(result) + assert result == ['Diagnoses: [E-ink = true],[Analog = true]', + 'Conflict: [E-ink = true, Analog = true]'] + + +def test_quickxplain_with_configuration(): + feature_model = FeatureIDEReader("../resources/smartwatch_consistent.fide").transform() + model = FmToPysat(feature_model).transform() + + configuration = ConfigurationBasicReader("../resources/smartwatch_nonvalid.csvconf").transform() + + hsdag_quickxplain = Glucose3Conflict() + hsdag_quickxplain.set_configuration(configuration) + hsdag_quickxplain.execute(model) + result = hsdag_quickxplain.get_result() + + print(result) + assert result == ['Conflict: [E-ink = true, Analog = true]', 'Diagnoses: [E-ink = true],[Analog = true]'] + + +def test_fastdiag_with_test_case(): + feature_model = FeatureIDEReader("../resources/smartwatch_deadfeature.fide").transform() + model = FmToPysat(feature_model).transform() + + test_case = ConfigurationBasicReader("../resources/smartwatch_testcase.csvconf").transform() + + hsdag_fastdiag = Glucose3Diagnosis() + hsdag_fastdiag.set_test_case(test_case) + hsdag_fastdiag.execute(model) + result = hsdag_fastdiag.get_result() + + print(result) + assert result == ['Diagnoses: [(4) IMPLIES[Smartwatch][Analog]],' + '[(alternative) Screen[1,1]Analog High Resolution E-ink ]', + 'Conflict: [(4) IMPLIES[Smartwatch][Analog], ' + '(alternative) Screen[1,1]Analog High Resolution E-ink ]'] + + +def test_quickxplain_with_testcase(): + feature_model = FeatureIDEReader("../resources/smartwatch_deadfeature.fide").transform() + model = FmToPysat(feature_model).transform() + + test_case = ConfigurationBasicReader("../resources/smartwatch_testcase.csvconf").transform() + + hsdag_quickxplain = Glucose3Conflict() + hsdag_quickxplain.set_test_case(test_case) + hsdag_quickxplain.execute(model) + result = hsdag_quickxplain.get_result() + + print(result) + assert result == ['Conflict: [(4) IMPLIES[Smartwatch][Analog], ' + '(alternative) Screen[1,1]Analog High Resolution E-ink ]', + 'Diagnoses: [(4) IMPLIES[Smartwatch][Analog]],' + '[(alternative) Screen[1,1]Analog High Resolution E-ink ]'] + + +if __name__ == '__main__': + unittest.main()