Skip to content

Commit

Permalink
feature: Adding a new metamodel to perform feature model diagnosis
Browse files Browse the repository at this point in the history
* Fix typo

* Ignore .idea

* Add test feature models

* Add ConsistencyChecker and new version of PySATModel supporting diagnosis tasks

* Functions return a list instead of a CNF

* Add FastDiag and QuickXPlain

* Add documentation

* Migrate to new package

* Support Configuration

* Support Configuration

* Clean up

* improve diagnosis messages

* use assumptions in solver.solve()

* done diagnosis with assumptions and better result messages

* Add HSDAG

* Remove D

* Add documentation

* Fix bug

* Fix typo

* fix: refactoring to optimize code in execution. Only save names when needed for diagnosis

* fix: finishing the separation, still need to workout the method call

* fix: decouping diagnosis task from basic pysat operations

---------

Co-authored-by: José A. Galindo <[email protected]>
  • Loading branch information
manleviet and jagalindo authored Aug 23, 2023
1 parent 35a2167 commit 7996e2d
Show file tree
Hide file tree
Showing 34 changed files with 1,730 additions and 70 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
__pycache__
*.egg-info*
env
/.idea/
/build/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# pysat_metamodel

This repository will host the pysat metamode and its operation implementation
This repository will host the pysat metamodel and its operation implementation


## Install for development
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .pysat_diagnosis_model import DiagnosisModel

