Skip to content

Commit

Permalink
Add flexible messaging
Browse files Browse the repository at this point in the history
In its first inception SIMS (Signal informed Messaging Service) showed some
significant weaknesses. With the biggest one being, due to sending unfiltered
notifications for every transaction, a quick developing fatigue for the user up
to outright annoyance and/or perceiving the messages as spam. On the other hand
is the generic content in the messages themselves decreasing the relevance and
usually fall apart quickly in specialised applications.

We've proven that the system is capable of reacting to changes in apps in
a responsive manner so that we now need to address the quality of
notifications. In order to do this we going to overhaul the pipeline of how
changes are consumed in SIMS and introduce a couple of new concepts along the
way. The goal is to empower the operator with little effort to create quality
notifications.

At the heart of the new pipeline will be rules and templates. Where rules are
the configuration based on the input e.g (a new `Post` which has the `article`
tag). After creation of such rule it can be associated with a template which
can make use of variables provided (e.g. `recipient.Username`).

Some house-keeping has to be done as we haven't followed through with some of
the concepts required. Up until now we hard-coded the mappings of platform
information to internal understanding of an App. As we use `SNS` there is quite
some management going on and we need to put that information (certs, endpoint,
schema) in a persistent place that can be managed without code deploys or
issuing `SQL` statements.

* add rule abstraction and service
* add pipeline abstraction
* integrate in SIMS
  • Loading branch information
Alexander Simmerl committed Dec 6, 2016
1 parent de4c273 commit af72407
Show file tree
Hide file tree
Showing 23 changed files with 2,185 additions and 524 deletions.
18 changes: 17 additions & 1 deletion TODO
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,25 @@ infrastructure:
✔ generate pg password during setup @done (16-11-11 11:10)
☐ fix pg monitoring role
☐ setup grafana and prometheus on monitoring host
☐ setup monitoring dashboard definition
☐ vpc pairing template
☐ pganalyze template
☐ handle dependencies with gvt
☐ create remote-exec provisioner for DB bootstrap
☐ make binaries available with Github release
☐ setup alertmanager and pagerduty
☐ setup cloudwatch logging

terraformer:
☐ verify account
☐ ask for cidr_block
☐ user MFA (if possible)
☐ sync vars and key

code:
✔ move event condition code @done (16-11-11 11:10)
✔ add missing event indeces @done (16-11-12 12:36)
✔ enable TLS and load certs @done (16-12-02 15:41)
☐ Implement instrumentation and log middleware for platform.Service
☐ Implement instrumentation and log middleware for platform.Service
☐ Complete postgres rule.Service
☐ Implement instrumentation and log middleware for rule.Service
8 changes: 4 additions & 4 deletions cmd/sims/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import (
"github.com/tapglue/snaas/service/app"
)

type channelFunc func(*app.App, *message) error
type channelFunc func(*app.App, *core.Message) error

