Skip to content

Commit

Permalink
Implement and test event pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Simmerl committed Dec 5, 2016
1 parent 0d0ca0c commit 338a1c1
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 3 deletions.
144 changes: 141 additions & 3 deletions core/pipieline.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
serr "github.com/tapglue/snaas/error"
"github.com/tapglue/snaas/service/app"
"github.com/tapglue/snaas/service/connection"
"github.com/tapglue/snaas/service/event"
"github.com/tapglue/snaas/service/object"
"github.com/tapglue/snaas/service/user"
)
Expand All @@ -19,6 +20,7 @@ const (
queryCondOwnerFriends = "ownerFriends"
queryCondObjectOwner = "objectOwner"
queryCondOwner = "owner"
queryCondParentOwner = "parentOwner"
queryCondUserFrom = "userFrom"
queryCondUserTo = "userTo"
)
Expand All @@ -29,6 +31,13 @@ type contextConnection struct {
To *user.User
}

type contextEvent struct {
Event *event.Event
Owner *user.User
Parent *object.Object
ParentOwner *user.User
}

type contextObject struct {
Object *object.Object
Owner *user.User
Expand All @@ -49,6 +58,19 @@ func (c criteriaConnection) match(s connection.StateChange) bool {
return s.New.MatchOpts(c.New) && s.Old.MatchOpts(c.Old)
}

type criteriaEvent struct {
New *event.QueryOptions
Old *event.QueryOptions
}

func (c criteriaEvent) match(s event.StateChange) bool {
if s.New == nil && s.Old == nil {
return false
}

return s.New.MatchOpts(c.New) && s.Old.MatchOpts(c.Old)
}

type criteriaObject struct {
New *object.QueryOptions
Old *object.QueryOptions
Expand Down Expand Up @@ -85,6 +107,11 @@ type ruleConnection struct {
Recipients recipients
}

type ruleEvent struct {
Criteria criteriaEvent
Recipients recipients
}

type ruleObject struct {
Criteria criteriaObject
Recipients recipients
Expand Down Expand Up @@ -165,12 +192,96 @@ func PipelineConnection(users user.Service) PipelineConnectionFunc {
}
}

// PipelineEventFunc constructs a Pipeline that by applying the provided rules
// outputs messages.
type PipelineEventFunc func(
*app.App,
event.StateChange,
...ruleEvent,
) (messages, error)

// PipelineEvent constructs a Pipeline that by applying the provided rules
// outputs messages.
func PipelineEvent(
objects object.Service,
users user.Service,
) PipelineEventFunc {
return func(
currentApp *app.App,
change event.StateChange,
rules ...ruleEvent,
) (messages, error) {
var (
ms = messages{}
e = change.New

context *contextEvent
owner *user.User
parent *object.Object
parentOwner *user.User
)

owner, err := UserFetch(users)(currentApp, e.UserID)
if err != nil {
return nil, err
}

if e.ObjectID != 0 {
parent, err = objectFetch(objects)(currentApp, e.ObjectID)
if err != nil {
return nil, err
}

parentOwner, err = UserFetch(users)(currentApp, parent.OwnerID)
if err != nil {
return nil, err
}
}

context = &contextEvent{
Event: e,
Owner: owner,
Parent: parent,
ParentOwner: parentOwner,
}

for _, r := range rules {
if !r.Criteria.match(change) {
continue
}

for _, recipient := range r.Recipients {
rs, err := recipientsEvent()(currentApp, context, recipient.Queries)
if err != nil {
return nil, err
}

for _, r := range rs {
urn, err := compileTemplate(context, recipient.URN)
if err != nil {
return nil, err
}

msg, err := compileTemplate(context, recipient.Templates[language.English.String()])
if err != nil {
return nil, err
}

ms = append(ms, &message{Message: msg, recipient: r.ID, urn: urn})
}
}
}

return ms, nil
}
}

// PipelineObjectFunc constructs a Pipeline that by appplying the provided
// rules outputs messages.
type PipelineObjectFunc func(
currentApp *app.App,
change object.StateChange,
rules ...ruleObject,
*app.App,
object.StateChange,
...ruleObject,
) (messages, error)

// PipelineObject constructs a Pipeline that by appplying the provided rules
Expand Down Expand Up @@ -432,3 +543,30 @@ func recipientsConnection() recipientsConnectionFunc {
return us, nil
}
}

type recipientsEventFunc func(
*app.App,
*contextEvent,
queries,
) (user.List, error)

func recipientsEvent() recipientsEventFunc {
return func(
currentApp *app.App,
context *contextEvent,
qs queries,
) (user.List, error) {
us := user.List{}

for _, cond := range qs {
for condType := range cond {
switch condType {
case queryCondParentOwner:
us = append(us, context.ParentOwner)
}
}
}

return us, nil
}
}
86 changes: 86 additions & 0 deletions core/pipieline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/tapglue/snaas/service/app"
"github.com/tapglue/snaas/service/connection"
"github.com/tapglue/snaas/service/event"
"github.com/tapglue/snaas/service/object"
"github.com/tapglue/snaas/service/user"
)
Expand Down Expand Up @@ -192,6 +193,91 @@ func TestPipelineConnectionCondTo(t *testing.T) {
t.Fatal(err)
}

if !reflect.DeepEqual(have, want) {
t.Errorf("have %#v, want %#v", have, want)
}
}

func TestPipelineEventCondParentOwner(t *testing.T) {
var (
currentApp = testApp()
events = event.MemService()
objects = object.MemService()
users = user.MemService()
)

// Creat Post Owner.
postOwner, err := users.Put(currentApp.Namespace(), testUser())
if err != nil {
t.Fatal(err)
}

// Create Post.
post, err := objects.Put(currentApp.Namespace(), testPost(postOwner.ID).Object)
if err != nil {
t.Fatal(err)
}

// Create liker.
liker, err := users.Put(currentApp.Namespace(), testUser())
if err != nil {
t.Fatal(err)
}

// Create like.
like, err := events.Put(currentApp.Namespace(), &event.Event{
Enabled: true,
ObjectID: post.ID,
Owned: true,
Type: TypeLike,
UserID: liker.ID,
})
if err != nil {
t.Fatal(err)
}

var (
enabled = true
ruleEventParentOwner = ruleEvent{
Criteria: criteriaEvent{
New: &event.QueryOptions{
Enabled: &enabled,
Owned: &enabled,
Types: []string{
TypeLike,
},
},
Old: nil,
},
Recipients: recipients{
{
Queries: []map[string]string{
map[string]string{
"parentOwner": "",
},
},
Templates: map[string]string{
"en": "{{.Owner.Username}} liked your post",
},
URN: "tapglue/users/{{.Owner.ID}}",
},
},
}
)

want := messages{
{
Message: fmt.Sprintf("%s liked your post", liker.Username),
recipient: postOwner.ID,
urn: fmt.Sprintf("tapglue/users/%d", liker.ID),
},
}

have, err := PipelineEvent(objects, users)(currentApp, event.StateChange{New: like}, ruleEventParentOwner)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(have, want) {
for i, m := range have {
fmt.Printf("[%d|%s] %s\n", m.recipient, m.urn, m.Message)
Expand Down

0 comments on commit 338a1c1

Please sign in to comment.