__all__ = [
'DiagnosisModel',
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from typing import List, Dict, Tuple

from flamapy.metamodels.configuration_metamodel.models import Configuration
from flamapy.metamodels.pysat_metamodel.models import PySATModel


class DiagnosisModel(PySATModel):
"""
This is a new version of the PySATModel class to support the following tasks:
1. Diagnosis Task
If a configuration is given:
C = configuration
B = {f0 = true} + CF (i.e., = PySATModel)
else (no configuration is given):
a. Diagnosis the feature model
C = CF (i.e., = PySATModel - {f0 = true})
B = {f0 = true}
b. Diagnosis the error
C = CF (i.e., = PySATModel - {f0 = true})
B = {f0 = true} + test_case
where test_case is the following:
+ Dead feature: test_case = {fi = true}
+ False optional feature: test_case = {f_parent = true} & {f_child = false}
2. Redundancy Detection Task (need negative constraints)
C = CF (i.e., = PySATModel - {f0 = true})
B = {}
"""

@staticmethod
def get_extension() -> str:
return 'pysat_diagnosis'

def __init__(self) -> None:
super().__init__()
self.C = None # set of constraints which could be faulty
self.B = None # background knowledge (i.e., the knowledge that is known to be true)

self.KB = None # set of all CNF with added assumptions
self.constraint_map: list[(str, list[list[int]])] = [] # map clauses to relationships/constraint

self.constraint_assumption_map = None

def add_clause_toMap(self, description: str, clauses: list[list[int]]) -> None:
self.constraint_map.append((description, clauses))

def get_C(self) -> list:
return self.C

def get_B(self) -> list:
return self.B

def get_KB(self) -> list:
return self.KB

def get_pretty_diagnoses(self, assumptions: list[list[int]]) -> str:
diagnoses = []
for ass in assumptions:
diag = []
for item in ass:
if self.constraint_assumption_map[item]:
diag.append(self.constraint_assumption_map[item])
diagnoses.append(f"[{', '.join(diag)}]")

return ','.join(diagnoses)

def prepare_diagnosis_task(self, configuration: Configuration = None, test_case: Configuration = None) -> None:
"""
Execute this method after the model is built.
If a configuration is given:
C = configuration
B = {f0 = true} + CF (i.e., = PySATModel)
else (no configuration is given):
a. Diagnosis the feature model
C = CF (i.e., = PySATModel - {f0 = true})
B = {f0 = true}
b. Diagnosis the error
C = CF (i.e., = PySATModel - {f0 = true})
B = {f0 = true} + test_case
where test_case is the following:
+ Dead feature: test_case = {fi = true}
+ False optional feature: test_case = {f_parent = true} & {f_child = false}
"""
if configuration is not None:
# C = configuration
# B = {f0 = true} + CF (i.e., = PySATModel)
self.C, self.B, self.KB, self.constraint_assumption_map = \
self.prepare_assumptions(configuration=configuration)
else:
if test_case is None:
# Diagnosis the feature model
# C = CF (i.e., = PySATModel - {f0 = true})
# B = {f0 = true}
self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions()
else:
# Diagnosis the error
# C = CF (i.e., = PySATModel - {f0 = true})
# B = {f0 = true} + test_case
self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions(test_case=test_case)

def prepare_redundancy_detection_task(self) -> None:
"""
This function prepares the model for WipeOutR algorithm.
Execute this method after the model is built.
C = CF (i.e., = PySATModel - {f0 = true})
B = {}
"""
# C = CF (i.e., = PySATModel - {f0 = true})
self.C = self.prepare_assumptions
self.B = [] # B = {}
# ToDo: TBD

def prepare_assumptions(self, configuration: Configuration = None, test_case: Configuration = None) \
-> Tuple[List, List, List, Dict]:
assumption = []
KB = []
constraint_assumption_map = {}

id_assumption = len(self.variables) + 1
id_assumption = self.prepare_assumptions_for_KB(KB, assumption, constraint_assumption_map, id_assumption)

start_id_configuration = len(assumption)
if configuration is not None:
constraint_assumption_map = {} # reset the map
id_assumption = self.prepare_assumptions_for_configuration(KB, assumption, configuration,
constraint_assumption_map,
id_assumption)

start_id_test_case = len(assumption)
if test_case is not None:
self.prepare_assumptions_for_configuration(KB, assumption, test_case,
constraint_assumption_map,
id_assumption)

if configuration is not None:
B = assumption[:start_id_configuration]
C = assumption[start_id_configuration:]
else:
if test_case is not None:
B = [assumption[0]] + assumption[start_id_test_case:]
C = assumption[1:start_id_test_case]
else:
B = [assumption[0]]
C = assumption[1:]

return C, B, KB, constraint_assumption_map

def prepare_assumptions_for_KB(self, KB, assumption, constraint_assumption_map, id_assumption):
c_map = self.constraint_map
# loop through all tuples in the constraint map
for i in range(len(c_map)):
# get description
desc = c_map[i][0]
# get clauses
clauses = c_map[i][1]
# loop through all variables in the constraint
for j in range(len(clauses)):
# get each clause
clause = clauses[j]
# add the assumption variable to the clause
# assumption => clause
# i.e., -assumption v clause
clause.append(-1 * id_assumption)

assumption.append(id_assumption)
KB.extend(clauses)
constraint_assumption_map[id_assumption] = desc

id_assumption += 1

return id_assumption

def prepare_assumptions_for_configuration(self, KB, assumption, configuration, constraint_assumption_map,
id_assumption):
config = [feat.name for feat in configuration.elements]
for feat in config:
if feat not in self.variables.keys():
raise Exception(f'Feature {feat} is not in the model.')

for feat in configuration.elements.items():
desc = ''
clause = []

if feat[1]:
desc = f'{feat[0].name} = true'
clause = [self.variables[feat[0].name], -1 * id_assumption]
elif not feat[1]:
desc = f'{feat[0].name} = false'
clause = [-1 * self.variables[feat[0].name], -1 * id_assumption]

assumption.append(id_assumption)
KB.append(clause)
constraint_assumption_map[id_assumption] = desc

id_assumption += 1

return id_assumption
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .glucose3_conflict import Glucose3Conflict
from .glucose3_diagnosis import Glucose3Diagnosis


__all__ = [
'Glucose3Diagnosis',
'Glucose3Conflict'
]
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
A Java version of this implementation is available at:
https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/checker/ChocoConsistencyChecker.java
"""

from pysat.solvers import Solver


class ConsistencyChecker:

def __init__(self, solverName: str, KB: list) -> None:
self.solver = None
self.result = False

self.solver = Solver(solverName, bootstrap_with=KB)

def is_consistent(self, C: list, Δ: list) -> bool:
"""
Check if the given CNF formula is consistent using a solver.
:param C: a list of assumptions should be added to the CNF formula
:param Δ: a list of assumptions should not be added to the CNF formula
:return: a boolean value indicating whether the given CNF formula is consistent
"""
assumptions = C + [-1 * item for item in Δ]
self.result = self.solver.solve(assumptions=assumptions)
# print(f"assumptions: {assumptions} - result: {self.result}")
return self.result

def delete(self):
self.solver.delete()
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
A Java version of this implementation is available at:
https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/FastDiagV3.java
"""

import logging

from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.utils import split, diff
from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker


class FastDiag:
"""
Implementation of MSS-based FastDiag algorithm.
Le, V. M., Silva, C. V., Felfernig, A., Benavides, D., Galindo, J., & Tran, T. N. T. (2023).
FastDiagP: An Algorithm for Parallelized Direct Diagnosis.
arXiv preprint arXiv:2305.06951.
"""

def __init__(self, checker: ConsistencyChecker) -> None:
self.checker = checker

def findDiagnosis(self, C: list, B: list) -> list:
"""
Activate FastDiag algorithm if there exists at least one constraint,
which induces an inconsistency in B. Otherwise, it returns an empty set.
// Func FastDiag(C, B) : Δ
// if isEmpty(C) or consistent(B U C) return Φ
// else return C \\ FD(Φ, C, B)
:param C: a consideration set of constraints
:param B: a background knowledge
:return: a diagnosis or an empty set
"""
logging.debug(f'fastDiag [C={C}, B={B}]')
# print(f'fastDiag [C={C}, B={B}]')

# if isEmpty(C) or consistent(B U C) return Φ
if len(C) == 0 or self.checker.is_consistent(B + C, []):
logging.debug('return Φ')
# print('return Φ')
return []
else: # return C \ FD(C, B, Φ)
mss = self.fd([], C, B)
diag = diff(C, mss)

logging.debug(f'return {diag}')
# print(f'return {diag}')
return diag

def fd(self, Δ: list, C: list, B: list) -> list:
"""
The implementation of MSS-based FastDiag algorithm.
The algorithm determines a maximal satisfiable subset MSS (Γ) of C U B.
// Func FD(Δ, C = {c1..cn}, B) : MSS
// if Δ != Φ and consistent(B U C) return C;
// if singleton(C) return Φ;
// k = n/2;
// C1 = {c1..ck}; C2 = {ck+1..cn};
// Δ1 = FD(C2, C1, B);
// Δ2 = FD(C1 - Δ1, C2, B U Δ1);
// return Δ1 ∪ Δ2;
:param Δ: check to skip redundant consistency checks
:param C: a consideration set of constraints
:param B: a background knowledge
:return: a maximal satisfiable subset MSS of C U B
"""
logging.debug(f'>>> FD [Δ={Δ}, C={C}, B={B}]')

# if Δ != Φ and consistent(B U C) return C;
if len(Δ) != 0 and self.checker.is_consistent(B + C, Δ):
logging.debug(f'<<< return {C}')
return C

# if singleton(C) return Φ;
if len(C) == 1:
logging.debug('<<< return Φ')
return []

# C1 = {c1..ck}; C2 = {ck+1..cn};
C1, C2 = split(C)

# Δ1 = FD(C2, C1, B);
Δ1 = self.fd(C2, C1, B)
# Δ2 = FD(C1 - Δ1, C2, B U Δ1);
C1withoutΔ1 = diff(C1, Δ1)
Δ2 = self.fd(C1withoutΔ1, C2, B + Δ1)

logging.debug('<<< return [Δ1={Δ1} ∪ Δ2={Δ2}]')

# return Δ1 + Δ2
return Δ1 + Δ2
Empty file.
Loading

0 comments on commit 7996e2d

Please sign in to comment.