From 424d96e3bc44aa8f5f2c3f9fc6a760e342175b2f Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Mon, 28 Nov 2016 11:18:18 +0100 Subject: [PATCH 1/3] Draft Flexible Messaging --- doc/flexible-messaging.md | 50 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 doc/flexible-messaging.md diff --git a/doc/flexible-messaging.md b/doc/flexible-messaging.md new file mode 100644 index 0000000..3b0e6fb --- /dev/null +++ b/doc/flexible-messaging.md @@ -0,0 +1,50 @@ +# Flexible Messaging + +In its first inception SIMS (Signal informed Messaging Service) showed some significant weaknesses. With the biggest one being, due to sending unfiltered notifications for every transaction, a quick developing fatigue for the user up to outright annoyance and/or perceiving the messages as spam. On the other hand is the generic content in the messages themselves decreasing the relevance and usually fall apart quickly in specialised applications. + +We've proven that the system is capable of reacting to changes in apps in a responsive manner so that we now need to address the quality of notifications. In order to do this we going to overhaul the pipeline of how changes are consumed in SIMS and introduce a couple of new concepts along the way. The goal is to empower the operator with little effort to create quality notifications. + +At the heart of the new pipeline will be rules and templates. Where rules are the configuration based on the input e.g (a new `Post` which has the `article` tag). After creation of such rule it can be associated with a template which can make use of variables provided (e.g. `recipient.Username`). + +### Components + +Some house-keeping has to be done as we haven't followed through with some of the concepts required. Up until now we hard-coded the mappings of platform information to internal understanding of an App. As we use `SNS` there is quite some management going on and we need to put that information (certs, endpoint, schema) in a persistent place that can be managed without code deploys or issuing `SQL` statements. + +#### Pipeline + +* **rule**: Determines if and who message should be send to based on configurable criteria and what the message content looks like. +* **criteria**: Old and new entity information which can be used in conditions. +* **recipient**: Users which stand in relation to the entity (e.g. owner of a post). +* **template**: Interpolate string which has a set of variables to work with and is created per recipient and language. +* **var**: Piece of information that can be used in templates for personalisation. + + +#### Platform + +Is the representation of an AWS Platform Application and is used to track important information and alleviate the need for direct interaction with SNS. + +``` go +// Platform supported for a Device. +const ( + IOSSandbox Ecosystem = iota + 1 + IOS + Android +) + +// Ecosystem of a device. +type Ecosystem uint8 + +// Platform represents an ecosystem like Android or iOS. +type Platform struct { + Active bool + ARN string + Ecosystem Ecosystem + Scheme string +} +``` + +### Tasks + +- [ ] Implement Platform +- [ ] Implement Rules +- [ ] Update SIMS \ No newline at end of file From 4a93be10c32cac4e446d268513c5b3801e5517b3 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Fri, 2 Dec 2016 15:51:40 +0100 Subject: [PATCH 2/3] Implement Application Platforms Which are used as the book-keeping glue between Apps and the SNS entity used for notification routing. * implement platform service * implement platform create handler (not mounted) * implement platform create and fetch by ARN or active per app * add first centralised errors --- TODO | 3 +- cmd/sims/channel.go | 23 +- cmd/sims/platform.go | 80 ---- cmd/sims/sims.go | 25 +- core/device.go | 8 +- core/platform.go | 131 +++++++ error/error.go | 58 +++ handler/http/device.go | 10 +- handler/http/platform.go | 90 +++++ infrastructure/terraform/template/storage.tf | 41 +- platform/sns/sns.go | 103 ++++- service/device/device.go | 14 +- service/device/helper_test.go | 3 +- service/platform/helper_test.go | 151 ++++++++ service/platform/platform.go | 84 +++++ service/platform/platform_test.go | 26 ++ service/platform/postgres.go | 375 +++++++++++++++++++ service/platform/postgres_test.go | 54 +++ 18 files changed, 1138 insertions(+), 141 deletions(-) create mode 100644 core/platform.go create mode 100644 error/error.go create mode 100644 handler/http/platform.go create mode 100644 service/platform/helper_test.go create mode 100644 service/platform/platform.go create mode 100644 service/platform/platform_test.go create mode 100644 service/platform/postgres.go create mode 100644 service/platform/postgres_test.go diff --git a/TODO b/TODO index 94195d3..f560afd 100644 --- a/TODO +++ b/TODO @@ -16,4 +16,5 @@ infrastructure: code: ✔ move event condition code @done (16-11-11 11:10) ✔ add missing event indeces @done (16-11-12 12:36) - ☐ enable TLS and load certs \ No newline at end of file + ✔ enable TLS and load certs @done (16-12-02 15:41) + ☐ Implement instrumentation and log middleware for platform.Service \ No newline at end of file diff --git a/cmd/sims/channel.go b/cmd/sims/channel.go index 921c4a5..ad41a97 100644 --- a/cmd/sims/channel.go +++ b/cmd/sims/channel.go @@ -2,9 +2,9 @@ package main import ( "github.com/tapglue/snaas/core" + pErr "github.com/tapglue/snaas/error" "github.com/tapglue/snaas/platform/sns" "github.com/tapglue/snaas/service/app" - "github.com/tapglue/snaas/service/device" ) type channelFunc func(*app.App, *message) error @@ -12,8 +12,8 @@ type channelFunc func(*app.App, *message) error func channelPush( deviceListUser core.DeviceListUserFunc, deviceSync core.DeviceSyncEndpointFunc, + fetchActive core.PlatformFetchActiveFunc, push sns.PushFunc, - pApps platformApps, ) channelFunc { return func(currentApp *app.App, msg *message) error { ds, err := deviceListUser(currentApp, msg.recipient) @@ -25,31 +25,20 @@ func channelPush( } for _, d := range ds { - pa, err := platformAppForPlatform(pApps, currentApp, d.Platform) + p, err := fetchActive(currentApp, d.Platform) if err != nil { - if isPlatformNotFound(err) { + if pErr.IsNotFound(err) { continue } return err } - d, err = deviceSync(currentApp, pa.ARN, d) + d, err = deviceSync(currentApp, p.ARN, d) if err != nil { return err } - var p sns.Platform - - switch d.Platform { - case device.PlatformAndroid: - p = sns.PlatformGCM - case device.PlatformIOS: - p = sns.PlatformAPNS - case device.PlatformIOSSandbox: - p = sns.PlatformAPNSSandbox - } - - err = push(p, d.EndpointARN, pa.Scheme, msg.urn, msg.message) + err = push(d.Platform, d.EndpointARN, p.Scheme, msg.urn, msg.message) if err != nil { if sns.IsDeliveryFailure(err) { return nil diff --git a/cmd/sims/platform.go b/cmd/sims/platform.go index 3cafc07..92392b5 100644 --- a/cmd/sims/platform.go +++ b/cmd/sims/platform.go @@ -1,73 +1,18 @@ package main import ( - "encoding/json" "errors" - "fmt" "strconv" "strings" "github.com/tapglue/snaas/core" "github.com/tapglue/snaas/service/app" - "github.com/tapglue/snaas/service/device" ) var ( ErrInvalidNamespace = errors.New("namespace invalid") - ErrPlatformNotFound = errors.New("platform not found") ) -type platformApp struct { - ARN string `json:"arn"` - Namespace string `json:"namespace"` - Scheme string `json:"scheme"` - Platform int `json:"platform"` -} - -type platformApps []*platformApp - -func (as *platformApps) Set(input string) error { - a := &platformApp{} - err := json.Unmarshal([]byte(input), a) - if err != nil { - return err - } - - *as = append(*as, a) - - return nil -} - -func (as *platformApps) String() string { - return fmt.Sprintf("%d apps", len(*as)) -} - -func appForARN( - appFetch core.AppFetchFunc, - pApps platformApps, - pARN string, -) (*app.App, error) { - var a *platformApp - - for _, pa := range pApps { - if pa.ARN == pARN { - a = pa - break - } - } - - if a == nil { - return nil, ErrPlatformNotFound - } - - id, err := namespaceToID(a.Namespace) - if err != nil { - return nil, err - } - - return appFetch(id) -} - func appForNamespace(appFetch core.AppFetchFunc, ns string) (*app.App, error) { id, err := namespaceToID(ns) if err != nil { @@ -86,28 +31,3 @@ func namespaceToID(ns string) (uint64, error) { return strconv.ParseUint(ps[1], 10, 64) } - -func isPlatformNotFound(err error) bool { - return err == ErrPlatformNotFound -} - -func platformAppForPlatform( - pApps platformApps, - a *app.App, - p device.Platform, -) (*platformApp, error) { - var pApp *platformApp - - for _, pa := range pApps { - if pa.Namespace == a.Namespace() && device.Platform(pa.Platform) == p { - pApp = pa - break - } - } - - if pApp == nil { - return nil, ErrPlatformNotFound - } - - return pApp, nil -} diff --git a/cmd/sims/sims.go b/cmd/sims/sims.go index 9ddacb0..6f0dbe4 100644 --- a/cmd/sims/sims.go +++ b/cmd/sims/sims.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tapglue/snaas/core" + pErr "github.com/tapglue/snaas/error" "github.com/tapglue/snaas/platform/metrics" platformSNS "github.com/tapglue/snaas/platform/sns" platformSQS "github.com/tapglue/snaas/platform/sqs" @@ -24,6 +25,7 @@ import ( "github.com/tapglue/snaas/service/device" "github.com/tapglue/snaas/service/event" "github.com/tapglue/snaas/service/object" + "github.com/tapglue/snaas/service/platform" "github.com/tapglue/snaas/service/user" ) @@ -50,7 +52,6 @@ var ( func main() { var ( begin = time.Now() - pApps = platformApps{} awsID = flag.String("aws.id", "", "Identifier for AWS requests") awsRegion = flag.String("aws.region", "us-east-1", "AWS region to operate in") @@ -58,7 +59,6 @@ func main() { postgresURL = flag.String("postgres.url", "", "Postgres URL to connect to") telemetryAddr = flag.String("telemetry.addr", ":9001", "Address to expose telemetry on") ) - flag.Var(&pApps, "app", "Repeated platform apps.") flag.Parse() logger := log.NewContext( @@ -189,6 +189,11 @@ func main() { )(objects) objects = object.LogServiceMiddleware(logger, storeService)(objects) + var platforms platform.Service + platforms = platform.PostgresService(pgClient) + // TODO: Implement instrumentaiton middleware. + // TODO: Implement logging middleware. + var users user.Service users = user.PostgresService(pgClient) users = user.InstrumentMiddleware( @@ -271,9 +276,19 @@ func main() { go func() { for c := range changec { - a, err := appForARN(core.AppFetch(apps), pApps, c.Resource) + p, err := core.PlatformFetchByARN(platforms)(c.Resource) + if err != nil { + if pErr.IsNotFound(err) { + continue + } + + logger.Log("err", err, "lifecycle", "abort") + os.Exit(1) + } + + a, err := core.AppFetch(apps)(p.AppID) if err != nil { - if isPlatformNotFound(err) { + if core.IsNotFound(err) { continue } @@ -367,8 +382,8 @@ func main() { platformSNS.EndpointRetrieve(snsAPI), platformSNS.EndpointUpdate(snsAPI), ), + core.PlatformFetchActive(platforms), platformSNS.Push(snsAPI), - pApps, ), } diff --git a/core/device.go b/core/device.go index 8b6ae21..60e82f7 100644 --- a/core/device.go +++ b/core/device.go @@ -85,7 +85,7 @@ func DeviceListUser(devices device.Service) DeviceListUserFunc { return devices.Query(currentApp.Namespace(), device.QueryOptions{ Deleted: &defaultDeleted, Disabled: &defaultDeleted, - Platforms: []device.Platform{ + Platforms: []sns.Platform{ device.PlatformIOSSandbox, device.PlatformIOS, device.PlatformAndroid, @@ -168,7 +168,7 @@ type DeviceUpdateFunc func( currentApp *app.App, origin Origin, deviceID string, - platform device.Platform, + platform sns.Platform, token string, language string, ) error @@ -179,13 +179,13 @@ func DeviceUpdate(devices device.Service) DeviceUpdateFunc { currentApp *app.App, origin Origin, deviceID string, - platform device.Platform, + platform sns.Platform, token string, language string, ) error { ds, err := devices.Query(currentApp.Namespace(), device.QueryOptions{ Deleted: &defaultDeleted, - Platforms: []device.Platform{ + Platforms: []sns.Platform{ platform, }, UserIDs: []uint64{ diff --git a/core/platform.go b/core/platform.go new file mode 100644 index 0000000..e8aa269 --- /dev/null +++ b/core/platform.go @@ -0,0 +1,131 @@ +package core + +import ( + "fmt" + + pErr "github.com/tapglue/snaas/error" + "github.com/tapglue/snaas/platform/pg" + "github.com/tapglue/snaas/platform/sns" + "github.com/tapglue/snaas/service/app" + "github.com/tapglue/snaas/service/platform" +) + +var ( + defaultActive = true +) + +// PlatformCreateFunc stores the provided platform. +type PlatformCreateFunc func( + currentApp *app.App, + p *platform.Platform, + cert, key string, +) (*platform.Platform, error) + +// PlatformCreate stores the provided platform. +func PlatformCreate( + platforms platform.Service, + createAPNS sns.AppCreateAPNSFunc, + createAPNSSandbox sns.AppCreateAPNSSandboxFunc, + createAndroid sns.AppCreateGCMFunc, +) PlatformCreateFunc { + return func( + currentApp *app.App, + p *platform.Platform, + cert, key string, + ) (*platform.Platform, error) { + arn := "" + + fmt.Printf("\n%s\n%s\n%#v\n\n", cert, key, p) + + switch p.Ecosystem { + case platform.Android: + var err error + arn, err = createAndroid(p.Name, key) + if err != nil { + return nil, err + } + case platform.IOS: + var err error + arn, err = createAPNS(p.Name, cert, key) + if err != nil { + return nil, err + } + case platform.IOSSandbox: + var err error + arn, err = createAPNSSandbox(p.Name, cert, key) + if err != nil { + return nil, err + } + } + + p.AppID = currentApp.ID + p.ARN = arn + + return platforms.Put(pg.MetaNamespace, p) + } +} + +// PlatformFetchActiveFunc returns the active platform for the current app and the +// given ecosystem. +type PlatformFetchActiveFunc func(*app.App, sns.Platform) (*platform.Platform, error) + +// PlatformFetchActive returns the active platform for the current app and the +// given ecosystem. +func PlatformFetchActive(platforms platform.Service) PlatformFetchActiveFunc { + return func( + currentApp *app.App, + ecosystem sns.Platform, + ) (*platform.Platform, error) { + ps, err := platforms.Query(pg.MetaNamespace, platform.QueryOptions{ + Active: &defaultActive, + AppIDs: []uint64{ + currentApp.ID, + }, + Deleted: &defaultDeleted, + Ecosystems: []sns.Platform{ + ecosystem, + }, + }) + if err != nil { + return nil, err + } + + if len(ps) != 1 { + return nil, pErr.Wrap( + pErr.ErrNotFound, + "no active platform found for %s", + sns.PlatformIdentifiers[ecosystem], + ) + } + + return ps[0], nil + } +} + +// PlatformFetchByARNFunc returns the Platform for the given ARN. +type PlatformFetchByARNFunc func(arn string) (*platform.Platform, error) + +// PlatformFetchByARN returns the Platform for the given ARN. +func PlatformFetchByARN(platforms platform.Service) PlatformFetchByARNFunc { + return func(arn string) (*platform.Platform, error) { + ps, err := platforms.Query(pg.MetaNamespace, platform.QueryOptions{ + ARNs: []string{ + arn, + }, + Deleted: &defaultDeleted, + }) + if err != nil { + return nil, err + } + + if len(ps) != 1 { + return nil, pErr.Wrap( + pErr.ErrNotFound, + "no platform found for '%s'", + arn, + ) + } + + return ps[0], nil + } +} diff --git a/error/error.go b/error/error.go new file mode 100644 index 0000000..3de2538 --- /dev/null +++ b/error/error.go @@ -0,0 +1,58 @@ +package error + +import ( + "errors" + "fmt" +) + +const errFmt = "%s: %s" + +// General-purpose errors. +var ( + ErrNotFound = errors.New("not found") +) + +// Platform errors. +var ( + ErrInvalidPlatform = errors.New("invalid platform") +) + +// Error wrapper. +type Error struct { + err error + msg string +} + +func (e Error) Error() string { + return e.msg +} + +// IsInvalidPlatform indicates if err is ErrInvalidPlatform. +func IsInvalidPlatform(err error) bool { + return unwrapError(err) == ErrInvalidPlatform +} + +// IsNotFound indicates if err is ErrNotFouund. +func IsNotFound(err error) bool { + return unwrapError(err) == ErrNotFound +} + +// Wrap constructs an Error with proper messaaging. +func Wrap(err error, format string, args ...interface{}) error { + return &Error{ + err: err, + msg: fmt.Sprintf( + errFmt, + err, fmt.Sprintf(format, args...), + ), + } +} + +func unwrapError(err error) error { + switch e := err.(type) { + case *Error: + return e.err + } + + return err +} diff --git a/handler/http/device.go b/handler/http/device.go index 2d30b5d..f8243c3 100644 --- a/handler/http/device.go +++ b/handler/http/device.go @@ -8,7 +8,7 @@ import ( "golang.org/x/net/context" "github.com/tapglue/snaas/core" - "github.com/tapglue/snaas/service/device" + "github.com/tapglue/snaas/platform/sns" ) // DeviceDelete removes a user's device. @@ -58,15 +58,15 @@ func DeviceUpdate(fn core.DeviceUpdateFunc) Handler { type payloadDevice struct { language string - platform device.Platform + platform sns.Platform token string } func (p *payloadDevice) UnmarshalJSON(raw []byte) error { f := struct { - Language string `json:"language"` - Platform device.Platform `json:"platform"` - Token string `json:"token"` + Language string `json:"language"` + Platform sns.Platform `json:"platform"` + Token string `json:"token"` }{} err := json.Unmarshal(raw, &f) diff --git a/handler/http/platform.go b/handler/http/platform.go new file mode 100644 index 0000000..7703bea --- /dev/null +++ b/handler/http/platform.go @@ -0,0 +1,90 @@ +package http + +import ( + "encoding/json" + "net/http" + "strconv" + "time" + + "golang.org/x/net/context" + + "github.com/tapglue/snaas/core" + "github.com/tapglue/snaas/platform/sns" + "github.com/tapglue/snaas/service/platform" +) + +// PlatformCreate stores the provided platform. +func PlatformCreate(fn core.PlatformCreateFunc) Handler { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + var ( + currentApp = appFromContext(ctx) + payload = payloadPlatform{} + ) + + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + respondError(w, 0, wrapError(ErrBadRequest, err.Error())) + return + } + + p, err := fn(currentApp, payload.platform, payload.cert, payload.key) + if err != nil { + respondError(w, 0, err) + return + } + + respondJSON(w, http.StatusCreated, &payloadPlatform{platform: p}) + } +} + +type payloadPlatform struct { + cert, key string + platform *platform.Platform +} + +func (p *payloadPlatform) MarshalJSON() ([]byte, error) { + return json.Marshal(&struct { + Active bool `json:"active"` + ARN string `json:"arn"` + Deleted bool `json:"deleted"` + Ecosystem int `json:"ecosystem"` + ID string `json:"id"` + Name string `json:"name"` + Scheme string `json:"scheme"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + }{ + Active: p.platform.Active, + ARN: p.platform.ARN, + Deleted: p.platform.Deleted, + Ecosystem: int(p.platform.Ecosystem), + ID: strconv.FormatUint(p.platform.ID, 10), + Name: p.platform.Name, + Scheme: p.platform.Scheme, + CreatedAt: p.platform.CreatedAt, + UpdatedAt: p.platform.UpdatedAt, + }) +} + +func (p *payloadPlatform) UnmarshalJSON(raw []byte) error { + f := struct { + Cert string `json:"cert"` + Ecosystem sns.Platform `json:"ecosystem"` + Key string `json:"key"` + Name string `json:"name"` + Scheme string `json:"scheme"` + }{} + + if err := json.Unmarshal(raw, &f); err != nil { + return err + } + + p.cert = f.Cert + p.key = f.Key + p.platform = &platform.Platform{ + Ecosystem: f.Ecosystem, + Name: f.Name, + Scheme: f.Scheme, + } + + return nil +} diff --git a/infrastructure/terraform/template/storage.tf b/infrastructure/terraform/template/storage.tf index 306b72d..2dafcc4 100644 --- a/infrastructure/terraform/template/storage.tf +++ b/infrastructure/terraform/template/storage.tf @@ -213,25 +213,25 @@ EOF visibility_timeout_seconds = 60 } -resource "aws_sqs_queue" "endpoint-state-change-dlq" { +resource "aws_sqs_queue" "event-state-change-dlq" { delay_seconds = 0 max_message_size = 262144 message_retention_seconds = 1209600 - name = "endpoint-state-change-dlq" + name = "event-state-change-dlq" receive_wait_time_seconds = 1 visibility_timeout_seconds = 300 } -resource "aws_sqs_queue" "endpoint-state-change" { +resource "aws_sqs_queue" "event-state-change" { delay_seconds = 0 max_message_size = 262144 message_retention_seconds = 1209600 - name = "endpoint-state-change" + name = "event-state-change" receive_wait_time_seconds = 1 redrive_policy = < Android { + return pErr.Wrap(pErr.ErrInvalidPlatform, "Ecosystem '%d' not supported", p.Ecosystem) + } + + if p.Name == "" { + return pErr.Wrap(pErr.ErrInvalidPlatform, "Name must be set") + } + + if p.Scheme == "" { + return pErr.Wrap(pErr.ErrInvalidPlatform, "Scheme must be set") + } + + return nil +} + +// List is a collection of Platforms. +type List []*Platform + +// QueryOptions to narrow-down platform queries. +type QueryOptions struct { + Active *bool + ARNs []string + AppIDs []uint64 + Deleted *bool + Ecosystems []sns.Platform + IDs []uint64 +} + +// Service for platform interactions. +type Service interface { + service.Lifecycle + + Put(namespace string, platform *Platform) (*Platform, error) + Query(namespace string, opts QueryOptions) (List, error) +} + +// ServiceMiddleware is a chainable behaviour modifier for Service. +type ServiceMiddleware func(Service) Service + +func flakeNamespace(ns string) string { + return fmt.Sprintf("%s_%s", ns, "platforms") +} diff --git a/service/platform/platform_test.go b/service/platform/platform_test.go new file mode 100644 index 0000000..a1337ea --- /dev/null +++ b/service/platform/platform_test.go @@ -0,0 +1,26 @@ +package platform + +import ( + "testing" + + pErr "github.com/tapglue/snaas/error" +) + +func TestValidate(t *testing.T) { + var ( + p = testPlatform() + ps = List{ + {}, // Missing ARN + {ARN: p.ARN}, // Missing Ecosystem + {ARN: p.ARN, Ecosystem: 4}, // Unsupported Ecosystem + {ARN: p.ARN, Ecosystem: p.Ecosystem}, // Missing Name + {ARN: p.ARN, Ecosystem: p.Ecosystem, Name: p.Name}, // Missing Scheme + } + ) + + for _, p := range ps { + if have, want := p.Validate(), pErr.ErrInvalidPlatform; !pErr.IsInvalidPlatform(have) { + t.Errorf("have %v, want %v", have, want) + } + } +} diff --git a/service/platform/postgres.go b/service/platform/postgres.go new file mode 100644 index 0000000..68daca1 --- /dev/null +++ b/service/platform/postgres.go @@ -0,0 +1,375 @@ +package platform + +import ( + "fmt" + "strings" + "time" + + "github.com/jmoiron/sqlx" + + "github.com/tapglue/snaas/platform/flake" + "github.com/tapglue/snaas/platform/pg" +) + +const ( + pgInsertPlatform = `INSERT INTO + %s.platforms(active, app_id, arn, deleted, ecosystem, id, name, scheme, created_at, updated_at) + VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)` + pgUpdatePlatform = ` + UPDATE + %s.platforms + SET + active = $2, + app_id = $3, + arn = $4, + deleted = $5, + ecosystem = $6, + name = $7, + scheme = $8, + updated_at = $9 + WHERE + id = $1` + + pgClauseActive = `active = ?` + pgClauseAppIDs = `app_id IN (?)` + pgClauseARNs = `arn IN (?)` + pgClauseDeleted = `deleted = ?` + pgClauseEcosystems = `ecosystem IN (?)` + pgClauseIDs = `id IN (?)` + + pgListPlatforms = ` + SELECT + active, app_id, arn, deleted, ecosystem, id, name, scheme, created_at, updated_at + FROM + %s.platforms + %s` + + pgOrderCreatedAt = `ORDER BY created_at DESC` + + pgCreateSchema = `CREATE SCHEMA IF NOT EXISTS %s` + pgCreateTable = `CREATE TABLE IF NOT EXISTS %s.platforms( + active BOOL DEFAULT false, + app_id BIGINT NOT NULL, + arn TEXT NOT NULL, + deleted BOOL DEFAULT false, + ecosystem INT NOT NULL, + id BIGINT NOT NULL UNIQUE, + name CITEXT NOT NULL UNIQUE, + scheme TEXT NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL + )` + pgDropTable = `DROP TABLE IF EXISTS %s.platforms` +) + +type pgService struct { + db *sqlx.DB +} + +// PostgresService returns a Postgres based Service implementation. +func PostgresService(db *sqlx.DB) Service { + return &pgService{ + db: db, + } +} + +func (s *pgService) Put(ns string, p *Platform) (*Platform, error) { + if err := p.Validate(); err != nil { + return nil, err + } + + if p.ID == 0 { + return s.insert(ns, p) + } + + return s.update(ns, p) +} + +func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { + clauses, params, err := convertOpts(opts) + if err != nil { + return nil, err + } + + ps, err := s.listPlatforms(ns, clauses, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + } + + ps, err = s.listPlatforms(ns, clauses, params...) + } + + return ps, err +} + +func (s *pgService) Setup(ns string) error { + qs := []string{ + fmt.Sprintf(pgCreateSchema, ns), + fmt.Sprintf(pgCreateTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("setup (%s): %s", q, err) + } + } + + return nil +} + +func (s *pgService) Teardown(ns string) error { + qs := []string{ + fmt.Sprintf(pgDropTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("teardown '%s': %s", q, err) + } + } + + return nil +} + +func (s *pgService) insert(ns string, p *Platform) (*Platform, error) { + if p.CreatedAt.IsZero() { + p.CreatedAt = time.Now().UTC() + } + + ts, err := time.Parse(pg.TimeFormat, p.CreatedAt.UTC().Format(pg.TimeFormat)) + if err != nil { + return nil, err + } + + p.CreatedAt = ts + p.UpdatedAt = ts + + id, err := flake.NextID(flakeNamespace(ns)) + if err != nil { + return nil, err + } + + p.ID = id + + var ( + params = []interface{}{ + p.Active, + p.AppID, + p.ARN, + p.Deleted, + p.Ecosystem, + p.ID, + p.Name, + p.Scheme, + p.CreatedAt, + p.UpdatedAt, + } + query = fmt.Sprintf(pgInsertPlatform, ns) + ) + + _, err = s.db.Exec(query, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + _, err = s.db.Exec(query, params...) + } + } + + return p, err +} + +func (s *pgService) listPlatforms( + ns string, + clauses []string, + params ...interface{}, +) (List, error) { + c := strings.Join(clauses, "\nAND") + + if len(clauses) > 0 { + c = fmt.Sprintf("WHERE %s", c) + } + + query := strings.Join([]string{ + fmt.Sprintf(pgListPlatforms, ns, c), + pgOrderCreatedAt, + }, "\n") + + query = sqlx.Rebind(sqlx.DOLLAR, query) + + rows, err := s.db.Query(query, params...) + if err != nil { + return nil, err + } + defer rows.Close() + + ps := List{} + + for rows.Next() { + p := &Platform{} + + err := rows.Scan( + &p.Active, + &p.AppID, + &p.ARN, + &p.Deleted, + &p.Ecosystem, + &p.ID, + &p.Name, + &p.Scheme, + &p.CreatedAt, + &p.UpdatedAt, + ) + if err != nil { + return nil, err + } + + p.CreatedAt = p.CreatedAt.UTC() + p.UpdatedAt = p.UpdatedAt.UTC() + + ps = append(ps, p) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return ps, nil +} + +func (s *pgService) update(ns string, p *Platform) (*Platform, error) { + now, err := time.Parse(pg.TimeFormat, time.Now().UTC().Format(pg.TimeFormat)) + if err != nil { + return nil, err + } + + p.UpdatedAt = now + + var ( + params = []interface{}{ + p.ID, + p.Active, + p.AppID, + p.ARN, + p.Deleted, + p.Ecosystem, + p.Name, + p.Scheme, + p.UpdatedAt, + } + query = fmt.Sprintf(pgUpdatePlatform, ns) + ) + + _, err = s.db.Exec(query, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + _, err = s.db.Exec(query, params...) + } + } + + return p, err +} + +func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { + var ( + clauses = []string{} + params = []interface{}{} + ) + + if opts.Active != nil { + clause, _, err := sqlx.In(pgClauseActive, []interface{}{*opts.Active}) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, *opts.Active) + } + + if len(opts.AppIDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.AppIDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseAppIDs, ps) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.ARNs) > 0 { + ps := []interface{}{} + + for _, arn := range opts.ARNs { + ps = append(ps, arn) + } + + clause, _, err := sqlx.In(pgClauseARNs, ps) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if opts.Deleted != nil { + clause, _, err := sqlx.In(pgClauseDeleted, []interface{}{*opts.Deleted}) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, *opts.Deleted) + } + + if len(opts.Ecosystems) > 0 { + ps := []interface{}{} + + for _, e := range opts.Ecosystems { + ps = append(ps, e) + } + + clause, _, err := sqlx.In(pgClauseEcosystems, ps) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.IDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.IDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseIDs, ps) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + return clauses, params, nil +} diff --git a/service/platform/postgres_test.go b/service/platform/postgres_test.go new file mode 100644 index 0000000..121a852 --- /dev/null +++ b/service/platform/postgres_test.go @@ -0,0 +1,54 @@ +// +build integration + +package platform + +import ( + "flag" + "fmt" + "os/user" + + "github.com/tapglue/snaas/platform/pg" + + "github.com/jmoiron/sqlx" + + "testing" +) + +var pgTestURL string + +func TestPostgresPut(t *testing.T) { + testServicePut(t, preparePostgres) +} + +func TestPostgresQuery(t *testing.T) { + testServiceQuery(t, preparePostgres) +} + +func preparePostgres(t *testing.T, namespace string) Service { + db, err := sqlx.Connect("postgres", pgTestURL) + if err != nil { + t.Fatal(err) + } + + s := PostgresService(db) + + if err := s.Teardown(namespace); err != nil { + t.Fatal(err) + } + + return s +} + +func init() { + u, err := user.Current() + if err != nil { + panic(err) + } + + d := fmt.Sprintf(pg.URLTest, u.Username) + + url := flag.String("postgres.url", d, "Postgres test connection URL") + flag.Parse() + + pgTestURL = *url +} From af724073980b11800476cc65a5b7e35df44fe549 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Tue, 6 Dec 2016 19:03:12 +0100 Subject: [PATCH 3/3] Add flexible messaging In its first inception SIMS (Signal informed Messaging Service) showed some significant weaknesses. With the biggest one being, due to sending unfiltered notifications for every transaction, a quick developing fatigue for the user up to outright annoyance and/or perceiving the messages as spam. On the other hand is the generic content in the messages themselves decreasing the relevance and usually fall apart quickly in specialised applications. We've proven that the system is capable of reacting to changes in apps in a responsive manner so that we now need to address the quality of notifications. In order to do this we going to overhaul the pipeline of how changes are consumed in SIMS and introduce a couple of new concepts along the way. The goal is to empower the operator with little effort to create quality notifications. At the heart of the new pipeline will be rules and templates. Where rules are the configuration based on the input e.g (a new `Post` which has the `article` tag). After creation of such rule it can be associated with a template which can make use of variables provided (e.g. `recipient.Username`). Some house-keeping has to be done as we haven't followed through with some of the concepts required. Up until now we hard-coded the mappings of platform information to internal understanding of an App. As we use `SNS` there is quite some management going on and we need to put that information (certs, endpoint, schema) in a persistent place that can be managed without code deploys or issuing `SQL` statements. * add rule abstraction and service * add pipeline abstraction * integrate in SIMS --- TODO | 18 +- cmd/sims/channel.go | 8 +- cmd/sims/consume.go | 98 ++--- cmd/sims/rules.go | 348 ---------------- cmd/sims/sims.go | 42 +- core/pipieline.go | 506 ++++++++++++++++++++++ core/pipieline_test.go | 580 ++++++++++++++++++++++++++ core/rules.go | 22 + core/user_test.go | 1 + service/connection/connection.go | 57 ++- service/connection/connection_test.go | 32 ++ service/event/event.go | 60 ++- service/event/event_test.go | 30 ++ service/object/mem.go | 37 +- service/object/object.go | 89 +++- service/object/object_test.go | 38 +- service/platform/postgres.go | 45 +- service/platform/postgres_test.go | 5 +- service/rule/postgres.go | 317 ++++++++++++++ service/rule/postgres_test.go | 204 +++++++++ service/rule/rule.go | 140 +++++++ service/user/mem.go | 18 +- service/user/user.go | 14 + 23 files changed, 2185 insertions(+), 524 deletions(-) delete mode 100644 cmd/sims/rules.go create mode 100644 core/pipieline.go create mode 100644 core/pipieline_test.go create mode 100644 core/rules.go create mode 100644 service/connection/connection_test.go create mode 100644 service/event/event_test.go create mode 100644 service/rule/postgres.go create mode 100644 service/rule/postgres_test.go create mode 100644 service/rule/rule.go diff --git a/TODO b/TODO index f560afd..470c808 100644 --- a/TODO +++ b/TODO @@ -12,9 +12,25 @@ infrastructure: ✔ generate pg password during setup @done (16-11-11 11:10) ☐ fix pg monitoring role ☐ setup grafana and prometheus on monitoring host + ☐ setup monitoring dashboard definition + ☐ vpc pairing template + ☐ pganalyze template + ☐ handle dependencies with gvt + ☐ create remote-exec provisioner for DB bootstrap + ☐ make binaries available with Github release + ☐ setup alertmanager and pagerduty + ☐ setup cloudwatch logging + +terraformer: + ☐ verify account + ☐ ask for cidr_block + ☐ user MFA (if possible) + ☐ sync vars and key code: ✔ move event condition code @done (16-11-11 11:10) ✔ add missing event indeces @done (16-11-12 12:36) ✔ enable TLS and load certs @done (16-12-02 15:41) - ☐ Implement instrumentation and log middleware for platform.Service \ No newline at end of file + ☐ Implement instrumentation and log middleware for platform.Service + ☐ Complete postgres rule.Service + ☐ Implement instrumentation and log middleware for rule.Service diff --git a/cmd/sims/channel.go b/cmd/sims/channel.go index ad41a97..70f5864 100644 --- a/cmd/sims/channel.go +++ b/cmd/sims/channel.go @@ -7,7 +7,7 @@ import ( "github.com/tapglue/snaas/service/app" ) -type channelFunc func(*app.App, *message) error +type channelFunc func(*app.App, *core.Message) error func channelPush( deviceListUser core.DeviceListUserFunc, @@ -15,8 +15,8 @@ func channelPush( fetchActive core.PlatformFetchActiveFunc, push sns.PushFunc, ) channelFunc { - return func(currentApp *app.App, msg *message) error { - ds, err := deviceListUser(currentApp, msg.recipient) + return func(currentApp *app.App, msg *core.Message) error { + ds, err := deviceListUser(currentApp, msg.Recipient) if err != nil { return err } @@ -38,7 +38,7 @@ func channelPush( return err } - err = push(d.Platform, d.EndpointARN, p.Scheme, msg.urn, msg.message) + err = push(d.Platform, d.EndpointARN, p.Scheme, msg.URN, msg.Message) if err != nil { if sns.IsDeliveryFailure(err) { return nil diff --git a/cmd/sims/consume.go b/cmd/sims/consume.go index 95f474d..9255126 100644 --- a/cmd/sims/consume.go +++ b/cmd/sims/consume.go @@ -14,6 +14,7 @@ import ( "github.com/tapglue/snaas/service/connection" "github.com/tapglue/snaas/service/event" "github.com/tapglue/snaas/service/object" + "github.com/tapglue/snaas/service/rule" ) type ackFunc func() error @@ -21,25 +22,18 @@ type ackFunc func() error type batch struct { ackFunc ackFunc app *app.App - messages messages + messages core.Messages } -type message struct { - message string - recipient uint64 - urn string -} - -type messages []*message - func consumeConnection( appFetch core.AppFetchFunc, conSource connection.Source, batchc chan<- batch, - ruleFns ...conRuleFunc, + pipeline core.PipelineConnectionFunc, + rules core.RuleListActiveFunc, ) error { for { - c, err := conSource.Consume() + change, err := conSource.Consume() if err != nil { if connection.IsEmptySource(err) { continue @@ -47,26 +41,23 @@ func consumeConnection( return err } - currentApp, err := appForNamespace(appFetch, c.Namespace) + currentApp, err := appForNamespace(appFetch, change.Namespace) if err != nil { return err } - ms := messages{} - - for _, rule := range ruleFns { - msgs, err := rule(currentApp, c) - if err != nil { - return err - } + rs, err := rules(currentApp, rule.TypeConnection) + if err != nil { + return err + } - for _, msg := range msgs { - ms = append(ms, msg) - } + ms, err := pipeline(currentApp, change, rs...) + if err != nil { + return err } if len(ms) == 0 { - err := conSource.Ack(c.AckID) + err := conSource.Ack(change.AckID) if err != nil { return err } @@ -74,7 +65,7 @@ func consumeConnection( continue } - batchc <- batchMessages(currentApp, conSource, c.AckID, ms) + batchc <- batchMessages(currentApp, conSource, change.AckID, ms) } } @@ -131,14 +122,16 @@ func consumeEndpointChange( } } } + func consumeEvent( appFetch core.AppFetchFunc, eventSource event.Source, batchc chan<- batch, - ruleFns ...eventRuleFunc, + pipeline core.PipelineEventFunc, + rules core.RuleListActiveFunc, ) error { for { - c, err := eventSource.Consume() + change, err := eventSource.Consume() if err != nil { if event.IsEmptySource(err) { continue @@ -146,26 +139,23 @@ func consumeEvent( return err } - currentApp, err := appForNamespace(appFetch, c.Namespace) + currentApp, err := appForNamespace(appFetch, change.Namespace) if err != nil { return err } - ms := messages{} - - for _, rule := range ruleFns { - rs, err := rule(currentApp, c) - if err != nil { - return err - } + rs, err := rules(currentApp, rule.TypeEvent) + if err != nil { + return err + } - for _, msg := range rs { - ms = append(ms, msg) - } + ms, err := pipeline(currentApp, change, rs...) + if err != nil { + return err } if len(ms) == 0 { - err = eventSource.Ack(c.AckID) + err = eventSource.Ack(change.AckID) if err != nil { return err } @@ -173,7 +163,7 @@ func consumeEvent( continue } - batchc <- batchMessages(currentApp, eventSource, c.AckID, ms) + batchc <- batchMessages(currentApp, eventSource, change.AckID, ms) } } @@ -181,10 +171,11 @@ func consumeObject( appFetch core.AppFetchFunc, objectSource object.Source, batchc chan<- batch, - ruleFns ...objectRuleFunc, + pipeline core.PipelineObjectFunc, + rules core.RuleListActiveFunc, ) error { for { - c, err := objectSource.Consume() + change, err := objectSource.Consume() if err != nil { if object.IsEmptySource(err) { continue @@ -192,26 +183,23 @@ func consumeObject( return err } - currentApp, err := appForNamespace(appFetch, c.Namespace) + currentApp, err := appForNamespace(appFetch, change.Namespace) if err != nil { return err } - ms := messages{} - - for _, rule := range ruleFns { - rs, err := rule(currentApp, c) - if err != nil { - return err - } + rs, err := rules(currentApp, rule.TypeObject) + if err != nil { + return err + } - for _, msg := range rs { - ms = append(ms, msg) - } + ms, err := pipeline(currentApp, change, rs...) + if err != nil { + return err } if len(ms) == 0 { - err := objectSource.Ack(c.AckID) + err := objectSource.Ack(change.AckID) if err != nil { return err } @@ -219,7 +207,7 @@ func consumeObject( continue } - batchc <- batchMessages(currentApp, objectSource, c.AckID, ms) + batchc <- batchMessages(currentApp, objectSource, change.AckID, ms) } } @@ -227,7 +215,7 @@ func batchMessages( currentApp *app.App, acker source.Acker, ackID string, - ms messages, + ms core.Messages, ) batch { return batch{ ackFunc: func(acked bool, ackID string) ackFunc { diff --git a/cmd/sims/rules.go b/cmd/sims/rules.go deleted file mode 100644 index 0eeecb0..0000000 --- a/cmd/sims/rules.go +++ /dev/null @@ -1,348 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/tapglue/snaas/core" - "github.com/tapglue/snaas/service/app" - "github.com/tapglue/snaas/service/connection" - "github.com/tapglue/snaas/service/event" - "github.com/tapglue/snaas/service/object" - "github.com/tapglue/snaas/service/user" -) - -// Messaging. -const ( - fmtCommentPost = `%s commented on a Post.` - fmtCommentPostOwn = `%s commented on your Post.` - fmtFollow = `%s started following you` - fmtFriendConfirmed = `%s accepted your friend request.` - fmtFriendRequest = `%s sent you a friend request.` - fmtLikePost = `%s liked a Post.` - fmtLikePostOwn = `%s liked your Post.` - fmtPostCreated = `%s created a new Post.` -) - -// Deep-linking URNs. -const ( - urnComment = `tapglue/posts/%d/comments/%d` - urnPost = `tapglue/posts/%d` - urnUser = `tapglue/users/%d` -) - -type conRuleFunc func(*app.App, *connection.StateChange) (messages, error) -type eventRuleFunc func(*app.App, *event.StateChange) (messages, error) -type objectRuleFunc func(*app.App, *object.StateChange) (messages, error) - -func conRuleFollower(userFetch core.UserFetchFunc) conRuleFunc { - return func( - currentApp *app.App, - change *connection.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.State != connection.StateConfirmed || - change.New.Type != connection.TypeFollow { - return nil, nil - } - - origin, err := userFetch(currentApp, change.New.FromID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - target, err := userFetch(currentApp, change.New.ToID) - if err != nil { - return nil, fmt.Errorf("target fetch: %s", err) - } - - return messages{ - { - message: fmtToMessage(fmtFollow, origin), - recipient: target.ID, - urn: fmt.Sprintf(urnUser, origin.ID), - }, - }, nil - } -} - -func conRuleFriendConfirmed(userFetch core.UserFetchFunc) conRuleFunc { - return func( - currentApp *app.App, - change *connection.StateChange, - ) (messages, error) { - if change.Old == nil || - change.Old.Type != connection.TypeFriend || - change.New.State != connection.StatePending || - change.New.Type != connection.TypeFriend { - return nil, nil - } - - origin, err := userFetch(currentApp, change.New.FromID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - target, err := userFetch(currentApp, change.New.ToID) - if err != nil { - return nil, fmt.Errorf("target fetch: %s", err) - } - - return messages{ - { - message: fmtToMessage(fmtFriendConfirmed, target), - recipient: origin.ID, - urn: fmt.Sprintf(urnUser, origin.ID), - }, - }, nil - } -} - -func conRuleFriendRequest(userFetch core.UserFetchFunc) conRuleFunc { - return func( - currentApp *app.App, - change *connection.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.State != connection.StatePending || - change.New.Type != connection.TypeFriend { - return nil, nil - } - - origin, err := userFetch(currentApp, change.New.FromID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - target, err := userFetch(currentApp, change.New.ToID) - if err != nil { - return nil, fmt.Errorf("target fetch: %s", err) - } - - return messages{ - { - message: fmtToMessage(fmtFriendRequest, origin), - recipient: target.ID, - urn: fmt.Sprintf(urnUser, origin.ID), - }, - }, nil - } -} - -func eventRuleLikeCreated( - followerIDs core.ConnectionFollowerIDsFunc, - friendIDs core.ConnectionFriendIDsFunc, - postFetch core.PostFetchFunc, - userFetch core.UserFetchFunc, - usersFetch core.UsersFetchFunc, -) eventRuleFunc { - return func( - currentApp *app.App, - change *event.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.Enabled == false || - !core.IsLike(change.New) { - return nil, nil - } - - post, err := postFetch(currentApp, change.New.ObjectID) - if err != nil { - return nil, fmt.Errorf("post fetch: %s", err) - } - - origin, err := userFetch(currentApp, change.New.UserID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - owner, err := userFetch(currentApp, post.OwnerID) - if err != nil { - return nil, fmt.Errorf("owner fetch: %s", err) - } - - followIDs, err := followerIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - fIDs, err := friendIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - ids := filterIDs(append(followIDs, fIDs...), owner.ID) - - rs, err := usersFetch(currentApp, ids...) - if err != nil { - return nil, err - } - - rs = append(rs, owner) - ms := messages{} - - for _, recipient := range rs { - f := fmtLikePost - - if post.OwnerID == recipient.ID { - f = fmtLikePostOwn - } - - ms = append(ms, &message{ - message: fmtToMessage(f, origin), - recipient: recipient.ID, - urn: fmt.Sprintf(urnPost, post.ID), - }) - } - - return ms, nil - } -} - -func objectRuleCommentCreated( - followerIDs core.ConnectionFollowerIDsFunc, - friendIDs core.ConnectionFriendIDsFunc, - postFetch core.PostFetchFunc, - userFetch core.UserFetchFunc, - usersFetch core.UsersFetchFunc, -) objectRuleFunc { - return func( - currentApp *app.App, - change *object.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.Deleted == true || - !core.IsComment(change.New) { - return nil, nil - } - - post, err := postFetch(currentApp, change.New.ObjectID) - if err != nil { - return nil, fmt.Errorf("post fetch: %s", err) - } - - origin, err := userFetch(currentApp, change.New.OwnerID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - owner, err := userFetch(currentApp, post.OwnerID) - if err != nil { - return nil, fmt.Errorf("owner fetch: %s", err) - } - - followIDs, err := followerIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - fIDs, err := friendIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - ids := filterIDs(append(followIDs, fIDs...), owner.ID) - - rs, err := usersFetch(currentApp, ids...) - if err != nil { - return nil, err - } - - rs = append(rs, owner) - ms := messages{} - - for _, recipient := range rs { - f := fmtCommentPost - - if post.OwnerID == recipient.ID { - f = fmtCommentPostOwn - } - - ms = append(ms, &message{ - message: fmtToMessage(f, origin), - recipient: recipient.ID, - urn: fmt.Sprintf(urnComment, post.ID, change.New.ID), - }) - } - - return ms, nil - } -} - -func objectRulePostCreated( - followerIDs core.ConnectionFollowerIDsFunc, - friendIDs core.ConnectionFriendIDsFunc, - userFetch core.UserFetchFunc, - usersFetch core.UsersFetchFunc, -) objectRuleFunc { - return func( - currentApp *app.App, - change *object.StateChange, - ) (messages, error) { - if change.Old != nil || - change.New.Deleted == true || - !core.IsPost(change.New) { - return nil, nil - } - - origin, err := userFetch(currentApp, change.New.OwnerID) - if err != nil { - return nil, fmt.Errorf("origin fetch: %s", err) - } - - followIDs, err := followerIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - fIDs, err := friendIDs(currentApp, origin.ID) - if err != nil { - return nil, err - } - - rs, err := usersFetch(currentApp, append(followIDs, fIDs...)...) - if err != nil { - return nil, err - } - - ms := messages{} - - for _, recipient := range rs { - ms = append(ms, &message{ - message: fmtToMessage(fmtPostCreated, origin), - recipient: recipient.ID, - urn: fmt.Sprintf(urnPost, change.New.ID), - }) - } - - return ms, nil - } -} - -func filterIDs(ids []uint64, fs ...uint64) []uint64 { - var ( - is = []uint64{} - seen = map[uint64]struct{}{} - ) - - for _, id := range fs { - seen[id] = struct{}{} - } - - for _, id := range ids { - if _, ok := seen[id]; ok { - continue - } - - is = append(is, id) - } - - return is -} - -func fmtToMessage(f string, u *user.User) string { - if u.Firstname != "" { - return fmt.Sprintf(f, u.Firstname) - } - - return fmt.Sprintf(f, u.Username) -} diff --git a/cmd/sims/sims.go b/cmd/sims/sims.go index 6f0dbe4..2412056 100644 --- a/cmd/sims/sims.go +++ b/cmd/sims/sims.go @@ -6,6 +6,8 @@ import ( "os" "time" + "github.com/tapglue/snaas/service/rule" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" awsSession "github.com/aws/aws-sdk-go/aws/session" @@ -194,6 +196,11 @@ func main() { // TODO: Implement instrumentaiton middleware. // TODO: Implement logging middleware. + var rules rule.Service + rules = rule.PostgresService(pgClient) + // TODO: Implement instrumentaiton middleware. + // TODO: Implement logging middleware. + var users user.Service users = user.PostgresService(pgClient) users = user.InstrumentMiddleware( @@ -312,15 +319,8 @@ func main() { core.AppFetch(apps), conSource, batchc, - conRuleFollower( - core.UserFetch(users), - ), - conRuleFriendConfirmed( - core.UserFetch(users), - ), - conRuleFriendRequest( - core.UserFetch(users), - ), + core.PipelineConnection(users), + core.RuleListActive(rules), ) if err != nil { logger.Log("err", err, "lifecycle", "abort") @@ -333,13 +333,8 @@ func main() { core.AppFetch(apps), eventSource, batchc, - eventRuleLikeCreated( - core.ConnectionFollowerIDs(connections), - core.ConnectionFriendIDs(connections), - core.PostFetch(objects), - core.UserFetch(users), - core.UsersFetch(users), - ), + core.PipelineEvent(objects, users), + core.RuleListActive(rules), ) if err != nil { logger.Log("err", err, "lifecycle", "abort") @@ -352,19 +347,8 @@ func main() { core.AppFetch(apps), objectSource, batchc, - objectRuleCommentCreated( - core.ConnectionFollowerIDs(connections), - core.ConnectionFriendIDs(connections), - core.PostFetch(objects), - core.UserFetch(users), - core.UsersFetch(users), - ), - objectRulePostCreated( - core.ConnectionFollowerIDs(connections), - core.ConnectionFriendIDs(connections), - core.UserFetch(users), - core.UsersFetch(users), - ), + core.PipelineObject(connections, objects, users), + core.RuleListActive(rules), ) if err != nil { logger.Log("err", err, "lifecycle", "abort") diff --git a/core/pipieline.go b/core/pipieline.go new file mode 100644 index 0000000..b06bf07 --- /dev/null +++ b/core/pipieline.go @@ -0,0 +1,506 @@ +package core + +import ( + "bytes" + "encoding/json" + "text/template" + "time" + + "golang.org/x/text/language" + + serr "github.com/tapglue/snaas/error" + "github.com/tapglue/snaas/service/app" + "github.com/tapglue/snaas/service/connection" + "github.com/tapglue/snaas/service/event" + "github.com/tapglue/snaas/service/object" + "github.com/tapglue/snaas/service/rule" + "github.com/tapglue/snaas/service/user" +) + +const ( + queryCondOwnerFriends = "ownerFriends" + queryCondObjectOwner = "objectOwner" + queryCondOwner = "owner" + queryCondParentOwner = "parentOwner" + queryCondUserFrom = "userFrom" + queryCondUserTo = "userTo" +) + +// Message is the envelope which holds the templated message produced by a +// Pipeline together with the recipient and the URN to deliver with it. +type Message struct { + Message string + Recipient uint64 + URN string +} + +// Messages is a Message collection +type Messages []*Message + +// PipelineConnectionFunc constructs a Pipeline that by applying the provided +// rules outputs messages. +type PipelineConnectionFunc func( + *app.App, + *connection.StateChange, + ...*rule.Rule, +) (Messages, error) + +// PipelineConnection constructs a Pipeline that by applying the provided +// rules outputs Messages. +func PipelineConnection(users user.Service) PipelineConnectionFunc { + return func( + currentApp *app.App, + change *connection.StateChange, + rules ...*rule.Rule, + ) (Messages, error) { + var ( + ms = Messages{} + c = change.New + + context *contextConnection + from, to *user.User + ) + + if change.New == nil { + return Messages{}, nil + } + + from, err := UserFetch(users)(currentApp, c.FromID) + if err != nil { + return nil, err + } + + to, err = UserFetch(users)(currentApp, c.ToID) + if err != nil { + return nil, err + } + + context = &contextConnection{ + Conenction: c, + From: from, + To: to, + } + + for _, r := range rules { + if !r.Criteria.Match(change) { + continue + } + + for _, recipient := range r.Recipients { + rs, err := recipientsConnection()(currentApp, context, recipient.Query) + if err != nil { + return nil, err + } + + for _, r := range rs { + urn, err := compileTemplate(context, recipient.URN) + if err != nil { + return nil, err + } + + msg, err := compileTemplate(context, recipient.Templates[language.English.String()]) + if err != nil { + return nil, err + } + + ms = append(ms, &Message{Message: msg, Recipient: r.ID, URN: urn}) + } + } + } + + return ms, nil + } +} + +// PipelineEventFunc constructs a Pipeline that by applying the provided rules +// outputs Messages. +type PipelineEventFunc func( + *app.App, + *event.StateChange, + ...*rule.Rule, +) (Messages, error) + +// PipelineEvent constructs a Pipeline that by applying the provided rules +// outputs Messages. +func PipelineEvent( + objects object.Service, + users user.Service, +) PipelineEventFunc { + return func( + currentApp *app.App, + change *event.StateChange, + rules ...*rule.Rule, + ) (Messages, error) { + var ( + ms = Messages{} + e = change.New + + context *contextEvent + owner *user.User + parent *object.Object + parentOwner *user.User + ) + + owner, err := UserFetch(users)(currentApp, e.UserID) + if err != nil { + return nil, err + } + + if e.ObjectID != 0 { + parent, err = objectFetch(objects)(currentApp, e.ObjectID) + if err != nil { + return nil, err + } + + parentOwner, err = UserFetch(users)(currentApp, parent.OwnerID) + if err != nil { + return nil, err + } + } + + context = &contextEvent{ + Event: e, + Owner: owner, + Parent: parent, + ParentOwner: parentOwner, + } + + for _, r := range rules { + if !r.Criteria.Match(change) { + continue + } + + for _, recipient := range r.Recipients { + rs, err := recipientsEvent()(currentApp, context, recipient.Query) + if err != nil { + return nil, err + } + + for _, r := range rs { + urn, err := compileTemplate(context, recipient.URN) + if err != nil { + return nil, err + } + + msg, err := compileTemplate(context, recipient.Templates[language.English.String()]) + if err != nil { + return nil, err + } + + ms = append(ms, &Message{Message: msg, Recipient: r.ID, URN: urn}) + } + } + } + + return ms, nil + } +} + +// PipelineObjectFunc constructs a Pipeline that by appplying the provided +// rules outputs Messages. +type PipelineObjectFunc func( + *app.App, + *object.StateChange, + ...*rule.Rule, +) (Messages, error) + +// PipelineObject constructs a Pipeline that by appplying the provided rules +// outputs Messages. +func PipelineObject( + connections connection.Service, + objects object.Service, + users user.Service, +) PipelineObjectFunc { + return func( + currentApp *app.App, + change *object.StateChange, + rules ...*rule.Rule, + ) (Messages, error) { + var ( + ms = Messages{} + o = change.New + + context *contextObject + parent *object.Object + parentOwner *user.User + ) + + if change.New == nil { + return Messages{}, nil + } + + owner, err := UserFetch(users)(currentApp, change.New.OwnerID) + if err != nil { + return nil, err + } + + if o.ObjectID != 0 { + parent, err = objectFetch(objects)(currentApp, o.ObjectID) + if err != nil { + return nil, err + } + + parentOwner, err = UserFetch(users)(currentApp, parent.OwnerID) + if err != nil { + return nil, err + } + } + + context = &contextObject{ + Object: change.New, + Owner: owner, + Parent: parent, + ParentOwner: parentOwner, + } + + for _, r := range rules { + if !r.Criteria.Match(change) { + continue + } + + for _, recipient := range r.Recipients { + rs, err := objectRecipients( + connections, + objects, + users, + )(currentApp, context, recipient.Query) + if err != nil { + return nil, err + } + + for _, r := range rs { + urn, err := compileTemplate(context, recipient.URN) + if err != nil { + return nil, err + } + + msg, err := compileTemplate(context, recipient.Templates[language.English.String()]) + if err != nil { + return nil, err + } + + ms = append(ms, &Message{Message: msg, Recipient: r.ID, URN: urn}) + } + } + } + + return ms, nil + } +} + +func compileTemplate(context interface{}, t string) (string, error) { + tmpl, err := template.New("message").Parse(t) + if err != nil { + return "", err + } + + buf := bytes.NewBuffer([]byte{}) + + err = tmpl.Execute(buf, context) + if err != nil { + return "", err + } + + return buf.String(), nil +} + +func filterIDs(ids []uint64, fs ...uint64) []uint64 { + var ( + is = []uint64{} + seen = map[uint64]struct{}{} + ) + + for _, id := range fs { + seen[id] = struct{}{} + } + + for _, id := range ids { + if _, ok := seen[id]; ok { + continue + } + + is = append(is, id) + } + + return is +} + +type objectFetchFunc func(*app.App, uint64) (*object.Object, error) + +func objectFetch(objects object.Service) objectFetchFunc { + return func(currentApp *app.App, id uint64) (*object.Object, error) { + os, err := objects.Query(currentApp.Namespace(), object.QueryOptions{ + ID: &id, + }) + if err != nil { + return nil, err + } + + if len(os) != 1 { + return nil, serr.Wrap(serr.ErrNotFound, "object missign for '%d'", id) + } + + return os[0], nil + } +} + +type objectRecipientsFunc func(*app.App, *contextObject, rule.Query) (user.List, error) + +func objectRecipients( + connections connection.Service, + objects object.Service, + users user.Service, +) objectRecipientsFunc { + return func( + currentApp *app.App, + context *contextObject, + q rule.Query, + ) (user.List, error) { + ids := []uint64{} + + for condType, condTemplate := range q { + switch condType { + case queryCondObjectOwner: + opts, err := queryOptsFromTemplate(context, condTemplate) + if err != nil { + return nil, err + } + + ownerIDs, err := ownerIDsFetch(objects, currentApp.Namespace(), opts) + if err != nil { + return nil, err + } + + ids = append(ids, ownerIDs...) + case queryCondOwnerFriends: + friendIDs, err := ConnectionFriendIDs(connections)(currentApp, context.Owner.ID) + if err != nil { + return nil, err + } + + ids = append(ids, friendIDs...) + case queryCondOwner: + ids = append(ids, context.ParentOwner.ID) + } + } + + ids = filterIDs(ids, context.Owner.ID) + + us, err := user.ListFromIDs(users, currentApp.Namespace(), ids...) + if err != nil { + return nil, err + } + + return us, nil + } +} + +func ownerIDsFetch( + objects object.Service, + ns string, + opts object.QueryOptions, +) ([]uint64, error) { + opts.Before = time.Now() + + os, err := objects.Query(ns, opts) + if err != nil { + return nil, err + } + + return os.OwnerIDs(), nil +} + +func queryOptsFromTemplate(context *contextObject, t string) (object.QueryOptions, error) { + opts := object.QueryOptions{} + + tmpl, err := template.New("onwerIDs").Parse(t) + if err != nil { + return opts, err + } + + buf := bytes.NewBuffer([]byte{}) + + err = tmpl.Execute(buf, context) + if err != nil { + return opts, err + } + + err = json.Unmarshal(buf.Bytes(), &opts) + if err != nil { + return opts, err + } + + return opts, nil +} + +type recipientsConnectionFunc func( + *app.App, + *contextConnection, + rule.Query, +) (user.List, error) + +func recipientsConnection() recipientsConnectionFunc { + return func( + currentApp *app.App, + context *contextConnection, + q rule.Query, + ) (user.List, error) { + us := user.List{} + + for condType := range q { + switch condType { + case queryCondUserFrom: + us = append(us, context.From) + case queryCondUserTo: + us = append(us, context.To) + } + } + + return us, nil + } +} + +type recipientsEventFunc func( + *app.App, + *contextEvent, + rule.Query, +) (user.List, error) + +func recipientsEvent() recipientsEventFunc { + return func( + currentApp *app.App, + context *contextEvent, + q rule.Query, + ) (user.List, error) { + us := user.List{} + + for condType := range q { + switch condType { + case queryCondParentOwner: + us = append(us, context.ParentOwner) + } + } + + return us, nil + } +} + +type contextConnection struct { + Conenction *connection.Connection + From *user.User + To *user.User +} + +type contextEvent struct { + Event *event.Event + Owner *user.User + Parent *object.Object + ParentOwner *user.User +} + +type contextObject struct { + Object *object.Object + Owner *user.User + Parent *object.Object + ParentOwner *user.User +} diff --git a/core/pipieline_test.go b/core/pipieline_test.go new file mode 100644 index 0000000..10630c9 --- /dev/null +++ b/core/pipieline_test.go @@ -0,0 +1,580 @@ +package core + +import ( + "fmt" + "math/rand" + "reflect" + "testing" + + "github.com/tapglue/snaas/service/app" + "github.com/tapglue/snaas/service/connection" + "github.com/tapglue/snaas/service/event" + "github.com/tapglue/snaas/service/object" + "github.com/tapglue/snaas/service/rule" + "github.com/tapglue/snaas/service/user" +) + +func TestPipelineConnectionCondFrom(t *testing.T) { + var ( + currentApp = testApp() + connections = connection.MemService() + users = user.MemService() + ) + + // Create friend target. + target, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create friend origin. + origin, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Friend request. + old, err := connections.Put(currentApp.Namespace(), &connection.Connection{ + Enabled: true, + FromID: origin.ID, + State: connection.StatePending, + ToID: target.ID, + Type: connection.TypeFriend, + }) + if err != nil { + t.Fatal(err) + } + + // Confirm request. + new, err := connections.Put(currentApp.Namespace(), &connection.Connection{ + Enabled: true, + FromID: origin.ID, + State: connection.StateConfirmed, + ToID: target.ID, + Type: connection.TypeFriend, + }) + if err != nil { + t.Fatal(err) + } + + var ( + enabled = true + ruleConnectionTo = &rule.Rule{ + Criteria: &rule.CriteriaConnection{ + New: &connection.QueryOptions{ + Enabled: &enabled, + States: []connection.State{ + connection.StateConfirmed, + }, + Types: []connection.Type{ + connection.TypeFriend, + }, + }, + Old: &connection.QueryOptions{ + Enabled: &enabled, + States: []connection.State{ + connection.StatePending, + }, + Types: []connection.Type{ + connection.TypeFriend, + }, + }, + }, + Recipients: rule.Recipients{ + { + Query: map[string]string{ + "userFrom": "", + }, + Templates: map[string]string{ + "en": "{{.To.Username}} accepted your friend request", + }, + URN: "tapglue/users/{{.To.ID}}", + }, + }, + } + ) + + want := Messages{ + { + Message: fmt.Sprintf("%s accepted your friend request", target.Username), + Recipient: origin.ID, + URN: fmt.Sprintf("tapglue/users/%d", target.ID), + }, + } + + have, err := PipelineConnection(users)(currentApp, &connection.StateChange{New: new, Old: old}, ruleConnectionTo) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(have, want) { + t.Errorf("have %#v, want %#v", have, want) + } +} + +func TestPipelineConnectionCondTo(t *testing.T) { + var ( + currentApp = testApp() + connections = connection.MemService() + users = user.MemService() + ) + + // Create friend target. + target, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create friend origin. + origin, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Friend request. + con, err := connections.Put(currentApp.Namespace(), &connection.Connection{ + Enabled: true, + FromID: origin.ID, + State: connection.StatePending, + ToID: target.ID, + Type: connection.TypeFriend, + }) + if err != nil { + t.Fatal(err) + } + + var ( + enabled = true + ruleConnectionTo = &rule.Rule{ + Criteria: &rule.CriteriaConnection{ + New: &connection.QueryOptions{ + Enabled: &enabled, + States: []connection.State{ + connection.StatePending, + }, + Types: []connection.Type{ + connection.TypeFriend, + }, + }, + Old: nil, + }, + Recipients: rule.Recipients{ + { + Query: map[string]string{ + "userTo": "", + }, + Templates: map[string]string{ + "en": "{{.From.Username}} sent you a friend request", + }, + URN: "tapglue/users/{{.From.ID}}", + }, + }, + } + ) + + want := Messages{ + { + Message: fmt.Sprintf("%s sent you a friend request", origin.Username), + Recipient: target.ID, + URN: fmt.Sprintf("tapglue/users/%d", origin.ID), + }, + } + + have, err := PipelineConnection(users)(currentApp, &connection.StateChange{New: con}, ruleConnectionTo) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(have, want) { + t.Errorf("have %#v, want %#v", have, want) + } +} + +func TestPipelineEventCondParentOwner(t *testing.T) { + var ( + currentApp = testApp() + events = event.MemService() + objects = object.MemService() + users = user.MemService() + ) + + // Creat Post Owner. + postOwner, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create Post. + post, err := objects.Put(currentApp.Namespace(), testPost(postOwner.ID).Object) + if err != nil { + t.Fatal(err) + } + + // Create liker. + liker, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create like. + like, err := events.Put(currentApp.Namespace(), &event.Event{ + Enabled: true, + ObjectID: post.ID, + Owned: true, + Type: TypeLike, + UserID: liker.ID, + }) + if err != nil { + t.Fatal(err) + } + + var ( + enabled = true + ruleEventParentOwner = &rule.Rule{ + Criteria: &rule.CriteriaEvent{ + New: &event.QueryOptions{ + Enabled: &enabled, + Owned: &enabled, + Types: []string{ + TypeLike, + }, + }, + Old: nil, + }, + Recipients: rule.Recipients{ + { + Query: map[string]string{ + "parentOwner": "", + }, + Templates: map[string]string{ + "en": "{{.Owner.Username}} liked your post", + }, + URN: "tapglue/users/{{.Owner.ID}}", + }, + }, + } + ) + + want := Messages{ + { + Message: fmt.Sprintf("%s liked your post", liker.Username), + Recipient: postOwner.ID, + URN: fmt.Sprintf("tapglue/users/%d", liker.ID), + }, + } + + have, err := PipelineEvent(objects, users)(currentApp, &event.StateChange{New: like}, ruleEventParentOwner) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(have, want) { + for i, m := range have { + fmt.Printf("[%d|%s] %s\n", m.Recipient, m.URN, m.Message) + fmt.Printf("[%d|%s] %s\n\n", want[i].Recipient, want[i].URN, want[i].Message) + } + + t.Errorf("have %#v, want %#v", have, want) + } +} + +func TestPipelineObjectCondFriends(t *testing.T) { + var ( + currentApp = testApp() + connections = connection.MemService() + objects = object.MemService() + users = user.MemService() + ) + + // Creat Post Owner. + postOwner, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create Post. + post, err := objects.Put(currentApp.Namespace(), testPost(postOwner.ID).Object) + if err != nil { + t.Fatal(err) + } + + // Create frist friend. + friend1, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create first connection. + _, err = connections.Put(currentApp.Namespace(), &connection.Connection{ + Enabled: true, + FromID: postOwner.ID, + State: connection.StateConfirmed, + ToID: friend1.ID, + Type: connection.TypeFriend, + }) + if err != nil { + t.Fatal(err) + } + + // Create second friend. + friend2, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create second connection. + _, err = connections.Put(currentApp.Namespace(), &connection.Connection{ + Enabled: true, + FromID: friend2.ID, + State: connection.StateConfirmed, + ToID: postOwner.ID, + Type: connection.TypeFriend, + }) + if err != nil { + t.Fatal(err) + } + + // Create creep who is not friends with post owner. + _, err = users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + ruleObjectOwner := &rule.Rule{ + Criteria: &rule.CriteriaObject{ + New: &object.QueryOptions{ + Owned: &defaultOwned, + Tags: []string{"review"}, + Types: []string{TypePost}, + }, + Old: nil, + }, + Recipients: rule.Recipients{ + { + Query: map[string]string{ + "ownerFriends": "", + }, + Templates: map[string]string{ + "en": "{{.Owner.Username}} just added a review", + }, + URN: "tapglue/posts/{{.Object.ID}}", + }, + }, + } + + want := Messages{ + { + Recipient: friend2.ID, + Message: fmt.Sprintf("%s just added a review", postOwner.Username), + URN: fmt.Sprintf("tapglue/posts/%d", post.ID), + }, + { + Recipient: friend1.ID, + Message: fmt.Sprintf("%s just added a review", postOwner.Username), + URN: fmt.Sprintf("tapglue/posts/%d", post.ID), + }, + } + + have, err := PipelineObject( + connections, + objects, + users, + )(currentApp, &object.StateChange{New: post}, ruleObjectOwner) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(have, want) { + t.Errorf("have %#v, want %#v", have, want) + } +} + +func TestPipelineObjectCondObjectOwner(t *testing.T) { + var ( + currentApp = testApp() + connections = connection.MemService() + objects = object.MemService() + users = user.MemService() + ) + + // Creat Post Owner. + postOwner, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create Post. + post, err := objects.Put(currentApp.Namespace(), testPost(postOwner.ID).Object) + if err != nil { + t.Fatal(err) + } + + // Create frist commenter. + commenter1, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create first comment. + _, err = objects.Put(currentApp.Namespace(), testComment(commenter1.ID, post)) + if err != nil { + t.Fatal(err) + } + + // Create second commenter. + commenter2, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create second comment. + _, err = objects.Put(currentApp.Namespace(), testComment(commenter2.ID, post)) + if err != nil { + t.Fatal(err) + } + + // Create final commenter. + commenter3, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Craete final comment, which we test against. + comment3, err := objects.Put(currentApp.Namespace(), testComment(commenter3.ID, post)) + if err != nil { + t.Fatal(err) + } + + ruleObjectOwner := &rule.Rule{ + Criteria: &rule.CriteriaObject{ + New: &object.QueryOptions{ + Owned: &defaultOwned, + Types: []string{TypeComment}, + }, + Old: nil, + }, + Recipients: rule.Recipients{ + { + Query: map[string]string{ + "objectOwner": `{ "object_ids": [ {{.Parent.ID}} ], "owned": true, "types": [ "tg_comment" ]}`, + }, + Templates: map[string]string{ + "en": "{{.Owner.Username}} also commented on {{.ParentOwner.Username}}s post", + }, + URN: "tapglue/posts/{{.Parent.ID}}/comments/{{.Object.ID}}", + }, + }, + } + + want := Messages{ + { + Recipient: commenter2.ID, + Message: fmt.Sprintf("%s also commented on %ss post", commenter3.Username, postOwner.Username), + URN: fmt.Sprintf("tapglue/posts/%d/comments/%d", post.ID, comment3.ID), + }, + { + Recipient: commenter1.ID, + Message: fmt.Sprintf("%s also commented on %ss post", commenter3.Username, postOwner.Username), + URN: fmt.Sprintf("tapglue/posts/%d/comments/%d", post.ID, comment3.ID), + }, + } + + have, err := PipelineObject( + connections, + objects, + users, + )(currentApp, &object.StateChange{New: comment3}, ruleObjectOwner) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(have, want) { + t.Errorf("have %#v, want %#v", have, want) + } +} + +func TestPipelineObjectCondOwner(t *testing.T) { + var ( + currentApp = testApp() + connections = connection.MemService() + objects = object.MemService() + users = user.MemService() + ) + + // Creat Post Owner. + postOwner, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create Post. + post, err := objects.Put(currentApp.Namespace(), testPost(postOwner.ID).Object) + if err != nil { + t.Fatal(err) + } + + // Create commenter. + commenter, err := users.Put(currentApp.Namespace(), testUser()) + if err != nil { + t.Fatal(err) + } + + // Create comment. + comment, err := objects.Put(currentApp.Namespace(), testComment(commenter.ID, post)) + if err != nil { + t.Fatal(err) + } + + ruleObjectOwner := &rule.Rule{ + Criteria: &rule.CriteriaObject{ + New: &object.QueryOptions{ + Owned: &defaultOwned, + Types: []string{TypeComment}, + }, + Old: nil, + }, + Recipients: rule.Recipients{ + { + Query: map[string]string{ + "owner": "", + }, + Templates: map[string]string{ + "en": "{{.Owner.Username}} commented on your post", + }, + URN: "tapglue/posts/{{.Parent.ID}}/comments/{{.Object.ID}}", + }, + }, + } + + want := Messages{ + { + Recipient: postOwner.ID, + Message: fmt.Sprintf("%s commented on your post", commenter.Username), + URN: fmt.Sprintf("tapglue/posts/%d/comments/%d", post.ID, comment.ID), + }, + } + + have, err := PipelineObject( + connections, + objects, + users, + )(currentApp, &object.StateChange{New: comment}, ruleObjectOwner) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(have, want) { + for i, m := range have { + fmt.Printf("[%d|%s] %s\n", m.Recipient, m.URN, m.Message) + fmt.Printf("[%d|%s] %s\n\n", want[i].Recipient, want[i].URN, want[i].Message) + } + + t.Errorf("have %#v, want %#v", have, want) + } +} + +func testApp() *app.App { + return &app.App{ + ID: uint64(rand.Int63()), + } +} diff --git a/core/rules.go b/core/rules.go new file mode 100644 index 0000000..9d33e6e --- /dev/null +++ b/core/rules.go @@ -0,0 +1,22 @@ +package core + +import ( + "github.com/tapglue/snaas/service/app" + "github.com/tapglue/snaas/service/rule" +) + +// RuleListActiveFunc returns all active rules for the current App. +type RuleListActiveFunc func(*app.App, rule.Type) (rule.List, error) + +// RuleListActive returns all active rules for the current App. +func RuleListActive(rules rule.Service) RuleListActiveFunc { + return func(currentApp *app.App, ruleType rule.Type) (rule.List, error) { + return rules.Query(currentApp.Namespace(), rule.QueryOptions{ + Active: &defaultActive, + Deleted: &defaultDeleted, + Types: []rule.Type{ + ruleType, + }, + }) + } +} diff --git a/core/user_test.go b/core/user_test.go index d043bde..5a3e251 100644 --- a/core/user_test.go +++ b/core/user_test.go @@ -99,5 +99,6 @@ func testUser() *user.User { ), Enabled: true, Password: generate.RandomString(8), + Username: generate.RandomStringSafe(6), } } diff --git a/service/connection/connection.go b/service/connection/connection.go index 2408602..bf393a0 100644 --- a/service/connection/connection.go +++ b/service/connection/connection.go @@ -36,6 +36,47 @@ type Consumer interface { Consume() (*StateChange, error) } +// MatchOpts indicates if the Connection matches the given QueryOptions. +func (c *Connection) MatchOpts(opts *QueryOptions) bool { + if opts == nil { + return true + } + + if opts.Enabled != nil && c.Enabled != *opts.Enabled { + return false + } + + if len(opts.States) > 0 { + discard := true + + for _, s := range opts.States { + if c.State == s { + discard = false + } + } + + if discard { + return false + } + } + + if len(opts.Types) > 0 { + discard := true + + for _, t := range opts.Types { + if c.Type == t { + discard = false + } + } + + if discard { + return false + } + } + + return true +} + // Validate performs checks on the Connection values for completeness and // correctness. func (c Connection) Validate() error { @@ -108,14 +149,14 @@ type Producer interface { // QueryOptions are used to narrow down Connection queries. type QueryOptions struct { - After time.Time - Before time.Time - Enabled *bool - FromIDs []uint64 - Limit int - States []State - ToIDs []uint64 - Types []Type + After time.Time `json:"-"` + Before time.Time `json:"-"` + Enabled *bool `json:"enabled,omitempty"` + FromIDs []uint64 `json:"from_ids,omitempty"` + Limit int `json:"-"` + States []State `json:"states,omitempty"` + ToIDs []uint64 `json:"to_ids,omitempty"` + Types []Type `json:"types,omitempty"` } // Service for connection interactions. diff --git a/service/connection/connection_test.go b/service/connection/connection_test.go new file mode 100644 index 0000000..0489c76 --- /dev/null +++ b/service/connection/connection_test.go @@ -0,0 +1,32 @@ +package connection + +import "testing" + +func TestConnectionMatchOpts(t *testing.T) { + var ( + disabled = false + enabled = true + c = &Connection{ + Enabled: true, + FromID: 1, + State: StateConfirmed, + ToID: 2, + Type: TypeFriend, + } + cases = map[*QueryOptions]bool{ + nil: true, + &QueryOptions{Enabled: &disabled}: false, + &QueryOptions{Enabled: &enabled}: true, + &QueryOptions{States: []State{StatePending}}: false, + &QueryOptions{States: []State{StateConfirmed}}: true, + &QueryOptions{Types: []Type{TypeFollow}}: false, + &QueryOptions{Types: []Type{TypeFriend}}: true, + } + ) + + for opts, want := range cases { + if have := c.MatchOpts(opts); have != want { + t.Errorf("have %v, want %v", have, want) + } + } +} diff --git a/service/event/event.go b/service/event/event.go index 0309b47..370771d 100644 --- a/service/event/event.go +++ b/service/event/event.go @@ -54,6 +54,38 @@ type Event struct { UpdatedAt time.Time `json:"updated_at"` } +// MatchOpts indicates if the Event matches the given QueryOptions. +func (e *Event) MatchOpts(opts *QueryOptions) bool { + if opts == nil { + return true + } + + if opts.Enabled != nil && e.Enabled != *opts.Enabled { + return false + } + + if opts.Owned != nil && e.Owned != *opts.Owned { + return false + } + + if len(opts.Types) > 0 { + discard := true + + for _, t := range opts.Types { + if e.Type == t { + discard = false + break + } + } + + if discard { + return false + } + } + + return true +} + // Validate performs semantic checks on the passed Event values for correctness. func (e Event) Validate() error { if e.Type == "" { @@ -141,20 +173,20 @@ type Producer interface { // QueryOptions are used to narrow down Event queries. type QueryOptions struct { - After time.Time - Before time.Time - Enabled *bool - ExternalObjectIDs []string - ExternalObjectTypes []string - IDs []uint64 - Limit int - ObjectIDs []uint64 - Owned *bool - TargetIDs []string - TargetTypes []string - Types []string - UserIDs []uint64 - Visibilities []Visibility + After time.Time `json:"-"` + Before time.Time `json:"-"` + Enabled *bool `json:"enabled"` + ExternalObjectIDs []string `json:"-"` + ExternalObjectTypes []string `json:"-"` + IDs []uint64 `json:"ids"` + Limit int `json:"-"` + ObjectIDs []uint64 `json:"object_ids"` + Owned *bool `json:"owned"` + TargetIDs []string `json:"-"` + TargetTypes []string `json:"-"` + Types []string `json:"types"` + UserIDs []uint64 `json:"user_ids"` + Visibilities []Visibility `json:"visibilities"` } // Period is a pre-defined time duration. diff --git a/service/event/event_test.go b/service/event/event_test.go new file mode 100644 index 0000000..172ff59 --- /dev/null +++ b/service/event/event_test.go @@ -0,0 +1,30 @@ +package event + +import "testing" + +func TestEventMatchOpts(t *testing.T) { + var ( + disabled = false + enabled = true + e = &Event{ + Enabled: true, + Owned: true, + Type: "signal", + } + cases = map[*QueryOptions]bool{ + nil: true, + &QueryOptions{Enabled: &disabled}: false, + &QueryOptions{Enabled: &enabled}: true, + &QueryOptions{Owned: &disabled}: false, + &QueryOptions{Owned: &enabled}: true, + &QueryOptions{Types: []string{"not-signal"}}: false, + &QueryOptions{Types: []string{"signal"}}: true, + } + ) + + for opts, want := range cases { + if have := e.MatchOpts(opts); have != want { + t.Errorf("have %v, want %v", have, want) + } + } +} diff --git a/service/object/mem.go b/service/object/mem.go index da6bb5a..038dcda 100644 --- a/service/object/mem.go +++ b/service/object/mem.go @@ -2,6 +2,7 @@ package object import ( "math" + "sort" "time" "github.com/tapglue/snaas/platform/flake" @@ -28,7 +29,7 @@ func (s *memService) Count(ns string, opts QueryOptions) (int, error) { return 0, ErrNamespaceNotFound } - return len(filterMap(bucket, opts)), nil + return len(filterList(listFromMap(bucket), opts)), nil } func (s *memService) Put(ns string, object *Object) (*Object, error) { @@ -104,7 +105,7 @@ func (s *memService) Query(ns string, opts QueryOptions) (List, error) { return nil, ErrNamespaceNotFound } - return filterMap(bucket, opts), nil + return filterList(listFromMap(bucket), opts), nil } func (s *memService) Setup(ns string) error { @@ -145,10 +146,22 @@ func inIDs(id uint64, ids []uint64) bool { return keep } -func filterMap(om Map, opts QueryOptions) List { - os := []*Object{} +func listFromMap(om Map) List { + os := List{} - for id, object := range om { + for _, object := range om { + os = append(os, object) + } + + sort.Sort(os) + + return os +} + +func filterList(os List, opts QueryOptions) List { + rs := List{} + + for _, object := range os { if !opts.Before.IsZero() && object.CreatedAt.UTC().After(opts.Before.UTC()) { continue } @@ -167,7 +180,7 @@ func filterMap(om Map, opts QueryOptions) List { continue } - if opts.ID != nil && id != *opts.ID { + if opts.ID != nil && object.ID != *opts.ID { continue } @@ -197,20 +210,20 @@ func filterMap(om Map, opts QueryOptions) List { continue } - os = append(os, object) + rs = append(rs, object) } - if len(os) == 0 { - return os + if len(rs) == 0 { + return rs } if opts.Limit > 0 { - l := math.Min(float64(len(os)), float64(opts.Limit)) + l := math.Min(float64(len(rs)), float64(opts.Limit)) - return os[:int(l)] + return rs[:int(l)] } - return os + return rs } func inTypes(ty string, ts []string) bool { diff --git a/service/object/object.go b/service/object/object.go index 8e96d99..a3e563b 100644 --- a/service/object/object.go +++ b/service/object/object.go @@ -113,11 +113,23 @@ func (c Contents) Validate() error { // List is an Object collection. type List []*Object +func (l List) Len() int { + return len(l) +} + +func (l List) Less(i, j int) bool { + return l[i].CreatedAt.After(l[j].CreatedAt) +} + +func (l List) Swap(i, j int) { + l[i], l[j] = l[j], l[i] +} + // OwnerIDs returns all user ids of the associated object owners. -func (os List) OwnerIDs() []uint64 { +func (l List) OwnerIDs() []uint64 { ids := []uint64{} - for _, o := range os { + for _, o := range l { ids = append(ids, o.OwnerID) } @@ -149,6 +161,57 @@ type Object struct { Visibility Visibility `json:"visibility"` } +// MatchOpts indicates if the Object matches the given QueryOptions. +func (o *Object) MatchOpts(opts *QueryOptions) bool { + if opts == nil { + return true + } + + if o.Deleted != opts.Deleted { + return false + } + + if opts.Owned != nil && o.Owned != *opts.Owned { + return false + } + + if len(opts.Tags) > 0 && len(o.Tags) == 0 { + return false + } + + if len(opts.Tags) > 0 { + for _, t := range opts.Tags { + discard := true + + for _, tag := range o.Tags { + if tag == t { + discard = false + } + } + + if discard { + return false + } + } + } + + if len(opts.Types) > 0 { + discard := true + + for _, t := range opts.Types { + if o.Type == t { + discard = false + } + } + + if discard { + return false + } + } + + return true +} + // Validate returns an error if a constraint on the Object is not full-filled. func (o *Object) Validate() error { if len(o.Attachments) > 5 { @@ -211,17 +274,17 @@ type Producer interface { // QueryOptions are passed to narrow down query for objects. type QueryOptions struct { - Before time.Time - Deleted bool - ExternalIDs []string - ID *uint64 - Limit int - ObjectIDs []uint64 - OwnerIDs []uint64 - Owned *bool - Tags []string - Types []string - Visibilities []Visibility + Before time.Time `json:"-"` + Deleted bool `json:"deleted,omitempty"` + ExternalIDs []string `json:"-"` + ID *uint64 `json:"id,omitempty"` + Limit int `json:"-"` + ObjectIDs []uint64 `json:"object_ids,omitempty"` + OwnerIDs []uint64 `json:"owner_ids,omitempty"` + Owned *bool `json:"owned,omitempty"` + Tags []string `json:"tags,omitempty"` + Types []string `json:"types,omitempty"` + Visibilities []Visibility `json:"visibilities,omitempty"` } // Restrictions is the composite to regulate common interactions on Posts. diff --git a/service/object/object_test.go b/service/object/object_test.go index f7e5d35..24b6acc 100644 --- a/service/object/object_test.go +++ b/service/object/object_test.go @@ -62,8 +62,40 @@ func TestAttachmentValidate(t *testing.T) { } } +func TestObjectMatchOpts(t *testing.T) { + var ( + owned = true + o = &Object{ + Deleted: false, + Owned: false, + Tags: []string{ + "tag1", + "tag2", + }, + Type: "foo", + } + cases = map[*QueryOptions]bool{ + nil: true, + &QueryOptions{Deleted: true}: false, + &QueryOptions{Deleted: false}: true, + &QueryOptions{Owned: &owned}: false, + &QueryOptions{Tags: []string{"tag3", "tag4"}}: false, + &QueryOptions{Tags: []string{"tag1"}}: true, + &QueryOptions{Tags: []string{"tag1", "tag2"}}: true, + &QueryOptions{Types: []string{"bar"}}: false, + &QueryOptions{Types: []string{"foo"}}: true, + } + ) + + for opts, want := range cases { + if have := o.MatchOpts(opts); have != want { + t.Errorf("have %v, want %v: %v", have, want, opts) + } + } +} + func TestObjectValidate(t *testing.T) { - for _, o := range []*Object{ + cases := List{ // Too many Attachments { Attachments: []Attachment{ @@ -109,7 +141,9 @@ func TestObjectValidate(t *testing.T) { Type: "recipe", Visibility: 50, }, - } { + } + + for _, o := range cases { if have, want := o.Validate(), ErrInvalidObject; !IsInvalidObject(have) { t.Errorf("have %v, want %v", have, want) } diff --git a/service/platform/postgres.go b/service/platform/postgres.go index 68daca1..c20a1a8 100644 --- a/service/platform/postgres.go +++ b/service/platform/postgres.go @@ -2,7 +2,6 @@ package platform import ( "fmt" - "strings" "time" "github.com/jmoiron/sqlx" @@ -86,12 +85,12 @@ func (s *pgService) Put(ns string, p *Platform) (*Platform, error) { } func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { - clauses, params, err := convertOpts(opts) + where, params, err := convertOpts(opts) if err != nil { return nil, err } - ps, err := s.listPlatforms(ns, clauses, params...) + ps, err := s.listPlatforms(ns, where, params...) if err != nil { if pg.IsRelationNotFound(pg.WrapError(err)) { if err := s.Setup(ns); err != nil { @@ -99,7 +98,7 @@ func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { } } - ps, err = s.listPlatforms(ns, clauses, params...) + ps, err = s.listPlatforms(ns, where, params...) } return ps, err @@ -187,22 +186,10 @@ func (s *pgService) insert(ns string, p *Platform) (*Platform, error) { } func (s *pgService) listPlatforms( - ns string, - clauses []string, + ns, where string, params ...interface{}, ) (List, error) { - c := strings.Join(clauses, "\nAND") - - if len(clauses) > 0 { - c = fmt.Sprintf("WHERE %s", c) - } - - query := strings.Join([]string{ - fmt.Sprintf(pgListPlatforms, ns, c), - pgOrderCreatedAt, - }, "\n") - - query = sqlx.Rebind(sqlx.DOLLAR, query) + query := fmt.Sprintf(pgListPlatforms, ns, where) rows, err := s.db.Query(query, params...) if err != nil { @@ -281,7 +268,7 @@ func (s *pgService) update(ns string, p *Platform) (*Platform, error) { return p, err } -func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { +func convertOpts(opts QueryOptions) (string, []interface{}, error) { var ( clauses = []string{} params = []interface{}{} @@ -290,7 +277,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { if opts.Active != nil { clause, _, err := sqlx.In(pgClauseActive, []interface{}{*opts.Active}) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -306,7 +293,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseAppIDs, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -322,7 +309,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseARNs, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -332,7 +319,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { if opts.Deleted != nil { clause, _, err := sqlx.In(pgClauseDeleted, []interface{}{*opts.Deleted}) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -348,7 +335,7 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseEcosystems, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) @@ -364,12 +351,18 @@ func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { clause, _, err := sqlx.In(pgClauseIDs, ps) if err != nil { - return nil, nil, err + return "", nil, err } clauses = append(clauses, clause) params = append(params, ps...) } - return clauses, params, nil + where := "" + + if len(clauses) > 0 { + where = sqlx.Rebind(sqlx.DOLLAR, pg.ClausesToWhere(clauses...)) + } + + return where, params, nil } diff --git a/service/platform/postgres_test.go b/service/platform/postgres_test.go index 121a852..b448739 100644 --- a/service/platform/postgres_test.go +++ b/service/platform/postgres_test.go @@ -6,12 +6,11 @@ import ( "flag" "fmt" "os/user" - - "github.com/tapglue/snaas/platform/pg" + "testing" "github.com/jmoiron/sqlx" - "testing" + "github.com/tapglue/snaas/platform/pg" ) var pgTestURL string diff --git a/service/rule/postgres.go b/service/rule/postgres.go new file mode 100644 index 0000000..6ca2307 --- /dev/null +++ b/service/rule/postgres.go @@ -0,0 +1,317 @@ +package rule + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/jmoiron/sqlx" + "github.com/tapglue/snaas/platform/flake" + "github.com/tapglue/snaas/platform/pg" +) + +const ( + pgInsertRule = `INSERT INTO + %s.rules(active, criteria, deleted, ecosystem, id, name, recipients, type, created_at, updated_at) + VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)` + + pgClauseActive = `active = ?` + pgClauseDeleted = `deleted = ?` + pgClauseIDs = `id IN (?)` + pgClauseTypes = `type IN (?)` + + pgListRules = ` + SELECT + active, criteria, deleted, ecosystem, id, name, recipients, type, created_at, updated_at + FROM + %s.rules + %s` + pgOrderCreatedAt = `ORDER BY created_at DESC` + + pgCreateSchema = `CREATE SCHEMA IF NOT EXISTS %s` + pgCreateTable = `CREATE TABLE IF NOT EXISTS %s.rules( + active BOOL DEFAULT false, + criteria JSONB NOT NULL, + deleted bool DEFAULT false, + ecosystem INT, + id BIGINT NOT NULL UNIQUE, + name TEXT NOT NULL, + recipients JSONB NOT NULL, + type INT NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL + )` + pgDropTable = `DROP TABLE IF EXISTS %s.rules` +) + +type pgService struct { + db *sqlx.DB +} + +// PostgresService returns a Postgres based Service implementation. +func PostgresService(db *sqlx.DB) Service { + return &pgService{ + db: db, + } +} + +func (s *pgService) Put(ns string, r *Rule) (*Rule, error) { + if err := r.Validate(); err != nil { + return nil, err + } + + if r.ID == 0 { + return s.insert(ns, r) + } + + return nil, fmt.Errorf("Put/Update not implementee") +} + +func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { + where, params, err := convertOpts(opts) + if err != nil { + return nil, err + } + + rs, err := s.listRules(ns, where, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + } + + rs, err = s.listRules(ns, where, params...) + } + + return rs, err +} + +func (s *pgService) Setup(ns string) error { + qs := []string{ + fmt.Sprintf(pgCreateSchema, ns), + fmt.Sprintf(pgCreateTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("setup '%s': %s", q, err) + } + } + + return nil +} + +func (s *pgService) Teardown(ns string) error { + qs := []string{ + fmt.Sprintf(pgDropTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("teardown '%s': %s", q, err) + } + } + + return nil +} + +func (s *pgService) insert(ns string, r *Rule) (*Rule, error) { + if r.CreatedAt.IsZero() { + r.CreatedAt = time.Now().UTC() + } + + ts, err := time.Parse(pg.TimeFormat, r.CreatedAt.UTC().Format(pg.TimeFormat)) + if err != nil { + return nil, err + } + + r.CreatedAt = ts + r.UpdatedAt = ts + + id, err := flake.NextID(flakeNamespace(ns)) + if err != nil { + return nil, err + } + + r.ID = id + + criteria, err := json.Marshal(r.Criteria) + if err != nil { + return nil, err + } + + recipients, err := json.Marshal(r.Recipients) + if err != nil { + return nil, err + } + + var ( + params = []interface{}{ + r.Active, + criteria, + r.Deleted, + r.Ecosystem, + r.ID, + r.Name, + recipients, + r.Type, + r.CreatedAt, + r.UpdatedAt, + } + query = fmt.Sprintf(pgInsertRule, ns) + ) + + _, err = s.db.Exec(query, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + _, err = s.db.Exec(query, params...) + } else { + return nil, err + } + } + + return r, err +} + +func (s *pgService) listRules( + ns, where string, + params ...interface{}, +) (List, error) { + query := fmt.Sprintf(pgListRules, ns, where) + + rows, err := s.db.Query(query, params...) + if err != nil { + return nil, err + } + defer rows.Close() + + rs := List{} + + for rows.Next() { + var ( + criteria = []byte{} + recipients = []byte{} + r = &Rule{} + ) + + err := rows.Scan( + &r.Active, + &criteria, + &r.Deleted, + &r.Ecosystem, + &r.ID, + &r.Name, + &recipients, + &r.Type, + &r.CreatedAt, + &r.UpdatedAt, + ) + if err != nil { + return nil, err + } + + switch r.Type { + case TypeConnection: + r.Criteria = &CriteriaConnection{} + case TypeEvent: + r.Criteria = &CriteriaEvent{} + case TypeObject: + r.Criteria = &CriteriaObject{} + default: + return nil, fmt.Errorf("type not supported") + } + + if err := json.Unmarshal(criteria, r.Criteria); err != nil { + return nil, err + } + + if err := json.Unmarshal(recipients, &r.Recipients); err != nil { + return nil, err + } + + r.CreatedAt = r.CreatedAt.UTC() + r.UpdatedAt = r.UpdatedAt.UTC() + + rs = append(rs, r) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return rs, nil +} + +func convertOpts(opts QueryOptions) (string, []interface{}, error) { + var ( + clauses = []string{} + params = []interface{}{} + ) + + if opts.Active != nil { + clause, _, err := sqlx.In(pgClauseActive, []interface{}{*opts.Active}) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, *opts.Active) + } + + if opts.Deleted != nil { + clause, _, err := sqlx.In(pgClauseDeleted, []interface{}{*opts.Deleted}) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, *opts.Deleted) + } + + if len(opts.IDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.IDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseIDs, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.Types) > 0 { + ps := []interface{}{} + + for _, t := range opts.Types { + ps = append(ps, t) + } + + clause, _, err := sqlx.In(pgClauseTypes, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + where := "" + + if len(clauses) > 0 { + where = sqlx.Rebind(sqlx.DOLLAR, pg.ClausesToWhere(clauses...)) + } + + return where, params, nil +} diff --git a/service/rule/postgres_test.go b/service/rule/postgres_test.go new file mode 100644 index 0000000..d7e011b --- /dev/null +++ b/service/rule/postgres_test.go @@ -0,0 +1,204 @@ +// +build integration + +package rule + +import ( + "flag" + "fmt" + "os/user" + "reflect" + "testing" + + "github.com/jmoiron/sqlx" + + "github.com/tapglue/snaas/platform/pg" + "github.com/tapglue/snaas/platform/sns" + "github.com/tapglue/snaas/service/connection" + "github.com/tapglue/snaas/service/event" + "github.com/tapglue/snaas/service/object" +) + +var pgTestURL string + +func TestPostgresPut(t *testing.T) { + var ( + disabled = false + enabled = true + namespace = "service_put_connection" + service = preparePostgres(t, namespace) + ) + + rules := List{ + { + Active: true, + Criteria: &CriteriaConnection{ + New: &connection.QueryOptions{ + States: []connection.State{ + connection.StateConfirmed, + }, + Types: []connection.Type{ + connection.TypeFriend, + }, + }, + Old: &connection.QueryOptions{ + States: []connection.State{ + connection.StatePending, + }, + Types: []connection.Type{ + connection.TypeFriend, + }, + }, + }, + Deleted: false, + Ecosystem: sns.PlatformAPNS, + Name: "Friend confirm", + Recipients: Recipients{ + { + Query: map[string]string{ + "foo": "bar", + }, + Templates: map[string]string{ + "en": "Where we mesage.", + }, + URN: "", + }, + }, + Type: TypeConnection, + }, + { + Active: true, + Criteria: &CriteriaEvent{ + New: &event.QueryOptions{ + Enabled: &disabled, + Types: []string{ + "signal", + }, + }, + Old: &event.QueryOptions{ + Enabled: &enabled, + Types: []string{ + "signal", + }, + }, + }, + Deleted: false, + Ecosystem: sns.PlatformAPNS, + Name: "Friend confirm", + Recipients: Recipients{ + { + Query: map[string]string{ + "foo": "bar", + }, + Templates: map[string]string{ + "en": "Where we mesage.", + }, + URN: "", + }, + }, + Type: TypeEvent, + }, + { + Active: true, + Criteria: &CriteriaObject{ + New: &object.QueryOptions{ + Owned: &enabled, + Types: []string{ + "review", + }, + Tags: []string{ + "movie", + "official", + }, + }, + Old: &object.QueryOptions{ + Owned: &enabled, + Types: []string{ + "review", + }, + Tags: []string{ + "movie", + }, + }, + }, + Deleted: false, + Ecosystem: sns.PlatformAPNS, + Name: "Friend confirm", + Recipients: Recipients{ + { + Query: map[string]string{ + "foo": "bar", + }, + Templates: map[string]string{ + "en": "Where we mesage.", + }, + URN: "", + }, + }, + Type: TypeObject, + }, + } + + for _, r := range rules { + created, err := service.Put(namespace, r) + if err != nil { + t.Fatal(err) + } + + deleted := false + + list, err := service.Query(namespace, QueryOptions{ + Deleted: &deleted, + IDs: []uint64{ + created.ID, + }, + }) + if err != nil { + t.Fatal(err) + } + + if have, want := len(list), 1; have != want { + t.Fatalf("have %v, want %v", have, want) + } + if have, want := list[0], created; !reflect.DeepEqual(have, want) { + t.Errorf("\nhave %v\nwant %v", have, want) + } + } +} + +func TestPostgresQuery(t *testing.T) { + // var ( + // activate = true + // deleted = true + // namespace = "service_query" + // service = preparePostgres(t, namespace) + // ) +} + +func preparePostgres(t *testing.T, namespace string) Service { + db, err := sqlx.Connect("postgres", pgTestURL) + if err != nil { + t.Fatal(err) + } + + s := PostgresService(db) + + if err := s.Teardown(namespace); err != nil { + t.Fatal(err) + } + + return s +} + +func init() { + u, err := user.Current() + if err != nil { + panic(err) + } + + d := fmt.Sprintf(pg.URLTest, u.Username) + + url := flag.String("postgres.url", d, "Postgres test connection URL") + flag.Parse() + + pgTestURL = *url +} diff --git a/service/rule/rule.go b/service/rule/rule.go new file mode 100644 index 0000000..68ab0e4 --- /dev/null +++ b/service/rule/rule.go @@ -0,0 +1,140 @@ +package rule + +import ( + "fmt" + "time" + + "github.com/tapglue/snaas/platform/service" + "github.com/tapglue/snaas/platform/sns" + "github.com/tapglue/snaas/service/connection" + "github.com/tapglue/snaas/service/event" + "github.com/tapglue/snaas/service/object" +) + +// Type to distinct between different stored criteria. +const ( + TypeConnection Type = iota + TypeEvent + TypeObject +) + +type CriteriaConnection struct { + New *connection.QueryOptions `json:"new"` + Old *connection.QueryOptions `json:"old"` +} + +func (c *CriteriaConnection) Match(i interface{}) bool { + s, ok := i.(*connection.StateChange) + if !ok { + return false + } + + if s.New == nil && s.Old == nil { + return false + } + + return s.New.MatchOpts(c.New) && s.Old.MatchOpts(c.Old) +} + +type CriteriaEvent struct { + New *event.QueryOptions `json:"new"` + Old *event.QueryOptions `json:"old"` +} + +func (c *CriteriaEvent) Match(i interface{}) bool { + s, ok := i.(*event.StateChange) + if !ok { + return false + } + + if s.New == nil && s.Old == nil { + return false + } + + return s.New.MatchOpts(c.New) && s.Old.MatchOpts(c.Old) +} + +type CriteriaObject struct { + New *object.QueryOptions `json:"new"` + Old *object.QueryOptions `json:"old"` +} + +func (c *CriteriaObject) Match(i interface{}) bool { + s, ok := i.(*object.StateChange) + if !ok { + return false + } + + if s.New == nil && s.Old == nil { + return false + } + + return s.New.MatchOpts(c.New) && s.Old.MatchOpts(c.Old) +} + +// List is a Rule collection. +type List []*Rule + +// Matcher to determine if a given state-change should trigger the Rule. +type Matcher interface { + Match(c interface{}) bool +} + +// Query is a mapping for templated Recipient lookups. +type Query map[string]string + +// QueryOptions to narrow-down Rule queries. +type QueryOptions struct { + Active *bool + Deleted *bool + IDs []uint64 + Types []Type +} + +// Recipient is an abstract description of how to lookup users and template the +// messaging as well as meta-information. +type Recipient struct { + Query Query `json:"query"` + Templates Templates `json:"templates"` + URN string `json:"urn"` +} + +// Recipients is a Recipient collection. +type Recipients []Recipient + +// Rule is a data container to parametrise Pipelines. +type Rule struct { + Active bool + Criteria Matcher + Deleted bool + Ecosystem sns.Platform + ID uint64 + Name string + Recipients Recipients + Type Type + CreatedAt time.Time + UpdatedAt time.Time +} + +// Validate checks for semantic correctness. +func (r *Rule) Validate() error { + return nil +} + +// Service for rule interactions. +type Service interface { + service.Lifecycle + + Put(namespace string, r *Rule) (*Rule, error) + Query(namespace string, opts QueryOptions) (List, error) +} + +// Templates map languages to template strings. +type Templates map[string]string + +// Type indicates for which entity the criterias are encoded in the rule. +type Type uint8 + +func flakeNamespace(ns string) string { + return fmt.Sprintf("%s_%s", ns, "rules") +} diff --git a/service/user/mem.go b/service/user/mem.go index 4711da7..c4424c8 100644 --- a/service/user/mem.go +++ b/service/user/mem.go @@ -23,7 +23,7 @@ func (s *memService) Count(ns string, opts QueryOptions) (int, error) { return 0, err } - return len(filterMap(s.users[ns], opts)), nil + return len(filterList(s.users[ns].ToList(), opts)), nil } func (s *memService) Put(ns string, input *User) (*User, error) { @@ -89,7 +89,7 @@ func (s *memService) Query(ns string, opts QueryOptions) (List, error) { return nil, err } - us := filterMap(s.users[ns], opts) + us := filterList(s.users[ns].ToList(), opts) if opts.Limit > 0 && len(us) > opts.Limit { us = us[:opts.Limit] @@ -110,7 +110,7 @@ func (s *memService) Search(ns string, opts QueryOptions) (List, error) { opts.Lastnames = nil opts.Usernames = nil - us := filterMap(s.users[ns], opts) + us := filterList(s.users[ns].ToList(), opts) us = searchUsers(us, sOpts) return us, nil @@ -153,10 +153,10 @@ func copy(u *User) *User { return &old } -func filterMap(um Map, opts QueryOptions) List { - us := List{} +func filterList(us List, opts QueryOptions) List { + rs := List{} - for id, u := range um { + for _, u := range us { if !inTypes(u.CustomID, opts.CustomIDs) { continue } @@ -173,7 +173,7 @@ func filterMap(um Map, opts QueryOptions) List { continue } - if !inIDs(id, opts.IDs) { + if !inIDs(u.ID, opts.IDs) { continue } @@ -201,10 +201,10 @@ func filterMap(um Map, opts QueryOptions) List { continue } - us = append(us, u) + rs = append(rs, u) } - return us + return rs } func inIDs(id uint64, ids []uint64) bool { diff --git a/service/user/user.go b/service/user/user.go index 846c371..3698956 100644 --- a/service/user/user.go +++ b/service/user/user.go @@ -2,6 +2,7 @@ package user import ( "fmt" + "sort" "time" "github.com/asaskevich/govalidator" @@ -60,6 +61,19 @@ func (m Map) Merge(x Map) Map { return m } +// ToList returns the Map as an ordered List. +func (m Map) ToList() List { + us := List{} + + for _, u := range m { + us = append(us, u) + } + + sort.Sort(us) + + return us +} + // Metadata is a bucket to provide additional user information. type Metadata map[string]interface{}