Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readonly cache #57

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cache/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

## Caching

Tuples can be categorized into two different types depending on their nature of durability. The event tuples, due to their transient nature, are to be consumed as it is processed by the rule engine. The data tuples, due to their persistent nature, are expected to be available within the rule network for evaluations and actions across the event tuples entered in the fule network.

The data tuples also are expected to be stored in a persistent storage - i.e. database,distributed cache, files on disk, etc.

For example, a data tuple would be a user's credit score while event tuple is a credit card application. A rule can be writtten to approve a credit card by joining user's credit tuple and credit card application.

### readonly tuple cache

A readonly tuple cache is supported with the following characteristics:
- readonly tuples are available in the cache and to be loaded before a rule session receives an event tuple.
- persistMode attribute value of "ReadOnlyCache" in the tuple descriptor indicates that the tuple is to be loaded from the cache into the rule network.
- TTL of the readonly cache is -1.
- A distributed cache would enable scale rule engine instances as needed from a single tuple data source as shown below.



<p align="center">
<img src ="readonlycache.png" />
</p>

13 changes: 13 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package rulecache

import (
"context"

"github.com/project-flogo/rules/common/model"
"github.com/project-flogo/rules/config"
)

type CacheManager interface {
Init(cfg config.CacheConfig)
LoadTuples(ctx context.Context, td *model.TupleDescriptor, rs model.RuleSession) error
}
Binary file added cache/readonlycache.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
69 changes: 69 additions & 0 deletions cache/rulecache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package rulecache

import (
"context"

"github.com/go-redis/redis"
"github.com/project-flogo/rules/common/model"
"github.com/project-flogo/rules/config"
)

type RedisCacheManager struct {
client *redis.Client
cfg config.CacheConfig
}

//LoadTuples loads tuples of a given tuple type from redis cache into a rulesession
func (rcm *RedisCacheManager) LoadTuples(ctx context.Context, td *model.TupleDescriptor, rs model.RuleSession) error {

keys, err := rcm.client.SMembers(td.Name).Result()
if err != nil {
return err
}

for _, key := range keys {

row, err := rcm.client.HGetAll(key).Result()

values := make(map[string]interface{})

for k, v := range row {
values[k] = v
}

if err != nil {
return err
}
//convert values[string]string to values[string]interface{}
t, err := model.NewTuple(model.TupleType(td.Name), values)

if err != nil {
return err
}

err = rs.Assert(ctx, t)
if err != nil {
return err
}
}

return nil
}

func (rcm *RedisCacheManager) Init(cfg config.CacheConfig) {
rcm.cfg.Address = cfg.Address
rcm.cfg.DB = cfg.DB
rcm.cfg.Name = cfg.Name
rcm.cfg.Password = cfg.Password
rcm.cfg.ServerType = cfg.ServerType

rcm.client = redis.NewClient(&redis.Options{
Addr: rcm.cfg.Address,
Password: rcm.cfg.Password,
DB: rcm.cfg.DB,
})
}

func (rcm *RedisCacheManager) GetRedisClient() *redis.Client {
return rcm.client
}
166 changes: 166 additions & 0 deletions cache/tests/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package tests

/* solar check events
if the monthly bill is greater than 200 and the house doesn't have a solar panels installed,
the house with the matching parcel id should be a candidate for solar panel installation promotion.

For now, house tuples are to be pre-loaded from data.txt to cache before running go test by
cat data.txt | redis-cli --pipe
The command above loads tuples with a tuple key as a hash key. Then updates the index named after
tuple type with the hash key.

<house data tuples>
house:parcel:0001 parcel 0001 is_solar true
house:parcel:0002 parcel 0002 is_solar false

solar events are asserted everytime a monthly electiricity bill is generated.

<solar event tuples>
solar:parcel:0001 parcel 0001 bill 300
solar:parcel:0002 parcel 0002 bill 250

*/

import (
"context"
"fmt"
"testing"

rulecache "github.com/project-flogo/rules/cache"

"github.com/project-flogo/rules/common/model"
"github.com/project-flogo/rules/config"

"github.com/project-flogo/rules/ruleapi"
)

