Skip to content

Commit

Permalink
Add reaction pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Simmerl committed Jan 31, 2017
1 parent b8cfc72 commit aae776d
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 0 deletions.
120 changes: 120 additions & 0 deletions core/pipieline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"text/template"
"time"

"github.com/tapglue/snaas/service/reaction"

"golang.org/x/text/language"

serr "github.com/tapglue/snaas/error"
Expand Down Expand Up @@ -295,6 +297,90 @@ func PipelineObject(
}
}

// PipelineReactionFunc constructs a Pipeline that by applying the provided
// rules outputs Messages.
type PipelineReactionFunc func(
*app.App,
*reaction.StateChange,
...*rule.Rule,
) (Messages, error)

// PipelineReaction constructs a Pipeline that by applying the provided rules
// outputs Messages.
func PipelineReaction(
objects object.Service,
users user.Service,
) PipelineReactionFunc {
return func(
currentApp *app.App,
change *reaction.StateChange,
rules ...*rule.Rule,
) (Messages, error) {
var (
ms = Messages{}
r = change.New

context *contextReaction
owner *user.User
parent *object.Object
parentOwner *user.User
)

owner, err := UserFetch(users)(currentApp, r.OwnerID)
if err != nil {
return nil, err
}

if r.ObjectID != 0 {
parent, err = objectFetch(objects)(currentApp, r.ObjectID)
if err != nil {
return nil, err
}

parentOwner, err = UserFetch(users)(currentApp, parent.OwnerID)
if err != nil {
return nil, err
}
}

context = &contextReaction{
Owner: owner,
Parent: parent,
ParentOwner: parentOwner,
Reaction: r,
}

for _, r := range rules {
if !r.Criteria.Match(change) {
continue
}

for _, recipient := range r.Recipients {
rs, err := recipientsReaction()(currentApp, context, recipient.Query)
if err != nil {
return nil, err
}

for _, r := range rs {
urn, err := compileTemplate(context, recipient.URN)
if err != nil {
return nil, err
}

msg, err := compileTemplate(context, recipient.Templates[language.English.String()])
if err != nil {
return nil, err
}

ms = append(ms, &Message{Message: msg, Recipient: r.ID, URN: urn})
}
}
}

return ms, nil
}
}

