Skip to content

Commit

Permalink
Merge pull request #36 from tapglue/reaction-source
Browse files Browse the repository at this point in the history
Reaction source
  • Loading branch information
xla authored Jan 31, 2017
2 parents b43a93c + 49b8414 commit 480d1f7
Show file tree
Hide file tree
Showing 18 changed files with 828 additions and 21 deletions.
28 changes: 23 additions & 5 deletions cmd/gateway-http/gateway-http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"time"

"github.com/tapglue/snaas/service/reaction"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
awsSession "github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -30,6 +28,7 @@ import (
"github.com/tapglue/snaas/service/device"
"github.com/tapglue/snaas/service/event"
"github.com/tapglue/snaas/service/object"
"github.com/tapglue/snaas/service/reaction"
"github.com/tapglue/snaas/service/session"
"github.com/tapglue/snaas/service/user"
)
Expand Down Expand Up @@ -222,9 +221,10 @@ func main() {

// Setup sources.
var (
conSource connection.Source
eventSource event.Source
objectSource object.Source
conSource connection.Source
eventSource event.Source
objectSource object.Source
reactionSource reaction.Source
)

switch *source {
Expand All @@ -250,6 +250,12 @@ func main() {
logger.Log("err", err, "lifecycle", "abort")
os.Exit(1)
}

reactionSource, err = reaction.SQSSource(sqsAPI)
if err != nil {
logger.Log("err", err, "lifecycle", "abort")
os.Exit(1)
}
default:
logger.Log(
"err", fmt.Sprintf("Source type '%s' not supported", *source),
Expand Down Expand Up @@ -288,6 +294,16 @@ func main() {
)(objectSource)
objectSource = object.LogSourceMiddleware(*source, logger)(objectSource)

reactionSource = reaction.InstrumentSourceMiddleware(
component,
*source,
sourceErrCount,
sourceOpCount,
sourceOpLatency,
sourceQueueLatency,
)(reactionSource)
reactionSource = reaction.LogSourceMiddleware(*source, logger)(reactionSource)

// Setup services.
var apps app.Service
apps = app.PostgresService(pgClient)
Expand Down Expand Up @@ -366,6 +382,8 @@ func main() {
serviceOpLatency,
)(reactions)
reactions = reaction.LogServiceMiddleware(logger, storeService)(reactions)
// Combine reaction service and source.
reactions = reaction.SourcingServiceMiddleware(reactionSource)(reactions)

var sessions session.Service
sessions = session.PostgresService(pgClient)
Expand Down
46 changes: 46 additions & 0 deletions cmd/sims/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"

"github.com/tapglue/snaas/core"
serr "github.com/tapglue/snaas/error"
"github.com/tapglue/snaas/platform/sns"
"github.com/tapglue/snaas/platform/source"
platformSQS "github.com/tapglue/snaas/platform/sqs"
"github.com/tapglue/snaas/service/app"
"github.com/tapglue/snaas/service/connection"
"github.com/tapglue/snaas/service/event"
"github.com/tapglue/snaas/service/object"
"github.com/tapglue/snaas/service/reaction"
"github.com/tapglue/snaas/service/rule"
)

Expand Down Expand Up @@ -167,6 +169,50 @@ func consumeEvent(
}
}

func consumeReaction(
appFetch core.AppFetchFunc,
reactionSource reaction.Source,
batchc chan<- batch,
pipeline core.PipelineReactionFunc,
rules core.RuleListActiveFunc,
) error {
for {
change, err := reactionSource.Consume()
if err != nil {
if serr.IsEmptySource(err) {
continue
}
return err
}

currentApp, err := appForNamespace(appFetch, change.Namespace)
if err != nil {
return err
}

rs, err := rules(currentApp, rule.TypeReaction)
if err != nil {
return err
}

ms, err := pipeline(currentApp, change, rs...)
if err != nil {
return err
}

if len(ms) == 0 {
err = reactionSource.Ack(change.AckID)
if err != nil {
return err
}

continue
}

batchc <- batchMessages(currentApp, reactionSource, change.AckID, ms)
}
}

func consumeObject(
appFetch core.AppFetchFunc,
objectSource object.Source,
Expand Down
32 changes: 31 additions & 1 deletion cmd/sims/sims.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"
"time"

"github.com/tapglue/snaas/service/rule"
"github.com/tapglue/snaas/service/reaction"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
Expand All @@ -28,6 +28,7 @@ import (
"github.com/tapglue/snaas/service/event"
"github.com/tapglue/snaas/service/object"
"github.com/tapglue/snaas/service/platform"
"github.com/tapglue/snaas/service/rule"
"github.com/tapglue/snaas/service/user"
)

Expand Down Expand Up @@ -258,6 +259,21 @@ func main() {
)(objectSource)
objectSource = object.LogSourceMiddleware(sourceService, logger)(objectSource)

reactionSource, err := reaction.SQSSource(sqsAPI)
if err != nil {
logger.Log("err", err, "lifecycle", "abort")
os.Exit(1)
}
reactionSource = reaction.InstrumentSourceMiddleware(
component,
sourceService,
sourceErrCount,
sourceOpCount,
sourceOpLatency,
sourceQueueLatency,
)(reactionSource)
reactionSource = reaction.LogSourceMiddleware(sourceService, logger)(reactionSource)

logger.Log(
"duration", time.Now().Sub(begin).Nanoseconds(),
"lifecycle", "start",
Expand Down Expand Up @@ -356,6 +372,20 @@ func main() {
}
}()

go func() {
err := consumeReaction(
core.AppFetch(apps),
reactionSource,
batchc,
core.PipelineReaction(objects, users),
core.RuleListActive(rules),
)
if err != nil {
logger.Log("err", err, "lifecycle", "abort")
os.Exit(1)
}
}()

// Distribute messages to channels.
cs := []channelFunc{
channelPush(
Expand Down
120 changes: 120 additions & 0 deletions core/pipieline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"text/template"
"time"

"github.com/tapglue/snaas/service/reaction"

"golang.org/x/text/language"

serr "github.com/tapglue/snaas/error"
Expand Down Expand Up @@ -295,6 +297,90 @@ func PipelineObject(
}
}

// PipelineReactionFunc constructs a Pipeline that by applying the provided
// rules outputs Messages.
type PipelineReactionFunc func(
*app.App,
*reaction.StateChange,
...*rule.Rule,
) (Messages, error)

// PipelineReaction constructs a Pipeline that by applying the provided rules
// outputs Messages.
func PipelineReaction(
objects object.Service,
users user.Service,
) PipelineReactionFunc {
return func(
currentApp *app.App,
change *reaction.StateChange,
rules ...*rule.Rule,
) (Messages, error) {
var (
ms = Messages{}
r = change.New

context *contextReaction
owner *user.User
parent *object.Object
parentOwner *user.User
)

owner, err := UserFetch(users)(currentApp, r.OwnerID)
if err != nil {
return nil, err
}

if r.ObjectID != 0 {
parent, err = objectFetch(objects)(currentApp, r.ObjectID)
if err != nil {
return nil, err
}

parentOwner, err = UserFetch(users)(currentApp, parent.OwnerID)
if err != nil {
return nil, err
}
}

context = &contextReaction{
Owner: owner,
Parent: parent,
ParentOwner: parentOwner,
Reaction: r,
}

for _, r := range rules {
if !r.Criteria.Match(change) {
continue
}

for _, recipient := range r.Recipients {
rs, err := recipientsReaction()(currentApp, context, recipient.Query)
if err != nil {
return nil, err
}

for _, r := range rs {
urn, err := compileTemplate(context, recipient.URN)
if err != nil {
return nil, err
}

msg, err := compileTemplate(context, recipient.Templates[language.English.String()])
if err != nil {
return nil, err
}

ms = append(ms, &Message{Message: msg, Recipient: r.ID, URN: urn})
}
}
}

return ms, nil
}
}

type contextConnection struct {
Conenction *connection.Connection
From *user.User
Expand All @@ -316,6 +402,13 @@ type contextObject struct {
ParentOwner *user.User
}

type contextReaction struct {
Owner *user.User
Parent *object.Object
ParentOwner *user.User
Reaction *reaction.Reaction
}

func compileTemplate(context interface{}, t string) (string, error) {
tmpl, err := template.New("message").Parse(t)
if err != nil {
Expand Down Expand Up @@ -518,3 +611,30 @@ func recipientsObject(
return us, nil
}
}

type recipientsReactionFunc func(
*app.App,
*contextReaction,
rule.Query,
) (user.List, error)

func recipientsReaction() recipientsReactionFunc {
return func(
currentApp *app.App,
context *contextReaction,
q rule.Query,
) (user.List, error) {
us := user.List{}

for condType := range q {
switch condType {
case queryCondParentOwner:
if context.Owner.ID != context.ParentOwner.ID {
us = append(us, context.ParentOwner)
}
}
}

return us, nil
}
}
Loading

0 comments on commit 480d1f7

Please sign in to comment.