From 9e323fda8ba592f991758153304252f25b9fa2f0 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Tue, 6 Dec 2016 18:48:32 +0100 Subject: [PATCH] Integrate pipelines in sims --- TODO | 18 +- cmd/sims/channel.go | 8 +- cmd/sims/consume.go | 98 +++++----- cmd/sims/rules.go | 348 ---------------------------------- cmd/sims/sims.go | 42 ++-- core/pipieline.go | 105 +++++----- core/pipieline_test.go | 69 ++++--- core/rules.go | 22 +++ service/platform/postgres.go | 45 ++--- service/rule/postgres.go | 41 ++-- service/rule/postgres_test.go | 21 +- service/rule/rule.go | 6 +- 12 files changed, 240 insertions(+), 583 deletions(-) delete mode 100644 cmd/sims/rules.go create mode 100644 core/rules.go diff --git a/TODO b/TODO index f560afd..470c808 100644 --- a/TODO +++ b/TODO @@ -12,9 +12,25 @@ infrastructure: ✔ generate pg password during setup @done (16-11-11 11:10) ☐ fix pg monitoring role ☐ setup grafana and prometheus on monitoring host + ☐ setup monitoring dashboard definition + ☐ vpc pairing template + ☐ pganalyze template + ☐ handle dependencies with gvt + ☐ create remote-exec provisioner for DB bootstrap + ☐ make binaries available with Github release + ☐ setup alertmanager and pagerduty + ☐ setup cloudwatch logging + +terraformer: + ☐ verify account + ☐ ask for cidr_block + ☐ user MFA (if possible) + ☐ sync vars and key code: ✔ move event condition code @done (16-11-11 11:10) ✔ add missing event indeces @done (16-11-12 12:36) ✔ enable TLS and load certs @done (16-12-02 15:41) - ☐ Implement instrumentation and log middleware for platform.Service \ No newline at end of file + ☐ Implement instrumentation and log middleware for platform.Service + ☐ Complete postgres rule.Service + ☐ Implement instrumentation and log middleware for rule.Service diff --git a/cmd/sims/channel.go b/cmd/sims/channel.go index ad41a97..70f5864 100644 --- a/cmd/sims/channel.go +++ b/cmd/sims/channel.go @@ -7,7 +7,7 @@ import ( "github.com/tapglue/snaas/service/app" ) -type channelFunc func(*app.App, *message) error +type channelFunc func(*app.App, *core.Message) error func channelPush( deviceListUser core.DeviceListUserFunc, @@ -15,8 +15,8 @@ func channelPush( fetchActive core.PlatformFetchActiveFunc, push sns.PushFunc, ) channelFunc { - return func(currentApp *app.App, msg *message) error { - ds, err := deviceListUser(currentApp, msg.recipient) + return func(currentApp *app.App, msg *core.Message) error { + ds, err := deviceListUser(currentApp, msg.Recipient) if err != nil { return err } @@ -38,7 +38,7 @@ func channelPush( return err } - err = push(d.Platform, d.EndpointARN, p.Scheme, msg.urn, msg.message) + err = push(d.Platform, d.EndpointARN, p.Scheme, msg.URN, msg.Message) if err != nil { if sns.IsDeliveryFailure(err) { return nil diff --git a/cmd/sims/consume.go b/cmd/sims/consume.go index 95f474d..9255126 100644 --- a/cmd/sims/consume.go +++ b/cmd/sims/consume.go @@ -14,6 +14,7 @@ import ( "github.com/tapglue/snaas/service/connection" "github.com/tapglue/snaas/service/event" "github.com/tapglue/snaas/service/object" + "github.com/tapglue/snaas/service/rule" ) type ackFunc func() error @@ -21,25 +22,18 @@ type ackFunc func() error type batch struct { ackFunc ackFunc app *app.App - messages messages + messages core.Messages } -type message struct { - message string - recipient uint64 - urn string -} - -type messages []*message - func consumeConnection( appFetch core.AppFetchFunc, conSource connection.Source, batchc chan<- batch, - ruleFns ...conRuleFunc, + pipeline core.PipelineConnectionFunc, + rules core.RuleListActiveFunc, ) error { for { - c, err := conSource.Consume() + change, err := conSource.Consume() if err != nil { if connection.IsEmptySource(err) { continue @@ -47,26 +41,23 @@ func consumeConnection( return err } - currentApp, err := appForNamespace(appFetch, c.Namespace) + currentApp, err := appForNamespace(appFetch, change.Namespace) if err != nil { return err } - ms := messages{} - - for _, rule := range ruleFns { - msgs, err := rule(currentApp, c) - if err != nil { - return err - } + rs, err := rules(currentApp, rule.TypeConnection) + if err != nil { + return err + } - for _, msg := range msgs { - ms = append(ms, msg) - } + ms, err := pipeline(currentApp, change, rs...) + if err != nil { + return err } if len(ms) == 0 { - err := conSource.Ack(c.AckID) + err := conSource.Ack(change.AckID) if err != nil { return err } @@ -74,7 +65,7 @@ func consumeConnection( continue } - batchc <- batchMessages(currentApp, conSource, c.AckID, ms) + batchc <- batchMessages(currentApp, conSource, change.AckID, ms) } } @@ -131,14 +122,16 @@ func consumeEndpointChange( } } } + func consumeEvent( appFetch core.AppFetchFunc, eventSource event.Source, batchc chan<- batch, - ruleFns ...eventRuleFunc, + pipeline core.PipelineEventFunc, + rules core.RuleListActiveFunc, ) error { for { - c, err := eventSource.Consume() + change, err := eventSource.Consume() if err != nil { if event.IsEmptySource(err) { continue @@ -146,26 +139,23 @@ func consumeEvent( return err } - currentApp, err := appForNamespace(appFetch, c.Namespace) + currentApp, err := appForNamespace(appFetch, change.Namespace) if err != nil { return err } - ms := messages{} - - for _, rule := range ruleFns { - rs, err := rule(currentApp, c) - if err != nil { - return err - } + rs, err := rules(currentApp, rule.TypeEvent) + if err != nil { + return err + } - for _, msg := range rs { - ms = append(ms, msg) - } + ms, err := pipeline(currentApp, change, rs...) + if err != nil { + return err } if len(ms) == 0 { - err = eventSource.Ack(c.AckID) + err = eventSource.Ack(change.AckID) if err != nil { return err } @@ -173,7 +163,7 @@ func consumeEvent( continue } - batchc <- batchMessages(currentApp, eventSource, c.AckID, ms) + batchc <- batchMessages(currentApp, eventSource, change.AckID, ms) } } @@ -181,10 +171,11 @@ func consumeObject( appFetch core.AppFetchFunc, objectSource object.Source, batchc chan<- batch, - ruleFns ...objectRuleFunc, + pipeline core.PipelineObjectFunc, + rules core.RuleListActiveFunc, ) error { for { - c, err := objectSource.Consume() + change, err := objectSource.Consume() if err != nil { if object.IsEmptySource(err) { continue @@ -192,26 +183,23 @@ func consumeObject( return err } - currentApp, err := appForNamespace(appFetch, c.Namespace) + currentApp, err := appForNamespace(appFetch, change.Namespace) if err != nil { return err } - ms := messages{} - - for _, rule := range ruleFns { - rs, err := rule(currentApp, c) - if err != nil { - return err - } + rs, err := rules(currentApp, rule.TypeObject) + if err != nil { + return err + } - for _, msg := range rs { - ms = append(ms, msg) - } + ms, err := pipeline(currentApp, change, rs...) + if err != nil { + return err } if len(ms) == 0 { - err := objectSource.Ack(c.AckID) + err := objectSource.Ack(change.AckID) if err != nil { return err } @@ -219,7 +207,7 @@ func consumeObject( continue } - batchc <- batchMessages(currentApp, objectSource, c.AckID, ms) + batchc <- batchMessages(currentApp, objectSource, change.AckID, ms) } } @@ -227,7 +215,7 @@ func batchMessages( currentApp *app.App, acker source.Acker, ackID string, - ms messages, + ms core.Messages, ) batch { return batch{ ackFunc: func(acked bool, ackID string) ackFunc { diff --git a/cmd/sims/rules.go b/cmd/sims/rules.go deleted file mode 100644 index 0eeecb0..0000000 --- a/cmd/sims/rules.go +++ /dev/null @@ -1,348 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/tapglue/snaas/core" - "github.com/tapglue/snaas/service/app" - "github.com/tapglue/snaas/service/connection" - "github.com/tapglue/snaas/service/event" - "github.com/tapglue/snaas/service/object" - "github.com/tapglue/snaas/service/user" -) - -// Messaging. -const ( - fmtCommentPost = `%s commented on a Post.` - fmtCommentPostOwn = `%s commented on your Post.` - fmtFollow = `%s started following you` - fmtFriendConfirmed = `%s accepted your friend request.` - fmtFriendRequest = `%s sent you a friend request.` - fmtLikePost = `%s liked a Post.` - fmtLikePostOwn = `%s liked your Post.` - fmtPostCreated = `%s created a new Post.` -) - -// Deep-linking URNs. -const ( - urnComment = `tapglue/posts/%d/comments/%d` - urnPost = `tapglue/posts/%d` - urnUser = `tapglue/users/%d` -) - -type conRuleFunc func(*app.App, *connection.StateChange) (messages, error) -type eventRuleFunc func(*app.App, *event.StateChange) (messages, error) -type objectRuleFunc func(*app.App, *object.StateChange) (messages, error) - -func conRuleFollower(userFetch core.UserFetchFunc) conRuleFunc { - return func( - currentApp *app.App, - change *connection.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.State != connection.StateConfirmed || - change.New.Type != connection.TypeFollow { - return nil, nil - } - - origin, err := userFetch(currentApp, change.New.FromID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - target, err := userFetch(currentApp, change.New.ToID) - if err != nil { - return nil, fmt.Errorf("target fetch: %s", err) - } - - return messages{ - { - message: fmtToMessage(fmtFollow, origin), - recipient: target.ID, - urn: fmt.Sprintf(urnUser, origin.ID), - }, - }, nil - } -} - -func conRuleFriendConfirmed(userFetch core.UserFetchFunc) conRuleFunc { - return func( - currentApp *app.App, - change *connection.StateChange, - ) (messages, error) { - if change.Old == nil || - change.Old.Type != connection.TypeFriend || - change.New.State != connection.StatePending || - change.New.Type != connection.TypeFriend { - return nil, nil - } - - origin, err := userFetch(currentApp, change.New.FromID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - target, err := userFetch(currentApp, change.New.ToID) - if err != nil { - return nil, fmt.Errorf("target fetch: %s", err) - } - - return messages{ - { - message: fmtToMessage(fmtFriendConfirmed, target), - recipient: origin.ID, - urn: fmt.Sprintf(urnUser, origin.ID), - }, - }, nil - } -} - -func conRuleFriendRequest(userFetch core.UserFetchFunc) conRuleFunc { - return func( - currentApp *app.App, - change *connection.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.State != connection.StatePending || - change.New.Type != connection.TypeFriend { - return nil, nil - } - - origin, err := userFetch(currentApp, change.New.FromID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - target, err := userFetch(currentApp, change.New.ToID) - if err != nil { - return nil, fmt.Errorf("target fetch: %s", err) - } - - return messages{ - { - message: fmtToMessage(fmtFriendRequest, origin), - recipient: target.ID, - urn: fmt.Sprintf(urnUser, origin.ID), - }, - }, nil - } -} - -func eventRuleLikeCreated( - followerIDs core.ConnectionFollowerIDsFunc, - friendIDs core.ConnectionFriendIDsFunc, - postFetch core.PostFetchFunc, - userFetch core.UserFetchFunc, - usersFetch core.UsersFetchFunc, -) eventRuleFunc { - return func( - currentApp *app.App, - change *event.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.Enabled == false || - !core.IsLike(change.New) { - return nil, nil - } - - post, err := postFetch(currentApp, change.New.ObjectID) - if err != nil { - return nil, fmt.Errorf("post fetch: %s", err) - } - - origin, err := userFetch(currentApp, change.New.UserID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - owner, err := userFetch(currentApp, post.OwnerID) - if err != nil { - return nil, fmt.Errorf("owner fetch: %s", err) - } - - followIDs, err := followerIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - fIDs, err := friendIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - ids := filterIDs(append(followIDs, fIDs...), owner.ID) - - rs, err := usersFetch(currentApp, ids...) - if err != nil { - return nil, err - } - - rs = append(rs, owner) - ms := messages{} - - for _, recipient := range rs { - f := fmtLikePost - - if post.OwnerID == recipient.ID { - f = fmtLikePostOwn - } - - ms = append(ms, &message{ - message: fmtToMessage(f, origin), - recipient: recipient.ID, - urn: fmt.Sprintf(urnPost, post.ID), - }) - } - - return ms, nil - } -} - -func objectRuleCommentCreated( - followerIDs core.ConnectionFollowerIDsFunc, - friendIDs core.ConnectionFriendIDsFunc, - postFetch core.PostFetchFunc, - userFetch core.UserFetchFunc, - usersFetch core.UsersFetchFunc, -) objectRuleFunc { - return func( - currentApp *app.App, - change *object.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.Deleted == true || - !core.IsComment(change.New) { - return nil, nil - } - - post, err := postFetch(currentApp, change.New.ObjectID) - if err != nil { - return nil, fmt.Errorf("post fetch: %s", err) - } - - origin, err := userFetch(currentApp, change.New.OwnerID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - owner, err := userFetch(currentApp, post.OwnerID) - if err != nil { - return nil, fmt.Errorf("owner fetch: %s", err) - } - - followIDs, err := followerIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - fIDs, err := friendIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - ids := filterIDs(append(followIDs, fIDs...), owner.ID) - - rs, err := usersFetch(currentApp, ids...) - if err != nil { - return nil, err - } - - rs = append(rs, owner) - ms := messages{} - - for _, recipient := range rs { - f := fmtCommentPost - - if post.OwnerID == recipient.ID { - f = fmtCommentPostOwn - } - - ms = append(ms, &message{ - message: fmtToMessage(f, origin), - recipient: recipient.ID, - urn: fmt.Sprintf(urnComment, post.ID, change.New.ID), - }) - } - - return ms, nil - } -} - -func objectRulePostCreated( - followerIDs core.ConnectionFollowerIDsFunc, - friendIDs core.ConnectionFriendIDsFunc, - userFetch core.UserFetchFunc, - usersFetch core.UsersFetchFunc, -) objectRuleFunc { - return func( - currentApp *app.App, - change *object.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.Deleted == true || - !core.IsPost(change.New) { - return nil, nil - } - - origin, err := userFetch(currentApp, change.New.OwnerID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - followIDs, err := followerIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - fIDs, err := friendIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - rs, err := usersFetch(currentApp, append(followIDs, fIDs...)...) - if err != nil { - return nil, err - } - - ms := messages{} - - for _, recipient := range rs { - ms = append(ms, &message{ - message: fmtToMessage(fmtPostCreated, origin), - recipient: recipient.ID, - urn: fmt.Sprintf(urnPost, change.New.ID), - }) - } - - return ms, nil - } -} - -func filterIDs(ids []uint64, fs ...uint64) []uint64 { - var ( - is = []uint64{} - seen = map[uint64]struct{}{} - ) - - for _, id := range fs { - seen[id] = struct{}{} - } - - for _, id := range ids { - if _, ok := seen[id]; ok { - continue - } - - is = append(is, id) - } - - return is -} - -func fmtToMessage(f string, u *user.User) string { - if u.Firstname != "" { - return fmt.Sprintf(f, u.Firstname) - } - - return fmt.Sprintf(f, u.Username) -} diff --git a/cmd/sims/sims.go b/cmd/sims/sims.go index 6f0dbe4..2412056 100644 --- a/cmd/sims/sims.go +++ b/cmd/sims/sims.go @@ -6,6 +6,8 @@ import ( "os" "time" + "github.com/tapglue/snaas/service/rule" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" awsSession "github.com/aws/aws-sdk-go/aws/session" @@ -194,6 +196,11 @@ func main() { // TODO: Implement instrumentaiton middleware. // TODO: Implement logging middleware. + var rules rule.Service + rules = rule.PostgresService(pgClient) + // TODO: Implement instrumentaiton middleware. + // TODO: Implement logging middleware. + var users user.Service users = user.PostgresService(pgClient) users = user.InstrumentMiddleware( @@ -312,15 +319,8 @@ func main() { core.AppFetch(apps), conSource, batchc, - conRuleFollower( - core.UserFetch(users), - ), - conRuleFriendConfirmed( - core.UserFetch(users), - ), - conRuleFriendRequest( - core.UserFetch(users), - ), + core.PipelineConnection(users), + core.RuleListActive(rules), ) if err != nil { logger.Log("err", err, "lifecycle", "abort") @@ -333,13 +333,8 @@ func main() { core.AppFetch(apps), eventSource, batchc, - eventRuleLikeCreated( - core.ConnectionFollowerIDs(connections), - core.ConnectionFriendIDs(connections), - core.PostFetch(objects), - core.UserFetch(users), - core.UsersFetch(users), - ), + core.PipelineEvent(objects, users), + core.RuleListActive(rules), ) if err != nil { logger.Log("err", err, "lifecycle", "abort") @@ -352,19 +347,8 @@ func main() { core.AppFetch(apps), objectSource, batchc, - objectRuleCommentCreated( - core.ConnectionFollowerIDs(connections), - core.ConnectionFriendIDs(connections), - core.PostFetch(objects), - core.UserFetch(users), - core.UsersFetch(users), - ), - objectRulePostCreated( - core.ConnectionFollowerIDs(connections), - core.ConnectionFriendIDs(connections), - core.UserFetch(users), - core.UsersFetch(users), - ), + core.PipelineObject(connections, objects, users), + core.RuleListActive(rules), ) if err != nil { logger.Log("err", err, "lifecycle", "abort") diff --git a/core/pipieline.go b/core/pipieline.go index 19c649d..b06bf07 100644 --- a/core/pipieline.go +++ b/core/pipieline.go @@ -26,52 +26,35 @@ const ( queryCondUserTo = "userTo" ) -type contextConnection struct { - Conenction *connection.Connection - From *user.User - To *user.User -} - -type contextEvent struct { - Event *event.Event - Owner *user.User - Parent *object.Object - ParentOwner *user.User -} - -type contextObject struct { - Object *object.Object - Owner *user.User - Parent *object.Object - ParentOwner *user.User -} - -type message struct { +// Message is the envelope which holds the templated message produced by a +// Pipeline together with the recipient and the URN to deliver with it. +type Message struct { Message string - recipient uint64 - urn string + Recipient uint64 + URN string } -type messages []*message +// Messages is a Message collection +type Messages []*Message // PipelineConnectionFunc constructs a Pipeline that by applying the provided // rules outputs messages. type PipelineConnectionFunc func( - currentApp *app.App, - change connection.StateChange, - rules ...*rule.Rule, -) (messages, error) + *app.App, + *connection.StateChange, + ...*rule.Rule, +) (Messages, error) // PipelineConnection constructs a Pipeline that by applying the provided -// rules outputs messages. +// rules outputs Messages. func PipelineConnection(users user.Service) PipelineConnectionFunc { return func( currentApp *app.App, - change connection.StateChange, + change *connection.StateChange, rules ...*rule.Rule, - ) (messages, error) { + ) (Messages, error) { var ( - ms = messages{} + ms = Messages{} c = change.New context *contextConnection @@ -79,7 +62,7 @@ func PipelineConnection(users user.Service) PipelineConnectionFunc { ) if change.New == nil { - return messages{}, nil + return Messages{}, nil } from, err := UserFetch(users)(currentApp, c.FromID) @@ -120,7 +103,7 @@ func PipelineConnection(users user.Service) PipelineConnectionFunc { return nil, err } - ms = append(ms, &message{Message: msg, recipient: r.ID, urn: urn}) + ms = append(ms, &Message{Message: msg, Recipient: r.ID, URN: urn}) } } } @@ -130,26 +113,26 @@ func PipelineConnection(users user.Service) PipelineConnectionFunc { } // PipelineEventFunc constructs a Pipeline that by applying the provided rules -// outputs messages. +// outputs Messages. type PipelineEventFunc func( *app.App, - event.StateChange, + *event.StateChange, ...*rule.Rule, -) (messages, error) +) (Messages, error) // PipelineEvent constructs a Pipeline that by applying the provided rules -// outputs messages. +// outputs Messages. func PipelineEvent( objects object.Service, users user.Service, ) PipelineEventFunc { return func( currentApp *app.App, - change event.StateChange, + change *event.StateChange, rules ...*rule.Rule, - ) (messages, error) { + ) (Messages, error) { var ( - ms = messages{} + ms = Messages{} e = change.New context *contextEvent @@ -204,7 +187,7 @@ func PipelineEvent( return nil, err } - ms = append(ms, &message{Message: msg, recipient: r.ID, urn: urn}) + ms = append(ms, &Message{Message: msg, Recipient: r.ID, URN: urn}) } } } @@ -214,15 +197,15 @@ func PipelineEvent( } // PipelineObjectFunc constructs a Pipeline that by appplying the provided -// rules outputs messages. +// rules outputs Messages. type PipelineObjectFunc func( *app.App, - object.StateChange, + *object.StateChange, ...*rule.Rule, -) (messages, error) +) (Messages, error) // PipelineObject constructs a Pipeline that by appplying the provided rules -// outputs messages. +// outputs Messages. func PipelineObject( connections connection.Service, objects object.Service, @@ -230,11 +213,11 @@ func PipelineObject( ) PipelineObjectFunc { return func( currentApp *app.App, - change object.StateChange, + change *object.StateChange, rules ...*rule.Rule, - ) (messages, error) { + ) (Messages, error) { var ( - ms = messages{} + ms = Messages{} o = change.New context *contextObject @@ -243,7 +226,7 @@ func PipelineObject( ) if change.New == nil { - return messages{}, nil + return Messages{}, nil } owner, err := UserFetch(users)(currentApp, change.New.OwnerID) @@ -296,7 +279,7 @@ func PipelineObject( return nil, err } - ms = append(ms, &message{Message: msg, recipient: r.ID, urn: urn}) + ms = append(ms, &Message{Message: msg, Recipient: r.ID, URN: urn}) } } } @@ -501,3 +484,23 @@ func recipientsEvent() recipientsEventFunc { return us, nil } } + +type contextConnection struct { + Conenction *connection.Connection + From *user.User + To *user.User +} + +type contextEvent struct { + Event *event.Event + Owner *user.User + Parent *object.Object + ParentOwner *user.User +} + +type contextObject struct { + Object *object.Object + Owner *user.User + Parent *object.Object + ParentOwner *user.User +} diff --git a/core/pipieline_test.go b/core/pipieline_test.go index 77f01ce..10630c9 100644 --- a/core/pipieline_test.go +++ b/core/pipieline_test.go @@ -94,25 +94,20 @@ func TestPipelineConnectionCondFrom(t *testing.T) { } ) - want := messages{ + want := Messages{ { Message: fmt.Sprintf("%s accepted your friend request", target.Username), - recipient: origin.ID, - urn: fmt.Sprintf("tapglue/users/%d", target.ID), + Recipient: origin.ID, + URN: fmt.Sprintf("tapglue/users/%d", target.ID), }, } - have, err := PipelineConnection(users)(currentApp, connection.StateChange{New: new, Old: old}, ruleConnectionTo) + have, err := PipelineConnection(users)(currentApp, &connection.StateChange{New: new, Old: old}, ruleConnectionTo) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(have, want) { - for i, m := range have { - fmt.Printf("[%d|%s] %s\n", m.recipient, m.urn, m.Message) - fmt.Printf("[%d|%s] %s\n\n", want[i].recipient, want[i].urn, want[i].Message) - } - t.Errorf("have %#v, want %#v", have, want) } } @@ -177,15 +172,15 @@ func TestPipelineConnectionCondTo(t *testing.T) { } ) - want := messages{ + want := Messages{ { Message: fmt.Sprintf("%s sent you a friend request", origin.Username), - recipient: target.ID, - urn: fmt.Sprintf("tapglue/users/%d", origin.ID), + Recipient: target.ID, + URN: fmt.Sprintf("tapglue/users/%d", origin.ID), }, } - have, err := PipelineConnection(users)(currentApp, connection.StateChange{New: con}, ruleConnectionTo) + have, err := PipelineConnection(users)(currentApp, &connection.StateChange{New: con}, ruleConnectionTo) if err != nil { t.Fatal(err) } @@ -260,23 +255,23 @@ func TestPipelineEventCondParentOwner(t *testing.T) { } ) - want := messages{ + want := Messages{ { Message: fmt.Sprintf("%s liked your post", liker.Username), - recipient: postOwner.ID, - urn: fmt.Sprintf("tapglue/users/%d", liker.ID), + Recipient: postOwner.ID, + URN: fmt.Sprintf("tapglue/users/%d", liker.ID), }, } - have, err := PipelineEvent(objects, users)(currentApp, event.StateChange{New: like}, ruleEventParentOwner) + have, err := PipelineEvent(objects, users)(currentApp, &event.StateChange{New: like}, ruleEventParentOwner) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(have, want) { for i, m := range have { - fmt.Printf("[%d|%s] %s\n", m.recipient, m.urn, m.Message) - fmt.Printf("[%d|%s] %s\n\n", want[i].recipient, want[i].urn, want[i].Message) + fmt.Printf("[%d|%s] %s\n", m.Recipient, m.URN, m.Message) + fmt.Printf("[%d|%s] %s\n\n", want[i].Recipient, want[i].URN, want[i].Message) } t.Errorf("have %#v, want %#v", have, want) @@ -367,16 +362,16 @@ func TestPipelineObjectCondFriends(t *testing.T) { }, } - want := messages{ + want := Messages{ { - recipient: friend2.ID, + Recipient: friend2.ID, Message: fmt.Sprintf("%s just added a review", postOwner.Username), - urn: fmt.Sprintf("tapglue/posts/%d", post.ID), + URN: fmt.Sprintf("tapglue/posts/%d", post.ID), }, { - recipient: friend1.ID, + Recipient: friend1.ID, Message: fmt.Sprintf("%s just added a review", postOwner.Username), - urn: fmt.Sprintf("tapglue/posts/%d", post.ID), + URN: fmt.Sprintf("tapglue/posts/%d", post.ID), }, } @@ -384,7 +379,7 @@ func TestPipelineObjectCondFriends(t *testing.T) { connections, objects, users, - )(currentApp, object.StateChange{New: post}, ruleObjectOwner) + )(currentApp, &object.StateChange{New: post}, ruleObjectOwner) if err != nil { t.Fatal(err) } @@ -471,16 +466,16 @@ func TestPipelineObjectCondObjectOwner(t *testing.T) { }, } - want := messages{ + want := Messages{ { - recipient: commenter2.ID, + Recipient: commenter2.ID, Message: fmt.Sprintf("%s also commented on %ss post", commenter3.Username, postOwner.Username), - urn: fmt.Sprintf("tapglue/posts/%d/comments/%d", post.ID, comment3.ID), + URN: fmt.Sprintf("tapglue/posts/%d/comments/%d", post.ID, comment3.ID), }, { - recipient: commenter1.ID, + Recipient: commenter1.ID, Message: fmt.Sprintf("%s also commented on %ss post", commenter3.Username, postOwner.Username), - urn: fmt.Sprintf("tapglue/posts/%d/comments/%d", post.ID, comment3.ID), + URN: fmt.Sprintf("tapglue/posts/%d/comments/%d", post.ID, comment3.ID), }, } @@ -488,7 +483,7 @@ func TestPipelineObjectCondObjectOwner(t *testing.T) { connections, objects, users, - )(currentApp, object.StateChange{New: comment3}, ruleObjectOwner) + )(currentApp, &object.StateChange{New: comment3}, ruleObjectOwner) if err != nil { t.Fatal(err) } @@ -551,11 +546,11 @@ func TestPipelineObjectCondOwner(t *testing.T) { }, } - want := messages{ + want := Messages{ { - recipient: postOwner.ID, + Recipient: postOwner.ID, Message: fmt.Sprintf("%s commented on your post", commenter.Username), - urn: fmt.Sprintf("tapglue/posts/%d/comments/%d", post.ID, comment.ID), + URN: fmt.Sprintf("tapglue/posts/%d/comments/%d", post.ID, comment.ID), }, } @@ -563,15 +558,15 @@ func TestPipelineObjectCondOwner(t *testing.T) { connections, objects, users, - )(currentApp, object.StateChange{New: comment}, ruleObjectOwner) + )(currentApp, &object.StateChange{New: comment}, ruleObjectOwner) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(have, want) { for i, m := range have { - fmt.Printf("[%d|%s] %s\n", m.recipient, m.urn, m.Message) - fmt.Printf("[%d|%s] %s\n\n", want[i].recipient, want[i].urn, want[i].Message) + fmt.Printf("[%d|%s] %s\n", m.Recipient, m.URN, m.Message) + fmt.Printf("[%d|%s] %s\n\n", want[i].Recipient, want[i].URN, want[i].Message) } t.Errorf("have %#v, want %#v", have, want) diff --git a/core/rules.go b/core/rules.go new file mode 100644 index 0000000..9d33e6e --- /dev/null +++ b/core/rules.go @@ -0,0 +1,22 @@ +package core + +import ( + "github.com/tapglue/snaas/service/app" + "github.com/tapglue/snaas/service/rule" +) + +// RuleListActiveFunc returns all active rules for the current App. +type RuleListActiveFunc func(*app.App, rule.Type) (rule.List, error) + +// RuleListActive returns all active rules for the current App. +func RuleListActive(rules rule.Service) RuleListActiveFunc { + return func(currentApp *app.App, ruleType rule.Type) (rule.List, error) { + return rules.Query(currentApp.Namespace(), rule.QueryOptions{ + Active: &defaultActive, + Deleted: &defaultDeleted, + Types: []rule.Type{ + ruleType, + }, + }) + } +} diff --git a/service/platform/postgres.go b/service/platform/postgres.go index 68daca1..c20a1a8 100644 --- a/service/platform/postgres.go +++ b/service/platform/postgres.go @@ -2,7 +2,6 @@ package platform import ( "fmt" - "strings" "time" "github.com/jmoiron/sqlx" @@ -86,12 +85,12 @@ func (s *pgService) Put(ns string, p *Platform) (*Platform, error) { } func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { - clauses, params, err := convertOpts(opts) + where, params, err := convertOpts(opts) if err != nil { return nil, err } - ps, err := s.listPlatforms(ns, clauses, params...) + ps, err := s.listPlatforms(ns, where, params...) if err != nil { if pg.IsRelationNotFound(pg.WrapError(err)) { if err := s.Setup(ns); err != nil { @@ -99,7 +98,7 @@ func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { } } - ps, err = s.listPlatforms(ns, clauses, params...) + ps, err = s.listPlatforms(ns, where, params...) } return ps, err @@ -187,22 +186,10 @@ func (s *pgService) insert(ns string, p *Platform) (*Platform, error) { } func (s *pgService) listPlatforms( - ns string, - clauses []string, + ns, where string, params ...interface{}, ) (List, error) { - c := strings.Join(clauses, "\nAND") - - if len(clauses) > 0 { - c = fmt.Sprintf("WHERE %s", c) - } - - query := strings.Join([]string{ - fmt.Sprintf(pgListPlatforms, ns, c), - pgOrderCreatedAt, - }, "\n") - - query = sqlx.Rebind(sqlx.DOLLAR, query) + query := fmt.Sprintf(pgListPlatforms, ns, where) rows, err := s.db.Query(query, params...) if err != nil { @@ -281,7 +268,7 @@ func (s *pgService) update(ns string, p *Platform) (*Platform, error) { return p, err } -func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { +func convertOpts(opts QueryOptions) (string, []interface{}, error) { var ( clauses = []string{} params = []interface{}{} @@ -290,7 +277,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { if opts.Active != nil { clause, _, err := sqlx.In(pgClauseActive, []interface{}{*opts.Active}) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -306,7 +293,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseAppIDs, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -322,7 +309,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseARNs, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -332,7 +319,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { if opts.Deleted != nil { clause, _, err := sqlx.In(pgClauseDeleted, []interface{}{*opts.Deleted}) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -348,7 +335,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseEcosystems, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -364,12 +351,18 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseIDs, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) params = append(params, ps...) } - return clauses, params, nil + where := "" + + if len(clauses) > 0 { + where = sqlx.Rebind(sqlx.DOLLAR, pg.ClausesToWhere(clauses...)) + } + + return where, params, nil } diff --git a/service/rule/postgres.go b/service/rule/postgres.go index 783397b..6ca2307 100644 --- a/service/rule/postgres.go +++ b/service/rule/postgres.go @@ -3,7 +3,6 @@ package rule import ( "encoding/json" "fmt" - "strings" "time" "github.com/jmoiron/sqlx" @@ -69,12 +68,12 @@ func (s *pgService) Put(ns string, r *Rule) (*Rule, error) { } func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { - clauses, params, err := convertOpts(opts) + where, params, err := convertOpts(opts) if err != nil { return nil, err } - rs, err := s.listRules(ns, clauses, params...) + rs, err := s.listRules(ns, where, params...) if err != nil { if pg.IsRelationNotFound(pg.WrapError(err)) { if err := s.Setup(ns); err != nil { @@ -82,7 +81,7 @@ func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { } } - rs, err = s.listRules(ns, clauses, params...) + rs, err = s.listRules(ns, where, params...) } return rs, err @@ -182,22 +181,10 @@ func (s *pgService) insert(ns string, r *Rule) (*Rule, error) { } func (s *pgService) listRules( - ns string, - clauses []string, + ns, where string, params ...interface{}, ) (List, error) { - c := strings.Join(clauses, "\nAND") - - if len(clauses) > 0 { - c = fmt.Sprintf("WHERE %s", c) - } - - query := strings.Join([]string{ - fmt.Sprintf(pgListRules, ns, c), - pgOrderCreatedAt, - }, "\n") - - query = sqlx.Rebind(sqlx.DOLLAR, query) + query := fmt.Sprintf(pgListRules, ns, where) rows, err := s.db.Query(query, params...) if err != nil { @@ -262,7 +249,7 @@ func (s *pgService) listRules( return rs, nil } -func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { +func convertOpts(opts QueryOptions) (string, []interface{}, error) { var ( clauses = []string{} params = []interface{}{} @@ -271,7 +258,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { if opts.Active != nil { clause, _, err := sqlx.In(pgClauseActive, []interface{}{*opts.Active}) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -281,7 +268,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { if opts.Deleted != nil { clause, _, err := sqlx.In(pgClauseDeleted, []interface{}{*opts.Deleted}) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -297,7 +284,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseIDs, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -313,12 +300,18 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseTypes, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) params = append(params, ps...) } - return clauses, params, nil + where := "" + + if len(clauses) > 0 { + where = sqlx.Rebind(sqlx.DOLLAR, pg.ClausesToWhere(clauses...)) + } + + return where, params, nil } diff --git a/service/rule/postgres_test.go b/service/rule/postgres_test.go index 9a1f868..d7e011b 100644 --- a/service/rule/postgres_test.go +++ b/service/rule/postgres_test.go @@ -9,19 +9,18 @@ import ( "reflect" "testing" - "github.com/tapglue/snaas/service/connection" - "github.com/tapglue/snaas/service/event" - "github.com/tapglue/snaas/service/object" - "github.com/jmoiron/sqlx" "github.com/tapglue/snaas/platform/pg" "github.com/tapglue/snaas/platform/sns" + "github.com/tapglue/snaas/service/connection" + "github.com/tapglue/snaas/service/event" + "github.com/tapglue/snaas/service/object" ) var pgTestURL string -func TestPostgresPutConnection(t *testing.T) { +func TestPostgresPut(t *testing.T) { var ( disabled = false enabled = true @@ -145,7 +144,10 @@ func TestPostgresPutConnection(t *testing.T) { t.Fatal(err) } + deleted := false + list, err := service.Query(namespace, QueryOptions{ + Deleted: &deleted, IDs: []uint64{ created.ID, }, @@ -163,6 +165,15 @@ func TestPostgresPutConnection(t *testing.T) { } } +func TestPostgresQuery(t *testing.T) { + // var ( + // activate = true + // deleted = true + // namespace = "service_query" + // service = preparePostgres(t, namespace) + // ) +} + func preparePostgres(t *testing.T, namespace string) Service { db, err := sqlx.Connect("postgres", pgTestURL) if err != nil { diff --git a/service/rule/rule.go b/service/rule/rule.go index 03d1a39..68ab0e4 100644 --- a/service/rule/rule.go +++ b/service/rule/rule.go @@ -24,7 +24,7 @@ type CriteriaConnection struct { } func (c *CriteriaConnection) Match(i interface{}) bool { - s, ok := i.(connection.StateChange) + s, ok := i.(*connection.StateChange) if !ok { return false } @@ -42,7 +42,7 @@ type CriteriaEvent struct { } func (c *CriteriaEvent) Match(i interface{}) bool { - s, ok := i.(event.StateChange) + s, ok := i.(*event.StateChange) if !ok { return false } @@ -60,7 +60,7 @@ type CriteriaObject struct { } func (c *CriteriaObject) Match(i interface{}) bool { - s, ok := i.(object.StateChange) + s, ok := i.(*object.StateChange) if !ok { return false }