Skip to content

Commit

Permalink
Merge pull request #23 from tapglue/flexible-messaging
Browse files Browse the repository at this point in the history
Flexible Messaging
  • Loading branch information
xla authored Dec 6, 2016
2 parents 9833e2b + af72407 commit b33afdb
Show file tree
Hide file tree
Showing 37 changed files with 3,342 additions and 634 deletions.
19 changes: 18 additions & 1 deletion TODO
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,25 @@ infrastructure:
✔ generate pg password during setup @done (16-11-11 11:10)
☐ fix pg monitoring role
☐ setup grafana and prometheus on monitoring host
☐ setup monitoring dashboard definition
☐ vpc pairing template
☐ pganalyze template
☐ handle dependencies with gvt
☐ create remote-exec provisioner for DB bootstrap
☐ make binaries available with Github release
☐ setup alertmanager and pagerduty
☐ setup cloudwatch logging

terraformer:
☐ verify account
☐ ask for cidr_block
☐ user MFA (if possible)
☐ sync vars and key

code:
✔ move event condition code @done (16-11-11 11:10)
✔ add missing event indeces @done (16-11-12 12:36)
☐ enable TLS and load certs
✔ enable TLS and load certs @done (16-12-02 15:41)
☐ Implement instrumentation and log middleware for platform.Service
☐ Complete postgres rule.Service
☐ Implement instrumentation and log middleware for rule.Service
29 changes: 9 additions & 20 deletions cmd/sims/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ package main

import (
"github.com/tapglue/snaas/core"
pErr "github.com/tapglue/snaas/error"
"github.com/tapglue/snaas/platform/sns"
"github.com/tapglue/snaas/service/app"
"github.com/tapglue/snaas/service/device"
)

type channelFunc func(*app.App, *message) error
type channelFunc func(*app.App, *core.Message) error

