From 49b84145c6e68a92048807c3adeec0f307fff473 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Tue, 31 Jan 2017 13:25:55 +0100 Subject: [PATCH] Consume reaction source in SIMS --- cmd/sims/consume.go | 46 ++++++++++++++++++++++++++++++++++++++++++++ cmd/sims/sims.go | 32 +++++++++++++++++++++++++++++- service/rule/rule.go | 1 + 3 files changed, 78 insertions(+), 1 deletion(-) diff --git a/cmd/sims/consume.go b/cmd/sims/consume.go index 9255126..d61c1ec 100644 --- a/cmd/sims/consume.go +++ b/cmd/sims/consume.go @@ -7,6 +7,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs" "github.com/tapglue/snaas/core" + serr "github.com/tapglue/snaas/error" "github.com/tapglue/snaas/platform/sns" "github.com/tapglue/snaas/platform/source" platformSQS "github.com/tapglue/snaas/platform/sqs" @@ -14,6 +15,7 @@ import ( "github.com/tapglue/snaas/service/connection" "github.com/tapglue/snaas/service/event" "github.com/tapglue/snaas/service/object" + "github.com/tapglue/snaas/service/reaction" "github.com/tapglue/snaas/service/rule" ) @@ -167,6 +169,50 @@ func consumeEvent( } } +func consumeReaction( + appFetch core.AppFetchFunc, + reactionSource reaction.Source, + batchc chan<- batch, + pipeline core.PipelineReactionFunc, + rules core.RuleListActiveFunc, +) error { + for { + change, err := reactionSource.Consume() + if err != nil { + if serr.IsEmptySource(err) { + continue + } + return err + } + + currentApp, err := appForNamespace(appFetch, change.Namespace) + if err != nil { + return err + } + + rs, err := rules(currentApp, rule.TypeReaction) + if err != nil { + return err + } + + ms, err := pipeline(currentApp, change, rs...) + if err != nil { + return err + } + + if len(ms) == 0 { + err = reactionSource.Ack(change.AckID) + if err != nil { + return err + } + + continue + } + + batchc <- batchMessages(currentApp, reactionSource, change.AckID, ms) + } +} + func consumeObject( appFetch core.AppFetchFunc, objectSource object.Source, diff --git a/cmd/sims/sims.go b/cmd/sims/sims.go index 2412056..fbc5204 100644 --- a/cmd/sims/sims.go +++ b/cmd/sims/sims.go @@ -6,7 +6,7 @@ import ( "os" "time" - "github.com/tapglue/snaas/service/rule" + "github.com/tapglue/snaas/service/reaction" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -28,6 +28,7 @@ import ( "github.com/tapglue/snaas/service/event" "github.com/tapglue/snaas/service/object" "github.com/tapglue/snaas/service/platform" + "github.com/tapglue/snaas/service/rule" "github.com/tapglue/snaas/service/user" ) @@ -258,6 +259,21 @@ func main() { )(objectSource) objectSource = object.LogSourceMiddleware(sourceService, logger)(objectSource) + reactionSource, err := reaction.SQSSource(sqsAPI) + if err != nil { + logger.Log("err", err, "lifecycle", "abort") + os.Exit(1) + } + reactionSource = reaction.InstrumentSourceMiddleware( + component, + sourceService, + sourceErrCount, + sourceOpCount, + sourceOpLatency, + sourceQueueLatency, + )(reactionSource) + reactionSource = reaction.LogSourceMiddleware(sourceService, logger)(reactionSource) + logger.Log( "duration", time.Now().Sub(begin).Nanoseconds(), "lifecycle", "start", @@ -356,6 +372,20 @@ func main() { } }() + go func() { + err := consumeReaction( + core.AppFetch(apps), + reactionSource, + batchc, + core.PipelineReaction(objects, users), + core.RuleListActive(rules), + ) + if err != nil { + logger.Log("err", err, "lifecycle", "abort") + os.Exit(1) + } + }() + // Distribute messages to channels. cs := []channelFunc{ channelPush( diff --git a/service/rule/rule.go b/service/rule/rule.go index 368750b..f5b2f64 100644 --- a/service/rule/rule.go +++ b/service/rule/rule.go @@ -18,6 +18,7 @@ const ( TypeConnection Type = iota TypeEvent TypeObject + TypeReaction ) type CriteriaConnection struct {