type contextConnection struct {
Conenction *connection.Connection
From *user.User
Expand All @@ -316,6 +402,13 @@ type contextObject struct {
ParentOwner *user.User
}

type contextReaction struct {
Owner *user.User
Parent *object.Object
ParentOwner *user.User
Reaction *reaction.Reaction
}

func compileTemplate(context interface{}, t string) (string, error) {
tmpl, err := template.New("message").Parse(t)
if err != nil {
Expand Down Expand Up @@ -518,3 +611,30 @@ func recipientsObject(
return us, nil
}
}

type recipientsReactionFunc func(
*app.App,
*contextReaction,
rule.Query,
) (user.List, error)

func recipientsReaction() recipientsReactionFunc {
return func(
currentApp *app.App,
context *contextReaction,
q rule.Query,
) (user.List, error) {
us := user.List{}

for condType := range q {
switch condType {
case queryCondParentOwner:
if context.Owner.ID != context.ParentOwner.ID {
us = append(us, context.ParentOwner)
}
}
}

return us, nil
}
}
87 changes: 87 additions & 0 deletions core/pipieline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"reflect"
"testing"

"github.com/tapglue/snaas/service/reaction"

"github.com/tapglue/snaas/service/app"
"github.com/tapglue/snaas/service/connection"
"github.com/tapglue/snaas/service/event"
Expand Down Expand Up @@ -190,6 +192,91 @@ func TestPipelineConnectionCondTo(t *testing.T) {
}
}

func TestPipelineReactionCondParentOwner(t *testing.T) {
var (
currentApp = testApp()
objects = object.MemService()
reactions = reaction.MemService()
users = user.MemService()
)

// Creat Post Owner.
postOwner, err := users.Put(currentApp.Namespace(), testUser())
if err != nil {
t.Fatal(err)
}

// Create Post.
post, err := objects.Put(currentApp.Namespace(), testPost(postOwner.ID).Object)
if err != nil {
t.Fatal(err)
}

// Create liker.
liker, err := users.Put(currentApp.Namespace(), testUser())
if err != nil {
t.Fatal(err)
}

// Create like.
like, err := reactions.Put(currentApp.Namespace(), &reaction.Reaction{
ObjectID: post.ID,
OwnerID: liker.ID,
Type: reaction.TypeLike,
})
if err != nil {
t.Fatal(err)
}

var (
deleted = false
ruleReactionParentOwner = &rule.Rule{
Criteria: &rule.CriteriaReaction{
New: &reaction.QueryOptions{
Deleted: &deleted,
Types: []reaction.Type{
reaction.TypeLike,
},
},
Old: nil,
},
Recipients: rule.Recipients{
{
Query: map[string]string{
"parentOwner": "",
},
Templates: map[string]string{
"en": "{{.Owner.Username}} liked your post",
},
URN: "tapglue/users/{{.Owner.ID}}",
},
},
}
)

want := Messages{
{
Message: fmt.Sprintf("%s liked your post", liker.Username),
Recipient: postOwner.ID,
URN: fmt.Sprintf("tapglue/users/%d", liker.ID),
},
}

have, err := PipelineReaction(objects, users)(currentApp, &reaction.StateChange{New: like}, ruleReactionParentOwner)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(have, want) {
for i, m := range have {
fmt.Printf("[%d|%s] %v\n", m.Recipient, m.URN, m.Message[i])
fmt.Printf("[%d|%s] %v\n\n", want[i].Recipient, want[i].URN, want[i].Message)
}

t.Errorf("have %#v, want %#v", have, want)
}
}

func TestPipelineEventCondParentOwner(t *testing.T) {
var (
currentApp = testApp()
Expand Down
30 changes: 30 additions & 0 deletions service/reaction/reaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
TypeAngry
)

// Consumer observes state changes.
type Consumer interface {
Consume() (*StateChange, error)
}
Expand Down Expand Up @@ -53,6 +54,7 @@ func (rs List) OwnerIDs() []uint64 {
// Map is a Reaction collection with their id as index.
type Map map[uint64]*Reaction

// ToList returns a list collection.
func (m Map) ToList() List {
rs := List{}

Expand Down Expand Up @@ -92,6 +94,34 @@ type Reaction struct {
UpdatedAt time.Time
}

// MatchOpts indicates if the Reaction matches the given QueryOptions.
func (r *Reaction) MatchOpts(opts *QueryOptions) bool {
if opts == nil {
return true
}

if opts.Deleted != nil && r.Deleted != *opts.Deleted {
return false
}

if len(opts.Types) > 0 {
discard := true

for _, t := range opts.Types {
if r.Type == t {
discard = false
break
}
}

if discard {
return false
}
}

return true
}

// Validate checks for semantic correctness.
func (r *Reaction) Validate() error {
if r.ObjectID == 0 {
Expand Down
24 changes: 24 additions & 0 deletions service/rule/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

"github.com/tapglue/snaas/service/reaction"

"github.com/tapglue/snaas/platform/service"
"github.com/tapglue/snaas/platform/sns"
"github.com/tapglue/snaas/service/connection"
Expand Down Expand Up @@ -40,6 +42,28 @@ func (c *CriteriaConnection) Match(i interface{}) bool {
return s.New.MatchOpts(c.New) && s.Old.MatchOpts(c.Old)
}

type CriteriaReaction struct {
New *reaction.QueryOptions `json:"new"`
Old *reaction.QueryOptions `json:"old"`
}

func (c *CriteriaReaction) Match(i interface{}) bool {
s, ok := i.(*reaction.StateChange)
if !ok {
return false
}

if s.New == nil && s.Old == nil {
return false
}

if s.Old == nil {
return s.New.MatchOpts(c.New)
}

return s.New.MatchOpts(c.New) && s.Old.MatchOpts(c.Old)
}

type CriteriaEvent struct {
New *event.QueryOptions `json:"new"`
Old *event.QueryOptions `json:"old"`
Expand Down

0 comments on commit aae776d

Please sign in to comment.