From 8b322fc6f5d284cbb421d2384535af3120deb93b Mon Sep 17 00:00:00 2001 From: balamg Date: Wed, 20 Nov 2019 11:27:54 +0530 Subject: [PATCH 1/7] synchronize on assertlock --- go.mod | 5 +++++ rete/network.go | 18 +++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index d457e61..f36faf1 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,10 @@ require ( github.com/aws/aws-sdk-go v1.18.3 github.com/gorilla/websocket v1.4.0 github.com/oklog/ulid v1.3.1 + github.com/project-flogo/contrib/trigger/kafka v0.9.0 + github.com/project-flogo/contrib/trigger/rest v0.9.0 github.com/project-flogo/core v0.9.3 + github.com/stretchr/testify v1.3.0 ) + +go 1.13 diff --git a/rete/network.go b/rete/network.go index 52dc416..4ef28e0 100644 --- a/rete/network.go +++ b/rete/network.go @@ -63,7 +63,7 @@ type reteNetworkImpl struct { currentId int assertLock sync.Mutex - crudLock sync.Mutex + //crudLock sync.Mutex txnHandler model.RtcTransactionHandler txnContext interface{} } @@ -85,8 +85,8 @@ func (nw *reteNetworkImpl) initReteNetwork() { func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) { - nw.crudLock.Lock() - defer nw.crudLock.Unlock() + nw.assertLock.Lock() + defer nw.assertLock.Unlock() if nw.allRules[rule.GetName()] != nil { return fmt.Errorf("Rule already exists.." + rule.GetName()) @@ -146,8 +146,8 @@ func (nw *reteNetworkImpl) setClassNodeAndLinkJoinTables(nodesOfRule *list.List, func (nw *reteNetworkImpl) RemoveRule(ruleName string) model.Rule { - nw.crudLock.Lock() - defer nw.crudLock.Unlock() + nw.assertLock.Lock() + defer nw.assertLock.Unlock() rule := nw.allRules[ruleName] delete(nw.allRules, ruleName) @@ -536,8 +536,8 @@ func (nw *reteNetworkImpl) Assert(ctx context.Context, rs model.RuleSession, tup reteCtxVar, isRecursive, newCtx := getOrSetReteCtx(ctx, nw, rs) if !isRecursive { - nw.crudLock.Lock() - defer nw.crudLock.Unlock() + nw.assertLock.Lock() + defer nw.assertLock.Unlock() nw.assertInternal(newCtx, tuple, changedProps, mode) reteCtxVar.getConflictResolver().resolveConflict(newCtx) //if Timeout is 0, remove it from rete @@ -575,8 +575,8 @@ func (nw *reteNetworkImpl) Retract(ctx context.Context, tuple model.Tuple, chang } reteCtxVar, isRecursive, _ := getOrSetReteCtx(ctx, nw, nil) if !isRecursive { - nw.crudLock.Lock() - defer nw.crudLock.Unlock() + nw.assertLock.Lock() + defer nw.assertLock.Unlock() nw.retractInternal(ctx, tuple, changedProps, mode) if nw.txnHandler != nil && mode == DELETE { rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted()) From 1cf4c2bf11b3d9558dd2b2a3b2f21538a585354b Mon Sep 17 00:00:00 2001 From: balamg Date: Tue, 3 Dec 2019 11:21:44 +0530 Subject: [PATCH 2/7] added debug statements --- rete/network.go | 5 ++++- rete/rulenode.go | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/rete/network.go b/rete/network.go index 4ef28e0..4a396a5 100644 --- a/rete/network.go +++ b/rete/network.go @@ -137,6 +137,9 @@ func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) { //Add NodeLinks nw.ruleNameClassNodeLinksOfRule[rule.GetName()] = classNodeLinksOfRule + + rstr := nw.String() + fmt.Printf(rstr) return nil } @@ -262,7 +265,7 @@ func (nw *reteNetworkImpl) buildNetwork(rule model.Rule, nodesOfRule *list.List, lastNode = fNode } //Yoohoo! We have a Rule!! - ruleNode := newRuleNode(rule) + ruleNode := newRuleNode(nw, rule) newNodeLink(nw, lastNode, ruleNode, false) nodesOfRule.PushBack(ruleNode) } else { diff --git a/rete/rulenode.go b/rete/rulenode.go index 01ce4e9..68d5603 100644 --- a/rete/rulenode.go +++ b/rete/rulenode.go @@ -18,8 +18,9 @@ type ruleNodeImpl struct { rule model.Rule } -func newRuleNode(rule model.Rule) ruleNode { +func newRuleNode(nw Network, rule model.Rule) ruleNode { rn := ruleNodeImpl{} + rn.nodeImpl.initNodeImpl(nw, rule, rule.GetIdentifiers()) rn.identifiers = rule.GetIdentifiers() rn.rule = rule return &rn From ddf54bc2fba5ad2d61ab391530bdf4167eab49d7 Mon Sep 17 00:00:00 2001 From: balamg Date: Tue, 3 Dec 2019 11:27:44 +0530 Subject: [PATCH 3/7] added debug statements --- rete/network.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rete/network.go b/rete/network.go index 4a396a5..62d94a0 100644 --- a/rete/network.go +++ b/rete/network.go @@ -183,7 +183,8 @@ func (nw *reteNetworkImpl) RemoveRule(ruleName string) model.Rule { } } } - + rstr := nw.String() + fmt.Printf(rstr) return rule } From ddba76a96199372c46851389dc7f144bdc10aba9 Mon Sep 17 00:00:00 2001 From: balamg Date: Wed, 4 Dec 2019 11:18:33 +0530 Subject: [PATCH 4/7] add rule and assert tuples --- rete/classnode.go | 12 ++++- rete/network.go | 98 +++++++++++++++++++++-------------- rete/opsList.go | 2 +- ruleapi/rulesession.go | 5 +- ruleapi/tests/rules_3_test.go | 5 +- 5 files changed, 77 insertions(+), 45 deletions(-) diff --git a/rete/classnode.go b/rete/classnode.go index 71b4916..2bfe352 100644 --- a/rete/classnode.go +++ b/rete/classnode.go @@ -14,7 +14,7 @@ type classNode interface { addClassNodeLink(classNodeLink) removeClassNodeLink(classNodeLink) getClassNodeLinks() *list.List - assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool) + assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, forRule string) } type classNodeImpl struct { @@ -74,13 +74,18 @@ func (cn *classNodeImpl) String() string { return ret } -func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool) { +func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, forRule string) { handle := getOrCreateHandle(ctx, tuple) handles := make([]reteHandle, 1) handles[0] = handle propagate := false for e := cn.getClassNodeLinks().Front(); e != nil; e = e.Next() { classNodeLinkVar := e.Value.(classNodeLink) + if forRule != "" { + if classNodeLinkVar.getRule().GetName() != forRule { + continue + } + } if changedProps != nil { depProps, found := classNodeLinkVar.getRule().GetDeps()[model.TupleType(cn.name)] if found { // rule depends on this type @@ -98,5 +103,8 @@ func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedP if propagate { classNodeLinkVar.propagateObjects(ctx, handles) } + if forRule != "" { + break + } } } diff --git a/rete/network.go b/rete/network.go index 62d94a0..d821768 100644 --- a/rete/network.go +++ b/rete/network.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "math" + "time" "github.com/project-flogo/rules/common/model" "container/list" "sync" - "time" ) type RtcOprn int @@ -23,7 +23,8 @@ const ( //Network ... the rete network type Network interface { - AddRule(model.Rule) error + AddRule(rule model.Rule, rs model.RuleSession) error + AddRuleAndAssert(rule model.Rule, rs model.RuleSession) (err error) String() string RemoveRule(string) model.Rule GetRules() []model.Rule @@ -34,7 +35,7 @@ type Network interface { retractInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) - assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) + assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) getOrCreateHandle(ctx context.Context, tuple model.Tuple) reteHandle getHandle(tuple model.Tuple) reteHandle @@ -82,9 +83,14 @@ func (nw *reteNetworkImpl) initReteNetwork() { nw.ruleNameClassNodeLinksOfRule = make(map[string]*list.List) nw.allHandles = make(map[string]reteHandle) } +func (nw *reteNetworkImpl) AddRuleAndAssert(rule model.Rule, rs model.RuleSession) (err error) { + return nw.addRuleWithAssert(rule, rs, true) +} +func (nw *reteNetworkImpl) AddRule(rule model.Rule, rs model.RuleSession) (err error) { + return nw.addRuleWithAssert(rule, rs, false) +} -func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) { - +func (nw *reteNetworkImpl) addRuleWithAssert(rule model.Rule, rs model.RuleSession, shouldAssert bool) error { nw.assertLock.Lock() defer nw.assertLock.Unlock() @@ -138,8 +144,18 @@ func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) { //Add NodeLinks nw.ruleNameClassNodeLinksOfRule[rule.GetName()] = classNodeLinksOfRule - rstr := nw.String() - fmt.Printf(rstr) + //rstr := nw.String() + //fmt.Printf(rstr) + + if shouldAssert { + for _, h := range nw.allHandles { + tt := h.getTuple().GetTupleType() + if ContainedByFirst(rule.GetIdentifiers(), []model.TupleType{tt}) { + //assert it but only for this rule. + nw.AssertForRule(nil, rs, h.getTuple(), nil, ADD, rule.GetName()) + } + } + } return nil } @@ -532,36 +548,7 @@ func (nw *reteNetworkImpl) printClassNode(ruleName string, classNodeImpl *classN } func (nw *reteNetworkImpl) Assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) { - - if ctx == nil { - ctx = context.Background() - } - - reteCtxVar, isRecursive, newCtx := getOrSetReteCtx(ctx, nw, rs) - - if !isRecursive { - nw.assertLock.Lock() - defer nw.assertLock.Unlock() - nw.assertInternal(newCtx, tuple, changedProps, mode) - reteCtxVar.getConflictResolver().resolveConflict(newCtx) - //if Timeout is 0, remove it from rete - td := model.GetTupleDescriptor(tuple.GetTupleType()) - if td != nil { - if td.TTLInSeconds == 0 { //remove immediately. - nw.removeTupleFromRete(tuple) - } else if td.TTLInSeconds > 0 { // TTL for the tuple type, after that, remove it from RETE - go time.AfterFunc(time.Second*time.Duration(td.TTLInSeconds), func() { - nw.removeTupleFromRete(tuple) - }) - } //else, its -ve and means, never expire - } - if nw.txnHandler != nil { - rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted()) - nw.txnHandler(ctx, rs, rtcTxn, nw.txnContext) - } - } else { - reteCtxVar.getOpsList().PushBack(newAssertEntry(tuple, changedProps, mode)) - } + nw.AssertForRule(ctx, rs, tuple, changedProps, mode, "") } func (nw *reteNetworkImpl) removeTupleFromRete(tuple model.Tuple) { @@ -626,12 +613,12 @@ func (nw *reteNetworkImpl) GetAssertedTupleByStringKey(key string) model.Tuple { return nil } -func (nw *reteNetworkImpl) assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) { +func (nw *reteNetworkImpl) assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) { tupleType := tuple.GetTupleType() listItem := nw.allClassNodes[string(tupleType)] if listItem != nil { classNodeVar := listItem.(classNode) - classNodeVar.assert(ctx, tuple, changedProps) + classNodeVar.assert(ctx, tuple, changedProps, forRule) } td := model.GetTupleDescriptor(tuple.GetTupleType()) if td != nil { @@ -671,3 +658,36 @@ func (nw *reteNetworkImpl) RegisterRtcTransactionHandler(txnHandler model.RtcTra nw.txnHandler = txnHandler nw.txnContext = txnContext } + +func (nw *reteNetworkImpl) AssertForRule(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) { + + if ctx == nil { + ctx = context.Background() + } + + reteCtxVar, isRecursive, newCtx := getOrSetReteCtx(ctx, nw, rs) + + if !isRecursive { + nw.assertLock.Lock() + defer nw.assertLock.Unlock() + nw.assertInternal(newCtx, tuple, changedProps, mode, forRule) + reteCtxVar.getConflictResolver().resolveConflict(newCtx) + //if Timeout is 0, remove it from rete + td := model.GetTupleDescriptor(tuple.GetTupleType()) + if td != nil { + if td.TTLInSeconds == 0 { //remove immediately. + nw.removeTupleFromRete(tuple) + } else if td.TTLInSeconds > 0 { // TTL for the tuple type, after that, remove it from RETE + go time.AfterFunc(time.Second*time.Duration(td.TTLInSeconds), func() { + nw.removeTupleFromRete(tuple) + }) + } //else, its -ve and means, never expire + } + if nw.txnHandler != nil { + rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted()) + nw.txnHandler(ctx, rs, rtcTxn, nw.txnContext) + } + } else { + reteCtxVar.getOpsList().PushBack(newAssertEntry(tuple, changedProps, mode)) + } +} diff --git a/rete/opsList.go b/rete/opsList.go index 8679bb6..722b630 100644 --- a/rete/opsList.go +++ b/rete/opsList.go @@ -36,7 +36,7 @@ func newAssertEntry(tuple model.Tuple, changeProps map[string]bool, mode RtcOprn func (ai *assertEntryImpl) execute(ctx context.Context) { reteCtx := getReteCtx(ctx) - reteCtx.getNetwork().assertInternal(ctx, ai.tuple, ai.changeProps, ai.mode) + reteCtx.getNetwork().assertInternal(ctx, ai.tuple, ai.changeProps, ai.mode, "") } //Modify Entry diff --git a/ruleapi/rulesession.go b/ruleapi/rulesession.go index 6df0fc9..f745f03 100644 --- a/ruleapi/rulesession.go +++ b/ruleapi/rulesession.go @@ -75,7 +75,10 @@ func (rs *rulesessionImpl) initRuleSession(name string) { } func (rs *rulesessionImpl) AddRule(rule model.Rule) (err error) { - return rs.reteNetwork.AddRule(rule) + return rs.reteNetwork.AddRule(rule, rs) +} +func (rs *rulesessionImpl) AddRuleAndAssert(rule model.Rule) (err error) { + return rs.reteNetwork.AddRuleAndAssert(rule, rs) } func (rs *rulesessionImpl) DeleteRule(ruleName string) { diff --git a/ruleapi/tests/rules_3_test.go b/ruleapi/tests/rules_3_test.go index 0b54fe6..6344ec7 100644 --- a/ruleapi/tests/rules_3_test.go +++ b/ruleapi/tests/rules_3_test.go @@ -2,6 +2,7 @@ package tests import ( "context" + "fmt" "testing" "github.com/project-flogo/rules/common/model" @@ -63,7 +64,7 @@ func r3Condition(ruleName string, condName string, tuples map[model.TupleType]mo } func r3action(ctx context.Context, rs model.RuleSession, ruleName string, tuples map[model.TupleType]model.Tuple, ruleCtx model.RuleContext) { - //fmt.Println("r13_action triggered") + fmt.Println("r13_action triggered") t1 := tuples[model.TupleType("t1")].(model.MutableTuple) id, _ := t1.GetString("id") @@ -75,7 +76,7 @@ func r3action(ctx context.Context, rs model.RuleSession, ruleName string, tuples } func r32action(ctx context.Context, rs model.RuleSession, ruleName string, tuples map[model.TupleType]model.Tuple, ruleCtx model.RuleContext) { - //fmt.Println("r132_action triggered") + fmt.Println("r132_action triggered") actCnt++ tk, _ := model.NewTupleKeyWithKeyValues("t1", "t10") From 18d7beb61d29c259f4a06e56ea841f031331b8cc Mon Sep 17 00:00:00 2001 From: balamg Date: Wed, 4 Dec 2019 11:43:34 +0530 Subject: [PATCH 5/7] ReplayTuplesForRule: a new method to replay tuples into a rule --- common/model/types.go | 8 ++++---- rete/network.go | 27 ++++++++++++--------------- ruleapi/rulesession.go | 9 +++++---- 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/common/model/types.go b/common/model/types.go index 2258a93..d9fb129 100644 --- a/common/model/types.go +++ b/common/model/types.go @@ -72,6 +72,8 @@ type RuleSession interface { //RtcTransactionHandler RegisterRtcTransactionHandler(txnHandler RtcTransactionHandler, handlerCtx interface{}) + //replay existing tuples into a rule + ReplayTuplesForRule(ruleName string) (err error) } //ConditionEvaluator is a function pointer for handling condition evaluations on the server side @@ -92,10 +94,9 @@ type ValueChangeListener interface { type RtcTxn interface { //map of type and map of key/tuple - GetRtcAdded () map[string]map[string]Tuple + GetRtcAdded() map[string]map[string]Tuple GetRtcModified() map[string]map[string]RtcModified GetRtcDeleted() map[string]map[string]Tuple - } type RtcModified interface { @@ -103,5 +104,4 @@ type RtcModified interface { GetModifiedProps() map[string]bool } -type RtcTransactionHandler func (ctx context.Context, rs RuleSession, txn RtcTxn, txnContext interface{}) - +type RtcTransactionHandler func(ctx context.Context, rs RuleSession, txn RtcTxn, txnContext interface{}) diff --git a/rete/network.go b/rete/network.go index d821768..273a6eb 100644 --- a/rete/network.go +++ b/rete/network.go @@ -23,8 +23,7 @@ const ( //Network ... the rete network type Network interface { - AddRule(rule model.Rule, rs model.RuleSession) error - AddRuleAndAssert(rule model.Rule, rs model.RuleSession) (err error) + AddRule(rule model.Rule) error String() string RemoveRule(string) model.Rule GetRules() []model.Rule @@ -44,6 +43,7 @@ type Network interface { GetAssertedTupleByStringKey(key string) model.Tuple //RtcTransactionHandler RegisterRtcTransactionHandler(txnHandler model.RtcTransactionHandler, txnContext interface{}) + ReplayTuplesForRule(ruleName string, rs model.RuleSession) (err error) } type reteNetworkImpl struct { @@ -83,14 +83,8 @@ func (nw *reteNetworkImpl) initReteNetwork() { nw.ruleNameClassNodeLinksOfRule = make(map[string]*list.List) nw.allHandles = make(map[string]reteHandle) } -func (nw *reteNetworkImpl) AddRuleAndAssert(rule model.Rule, rs model.RuleSession) (err error) { - return nw.addRuleWithAssert(rule, rs, true) -} -func (nw *reteNetworkImpl) AddRule(rule model.Rule, rs model.RuleSession) (err error) { - return nw.addRuleWithAssert(rule, rs, false) -} -func (nw *reteNetworkImpl) addRuleWithAssert(rule model.Rule, rs model.RuleSession, shouldAssert bool) error { +func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) { nw.assertLock.Lock() defer nw.assertLock.Unlock() @@ -144,15 +138,18 @@ func (nw *reteNetworkImpl) addRuleWithAssert(rule model.Rule, rs model.RuleSessi //Add NodeLinks nw.ruleNameClassNodeLinksOfRule[rule.GetName()] = classNodeLinksOfRule - //rstr := nw.String() - //fmt.Printf(rstr) + return nil +} - if shouldAssert { +func (nw *reteNetworkImpl) ReplayTuplesForRule(ruleName string, rs model.RuleSession) error { + if rule, exists := nw.allRules[ruleName]; !exists { + return fmt.Errorf("Rule not found [%s]", ruleName) + } else { for _, h := range nw.allHandles { tt := h.getTuple().GetTupleType() if ContainedByFirst(rule.GetIdentifiers(), []model.TupleType{tt}) { //assert it but only for this rule. - nw.AssertForRule(nil, rs, h.getTuple(), nil, ADD, rule.GetName()) + nw.assert(nil, rs, h.getTuple(), nil, ADD, ruleName) } } } @@ -548,7 +545,7 @@ func (nw *reteNetworkImpl) printClassNode(ruleName string, classNodeImpl *classN } func (nw *reteNetworkImpl) Assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) { - nw.AssertForRule(ctx, rs, tuple, changedProps, mode, "") + nw.assert(ctx, rs, tuple, changedProps, mode, "") } func (nw *reteNetworkImpl) removeTupleFromRete(tuple model.Tuple) { @@ -659,7 +656,7 @@ func (nw *reteNetworkImpl) RegisterRtcTransactionHandler(txnHandler model.RtcTra nw.txnContext = txnContext } -func (nw *reteNetworkImpl) AssertForRule(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) { +func (nw *reteNetworkImpl) assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) { if ctx == nil { ctx = context.Background() diff --git a/ruleapi/rulesession.go b/ruleapi/rulesession.go index f745f03..0a5ec42 100644 --- a/ruleapi/rulesession.go +++ b/ruleapi/rulesession.go @@ -75,10 +75,7 @@ func (rs *rulesessionImpl) initRuleSession(name string) { } func (rs *rulesessionImpl) AddRule(rule model.Rule) (err error) { - return rs.reteNetwork.AddRule(rule, rs) -} -func (rs *rulesessionImpl) AddRuleAndAssert(rule model.Rule) (err error) { - return rs.reteNetwork.AddRuleAndAssert(rule, rs) + return rs.reteNetwork.AddRule(rule) } func (rs *rulesessionImpl) DeleteRule(ruleName string) { @@ -177,3 +174,7 @@ func (rs *rulesessionImpl) GetAssertedTuple(key model.TupleKey) model.Tuple { func (rs *rulesessionImpl) RegisterRtcTransactionHandler(txnHandler model.RtcTransactionHandler, txnContext interface{}) { rs.reteNetwork.RegisterRtcTransactionHandler(txnHandler, txnContext) } + +func (rs *rulesessionImpl) ReplayTuplesForRule(ruleName string) (err error) { + return rs.reteNetwork.ReplayTuplesForRule(ruleName, rs) +} From 470b21581e5f9f6144623e5f9052fb4c1b14781a Mon Sep 17 00:00:00 2001 From: balamg Date: Wed, 4 Dec 2019 11:48:26 +0530 Subject: [PATCH 6/7] ReplayTuplesForRule: a new method to replay tuples into a rule --- ruleapi/tests/rules_3_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ruleapi/tests/rules_3_test.go b/ruleapi/tests/rules_3_test.go index 6344ec7..0b54fe6 100644 --- a/ruleapi/tests/rules_3_test.go +++ b/ruleapi/tests/rules_3_test.go @@ -2,7 +2,6 @@ package tests import ( "context" - "fmt" "testing" "github.com/project-flogo/rules/common/model" @@ -64,7 +63,7 @@ func r3Condition(ruleName string, condName string, tuples map[model.TupleType]mo } func r3action(ctx context.Context, rs model.RuleSession, ruleName string, tuples map[model.TupleType]model.Tuple, ruleCtx model.RuleContext) { - fmt.Println("r13_action triggered") + //fmt.Println("r13_action triggered") t1 := tuples[model.TupleType("t1")].(model.MutableTuple) id, _ := t1.GetString("id") @@ -76,7 +75,7 @@ func r3action(ctx context.Context, rs model.RuleSession, ruleName string, tuples } func r32action(ctx context.Context, rs model.RuleSession, ruleName string, tuples map[model.TupleType]model.Tuple, ruleCtx model.RuleContext) { - fmt.Println("r132_action triggered") + //fmt.Println("r132_action triggered") actCnt++ tk, _ := model.NewTupleKeyWithKeyValues("t1", "t10") From fb499542208b40554eb334a6a16b978669fd675e Mon Sep 17 00:00:00 2001 From: Ramesh Polishetti Date: Thu, 5 Dec 2019 22:56:52 +0530 Subject: [PATCH 7/7] Add test case - 'adding previously removed rule scenario' --- ruleapi/tests/rules_4_test.go | 98 +++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 ruleapi/tests/rules_4_test.go diff --git a/ruleapi/tests/rules_4_test.go b/ruleapi/tests/rules_4_test.go new file mode 100644 index 0000000..fc1eeb2 --- /dev/null +++ b/ruleapi/tests/rules_4_test.go @@ -0,0 +1,98 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/project-flogo/rules/common/model" + "github.com/project-flogo/rules/ruleapi" +) + +// Add previously removed rule +func Test_Four(t *testing.T) { + rs, _ := createRuleSession() + + // create rule + r1 := ruleapi.NewRule("Rule1") + r1.AddCondition("r1c1", []string{"t1.none", "t2.none"}, trueCondition, nil) + r1.SetAction(r1Action) + // create tuples + t1, _ := model.NewTupleWithKeyValues("t1", "one") // No TTL + t2, _ := model.NewTupleWithKeyValues("t2", "two") // TTL is 0 + + // start rule session + rs.Start(nil) + + assertCtxValues := make(map[string]interface{}) + assertCtxValues["test"] = t + assertCtxValues["actionFired"] = "NOTFIRED" + assertCtx := context.WithValue(context.TODO(), "values", assertCtxValues) + + // Test1: add r1 and assert t1 & t2; rule action SHOULD be fired + addRule(t, rs, r1) + assert(assertCtx, rs, t1) + assert(assertCtx, rs, t2) + actionFired, _ := assertCtxValues["actionFired"].(string) + if actionFired != "FIRED" { + t.Log("rule action SHOULD be fired") + t.FailNow() + } + + // Test2: remove r1 and assert t2; rule action SHOULD NOT be fired + deleteRule(t, rs, r1) + assertCtxValues["actionFired"] = "NOTFIRED" + assert(assertCtx, rs, t2) + actionFired, _ = assertCtxValues["actionFired"].(string) + if actionFired != "NOTFIRED" { + t.Log("rule action SHOULD NOT be fired") + t.FailNow() + } + + // Test3: add r1 again and assert t2; rule action SHOULD be fired + addRule(t, rs, r1) + rs.ReplayTuplesForRule(r1.GetName()) + assertCtxValues["actionFired"] = "NOTFIRED" + assert(assertCtx, rs, t2) + actionFired, _ = assertCtxValues["actionFired"].(string) + if actionFired != "FIRED" { + t.Log("rule action SHOULD be fired") + t.FailNow() + } + + rs.Unregister() +} + +func addRule(t *testing.T, rs model.RuleSession, rule model.Rule) { + err := rs.AddRule(rule) + if err != nil { + t.Logf("[%s] error while adding rule[%s]", time.Now().Format("15:04:05.999999"), rule.GetName()) + return + } + t.Logf("[%s] Rule[%s] added. \n", time.Now().Format("15:04:05.999999"), rule.GetName()) +} + +func deleteRule(t *testing.T, rs model.RuleSession, rule model.Rule) { + rs.DeleteRule(rule.GetName()) + t.Logf("[%s] Rule[%s] deleted. \n", time.Now().Format("15:04:05.999999"), rule.GetName()) +} + +func assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple) { + assertCtxValues := ctx.Value("values").(map[string]interface{}) + t, _ := assertCtxValues["test"].(*testing.T) + err := rs.Assert(ctx, tuple) + if err != nil { + t.Logf("[%s] assert error %s", time.Now().Format("15:04:05.999999"), err) + } +} + +func r1Action(ctx context.Context, rs model.RuleSession, ruleName string, tuples map[model.TupleType]model.Tuple, ruleCtx model.RuleContext) { + assertCtxValues := ctx.Value("values").(map[string]interface{}) + test, _ := assertCtxValues["test"].(*testing.T) + + t := tuples["t1"] + tSrt, _ := t.GetString("id") + test.Logf("[%s] r1Action called with the tuple[%s] \n", time.Now().Format("15:04:05.999999"), tSrt) + + assertCtxValues["actionFired"] = "FIRED" +}