func channelPush(
deviceListUser core.DeviceListUserFunc,
deviceSync core.DeviceSyncEndpointFunc,
fetchActive core.PlatformFetchActiveFunc,
push sns.PushFunc,
) channelFunc {
return func(currentApp *app.App, msg *message) error {
ds, err := deviceListUser(currentApp, msg.recipient)
return func(currentApp *app.App, msg *core.Message) error {
ds, err := deviceListUser(currentApp, msg.Recipient)
if err != nil {
return err
}
Expand All @@ -38,7 +38,7 @@ func channelPush(
return err
}

err = push(d.Platform, d.EndpointARN, p.Scheme, msg.urn, msg.message)
err = push(d.Platform, d.EndpointARN, p.Scheme, msg.URN, msg.Message)
if err != nil {
if sns.IsDeliveryFailure(err) {
return nil
Expand Down
98 changes: 43 additions & 55 deletions cmd/sims/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,67 +14,58 @@ import (
"github.com/tapglue/snaas/service/connection"
"github.com/tapglue/snaas/service/event"
"github.com/tapglue/snaas/service/object"
"github.com/tapglue/snaas/service/rule"
)

type ackFunc func() error

type batch struct {
ackFunc ackFunc
app *app.App
messages messages
messages core.Messages
}

type message struct {
message string
recipient uint64
urn string
}

type messages []*message

func consumeConnection(
appFetch core.AppFetchFunc,
conSource connection.Source,
batchc chan<- batch,
ruleFns ...conRuleFunc,
pipeline core.PipelineConnectionFunc,
rules core.RuleListActiveFunc,
) error {
for {
c, err := conSource.Consume()
change, err := conSource.Consume()
if err != nil {
if connection.IsEmptySource(err) {
continue
}
return err
}

currentApp, err := appForNamespace(appFetch, c.Namespace)
currentApp, err := appForNamespace(appFetch, change.Namespace)
if err != nil {
return err
}

ms := messages{}

for _, rule := range ruleFns {
msgs, err := rule(currentApp, c)
if err != nil {
return err
}
rs, err := rules(currentApp, rule.TypeConnection)
if err != nil {
return err
}

for _, msg := range msgs {
ms = append(ms, msg)
}
ms, err := pipeline(currentApp, change, rs...)
if err != nil {
return err
}

if len(ms) == 0 {
err := conSource.Ack(c.AckID)
err := conSource.Ack(change.AckID)
if err != nil {
return err
}

continue
}

batchc <- batchMessages(currentApp, conSource, c.AckID, ms)
batchc <- batchMessages(currentApp, conSource, change.AckID, ms)
}
}

Expand Down Expand Up @@ -131,103 +122,100 @@ func consumeEndpointChange(
}
}
}

func consumeEvent(
appFetch core.AppFetchFunc,
eventSource event.Source,
batchc chan<- batch,
ruleFns ...eventRuleFunc,
pipeline core.PipelineEventFunc,
rules core.RuleListActiveFunc,
) error {
for {
c, err := eventSource.Consume()
change, err := eventSource.Consume()
if err != nil {
if event.IsEmptySource(err) {
continue
}
return err
}

currentApp, err := appForNamespace(appFetch, c.Namespace)
currentApp, err := appForNamespace(appFetch, change.Namespace)
if err != nil {
return err
}

ms := messages{}

for _, rule := range ruleFns {
rs, err := rule(currentApp, c)
if err != nil {
return err
}
rs, err := rules(currentApp, rule.TypeEvent)
if err != nil {
return err
}

for _, msg := range rs {
ms = append(ms, msg)
}
ms, err := pipeline(currentApp, change, rs...)
if err != nil {
return err
}

if len(ms) == 0 {
err = eventSource.Ack(c.AckID)
err = eventSource.Ack(change.AckID)
if err != nil {
return err
}

continue
}

batchc <- batchMessages(currentApp, eventSource, c.AckID, ms)
batchc <- batchMessages(currentApp, eventSource, change.AckID, ms)
}
}

func consumeObject(
appFetch core.AppFetchFunc,
objectSource object.Source,
batchc chan<- batch,
ruleFns ...objectRuleFunc,
pipeline core.PipelineObjectFunc,
rules core.RuleListActiveFunc,
) error {
for {
c, err := objectSource.Consume()
change, err := objectSource.Consume()
if err != nil {
if object.IsEmptySource(err) {
continue
}
return err
}

currentApp, err := appForNamespace(appFetch, c.Namespace)
currentApp, err := appForNamespace(appFetch, change.Namespace)
if err != nil {
return err
}

ms := messages{}

for _, rule := range ruleFns {
rs, err := rule(currentApp, c)
if err != nil {
return err
}
rs, err := rules(currentApp, rule.TypeObject)
if err != nil {
return err
}

for _, msg := range rs {
ms = append(ms, msg)
}
ms, err := pipeline(currentApp, change, rs...)
if err != nil {
return err
}

if len(ms) == 0 {
err := objectSource.Ack(c.AckID)
err := objectSource.Ack(change.AckID)
if err != nil {
return err
}

continue
}

batchc <- batchMessages(currentApp, objectSource, c.AckID, ms)
batchc <- batchMessages(currentApp, objectSource, change.AckID, ms)
}
}

func batchMessages(
currentApp *app.App,
acker source.Acker,
ackID string,
ms messages,
ms core.Messages,
) batch {
return batch{
ackFunc: func(acked bool, ackID string) ackFunc {
Expand Down
Loading

0 comments on commit af72407

Please sign in to comment.