Skip to content

Commit

Permalink
Merge pull request #84 from balamg/issue80
Browse files Browse the repository at this point in the history
Issue80
  • Loading branch information
rameshpolishetti authored Dec 5, 2019
2 parents ca6b3af + fb49954 commit 3c06490
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 53 deletions.
8 changes: 4 additions & 4 deletions common/model/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type RuleSession interface {
//RtcTransactionHandler
RegisterRtcTransactionHandler(txnHandler RtcTransactionHandler, handlerCtx interface{})

//replay existing tuples into a rule
ReplayTuplesForRule(ruleName string) (err error)
}

//ConditionEvaluator is a function pointer for handling condition evaluations on the server side
Expand All @@ -92,16 +94,14 @@ type ValueChangeListener interface {

type RtcTxn interface {
//map of type and map of key/tuple
GetRtcAdded () map[string]map[string]Tuple
GetRtcAdded() map[string]map[string]Tuple
GetRtcModified() map[string]map[string]RtcModified
GetRtcDeleted() map[string]map[string]Tuple

}

type RtcModified interface {
GetTuple() Tuple
GetModifiedProps() map[string]bool
}

type RtcTransactionHandler func (ctx context.Context, rs RuleSession, txn RtcTxn, txnContext interface{})

type RtcTransactionHandler func(ctx context.Context, rs RuleSession, txn RtcTxn, txnContext interface{})
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,10 @@ require (
github.com/aws/aws-sdk-go v1.18.3
github.com/gorilla/websocket v1.4.0
github.com/oklog/ulid v1.3.1
github.com/project-flogo/contrib/trigger/kafka v0.9.0
github.com/project-flogo/contrib/trigger/rest v0.9.0
github.com/project-flogo/core v0.9.3
github.com/stretchr/testify v1.3.0
)

go 1.13
12 changes: 10 additions & 2 deletions rete/classnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type classNode interface {
addClassNodeLink(classNodeLink)
removeClassNodeLink(classNodeLink)
getClassNodeLinks() *list.List
assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool)
assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, forRule string)
}