func Test_Cache_1(t *testing.T) {

rs, _ := createRuleSession()

rule := ruleapi.NewRule("Cache_Test")
err := rule.AddCondition("TC_1", []string{"house", "solar"}, checkSolarEligibleCondition, nil)
if err != nil {
t.Logf("%s", err)
t.FailNow()
}
ruleActionCtx := make(map[string]string)
rule.SetContext(ruleActionCtx)
rule.SetAction(solarEligibleAction)
rule.SetPriority(1)
err = rs.AddRule(rule)
if err != nil {
t.Logf("%s", err)
t.FailNow()
}
t.Logf("Rule added: [%s]\n", rule.GetName())

err = rs.Start(nil)
if err != nil {
t.Logf("%s", err)
t.FailNow()
}

//PreCase: Assert all house tuples from Cache. ToDo: Check if this can be done before rulesession creation.
{
var rcm rulecache.CacheManager = &rulecache.RedisCacheManager{}
cacheConfig := config.CacheConfig{"rediscache", "redis", "localhost:6379", "", 0}
rcm.Init(cacheConfig)

//Load tuples from cache
tds := model.GetAllTupleDescriptors()
for _, td := range tds {
if model.OMModeMap[td.PersistMode] == model.ReadOnlyCache {
err = rcm.LoadTuples(context.TODO(), &td, rs)
if err != nil {
t.Logf("%s", err)
t.FailNow()
}
}
}
}

// Case1: Assert an ineligible solar. It should not fire solarEligibleAction.
{
ctx := context.WithValue(context.TODO(), "key", t)
values := make(map[string]interface{})
values["parcel"] = "0001"
values["bill"] = 300
tuple, _ := model.NewTuple("solar", values)
err := rs.Assert(ctx, tuple)
if err != nil {
t.Logf("%s", err)
t.FailNow()
}
}

// Case2: Assert an eligible solar. solarEligibleAction should be fired.
{
ctx := context.WithValue(context.TODO(), "key", t)
values := make(map[string]interface{})
values["parcel"] = "0002"
values["bill"] = 250
tuple, _ := model.NewTuple("solar", values)
err := rs.Assert(ctx, tuple)
if err != nil {
t.Logf("%s", err)
t.FailNow()
}
}

// Case3: Assert the ineligible solar again. It should not fire solarEligibleAction.
{
ctx := context.WithValue(context.TODO(), "key", t)
values := make(map[string]interface{})
values["parcel"] = "0001"
values["bill"] = 300
tuple, _ := model.NewTuple("solar", values)
err := rs.Assert(ctx, tuple)
if err != nil {
t.Logf("%s", err)
t.FailNow()
}
}

// Case2: Assert an eligible solar. solarEligibleAction should be fired.
{
ctx := context.WithValue(context.TODO(), "key", t)
values := make(map[string]interface{})
values["parcel"] = "0002"
values["bill"] = 250
tuple, _ := model.NewTuple("solar", values)
err := rs.Assert(ctx, tuple)
if err != nil {
t.Logf("%s", err)
t.FailNow()
}
}

rs.Unregister()

}

func solarEligibleAction(ctx context.Context, rs model.RuleSession, ruleName string, tuples map[model.TupleType]model.Tuple, ruleCtx model.RuleContext) {
t := ctx.Value("key").(*testing.T)
tHouse := tuples["house"]
tSolar := tuples["solar"]
t.Logf("Eligible for a solar promotion! [%s], [%s]\n", tHouse.GetKey().String(), tSolar.GetKey().String())
}

func checkSolarEligibleCondition(ruleName string, condName string, tuples map[model.TupleType]model.Tuple, ctx model.RuleContext) bool {
tHouse := tuples["house"]
tSolar := tuples["solar"]
if tHouse == nil || tSolar == nil {
fmt.Println("Should not get nil tuples here in JoinCondition! This is an error")
return false
}
parcelHouse, _ := tHouse.GetString("parcel")
parcelSolar, _ := tSolar.GetString("parcel")

isSolarHouse, _ := tHouse.GetBool("is_solar")
billSolar, _ := tSolar.GetDouble("bill")

return (parcelHouse == parcelSolar) &&
(isSolarHouse == false) &&
(billSolar > 200)
}
62 changes: 62 additions & 0 deletions cache/tests/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package tests

import (
"context"
"io/ioutil"
"log"
"testing"

"github.com/project-flogo/rules/common"
"github.com/project-flogo/rules/common/model"
"github.com/project-flogo/rules/ruleapi"
)

func createRuleSession() (model.RuleSession, error) {
rs, _ := ruleapi.GetOrCreateRuleSession("test")

tupleDescFileAbsPath := common.GetAbsPathForResource("src/github.com/project-flogo/rules/cache/tests/tests.json")

dat, err := ioutil.ReadFile(tupleDescFileAbsPath)
if err != nil {
log.Fatal(err)
}
err = model.RegisterTupleDescriptors(string(dat))
if err != nil {
return nil, err
}
return rs, nil
}

//conditions and actions
func trueCondition(ruleName string, condName string, tuples map[model.TupleType]model.Tuple, ctx model.RuleContext) bool {
return true
}
func falseCondition(ruleName string, condName string, tuples map[model.TupleType]model.Tuple, ctx model.RuleContext) bool {
return false
}
func emptyAction(ctx context.Context, rs model.RuleSession, ruleName string, tuples map[model.TupleType]model.Tuple, ruleCtx model.RuleContext) {
}

func printTuples(t *testing.T, oprn string, tupleMap map[string]map[string]model.Tuple) {

for k, v := range tupleMap {
t.Logf("%s tuples for type [%s]\n", oprn, k)
for k1, _ := range v {
t.Logf(" tuples key [%s]\n", k1)
}
}
}
func printModified(t *testing.T, modified map[string]map[string]model.RtcModified) {

for k, v := range modified {
t.Logf("%s tuples for type [%s]\n", "Modified", k)
for k1, _ := range v {
t.Logf(" tuples key [%s]\n", k1)
}
}
}

type txnCtx struct {
Testing *testing.T
TxnCnt int
}
4 changes: 4 additions & 0 deletions cache/tests/data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
HMSET house:parcel:0001 parcel 0001 is_solar true
SADD house house:parcel:0001
HMSET house:parcel:0002 parcel 0002 is_solar false
SADD house house:parcel:0002
33 changes: 33 additions & 0 deletions cache/tests/tests.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[
{
"name":"house",
"ttl" : -1,
"properties":[
{
"name":"parcel",
"type":"string",
"pk-index":0
},
{
"name":"is_solar",
"type":"bool"
}
],
"persistMode": "ReadOnlyCache"
},
{
"name":"solar",
"ttl" : 0,
"properties":[
{
"name":"parcel",
"type":"string",
"pk-index":0
},
{
"name":"bill",
"type":"double"
}
]
}
]
Loading