From a11dbdac5aad381da1c84ea160c609cfafb42efa Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Thu, 19 Jan 2017 16:26:28 +0100 Subject: [PATCH 1/6] Implement reaction source with logging --- cmd/gateway-http/gateway-http.go | 21 ++++-- error/error.go | 10 +++ service/event/event.go | 20 +++--- service/event/logging.go | 11 ++-- service/reaction/logging.go | 88 +++++++++++++++++++++++++ service/reaction/reaction.go | 30 +++++++++ service/reaction/sourcing.go | 58 ++++++++++++++++ service/reaction/sqs.go | 110 +++++++++++++++++++++++++++++++ 8 files changed, 328 insertions(+), 20 deletions(-) create mode 100644 service/reaction/sourcing.go create mode 100644 service/reaction/sqs.go diff --git a/cmd/gateway-http/gateway-http.go b/cmd/gateway-http/gateway-http.go index 601c6bc..9ddd352 100644 --- a/cmd/gateway-http/gateway-http.go +++ b/cmd/gateway-http/gateway-http.go @@ -7,8 +7,6 @@ import ( "os" "time" - "github.com/tapglue/snaas/service/reaction" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" awsSession "github.com/aws/aws-sdk-go/aws/session" @@ -30,6 +28,7 @@ import ( "github.com/tapglue/snaas/service/device" "github.com/tapglue/snaas/service/event" "github.com/tapglue/snaas/service/object" + "github.com/tapglue/snaas/service/reaction" "github.com/tapglue/snaas/service/session" "github.com/tapglue/snaas/service/user" ) @@ -222,9 +221,10 @@ func main() { // Setup sources. var ( - conSource connection.Source - eventSource event.Source - objectSource object.Source + conSource connection.Source + eventSource event.Source + objectSource object.Source + reactionSource reaction.Source ) switch *source { @@ -250,6 +250,12 @@ func main() { logger.Log("err", err, "lifecycle", "abort") os.Exit(1) } + + reactionSource, err = reaction.SQSSource(sqsAPI) + if err != nil { + logger.Log("err", err, "lifecycle", "abort") + os.Exit(1) + } default: logger.Log( "err", fmt.Sprintf("Source type '%s' not supported", *source), @@ -288,6 +294,9 @@ func main() { )(objectSource) objectSource = object.LogSourceMiddleware(*source, logger)(objectSource) + // TODO: Implement reaction instrumentation middleware + reactionSource = reaction.LogSourceMiddleware(*source, logger)(reactionSource) + // Setup services. var apps app.Service apps = app.PostgresService(pgClient) @@ -366,6 +375,8 @@ func main() { serviceOpLatency, )(reactions) reactions = reaction.LogServiceMiddleware(logger, storeService)(reactions) + // Combine reaction service and source. + reactions = reaction.SourcingServiceMiddleware(reactionSource)(reactions) var sessions session.Service sessions = session.PostgresService(pgClient) diff --git a/error/error.go b/error/error.go index 804d63f..22f5992 100644 --- a/error/error.go +++ b/error/error.go @@ -24,6 +24,11 @@ var ( ErrReactionNotFound = errors.New("reaction not found") ) +// Source errors. +var ( + ErrEmptySource = errors.New("empty source") +) + // User errors. var ( ErrUserExists = errors.New("user not unique") @@ -44,6 +49,11 @@ func IsDeviceDisabled(err error) bool { return unwrapError(err) == ErrDeviceDisabled } +// IsEmptySource indicates if err is ErrEmptySource. +func IsEmptySource(err error) bool { + return unwrapError(err) == ErrEmptySource +} + // IsInvalidPlatform indicates if err is ErrInvalidPlatform. func IsInvalidPlatform(err error) bool { return unwrapError(err) == ErrInvalidPlatform diff --git a/service/event/event.go b/service/event/event.go index 370771d..19136c8 100644 --- a/service/event/event.go +++ b/service/event/event.go @@ -204,16 +204,6 @@ type Service interface { // ServiceMiddleware is a chainable behaviour modifier for Service. type ServiceMiddleware func(Service) Service -// StateChange transports all information necessary to observe state changes. -type StateChange struct { - AckID string - ID string - Namespace string - New *Event - Old *Event - SentAt time.Time -} - // Source encapsulates state change notifications operations. type Source interface { source.Acker @@ -224,6 +214,16 @@ type Source interface { // SourceMiddleware is a chainable behaviour modifier for Source. type SourceMiddleware func(Source) Source +// StateChange transports all information necessary to observe state changes. +type StateChange struct { + AckID string + ID string + Namespace string + New *Event + Old *Event + SentAt time.Time +} + // Target describes the person addressed in an event. To be phased out. type Target struct { ID string `json:"id"` diff --git a/service/event/logging.go b/service/event/logging.go index 3548742..ac4a693 100644 --- a/service/event/logging.go +++ b/service/event/logging.go @@ -124,7 +124,7 @@ type logSource struct { next Source } -// LogSourceMiddleware given a Logger raps the next Source logging capabilities. +// LogSourceMiddleware given a Logger wraps the next Source logging capabilities. func LogSourceMiddleware(store string, logger log.Logger) SourceMiddleware { return func(next Source) Source { logger = log.NewContext(logger).With( @@ -143,6 +143,7 @@ func (s *logSource) Ack(id string) (err error) { defer func(begin time.Time) { ps := []interface{}{ "ack_id", id, + "duration_ns", time.Since(begin).Nanoseconds(), "method", "Ack", } @@ -166,8 +167,8 @@ func (s *logSource) Consume() (change *StateChange, err error) { if change != nil { ps = append(ps, "namespace", change.Namespace, - "object_new", change.New, - "object_old", change.Old, + "event_new", change.New, + "event_old", change.Old, ) } @@ -188,8 +189,8 @@ func (s *logSource) Propagate(ns string, old, new *Event) (id string, err error) "id", id, "method", "Propagate", "namespace", ns, - "object_new", new, - "object_old", old, + "event_new", new, + "event_old", old, } if err != nil { diff --git a/service/reaction/logging.go b/service/reaction/logging.go index f10c29f..446b251 100644 --- a/service/reaction/logging.go +++ b/service/reaction/logging.go @@ -6,6 +6,94 @@ import ( "github.com/go-kit/kit/log" ) +type logSource struct { + logger log.Logger + next Source +} + +// LogSourceMiddleware given a Logger wraps the next Source with logging +// capabilities. +func LogSourceMiddleware(store string, logger log.Logger) SourceMiddleware { + return func(next Source) Source { + logger = log.NewContext(logger).With( + "source", "reaction", + "store", store, + ) + + return &logSource{ + logger: logger, + next: next, + } + } +} + +func (s *logSource) Ack(id string) (err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "ack_id", id, + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Ack", + } + + if err != nil { + ps = append(ps, "err", err) + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Ack(id) +} + +func (s *logSource) Consume() (change *StateChange, err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Consume", + } + + if change != nil { + ps = append(ps, + "namespace", change.Namespace, + "reaction_new", change.New, + "reaction_old", change.Old, + ) + } + + if err != nil { + ps = append(ps, "err", err) + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Consume() +} + +func (s *logSource) Propagate( + ns string, + old, new *Reaction, +) (id string, err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "id", id, + "method", "Propagate", + "namespace", ns, + "event_new", new, + "event_old", old, + } + + if err != nil { + ps = append(ps, "err", err) + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Propagate(ns, old, new) +} + type logService struct { logger log.Logger next Service diff --git a/service/reaction/reaction.go b/service/reaction/reaction.go index 98d22a6..490823f 100644 --- a/service/reaction/reaction.go +++ b/service/reaction/reaction.go @@ -7,6 +7,7 @@ import ( serr "github.com/tapglue/snaas/error" "github.com/tapglue/snaas/platform/service" + "github.com/tapglue/snaas/platform/source" ) // Supported Reaction types. @@ -19,6 +20,10 @@ const ( TypeAngry ) +type Consumer interface { + Consume() (*StateChange, error) +} + // List is a collection of Reaction. type List []*Reaction @@ -60,6 +65,11 @@ func (m Map) ToList() List { return rs } +// Producer creates a state change notification. +type Producer interface { + Propagate(namespace string, old, new *Reaction) (string, error) +} + // QueryOptions to narrow-down queries. type QueryOptions struct { Before time.Time `json:"-"` @@ -110,6 +120,26 @@ type Service interface { // ServiceMiddleware is a chainable behaviour modifier for Service. type ServiceMiddleware func(Service) Service +// Source encapsulates state change notification operations. +type Source interface { + source.Acker + Consumer + Producer +} + +// SourceMiddleware is a chainable behaviour modifier for Source. +type SourceMiddleware func(Source) Source + +// StateChange transports all information necessary to observe state changes. +type StateChange struct { + AckID string + ID string + Namespace string + New *Reaction + Old *Reaction + SentAt time.Time +} + // Type is used to distinct Reactions by type. type Type uint diff --git a/service/reaction/sourcing.go b/service/reaction/sourcing.go new file mode 100644 index 0000000..a890ee9 --- /dev/null +++ b/service/reaction/sourcing.go @@ -0,0 +1,58 @@ +package reaction + +type sourcingService struct { + producer Producer + service Service +} + +func SourcingServiceMiddleware(producer Producer) ServiceMiddleware { + return func(service Service) Service { + return &sourcingService{ + service: service, + producer: producer, + } + } +} + +func (s *sourcingService) Count(ns string, opts QueryOptions) (uint, error) { + return s.service.Count(ns, opts) +} + +func (s *sourcingService) Put(ns string, input *Reaction) (new *Reaction, err error) { + var old *Reaction + + defer func() { + if err == nil { + _, _ = s.producer.Propagate(ns, old, new) + } + } + + if input.ID != 0 { + rs, err := s.service.Query(ns, QueryOptions{ + ID: []uint64{ + input.ID, + }, + }) + if err != nil { + return nil, err + } + + if len(rs) == 1 { + old = rs[0] + } + } + + return s.service.Put(ns, input) +} + +func (s *sourcingService) Query(ns string, opts QueryOptions) (List, error) { + return s.service.Query(ns, opts) +} + +func (s *sourcingService) Setup(ns string) error { + return s.service.Setup(ns) +} + +func (s *sourcingService) Teardown(ns string) error { + return s.service.Teardown(ns) +} diff --git a/service/reaction/sqs.go b/service/reaction/sqs.go new file mode 100644 index 0000000..c05d812 --- /dev/null +++ b/service/reaction/sqs.go @@ -0,0 +1,110 @@ +package reaction + +import ( + "encoding/json" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + serr "github.com/tapglue/snaas/error" + platformSQS "github.com/tapglue/snaas/platform/sqs" +) + +const ( + queueName = "reaction-state-change" +) + +type sqsSource struct { + api platformSQS.API + queueURL string +} + +// SQSSource returns an SQS backed Source implementation. +func SQSSource(api platformSQS.API) (Source, error) { + res, err := api.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queueName), + }) + if err != nil { + return nil, err + } + + return &sqsSource{ + api: api, + queueURL: *res.QueueUrl, + }, nil +} + +func (s *sqsSource) Ack(id string) error { + _, err := s.api.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: aws.String(s.queueURL), + ReceiptHandle: aws.String(id), + }) + + return err +} + +func (s *sqsSource) Consume() (*StateChange, error) { + o, err := platformSQS.ReceiveMessage(s.api, s.queueURL) + if err != nil { + return nil, err + } + + if len(o.Messages) == 0 { + return nil, serr.ErrEmptySource + } + + var ( + m = o.Messages[0] + + sentAt time.Time + ) + + if attr, ok := m.MessageAttributes[platformSQS.AttributeSentAt]; ok { + t, err := time.Parse(platformSQS.FormatSentAt, *attr.StringValue) + if err != nil { + return nil, err + } + + sentAt = t + } + + f := sqsStateChange{} + + err = json.Unmarshal([]byte(*m.Body), &f) + if err != nil { + return nil, err + } + + return &StateChange{ + AckID: *m.ReceiptHandle, + ID: *m.MessageId, + Namespace: f.Namespace, + New: f.New, + Old: f.Old, + SentAt: sentAt, + }, nil +} + +func (s *sqsSource) Propagate(ns string, old, new *Reaction) (string, error) { + r, err := json.Marshal(&sqsStateChange{ + Namespace: ns, + New: new, + Old: old, + }) + if err != nil { + return "", err + } + + o, err := s.api.SendMessage(platformSQS.MessageInput(r, s.queueURL)) + if err != nil { + return "", err + } + + return *o.MessageId, nil +} + +type sqsStateChange struct { + Namespace string `json:"namespace"` + New *Reaction `json:"new"` + Old *Reaction `json:"old"` +} From 214fa1b3dc9d927334045f60b3be711c09d53b24 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Thu, 19 Jan 2017 16:41:15 +0100 Subject: [PATCH 2/6] Query on reaction id --- service/reaction/helper_test.go | 1 + service/reaction/mem.go | 4 ++++ service/reaction/postgres.go | 17 +++++++++++++++++ service/reaction/reaction.go | 1 + service/reaction/sourcing.go | 6 +++--- 5 files changed, 26 insertions(+), 3 deletions(-) diff --git a/service/reaction/helper_test.go b/service/reaction/helper_test.go index e3f480e..63cd97c 100644 --- a/service/reaction/helper_test.go +++ b/service/reaction/helper_test.go @@ -129,6 +129,7 @@ func testServiceQuery(p prepareFunc, t *testing.T) { &QueryOptions{}: 52, // All &QueryOptions{Before: created.UpdatedAt}: 51, // Deleted &QueryOptions{Deleted: &deleted}: 5, // Deleted + &QueryOptions{IDs: []uint64{created.ID}}: 1, &QueryOptions{Limit: 11}: 11, // Deleted &QueryOptions{ObjectIDs: []uint64{objectID}}: 3, // By Object &QueryOptions{OwnerIDs: []uint64{ownerID}}: 11, // By Owner diff --git a/service/reaction/mem.go b/service/reaction/mem.go index 1050251..603bfba 100644 --- a/service/reaction/mem.go +++ b/service/reaction/mem.go @@ -121,6 +121,10 @@ func filterList(rs List, opts QueryOptions) List { continue } + if !inIDs(r.ID, opts.IDs) { + continue + } + if !inIDs(r.ObjectID, opts.ObjectIDs) { continue } diff --git a/service/reaction/postgres.go b/service/reaction/postgres.go index 54c8183..3ece3c1 100644 --- a/service/reaction/postgres.go +++ b/service/reaction/postgres.go @@ -37,6 +37,7 @@ const ( pgClauseBefore = `updated_at < ?` pgClauseDeleted = `deleted = ?` + pgClauseIDs = `id IN (?)` pgClauseObjectIDs = `object_id IN (?)` pgClauseOwnerIDs = `owner_id IN (?)` pgClauseTypes = `type IN (?)` @@ -321,6 +322,22 @@ func convertOpts(opts QueryOptions) (string, []interface{}, error) { params = append(params, *opts.Deleted) } + if len(opts.IDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.IDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseIDs, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + if len(opts.ObjectIDs) > 0 { ps := []interface{}{} diff --git a/service/reaction/reaction.go b/service/reaction/reaction.go index 490823f..ff42a52 100644 --- a/service/reaction/reaction.go +++ b/service/reaction/reaction.go @@ -74,6 +74,7 @@ type Producer interface { type QueryOptions struct { Before time.Time `json:"-"` Deleted *bool `json:"deleted,omitempty"` + IDs []uint64 `json:"-"` Limit int `json:"-"` ObjectIDs []uint64 `json:"object_ids"` OwnerIDs []uint64 `json:"owner_ids"` diff --git a/service/reaction/sourcing.go b/service/reaction/sourcing.go index a890ee9..a82bb20 100644 --- a/service/reaction/sourcing.go +++ b/service/reaction/sourcing.go @@ -25,13 +25,13 @@ func (s *sourcingService) Put(ns string, input *Reaction) (new *Reaction, err er if err == nil { _, _ = s.producer.Propagate(ns, old, new) } - } + }() if input.ID != 0 { rs, err := s.service.Query(ns, QueryOptions{ - ID: []uint64{ + IDs: []uint64{ input.ID, - }, + }, }) if err != nil { return nil, err From b8cfc7207eebbb91a3fee4b5a695a1f3f16da246 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Tue, 31 Jan 2017 10:13:26 +0100 Subject: [PATCH 3/6] Implement instrumentation for reaction source --- cmd/gateway-http/gateway-http.go | 9 ++- service/reaction/instrumentation.go | 105 ++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 1 deletion(-) diff --git a/cmd/gateway-http/gateway-http.go b/cmd/gateway-http/gateway-http.go index 9ddd352..d9ede00 100644 --- a/cmd/gateway-http/gateway-http.go +++ b/cmd/gateway-http/gateway-http.go @@ -294,7 +294,14 @@ func main() { )(objectSource) objectSource = object.LogSourceMiddleware(*source, logger)(objectSource) - // TODO: Implement reaction instrumentation middleware + reactionSource = reaction.InstrumentSourceMiddleware( + component, + *source, + sourceErrCount, + sourceOpCount, + sourceOpLatency, + sourceQueueLatency, + )(reactionSource) reactionSource = reaction.LogSourceMiddleware(*source, logger)(reactionSource) // Setup services. diff --git a/service/reaction/instrumentation.go b/service/reaction/instrumentation.go index 42a1220..a332c99 100644 --- a/service/reaction/instrumentation.go +++ b/service/reaction/instrumentation.go @@ -121,3 +121,108 @@ func (s *instrumentService) track( metrics.FieldStore: s.store, }).Observe(time.Since(begin).Seconds()) } + +type instrumentSource struct { + component string + errCount kitmetrics.Counter + opCount kitmetrics.Counter + opLatency *prometheus.HistogramVec + queueLatency *prometheus.HistogramVec + next Source + store string +} + +func InstrumentSourceMiddleware( + component, store string, + errCount kitmetrics.Counter, + opCount kitmetrics.Counter, + opLatency *prometheus.HistogramVec, + queueLatency *prometheus.HistogramVec, +) SourceMiddleware { + return func(next Source) Source { + return &instrumentSource{ + component: component, + errCount: errCount, + opCount: opCount, + opLatency: opLatency, + queueLatency: queueLatency, + next: next, + store: store, + } + } +} + +func (s *instrumentSource) Ack(id string) (err error) { + defer func(begin time.Time) { + s.track("Ack", "", begin, err) + }(time.Now()) + + return s.next.Ack(id) +} + +func (s *instrumentSource) Consume() (change *StateChange, err error) { + defer func(begin time.Time) { + ns := "" + + if err == nil && change != nil { + ns = change.Namespace + + if !change.SentAt.IsZero() { + s.queueLatency.With(prometheus.Labels{ + metrics.FieldComponent: s.component, + metrics.FieldMethod: "Consume", + metrics.FieldNamespace: ns, + metrics.FieldSource: serviceName, + metrics.FieldStore: s.store, + }).Observe(time.Since(change.SentAt).Seconds()) + } + } + + s.track("Consume", ns, begin, err) + }(time.Now()) + + return s.next.Consume() +} + +func (s *instrumentSource) Propagate( + ns string, + old, new *Reaction, +) (id string, err error) { + defer func(begin time.Time) { + s.track("Propagate", ns, begin, err) + }(time.Now()) + + return s.next.Propagate(ns, old, new) +} + +func (s *instrumentSource) track( + method, namespace string, + begin time.Time, + err error, +) { + if err != nil { + s.errCount.With( + metrics.FieldComponent, s.component, + metrics.FieldMethod, method, + metrics.FieldNamespace, namespace, + metrics.FieldSource, serviceName, + metrics.FieldStore, s.store, + ).Add(1) + } else { + s.opCount.With( + metrics.FieldComponent, s.component, + metrics.FieldMethod, method, + metrics.FieldNamespace, namespace, + metrics.FieldSource, serviceName, + metrics.FieldStore, s.store, + ).Add(1) + + s.opLatency.With(prometheus.Labels{ + metrics.FieldComponent: s.component, + metrics.FieldMethod: method, + metrics.FieldNamespace: namespace, + metrics.FieldSource: serviceName, + metrics.FieldStore: s.store, + }).Observe(time.Since(begin).Seconds()) + } +} From aae776db05cab87f6406677176d5e6e6cc617186 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Tue, 31 Jan 2017 12:57:26 +0100 Subject: [PATCH 4/6] Add reaction pipeline --- core/pipieline.go | 120 +++++++++++++++++++++++++++++++++++ core/pipieline_test.go | 87 +++++++++++++++++++++++++ service/reaction/reaction.go | 30 +++++++++ service/rule/rule.go | 24 +++++++ 4 files changed, 261 insertions(+) diff --git a/core/pipieline.go b/core/pipieline.go index 14c224f..20adae4 100644 --- a/core/pipieline.go +++ b/core/pipieline.go @@ -6,6 +6,8 @@ import ( "text/template" "time" + "github.com/tapglue/snaas/service/reaction" + "golang.org/x/text/language" serr "github.com/tapglue/snaas/error" @@ -295,6 +297,90 @@ func PipelineObject( } } +// PipelineReactionFunc constructs a Pipeline that by applying the provided +// rules outputs Messages. +type PipelineReactionFunc func( + *app.App, + *reaction.StateChange, + ...*rule.Rule, +) (Messages, error) + +// PipelineReaction constructs a Pipeline that by applying the provided rules +// outputs Messages. +func PipelineReaction( + objects object.Service, + users user.Service, +) PipelineReactionFunc { + return func( + currentApp *app.App, + change *reaction.StateChange, + rules ...*rule.Rule, + ) (Messages, error) { + var ( + ms = Messages{} + r = change.New + + context *contextReaction + owner *user.User + parent *object.Object + parentOwner *user.User + ) + + owner, err := UserFetch(users)(currentApp, r.OwnerID) + if err != nil { + return nil, err + } + + if r.ObjectID != 0 { + parent, err = objectFetch(objects)(currentApp, r.ObjectID) + if err != nil { + return nil, err + } + + parentOwner, err = UserFetch(users)(currentApp, parent.OwnerID) + if err != nil { + return nil, err + } + } + + context = &contextReaction{ + Owner: owner, + Parent: parent, + ParentOwner: parentOwner, + Reaction: r, + } + + for _, r := range rules { + if !r.Criteria.Match(change) { + continue + } + + for _, recipient := range r.Recipients { + rs, err := recipientsReaction()(currentApp, context, recipient.Query) + if err != nil { + return nil, err + } + + for _, r := range rs { + urn, err := compileTemplate(context, recipient.URN) + if err != nil { + return nil, err + } + + msg, err := compileTemplate(context, recipient.Templates[language.English.String()]) + if err != nil { + return nil, err + } + + ms = append(ms, &Message{Message: msg, Recipient: r.ID, URN: urn}) + } + } + } + + return ms, nil + } +} + type contextConnection struct { Conenction *connection.Connection From *user.User @@ -316,6 +402,13 @@ type contextObject struct { ParentOwner *user.User } +type contextReaction struct { + Owner *user.User + Parent *object.Object + ParentOwner *user.User + Reaction *reaction.Reaction +} + func compileTemplate(context interface{}, t string) (string, error) { tmpl, err := template.New("message").Parse(t) if err != nil { @@ -518,3 +611,30 @@ func recipientsObject( return us, nil } } + +type recipientsReactionFunc func( + *app.App, + *contextReaction, + rule.Query, +) (user.List, error) + +func recipientsReaction() recipientsReactionFunc { + return func( + currentApp *app.App, + context *contextReaction, + q rule.Query, + ) (user.List, error) { + us := user.List{} + + for condType := range q { + switch condType { + case queryCondParentOwner: + if context.Owner.ID != context.ParentOwner.ID { + us = append(us, context.ParentOwner) + } + } + } + + return us, nil + } +} diff --git a/core/pipieline_test.go b/core/pipieline_test.go index aab2e50..1fdc0a7 100644 --- a/core/pipieline_test.go +++ b/core/pipieline_test.go @@ -6,6 +6,8 @@ import ( "reflect" "testing" + "github.com/tapglue/snaas/service/reaction" + "github.com/tapglue/snaas/service/app" "github.com/tapglue/snaas/service/connection" "github.com/tapglue/snaas/service/event" @@ -190,6 +192,91 @@ func TestPipelineConnectionCondTo(t *testing.T) { } } +func TestPipelineReactionCondParentOwner(t *testing.T) { + var ( + currentApp = testApp() + objects = object.MemService() + reactions = reaction.MemService() + users = user.MemService() + ) + + // Creat Post Owner. + postOwner, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create Post. + post, err := objects.Put(currentApp.Namespace(), testPost(postOwner.ID).Object) + if err != nil { + t.Fatal(err) + } + + // Create liker. + liker, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create like. + like, err := reactions.Put(currentApp.Namespace(), &reaction.Reaction{ + ObjectID: post.ID, + OwnerID: liker.ID, + Type: reaction.TypeLike, + }) + if err != nil { + t.Fatal(err) + } + + var ( + deleted = false + ruleReactionParentOwner = &rule.Rule{ + Criteria: &rule.CriteriaReaction{ + New: &reaction.QueryOptions{ + Deleted: &deleted, + Types: []reaction.Type{ + reaction.TypeLike, + }, + }, + Old: nil, + }, + Recipients: rule.Recipients{ + { + Query: map[string]string{ + "parentOwner": "", + }, + Templates: map[string]string{ + "en": "{{.Owner.Username}} liked your post", + }, + URN: "tapglue/users/{{.Owner.ID}}", + }, + }, + } + ) + + want := Messages{ + { + Message: fmt.Sprintf("%s liked your post", liker.Username), + Recipient: postOwner.ID, + URN: fmt.Sprintf("tapglue/users/%d", liker.ID), + }, + } + + have, err := PipelineReaction(objects, users)(currentApp, &reaction.StateChange{New: like}, ruleReactionParentOwner) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(have, want) { + for i, m := range have { + fmt.Printf("[%d|%s] %v\n", m.Recipient, m.URN, m.Message[i]) + fmt.Printf("[%d|%s] %v\n\n", want[i].Recipient, want[i].URN, want[i].Message) + } + + t.Errorf("have %#v, want %#v", have, want) + } +} + func TestPipelineEventCondParentOwner(t *testing.T) { var ( currentApp = testApp() diff --git a/service/reaction/reaction.go b/service/reaction/reaction.go index ff42a52..80fa860 100644 --- a/service/reaction/reaction.go +++ b/service/reaction/reaction.go @@ -20,6 +20,7 @@ const ( TypeAngry ) +// Consumer observes state changes. type Consumer interface { Consume() (*StateChange, error) } @@ -53,6 +54,7 @@ func (rs List) OwnerIDs() []uint64 { // Map is a Reaction collection with their id as index. type Map map[uint64]*Reaction +// ToList returns a list collection. func (m Map) ToList() List { rs := List{} @@ -92,6 +94,34 @@ type Reaction struct { UpdatedAt time.Time } +// MatchOpts indicates if the Reaction matches the given QueryOptions. +func (r *Reaction) MatchOpts(opts *QueryOptions) bool { + if opts == nil { + return true + } + + if opts.Deleted != nil && r.Deleted != *opts.Deleted { + return false + } + + if len(opts.Types) > 0 { + discard := true + + for _, t := range opts.Types { + if r.Type == t { + discard = false + break + } + } + + if discard { + return false + } + } + + return true +} + // Validate checks for semantic correctness. func (r *Reaction) Validate() error { if r.ObjectID == 0 { diff --git a/service/rule/rule.go b/service/rule/rule.go index 8b2be48..368750b 100644 --- a/service/rule/rule.go +++ b/service/rule/rule.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + "github.com/tapglue/snaas/service/reaction" + "github.com/tapglue/snaas/platform/service" "github.com/tapglue/snaas/platform/sns" "github.com/tapglue/snaas/service/connection" @@ -40,6 +42,28 @@ func (c *CriteriaConnection) Match(i interface{}) bool { return s.New.MatchOpts(c.New) && s.Old.MatchOpts(c.Old) } +type CriteriaReaction struct { + New *reaction.QueryOptions `json:"new"` + Old *reaction.QueryOptions `json:"old"` +} + +func (c *CriteriaReaction) Match(i interface{}) bool { + s, ok := i.(*reaction.StateChange) + if !ok { + return false + } + + if s.New == nil && s.Old == nil { + return false + } + + if s.Old == nil { + return s.New.MatchOpts(c.New) + } + + return s.New.MatchOpts(c.New) && s.Old.MatchOpts(c.Old) +} + type CriteriaEvent struct { New *event.QueryOptions `json:"new"` Old *event.QueryOptions `json:"old"` From 4596f969b294b60f603013de1c57b77988e33c83 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Tue, 31 Jan 2017 13:25:45 +0100 Subject: [PATCH 5/6] Create reaction queue --- infrastructure/terraform/template/storage.tf | 26 ++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/infrastructure/terraform/template/storage.tf b/infrastructure/terraform/template/storage.tf index 2dafcc4..49e57e2 100644 --- a/infrastructure/terraform/template/storage.tf +++ b/infrastructure/terraform/template/storage.tf @@ -265,6 +265,32 @@ EOF visibility_timeout_seconds = 60 } +resource "aws_sqs_queue" "reaction-state-change-dlq" { + delay_seconds = 0 + max_message_size = 262144 + message_retention_seconds = 1209600 + name = "reaction-state-change-dlq" + receive_wait_time_seconds = 1 + visibility_timeout_seconds = 300 +} + +resource "aws_sqs_queue" "reaction-state-change" { + delay_seconds = 0 + max_message_size = 262144 + message_retention_seconds = 1209600 + name = "reaction-state-change" + receive_wait_time_seconds = 1 + + redrive_policy = < Date: Tue, 31 Jan 2017 13:25:55 +0100 Subject: [PATCH 6/6] Consume reaction source in SIMS --- cmd/sims/consume.go | 46 ++++++++++++++++++++++++++++++++++++++++++++ cmd/sims/sims.go | 32 +++++++++++++++++++++++++++++- service/rule/rule.go | 1 + 3 files changed, 78 insertions(+), 1 deletion(-) diff --git a/cmd/sims/consume.go b/cmd/sims/consume.go index 9255126..d61c1ec 100644 --- a/cmd/sims/consume.go +++ b/cmd/sims/consume.go @@ -7,6 +7,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" "github.com/tapglue/snaas/core" + serr "github.com/tapglue/snaas/error" "github.com/tapglue/snaas/platform/sns" "github.com/tapglue/snaas/platform/source" platformSQS "github.com/tapglue/snaas/platform/sqs" @@ -14,6 +15,7 @@ import ( "github.com/tapglue/snaas/service/connection" "github.com/tapglue/snaas/service/event" "github.com/tapglue/snaas/service/object" + "github.com/tapglue/snaas/service/reaction" "github.com/tapglue/snaas/service/rule" ) @@ -167,6 +169,50 @@ func consumeEvent( } } +func consumeReaction( + appFetch core.AppFetchFunc, + reactionSource reaction.Source, + batchc chan<- batch, + pipeline core.PipelineReactionFunc, + rules core.RuleListActiveFunc, +) error { + for { + change, err := reactionSource.Consume() + if err != nil { + if serr.IsEmptySource(err) { + continue + } + return err + } + + currentApp, err := appForNamespace(appFetch, change.Namespace) + if err != nil { + return err + } + + rs, err := rules(currentApp, rule.TypeReaction) + if err != nil { + return err + } + + ms, err := pipeline(currentApp, change, rs...) + if err != nil { + return err + } + + if len(ms) == 0 { + err = reactionSource.Ack(change.AckID) + if err != nil { + return err + } + + continue + } + + batchc <- batchMessages(currentApp, reactionSource, change.AckID, ms) + } +} + func consumeObject( appFetch core.AppFetchFunc, objectSource object.Source, diff --git a/cmd/sims/sims.go b/cmd/sims/sims.go index 2412056..fbc5204 100644 --- a/cmd/sims/sims.go +++ b/cmd/sims/sims.go @@ -6,7 +6,7 @@ import ( "os" "time" - "github.com/tapglue/snaas/service/rule" + "github.com/tapglue/snaas/service/reaction" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -28,6 +28,7 @@ import ( "github.com/tapglue/snaas/service/event" "github.com/tapglue/snaas/service/object" "github.com/tapglue/snaas/service/platform" + "github.com/tapglue/snaas/service/rule" "github.com/tapglue/snaas/service/user" ) @@ -258,6 +259,21 @@ func main() { )(objectSource) objectSource = object.LogSourceMiddleware(sourceService, logger)(objectSource) + reactionSource, err := reaction.SQSSource(sqsAPI) + if err != nil { + logger.Log("err", err, "lifecycle", "abort") + os.Exit(1) + } + reactionSource = reaction.InstrumentSourceMiddleware( + component, + sourceService, + sourceErrCount, + sourceOpCount, + sourceOpLatency, + sourceQueueLatency, + )(reactionSource) + reactionSource = reaction.LogSourceMiddleware(sourceService, logger)(reactionSource) + logger.Log( "duration", time.Now().Sub(begin).Nanoseconds(), "lifecycle", "start", @@ -356,6 +372,20 @@ func main() { } }() + go func() { + err := consumeReaction( + core.AppFetch(apps), + reactionSource, + batchc, + core.PipelineReaction(objects, users), + core.RuleListActive(rules), + ) + if err != nil { + logger.Log("err", err, "lifecycle", "abort") + os.Exit(1) + } + }() + // Distribute messages to channels. cs := []channelFunc{ channelPush( diff --git a/service/rule/rule.go b/service/rule/rule.go index 368750b..f5b2f64 100644 --- a/service/rule/rule.go +++ b/service/rule/rule.go @@ -18,6 +18,7 @@ const ( TypeConnection Type = iota TypeEvent TypeObject + TypeReaction ) type CriteriaConnection struct {