Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnosis #56

Merged
merged 24 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
__pycache__
*.egg-info*
env
/.idea/
/build/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# pysat_metamodel

This repository will host the pysat metamode and its operation implementation
This repository will host the pysat metamodel and its operation implementation


## Install for development
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .pysat_diagnosis_model import DiagnosisModel

__all__ = [
'DiagnosisModel',
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from typing import List, Dict, Tuple

from flamapy.metamodels.configuration_metamodel.models import Configuration
from flamapy.metamodels.pysat_metamodel.models import PySATModel


class DiagnosisModel(PySATModel):
"""
This is a new version of the PySATModel class to support the following tasks:
1. Diagnosis Task
If a configuration is given:
C = configuration
B = {f0 = true} + CF (i.e., = PySATModel)
else (no configuration is given):
a. Diagnosis the feature model
C = CF (i.e., = PySATModel - {f0 = true})
B = {f0 = true}
b. Diagnosis the error
C = CF (i.e., = PySATModel - {f0 = true})
B = {f0 = true} + test_case
where test_case is the following:
+ Dead feature: test_case = {fi = true}
+ False optional feature: test_case = {f_parent = true} & {f_child = false}
2. Redundancy Detection Task (need negative constraints)
C = CF (i.e., = PySATModel - {f0 = true})
B = {}
"""

@staticmethod
def get_extension() -> str:
return 'pysat_diagnosis'

def __init__(self) -> None:
super().__init__()
self.C = None # set of constraints which could be faulty
self.B = None # background knowledge (i.e., the knowledge that is known to be true)

self.KB = None # set of all CNF with added assumptions
self.constraint_map: list[(str, list[list[int]])] = [] # map clauses to relationships/constraint

self.constraint_assumption_map = None

def add_clause_toMap(self, description: str, clauses: list[list[int]]) -> None:
self.constraint_map.append((description, clauses))

def get_C(self) -> list:
return self.C

def get_B(self) -> list:
return self.B

def get_KB(self) -> list:
return self.KB

def get_pretty_diagnoses(self, assumptions: list[list[int]]) -> str:
diagnoses = []
for ass in assumptions:
diag = []
for item in ass:
if self.constraint_assumption_map[item]:
diag.append(self.constraint_assumption_map[item])
diagnoses.append(f"[{', '.join(diag)}]")

return ','.join(diagnoses)

def prepare_diagnosis_task(self, configuration: Configuration = None, test_case: Configuration = None) -> None:
"""
Execute this method after the model is built.
If a configuration is given:
C = configuration
B = {f0 = true} + CF (i.e., = PySATModel)
else (no configuration is given):
a. Diagnosis the feature model
C = CF (i.e., = PySATModel - {f0 = true})
B = {f0 = true}
b. Diagnosis the error
C = CF (i.e., = PySATModel - {f0 = true})
B = {f0 = true} + test_case
where test_case is the following:
+ Dead feature: test_case = {fi = true}
+ False optional feature: test_case = {f_parent = true} & {f_child = false}
"""
if configuration is not None:
# C = configuration
# B = {f0 = true} + CF (i.e., = PySATModel)
self.C, self.B, self.KB, self.constraint_assumption_map = \
self.prepare_assumptions(configuration=configuration)
else:
if test_case is None:
# Diagnosis the feature model
# C = CF (i.e., = PySATModel - {f0 = true})
# B = {f0 = true}
self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions()
else:
# Diagnosis the error
# C = CF (i.e., = PySATModel - {f0 = true})
# B = {f0 = true} + test_case
self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions(test_case=test_case)

def prepare_redundancy_detection_task(self) -> None:
"""
This function prepares the model for WipeOutR algorithm.
Execute this method after the model is built.
C = CF (i.e., = PySATModel - {f0 = true})
B = {}
"""
# C = CF (i.e., = PySATModel - {f0 = true})
self.C = self.prepare_assumptions
self.B = [] # B = {}
# ToDo: TBD

def prepare_assumptions(self, configuration: Configuration = None, test_case: Configuration = None) \
-> Tuple[List, List, List, Dict]:
assumption = []
KB = []
constraint_assumption_map = {}

id_assumption = len(self.variables) + 1
id_assumption = self.prepare_assumptions_for_KB(KB, assumption, constraint_assumption_map, id_assumption)

start_id_configuration = len(assumption)
if configuration is not None:
constraint_assumption_map = {} # reset the map
id_assumption = self.prepare_assumptions_for_configuration(KB, assumption, configuration,
constraint_assumption_map,
id_assumption)

start_id_test_case = len(assumption)
if test_case is not None:
self.prepare_assumptions_for_configuration(KB, assumption, test_case,
constraint_assumption_map,
id_assumption)

if configuration is not None:
B = assumption[:start_id_configuration]
C = assumption[start_id_configuration:]
else:
if test_case is not None:
B = [assumption[0]] + assumption[start_id_test_case:]
C = assumption[1:start_id_test_case]
else:
B = [assumption[0]]
C = assumption[1:]

return C, B, KB, constraint_assumption_map

def prepare_assumptions_for_KB(self, KB, assumption, constraint_assumption_map, id_assumption):
c_map = self.constraint_map
# loop through all tuples in the constraint map
for i in range(len(c_map)):
# get description
desc = c_map[i][0]
# get clauses
clauses = c_map[i][1]
# loop through all variables in the constraint
for j in range(len(clauses)):
# get each clause
clause = clauses[j]
# add the assumption variable to the clause
# assumption => clause
# i.e., -assumption v clause
clause.append(-1 * id_assumption)

assumption.append(id_assumption)
KB.extend(clauses)
constraint_assumption_map[id_assumption] = desc

id_assumption += 1

return id_assumption

def prepare_assumptions_for_configuration(self, KB, assumption, configuration, constraint_assumption_map,
id_assumption):
config = [feat.name for feat in configuration.elements]
for feat in config:
if feat not in self.variables.keys():
raise Exception(f'Feature {feat} is not in the model.')

for feat in configuration.elements.items():
desc = ''
clause = []

if feat[1]:
desc = f'{feat[0].name} = true'
clause = [self.variables[feat[0].name], -1 * id_assumption]
elif not feat[1]:
desc = f'{feat[0].name} = false'
clause = [-1 * self.variables[feat[0].name], -1 * id_assumption]

assumption.append(id_assumption)
KB.append(clause)
constraint_assumption_map[id_assumption] = desc

id_assumption += 1

return id_assumption
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .glucose3_conflict import Glucose3Conflict
from .glucose3_diagnosis import Glucose3Diagnosis


__all__ = [
'Glucose3Diagnosis',
'Glucose3Conflict'
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
A Java version of this implementation is available at:
https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/checker/ChocoConsistencyChecker.java
"""

from pysat.solvers import Solver


class ConsistencyChecker:

def __init__(self, solverName: str, KB: list) -> None:
self.solver = None
self.result = False

self.solver = Solver(solverName, bootstrap_with=KB)

def is_consistent(self, C: list, Δ: list) -> bool:
"""
Check if the given CNF formula is consistent using a solver.
:param C: a list of assumptions should be added to the CNF formula
:param Δ: a list of assumptions should not be added to the CNF formula
:return: a boolean value indicating whether the given CNF formula is consistent
"""
assumptions = C + [-1 * item for item in Δ]
self.result = self.solver.solve(assumptions=assumptions)
# print(f"assumptions: {assumptions} - result: {self.result}")
return self.result

def delete(self):
self.solver.delete()
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
A Java version of this implementation is available at:
https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/algorithms/FastDiagV3.java
"""

import logging

from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.utils import split, diff
from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker


class FastDiag:
"""
Implementation of MSS-based FastDiag algorithm.
Le, V. M., Silva, C. V., Felfernig, A., Benavides, D., Galindo, J., & Tran, T. N. T. (2023).
FastDiagP: An Algorithm for Parallelized Direct Diagnosis.
arXiv preprint arXiv:2305.06951.
"""

def __init__(self, checker: ConsistencyChecker) -> None:
self.checker = checker

def findDiagnosis(self, C: list, B: list) -> list:
"""
Activate FastDiag algorithm if there exists at least one constraint,
which induces an inconsistency in B. Otherwise, it returns an empty set.

// Func FastDiag(C, B) : Δ
// if isEmpty(C) or consistent(B U C) return Φ
// else return C \\ FD(Φ, C, B)
:param C: a consideration set of constraints
:param B: a background knowledge
:return: a diagnosis or an empty set
"""
logging.debug(f'fastDiag [C={C}, B={B}]')
# print(f'fastDiag [C={C}, B={B}]')

# if isEmpty(C) or consistent(B U C) return Φ
if len(C) == 0 or self.checker.is_consistent(B + C, []):
logging.debug('return Φ')
# print('return Φ')
return []
else: # return C \ FD(C, B, Φ)
mss = self.fd([], C, B)
diag = diff(C, mss)

logging.debug(f'return {diag}')
# print(f'return {diag}')
return diag

def fd(self, Δ: list, C: list, B: list) -> list:
"""
The implementation of MSS-based FastDiag algorithm.
The algorithm determines a maximal satisfiable subset MSS (Γ) of C U B.

// Func FD(Δ, C = {c1..cn}, B) : MSS
// if Δ != Φ and consistent(B U C) return C;
// if singleton(C) return Φ;
// k = n/2;
// C1 = {c1..ck}; C2 = {ck+1..cn};
// Δ1 = FD(C2, C1, B);
// Δ2 = FD(C1 - Δ1, C2, B U Δ1);
// return Δ1 ∪ Δ2;
:param Δ: check to skip redundant consistency checks
:param C: a consideration set of constraints
:param B: a background knowledge
:return: a maximal satisfiable subset MSS of C U B
"""
logging.debug(f'>>> FD [Δ={Δ}, C={C}, B={B}]')

# if Δ != Φ and consistent(B U C) return C;
if len(Δ) != 0 and self.checker.is_consistent(B + C, Δ):
logging.debug(f'<<< return {C}')
return C

# if singleton(C) return Φ;
if len(C) == 1:
logging.debug('<<< return Φ')
return []

# C1 = {c1..ck}; C2 = {ck+1..cn};
C1, C2 = split(C)

# Δ1 = FD(C2, C1, B);
Δ1 = self.fd(C2, C1, B)
# Δ2 = FD(C1 - Δ1, C2, B U Δ1);
C1withoutΔ1 = diff(C1, Δ1)
Δ2 = self.fd(C1withoutΔ1, C2, B + Δ1)

logging.debug('<<< return [Δ1={Δ1} ∪ Δ2={Δ2}]')

# return Δ1 + Δ2
return Δ1 + Δ2
Loading
Loading