diff --git a/common/model/types.go b/common/model/types.go index 2258a93..d9fb129 100644 --- a/common/model/types.go +++ b/common/model/types.go @@ -72,6 +72,8 @@ type RuleSession interface { //RtcTransactionHandler RegisterRtcTransactionHandler(txnHandler RtcTransactionHandler, handlerCtx interface{}) + //replay existing tuples into a rule + ReplayTuplesForRule(ruleName string) (err error) } //ConditionEvaluator is a function pointer for handling condition evaluations on the server side @@ -92,10 +94,9 @@ type ValueChangeListener interface { type RtcTxn interface { //map of type and map of key/tuple - GetRtcAdded () map[string]map[string]Tuple + GetRtcAdded() map[string]map[string]Tuple GetRtcModified() map[string]map[string]RtcModified GetRtcDeleted() map[string]map[string]Tuple - } type RtcModified interface { @@ -103,5 +104,4 @@ type RtcModified interface { GetModifiedProps() map[string]bool } -type RtcTransactionHandler func (ctx context.Context, rs RuleSession, txn RtcTxn, txnContext interface{}) - +type RtcTransactionHandler func(ctx context.Context, rs RuleSession, txn RtcTxn, txnContext interface{}) diff --git a/go.mod b/go.mod index d457e61..f36faf1 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,10 @@ require ( github.com/aws/aws-sdk-go v1.18.3 github.com/gorilla/websocket v1.4.0 github.com/oklog/ulid v1.3.1 + github.com/project-flogo/contrib/trigger/kafka v0.9.0 + github.com/project-flogo/contrib/trigger/rest v0.9.0 github.com/project-flogo/core v0.9.3 + github.com/stretchr/testify v1.3.0 ) + +go 1.13 diff --git a/rete/classnode.go b/rete/classnode.go index 71b4916..2bfe352 100644 --- a/rete/classnode.go +++ b/rete/classnode.go @@ -14,7 +14,7 @@ type classNode interface { addClassNodeLink(classNodeLink) removeClassNodeLink(classNodeLink) getClassNodeLinks() *list.List - assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool) + assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, forRule string) } type classNodeImpl struct { @@ -74,13 +74,18 @@ func (cn *classNodeImpl) String() string { return ret } -func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool) { +func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, forRule string) { handle := getOrCreateHandle(ctx, tuple) handles := make([]reteHandle, 1) handles[0] = handle propagate := false for e := cn.getClassNodeLinks().Front(); e != nil; e = e.Next() { classNodeLinkVar := e.Value.(classNodeLink) + if forRule != "" { + if classNodeLinkVar.getRule().GetName() != forRule { + continue + } + } if changedProps != nil { depProps, found := classNodeLinkVar.getRule().GetDeps()[model.TupleType(cn.name)] if found { // rule depends on this type @@ -98,5 +103,8 @@ func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedP if propagate { classNodeLinkVar.propagateObjects(ctx, handles) } + if forRule != "" { + break + } } } diff --git a/rete/network.go b/rete/network.go index 52dc416..273a6eb 100644 --- a/rete/network.go +++ b/rete/network.go @@ -4,12 +4,12 @@ import ( "context" "fmt" "math" + "time" "github.com/project-flogo/rules/common/model" "container/list" "sync" - "time" ) type RtcOprn int @@ -23,7 +23,7 @@ const ( //Network ... the rete network type Network interface { - AddRule(model.Rule) error + AddRule(rule model.Rule) error String() string RemoveRule(string) model.Rule GetRules() []model.Rule @@ -34,7 +34,7 @@ type Network interface { retractInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) - assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) + assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) getOrCreateHandle(ctx context.Context, tuple model.Tuple) reteHandle getHandle(tuple model.Tuple) reteHandle @@ -43,6 +43,7 @@ type Network interface { GetAssertedTupleByStringKey(key string) model.Tuple //RtcTransactionHandler RegisterRtcTransactionHandler(txnHandler model.RtcTransactionHandler, txnContext interface{}) + ReplayTuplesForRule(ruleName string, rs model.RuleSession) (err error) } type reteNetworkImpl struct { @@ -63,7 +64,7 @@ type reteNetworkImpl struct { currentId int assertLock sync.Mutex - crudLock sync.Mutex + //crudLock sync.Mutex txnHandler model.RtcTransactionHandler txnContext interface{} } @@ -84,9 +85,8 @@ func (nw *reteNetworkImpl) initReteNetwork() { } func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) { - - nw.crudLock.Lock() - defer nw.crudLock.Unlock() + nw.assertLock.Lock() + defer nw.assertLock.Unlock() if nw.allRules[rule.GetName()] != nil { return fmt.Errorf("Rule already exists.." + rule.GetName()) @@ -137,6 +137,22 @@ func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) { //Add NodeLinks nw.ruleNameClassNodeLinksOfRule[rule.GetName()] = classNodeLinksOfRule + + return nil +} + +func (nw *reteNetworkImpl) ReplayTuplesForRule(ruleName string, rs model.RuleSession) error { + if rule, exists := nw.allRules[ruleName]; !exists { + return fmt.Errorf("Rule not found [%s]", ruleName) + } else { + for _, h := range nw.allHandles { + tt := h.getTuple().GetTupleType() + if ContainedByFirst(rule.GetIdentifiers(), []model.TupleType{tt}) { + //assert it but only for this rule. + nw.assert(nil, rs, h.getTuple(), nil, ADD, ruleName) + } + } + } return nil } @@ -146,8 +162,8 @@ func (nw *reteNetworkImpl) setClassNodeAndLinkJoinTables(nodesOfRule *list.List, func (nw *reteNetworkImpl) RemoveRule(ruleName string) model.Rule { - nw.crudLock.Lock() - defer nw.crudLock.Unlock() + nw.assertLock.Lock() + defer nw.assertLock.Unlock() rule := nw.allRules[ruleName] delete(nw.allRules, ruleName) @@ -180,7 +196,8 @@ func (nw *reteNetworkImpl) RemoveRule(ruleName string) model.Rule { } } } - + rstr := nw.String() + fmt.Printf(rstr) return rule } @@ -262,7 +279,7 @@ func (nw *reteNetworkImpl) buildNetwork(rule model.Rule, nodesOfRule *list.List, lastNode = fNode } //Yoohoo! We have a Rule!! - ruleNode := newRuleNode(rule) + ruleNode := newRuleNode(nw, rule) newNodeLink(nw, lastNode, ruleNode, false) nodesOfRule.PushBack(ruleNode) } else { @@ -528,36 +545,7 @@ func (nw *reteNetworkImpl) printClassNode(ruleName string, classNodeImpl *classN } func (nw *reteNetworkImpl) Assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) { - - if ctx == nil { - ctx = context.Background() - } - - reteCtxVar, isRecursive, newCtx := getOrSetReteCtx(ctx, nw, rs) - - if !isRecursive { - nw.crudLock.Lock() - defer nw.crudLock.Unlock() - nw.assertInternal(newCtx, tuple, changedProps, mode) - reteCtxVar.getConflictResolver().resolveConflict(newCtx) - //if Timeout is 0, remove it from rete - td := model.GetTupleDescriptor(tuple.GetTupleType()) - if td != nil { - if td.TTLInSeconds == 0 { //remove immediately. - nw.removeTupleFromRete(tuple) - } else if td.TTLInSeconds > 0 { // TTL for the tuple type, after that, remove it from RETE - go time.AfterFunc(time.Second*time.Duration(td.TTLInSeconds), func() { - nw.removeTupleFromRete(tuple) - }) - } //else, its -ve and means, never expire - } - if nw.txnHandler != nil { - rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted()) - nw.txnHandler(ctx, rs, rtcTxn, nw.txnContext) - } - } else { - reteCtxVar.getOpsList().PushBack(newAssertEntry(tuple, changedProps, mode)) - } + nw.assert(ctx, rs, tuple, changedProps, mode, "") } func (nw *reteNetworkImpl) removeTupleFromRete(tuple model.Tuple) { @@ -575,8 +563,8 @@ func (nw *reteNetworkImpl) Retract(ctx context.Context, tuple model.Tuple, chang } reteCtxVar, isRecursive, _ := getOrSetReteCtx(ctx, nw, nil) if !isRecursive { - nw.crudLock.Lock() - defer nw.crudLock.Unlock() + nw.assertLock.Lock() + defer nw.assertLock.Unlock() nw.retractInternal(ctx, tuple, changedProps, mode) if nw.txnHandler != nil && mode == DELETE { rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted()) @@ -622,12 +610,12 @@ func (nw *reteNetworkImpl) GetAssertedTupleByStringKey(key string) model.Tuple { return nil } -func (nw *reteNetworkImpl) assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) { +func (nw *reteNetworkImpl) assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) { tupleType := tuple.GetTupleType() listItem := nw.allClassNodes[string(tupleType)] if listItem != nil { classNodeVar := listItem.(classNode) - classNodeVar.assert(ctx, tuple, changedProps) + classNodeVar.assert(ctx, tuple, changedProps, forRule) } td := model.GetTupleDescriptor(tuple.GetTupleType()) if td != nil { @@ -667,3 +655,36 @@ func (nw *reteNetworkImpl) RegisterRtcTransactionHandler(txnHandler model.RtcTra nw.txnHandler = txnHandler nw.txnContext = txnContext } + +func (nw *reteNetworkImpl) assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) { + + if ctx == nil { + ctx = context.Background() + } + + reteCtxVar, isRecursive, newCtx := getOrSetReteCtx(ctx, nw, rs) + + if !isRecursive { + nw.assertLock.Lock() + defer nw.assertLock.Unlock() + nw.assertInternal(newCtx, tuple, changedProps, mode, forRule) + reteCtxVar.getConflictResolver().resolveConflict(newCtx) + //if Timeout is 0, remove it from rete + td := model.GetTupleDescriptor(tuple.GetTupleType()) + if td != nil { + if td.TTLInSeconds == 0 { //remove immediately. + nw.removeTupleFromRete(tuple) + } else if td.TTLInSeconds > 0 { // TTL for the tuple type, after that, remove it from RETE + go time.AfterFunc(time.Second*time.Duration(td.TTLInSeconds), func() { + nw.removeTupleFromRete(tuple) + }) + } //else, its -ve and means, never expire + } + if nw.txnHandler != nil { + rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted()) + nw.txnHandler(ctx, rs, rtcTxn, nw.txnContext) + } + } else { + reteCtxVar.getOpsList().PushBack(newAssertEntry(tuple, changedProps, mode)) + } +} diff --git a/rete/opsList.go b/rete/opsList.go index 8679bb6..722b630 100644 --- a/rete/opsList.go +++ b/rete/opsList.go @@ -36,7 +36,7 @@ func newAssertEntry(tuple model.Tuple, changeProps map[string]bool, mode RtcOprn func (ai *assertEntryImpl) execute(ctx context.Context) { reteCtx := getReteCtx(ctx) - reteCtx.getNetwork().assertInternal(ctx, ai.tuple, ai.changeProps, ai.mode) + reteCtx.getNetwork().assertInternal(ctx, ai.tuple, ai.changeProps, ai.mode, "") } //Modify Entry diff --git a/rete/rulenode.go b/rete/rulenode.go index 01ce4e9..68d5603 100644 --- a/rete/rulenode.go +++ b/rete/rulenode.go @@ -18,8 +18,9 @@ type ruleNodeImpl struct { rule model.Rule } -func newRuleNode(rule model.Rule) ruleNode { +func newRuleNode(nw Network, rule model.Rule) ruleNode { rn := ruleNodeImpl{} + rn.nodeImpl.initNodeImpl(nw, rule, rule.GetIdentifiers()) rn.identifiers = rule.GetIdentifiers() rn.rule = rule return &rn diff --git a/ruleapi/rulesession.go b/ruleapi/rulesession.go index 6df0fc9..0a5ec42 100644 --- a/ruleapi/rulesession.go +++ b/ruleapi/rulesession.go @@ -174,3 +174,7 @@ func (rs *rulesessionImpl) GetAssertedTuple(key model.TupleKey) model.Tuple { func (rs *rulesessionImpl) RegisterRtcTransactionHandler(txnHandler model.RtcTransactionHandler, txnContext interface{}) { rs.reteNetwork.RegisterRtcTransactionHandler(txnHandler, txnContext) } + +func (rs *rulesessionImpl) ReplayTuplesForRule(ruleName string) (err error) { + return rs.reteNetwork.ReplayTuplesForRule(ruleName, rs) +} diff --git a/ruleapi/tests/rules_4_test.go b/ruleapi/tests/rules_4_test.go new file mode 100644 index 0000000..fc1eeb2 --- /dev/null +++ b/ruleapi/tests/rules_4_test.go @@ -0,0 +1,98 @@ +package tests + +import ( + "context" + "testing" + "time" + + "github.com/project-flogo/rules/common/model" + "github.com/project-flogo/rules/ruleapi" +) + +// Add previously removed rule +func Test_Four(t *testing.T) { + rs, _ := createRuleSession() + + // create rule + r1 := ruleapi.NewRule("Rule1") + r1.AddCondition("r1c1", []string{"t1.none", "t2.none"}, trueCondition, nil) + r1.SetAction(r1Action) + // create tuples + t1, _ := model.NewTupleWithKeyValues("t1", "one") // No TTL + t2, _ := model.NewTupleWithKeyValues("t2", "two") // TTL is 0 + + // start rule session + rs.Start(nil) + + assertCtxValues := make(map[string]interface{}) + assertCtxValues["test"] = t + assertCtxValues["actionFired"] = "NOTFIRED" + assertCtx := context.WithValue(context.TODO(), "values", assertCtxValues) + + // Test1: add r1 and assert t1 & t2; rule action SHOULD be fired + addRule(t, rs, r1) + assert(assertCtx, rs, t1) + assert(assertCtx, rs, t2) + actionFired, _ := assertCtxValues["actionFired"].(string) + if actionFired != "FIRED" { + t.Log("rule action SHOULD be fired") + t.FailNow() + } + + // Test2: remove r1 and assert t2; rule action SHOULD NOT be fired + deleteRule(t, rs, r1) + assertCtxValues["actionFired"] = "NOTFIRED" + assert(assertCtx, rs, t2) + actionFired, _ = assertCtxValues["actionFired"].(string) + if actionFired != "NOTFIRED" { + t.Log("rule action SHOULD NOT be fired") + t.FailNow() + } + + // Test3: add r1 again and assert t2; rule action SHOULD be fired + addRule(t, rs, r1) + rs.ReplayTuplesForRule(r1.GetName()) + assertCtxValues["actionFired"] = "NOTFIRED" + assert(assertCtx, rs, t2) + actionFired, _ = assertCtxValues["actionFired"].(string) + if actionFired != "FIRED" { + t.Log("rule action SHOULD be fired") + t.FailNow() + } + + rs.Unregister() +} + +func addRule(t *testing.T, rs model.RuleSession, rule model.Rule) { + err := rs.AddRule(rule) + if err != nil { + t.Logf("[%s] error while adding rule[%s]", time.Now().Format("15:04:05.999999"), rule.GetName()) + return + } + t.Logf("[%s] Rule[%s] added. \n", time.Now().Format("15:04:05.999999"), rule.GetName()) +} + +func deleteRule(t *testing.T, rs model.RuleSession, rule model.Rule) { + rs.DeleteRule(rule.GetName()) + t.Logf("[%s] Rule[%s] deleted. \n", time.Now().Format("15:04:05.999999"), rule.GetName()) +} + +func assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple) { + assertCtxValues := ctx.Value("values").(map[string]interface{}) + t, _ := assertCtxValues["test"].(*testing.T) + err := rs.Assert(ctx, tuple) + if err != nil { + t.Logf("[%s] assert error %s", time.Now().Format("15:04:05.999999"), err) + } +} + +func r1Action(ctx context.Context, rs model.RuleSession, ruleName string, tuples map[model.TupleType]model.Tuple, ruleCtx model.RuleContext) { + assertCtxValues := ctx.Value("values").(map[string]interface{}) + test, _ := assertCtxValues["test"].(*testing.T) + + t := tuples["t1"] + tSrt, _ := t.GetString("id") + test.Logf("[%s] r1Action called with the tuple[%s] \n", time.Now().Format("15:04:05.999999"), tSrt) + + assertCtxValues["actionFired"] = "FIRED" +}