func channelPush(
deviceListUser core.DeviceListUserFunc,
deviceSync core.DeviceSyncEndpointFunc,
fetchActive core.PlatformFetchActiveFunc,
push sns.PushFunc,
pApps platformApps,
) channelFunc {
return func(currentApp *app.App, msg *message) error {
ds, err := deviceListUser(currentApp, msg.recipient)
return func(currentApp *app.App, msg *core.Message) error {
ds, err := deviceListUser(currentApp, msg.Recipient)
if err != nil {
return err
}
Expand All @@ -25,31 +25,20 @@ func channelPush(
}

for _, d := range ds {
pa, err := platformAppForPlatform(pApps, currentApp, d.Platform)
p, err := fetchActive(currentApp, d.Platform)
if err != nil {
if isPlatformNotFound(err) {
if pErr.IsNotFound(err) {
continue
}
return err
}

d, err = deviceSync(currentApp, pa.ARN, d)
d, err = deviceSync(currentApp, p.ARN, d)
if err != nil {
return err
}

var p sns.Platform

switch d.Platform {
case device.PlatformAndroid:
p = sns.PlatformGCM
case device.PlatformIOS:
p = sns.PlatformAPNS
case device.PlatformIOSSandbox:
p = sns.PlatformAPNSSandbox
}

err = push(p, d.EndpointARN, pa.Scheme, msg.urn, msg.message)
err = push(d.Platform, d.EndpointARN, p.Scheme, msg.URN, msg.Message)
if err != nil {
if sns.IsDeliveryFailure(err) {
return nil
Expand Down
98 changes: 43 additions & 55 deletions cmd/sims/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,67 +14,58 @@ import (
"github.com/tapglue/snaas/service/connection"
"github.com/tapglue/snaas/service/event"
"github.com/tapglue/snaas/service/object"
"github.com/tapglue/snaas/service/rule"
)

type ackFunc func() error

type batch struct {
ackFunc ackFunc
app *app.App
messages messages
messages core.Messages
}

type message struct {
message string
recipient uint64
urn string
}

type messages []*message

func consumeConnection(
appFetch core.AppFetchFunc,
conSource connection.Source,
batchc chan<- batch,
ruleFns ...conRuleFunc,
pipeline core.PipelineConnectionFunc,
rules core.RuleListActiveFunc,
) error {
for {
c, err := conSource.Consume()
change, err := conSource.Consume()
if err != nil {
if connection.IsEmptySource(err) {
continue
}
return err
}

currentApp, err := appForNamespace(appFetch, c.Namespace)
currentApp, err := appForNamespace(appFetch, change.Namespace)
if err != nil {
return err
}

ms := messages{}

for _, rule := range ruleFns {
msgs, err := rule(currentApp, c)
if err != nil {
return err
}
rs, err := rules(currentApp, rule.TypeConnection)
if err != nil {
return err
}

for _, msg := range msgs {
ms = append(ms, msg)
}
ms, err := pipeline(currentApp, change, rs...)
if err != nil {
return err
}

if len(ms) == 0 {
err := conSource.Ack(c.AckID)
err := conSource.Ack(change.AckID)
if err != nil {
return err
}

continue
}

batchc <- batchMessages(currentApp, conSource, c.AckID, ms)
batchc <- batchMessages(currentApp, conSource, change.AckID, ms)
}
}

Expand Down Expand Up @@ -131,103 +122,100 @@ func consumeEndpointChange(
}
}
}

func consumeEvent(
appFetch core.AppFetchFunc,
eventSource event.Source,
batchc chan<- batch,
ruleFns ...eventRuleFunc,
pipeline core.PipelineEventFunc,
rules core.RuleListActiveFunc,
) error {
for {
c, err := eventSource.Consume()
change, err := eventSource.Consume()
if err != nil {
if event.IsEmptySource(err) {
continue
}
return err
}

currentApp, err := appForNamespace(appFetch, c.Namespace)
currentApp, err := appForNamespace(appFetch, change.Namespace)
if err != nil {
return err
}

ms := messages{}

for _, rule := range ruleFns {
rs, err := rule(currentApp, c)
if err != nil {
return err
}
rs, err := rules(currentApp, rule.TypeEvent)
if err != nil {
return err
}

for _, msg := range rs {
ms = append(ms, msg)
}
ms, err := pipeline(currentApp, change, rs...)
if err != nil {
return err
}

if len(ms) == 0 {
err = eventSource.Ack(c.AckID)
err = eventSource.Ack(change.AckID)
if err != nil {
return err
}

continue
}

batchc <- batchMessages(currentApp, eventSource, c.AckID, ms)
batchc <- batchMessages(currentApp, eventSource, change.AckID, ms)
}
}

func consumeObject(
appFetch core.AppFetchFunc,
objectSource object.Source,
batchc chan<- batch,
ruleFns ...objectRuleFunc,
pipeline core.PipelineObjectFunc,
rules core.RuleListActiveFunc,
) error {
for {
c, err := objectSource.Consume()
change, err := objectSource.Consume()
if err != nil {
if object.IsEmptySource(err) {
continue
}
return err
}

currentApp, err := appForNamespace(appFetch, c.Namespace)
currentApp, err := appForNamespace(appFetch, change.Namespace)
if err != nil {
return err
}

ms := messages{}

for _, rule := range ruleFns {
rs, err := rule(currentApp, c)
if err != nil {
return err
}
rs, err := rules(currentApp, rule.TypeObject)
if err != nil {
return err
}

for _, msg := range rs {
ms = append(ms, msg)
}
ms, err := pipeline(currentApp, change, rs...)
if err != nil {
return err
}

if len(ms) == 0 {
err := objectSource.Ack(c.AckID)
err := objectSource.Ack(change.AckID)
if err != nil {
return err
}

continue
}

batchc <- batchMessages(currentApp, objectSource, c.AckID, ms)
batchc <- batchMessages(currentApp, objectSource, change.AckID, ms)
}
}

func batchMessages(
currentApp *app.App,
acker source.Acker,
ackID string,
ms messages,
ms core.Messages,
) batch {
return batch{
ackFunc: func(acked bool, ackID string) ackFunc {
Expand Down
Loading

0 comments on commit b33afdb

Please sign in to comment.