type classNodeImpl struct {
Expand Down Expand Up @@ -74,13 +74,18 @@ func (cn *classNodeImpl) String() string {
return ret
}

func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool) {
func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, forRule string) {
handle := getOrCreateHandle(ctx, tuple)
handles := make([]reteHandle, 1)
handles[0] = handle
propagate := false
for e := cn.getClassNodeLinks().Front(); e != nil; e = e.Next() {
classNodeLinkVar := e.Value.(classNodeLink)
if forRule != "" {
if classNodeLinkVar.getRule().GetName() != forRule {
continue
}
}
if changedProps != nil {
depProps, found := classNodeLinkVar.getRule().GetDeps()[model.TupleType(cn.name)]
if found { // rule depends on this type
Expand All @@ -98,5 +103,8 @@ func (cn *classNodeImpl) assert(ctx context.Context, tuple model.Tuple, changedP
if propagate {
classNodeLinkVar.propagateObjects(ctx, handles)
}
if forRule != "" {
break
}
}
}
111 changes: 66 additions & 45 deletions rete/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/project-flogo/rules/common/model"

"container/list"
"sync"
"time"
)

type RtcOprn int
Expand All @@ -23,7 +23,7 @@ const (

//Network ... the rete network
type Network interface {
AddRule(model.Rule) error
AddRule(rule model.Rule) error
String() string
RemoveRule(string) model.Rule
GetRules() []model.Rule
Expand All @@ -34,7 +34,7 @@ type Network interface {

retractInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn)

assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn)
assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string)
getOrCreateHandle(ctx context.Context, tuple model.Tuple) reteHandle
getHandle(tuple model.Tuple) reteHandle

Expand All @@ -43,6 +43,7 @@ type Network interface {
GetAssertedTupleByStringKey(key string) model.Tuple
//RtcTransactionHandler
RegisterRtcTransactionHandler(txnHandler model.RtcTransactionHandler, txnContext interface{})
ReplayTuplesForRule(ruleName string, rs model.RuleSession) (err error)
}

type reteNetworkImpl struct {
Expand All @@ -63,7 +64,7 @@ type reteNetworkImpl struct {
currentId int

assertLock sync.Mutex
crudLock sync.Mutex
//crudLock sync.Mutex
txnHandler model.RtcTransactionHandler
txnContext interface{}
}
Expand All @@ -84,9 +85,8 @@ func (nw *reteNetworkImpl) initReteNetwork() {
}

func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) {

nw.crudLock.Lock()
defer nw.crudLock.Unlock()
nw.assertLock.Lock()
defer nw.assertLock.Unlock()

if nw.allRules[rule.GetName()] != nil {
return fmt.Errorf("Rule already exists.." + rule.GetName())
Expand Down Expand Up @@ -137,6 +137,22 @@ func (nw *reteNetworkImpl) AddRule(rule model.Rule) (err error) {

//Add NodeLinks
nw.ruleNameClassNodeLinksOfRule[rule.GetName()] = classNodeLinksOfRule

return nil
}

func (nw *reteNetworkImpl) ReplayTuplesForRule(ruleName string, rs model.RuleSession) error {
if rule, exists := nw.allRules[ruleName]; !exists {
return fmt.Errorf("Rule not found [%s]", ruleName)
} else {
for _, h := range nw.allHandles {
tt := h.getTuple().GetTupleType()
if ContainedByFirst(rule.GetIdentifiers(), []model.TupleType{tt}) {
//assert it but only for this rule.
nw.assert(nil, rs, h.getTuple(), nil, ADD, ruleName)
}
}
}
return nil
}

Expand All @@ -146,8 +162,8 @@ func (nw *reteNetworkImpl) setClassNodeAndLinkJoinTables(nodesOfRule *list.List,

func (nw *reteNetworkImpl) RemoveRule(ruleName string) model.Rule {

nw.crudLock.Lock()
defer nw.crudLock.Unlock()
nw.assertLock.Lock()
defer nw.assertLock.Unlock()

rule := nw.allRules[ruleName]
delete(nw.allRules, ruleName)
Expand Down Expand Up @@ -180,7 +196,8 @@ func (nw *reteNetworkImpl) RemoveRule(ruleName string) model.Rule {
}
}
}

rstr := nw.String()
fmt.Printf(rstr)
return rule
}

Expand Down Expand Up @@ -262,7 +279,7 @@ func (nw *reteNetworkImpl) buildNetwork(rule model.Rule, nodesOfRule *list.List,
lastNode = fNode
}
//Yoohoo! We have a Rule!!
ruleNode := newRuleNode(rule)
ruleNode := newRuleNode(nw, rule)
newNodeLink(nw, lastNode, ruleNode, false)
nodesOfRule.PushBack(ruleNode)
} else {
Expand Down Expand Up @@ -528,36 +545,7 @@ func (nw *reteNetworkImpl) printClassNode(ruleName string, classNodeImpl *classN
}

func (nw *reteNetworkImpl) Assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) {

if ctx == nil {
ctx = context.Background()
}

reteCtxVar, isRecursive, newCtx := getOrSetReteCtx(ctx, nw, rs)

if !isRecursive {
nw.crudLock.Lock()
defer nw.crudLock.Unlock()
nw.assertInternal(newCtx, tuple, changedProps, mode)
reteCtxVar.getConflictResolver().resolveConflict(newCtx)
//if Timeout is 0, remove it from rete
td := model.GetTupleDescriptor(tuple.GetTupleType())
if td != nil {
if td.TTLInSeconds == 0 { //remove immediately.
nw.removeTupleFromRete(tuple)
} else if td.TTLInSeconds > 0 { // TTL for the tuple type, after that, remove it from RETE
go time.AfterFunc(time.Second*time.Duration(td.TTLInSeconds), func() {
nw.removeTupleFromRete(tuple)
})
} //else, its -ve and means, never expire
}
if nw.txnHandler != nil {
rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted())
nw.txnHandler(ctx, rs, rtcTxn, nw.txnContext)
}
} else {
reteCtxVar.getOpsList().PushBack(newAssertEntry(tuple, changedProps, mode))
}
nw.assert(ctx, rs, tuple, changedProps, mode, "")
}

func (nw *reteNetworkImpl) removeTupleFromRete(tuple model.Tuple) {
Expand All @@ -575,8 +563,8 @@ func (nw *reteNetworkImpl) Retract(ctx context.Context, tuple model.Tuple, chang
}
reteCtxVar, isRecursive, _ := getOrSetReteCtx(ctx, nw, nil)
if !isRecursive {
nw.crudLock.Lock()
defer nw.crudLock.Unlock()
nw.assertLock.Lock()
defer nw.assertLock.Unlock()
nw.retractInternal(ctx, tuple, changedProps, mode)
if nw.txnHandler != nil && mode == DELETE {
rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted())
Expand Down Expand Up @@ -622,12 +610,12 @@ func (nw *reteNetworkImpl) GetAssertedTupleByStringKey(key string) model.Tuple {
return nil
}

func (nw *reteNetworkImpl) assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn) {
func (nw *reteNetworkImpl) assertInternal(ctx context.Context, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) {
tupleType := tuple.GetTupleType()
listItem := nw.allClassNodes[string(tupleType)]
if listItem != nil {
classNodeVar := listItem.(classNode)
classNodeVar.assert(ctx, tuple, changedProps)
classNodeVar.assert(ctx, tuple, changedProps, forRule)
}
td := model.GetTupleDescriptor(tuple.GetTupleType())
if td != nil {
Expand Down Expand Up @@ -667,3 +655,36 @@ func (nw *reteNetworkImpl) RegisterRtcTransactionHandler(txnHandler model.RtcTra
nw.txnHandler = txnHandler
nw.txnContext = txnContext
}

func (nw *reteNetworkImpl) assert(ctx context.Context, rs model.RuleSession, tuple model.Tuple, changedProps map[string]bool, mode RtcOprn, forRule string) {

if ctx == nil {
ctx = context.Background()
}

reteCtxVar, isRecursive, newCtx := getOrSetReteCtx(ctx, nw, rs)

if !isRecursive {
nw.assertLock.Lock()
defer nw.assertLock.Unlock()
nw.assertInternal(newCtx, tuple, changedProps, mode, forRule)
reteCtxVar.getConflictResolver().resolveConflict(newCtx)
//if Timeout is 0, remove it from rete
td := model.GetTupleDescriptor(tuple.GetTupleType())
if td != nil {
if td.TTLInSeconds == 0 { //remove immediately.
nw.removeTupleFromRete(tuple)
} else if td.TTLInSeconds > 0 { // TTL for the tuple type, after that, remove it from RETE
go time.AfterFunc(time.Second*time.Duration(td.TTLInSeconds), func() {
nw.removeTupleFromRete(tuple)
})
} //else, its -ve and means, never expire
}
if nw.txnHandler != nil {
rtcTxn := newRtcTxn(reteCtxVar.getRtcAdded(), reteCtxVar.getRtcModified(), reteCtxVar.getRtcDeleted())
nw.txnHandler(ctx, rs, rtcTxn, nw.txnContext)
}
} else {
reteCtxVar.getOpsList().PushBack(newAssertEntry(tuple, changedProps, mode))
}
}
2 changes: 1 addition & 1 deletion rete/opsList.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newAssertEntry(tuple model.Tuple, changeProps map[string]bool, mode RtcOprn

func (ai *assertEntryImpl) execute(ctx context.Context) {
reteCtx := getReteCtx(ctx)
reteCtx.getNetwork().assertInternal(ctx, ai.tuple, ai.changeProps, ai.mode)
reteCtx.getNetwork().assertInternal(ctx, ai.tuple, ai.changeProps, ai.mode, "")
}

//Modify Entry
Expand Down
3 changes: 2 additions & 1 deletion rete/rulenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ type ruleNodeImpl struct {
rule model.Rule
}

func newRuleNode(rule model.Rule) ruleNode {
func newRuleNode(nw Network, rule model.Rule) ruleNode {
rn := ruleNodeImpl{}
rn.nodeImpl.initNodeImpl(nw, rule, rule.GetIdentifiers())
rn.identifiers = rule.GetIdentifiers()
rn.rule = rule
return &rn
Expand Down
4 changes: 4 additions & 0 deletions ruleapi/rulesession.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,7 @@ func (rs *rulesessionImpl) GetAssertedTuple(key model.TupleKey) model.Tuple {
func (rs *rulesessionImpl) RegisterRtcTransactionHandler(txnHandler model.RtcTransactionHandler, txnContext interface{}) {
rs.reteNetwork.RegisterRtcTransactionHandler(txnHandler, txnContext)
}

func (rs *rulesessionImpl) ReplayTuplesForRule(ruleName string) (err error) {
return rs.reteNetwork.ReplayTuplesForRule(ruleName, rs)
}
Loading

0 comments on commit 3c06490

Please sign in to comment.