From 4a93be10c32cac4e446d268513c5b3801e5517b3 Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Fri, 2 Dec 2016 15:51:40 +0100 Subject: [PATCH] Implement Application Platforms Which are used as the book-keeping glue between Apps and the SNS entity used for notification routing. * implement platform service * implement platform create handler (not mounted) * implement platform create and fetch by ARN or active per app * add first centralised errors --- TODO | 3 +- cmd/sims/channel.go | 23 +- cmd/sims/platform.go | 80 ---- cmd/sims/sims.go | 25 +- core/device.go | 8 +- core/platform.go | 131 +++++++ error/error.go | 58 +++ handler/http/device.go | 10 +- handler/http/platform.go | 90 +++++ infrastructure/terraform/template/storage.tf | 41 +- platform/sns/sns.go | 103 ++++- service/device/device.go | 14 +- service/device/helper_test.go | 3 +- service/platform/helper_test.go | 151 ++++++++ service/platform/platform.go | 84 +++++ service/platform/platform_test.go | 26 ++ service/platform/postgres.go | 375 +++++++++++++++++++ service/platform/postgres_test.go | 54 +++ 18 files changed, 1138 insertions(+), 141 deletions(-) create mode 100644 core/platform.go create mode 100644 error/error.go create mode 100644 handler/http/platform.go create mode 100644 service/platform/helper_test.go create mode 100644 service/platform/platform.go create mode 100644 service/platform/platform_test.go create mode 100644 service/platform/postgres.go create mode 100644 service/platform/postgres_test.go diff --git a/TODO b/TODO index 94195d3..f560afd 100644 --- a/TODO +++ b/TODO @@ -16,4 +16,5 @@ infrastructure: code: ✔ move event condition code @done (16-11-11 11:10) ✔ add missing event indeces @done (16-11-12 12:36) - ☐ enable TLS and load certs \ No newline at end of file + ✔ enable TLS and load certs @done (16-12-02 15:41) + ☐ Implement instrumentation and log middleware for platform.Service \ No newline at end of file diff --git a/cmd/sims/channel.go b/cmd/sims/channel.go index 921c4a5..ad41a97 100644 --- a/cmd/sims/channel.go +++ b/cmd/sims/channel.go @@ -2,9 +2,9 @@ package main import ( "github.com/tapglue/snaas/core" + pErr "github.com/tapglue/snaas/error" "github.com/tapglue/snaas/platform/sns" "github.com/tapglue/snaas/service/app" - "github.com/tapglue/snaas/service/device" ) type channelFunc func(*app.App, *message) error @@ -12,8 +12,8 @@ type channelFunc func(*app.App, *message) error func channelPush( deviceListUser core.DeviceListUserFunc, deviceSync core.DeviceSyncEndpointFunc, + fetchActive core.PlatformFetchActiveFunc, push sns.PushFunc, - pApps platformApps, ) channelFunc { return func(currentApp *app.App, msg *message) error { ds, err := deviceListUser(currentApp, msg.recipient) @@ -25,31 +25,20 @@ func channelPush( } for _, d := range ds { - pa, err := platformAppForPlatform(pApps, currentApp, d.Platform) + p, err := fetchActive(currentApp, d.Platform) if err != nil { - if isPlatformNotFound(err) { + if pErr.IsNotFound(err) { continue } return err } - d, err = deviceSync(currentApp, pa.ARN, d) + d, err = deviceSync(currentApp, p.ARN, d) if err != nil { return err } - var p sns.Platform - - switch d.Platform { - case device.PlatformAndroid: - p = sns.PlatformGCM - case device.PlatformIOS: - p = sns.PlatformAPNS - case device.PlatformIOSSandbox: - p = sns.PlatformAPNSSandbox - } - - err = push(p, d.EndpointARN, pa.Scheme, msg.urn, msg.message) + err = push(d.Platform, d.EndpointARN, p.Scheme, msg.urn, msg.message) if err != nil { if sns.IsDeliveryFailure(err) { return nil diff --git a/cmd/sims/platform.go b/cmd/sims/platform.go index 3cafc07..92392b5 100644 --- a/cmd/sims/platform.go +++ b/cmd/sims/platform.go @@ -1,73 +1,18 @@ package main import ( - "encoding/json" "errors" - "fmt" "strconv" "strings" "github.com/tapglue/snaas/core" "github.com/tapglue/snaas/service/app" - "github.com/tapglue/snaas/service/device" ) var ( ErrInvalidNamespace = errors.New("namespace invalid") - ErrPlatformNotFound = errors.New("platform not found") ) -type platformApp struct { - ARN string `json:"arn"` - Namespace string `json:"namespace"` - Scheme string `json:"scheme"` - Platform int `json:"platform"` -} - -type platformApps []*platformApp - -func (as *platformApps) Set(input string) error { - a := &platformApp{} - err := json.Unmarshal([]byte(input), a) - if err != nil { - return err - } - - *as = append(*as, a) - - return nil -} - -func (as *platformApps) String() string { - return fmt.Sprintf("%d apps", len(*as)) -} - -func appForARN( - appFetch core.AppFetchFunc, - pApps platformApps, - pARN string, -) (*app.App, error) { - var a *platformApp - - for _, pa := range pApps { - if pa.ARN == pARN { - a = pa - break - } - } - - if a == nil { - return nil, ErrPlatformNotFound - } - - id, err := namespaceToID(a.Namespace) - if err != nil { - return nil, err - } - - return appFetch(id) -} - func appForNamespace(appFetch core.AppFetchFunc, ns string) (*app.App, error) { id, err := namespaceToID(ns) if err != nil { @@ -86,28 +31,3 @@ func namespaceToID(ns string) (uint64, error) { return strconv.ParseUint(ps[1], 10, 64) } - -func isPlatformNotFound(err error) bool { - return err == ErrPlatformNotFound -} - -func platformAppForPlatform( - pApps platformApps, - a *app.App, - p device.Platform, -) (*platformApp, error) { - var pApp *platformApp - - for _, pa := range pApps { - if pa.Namespace == a.Namespace() && device.Platform(pa.Platform) == p { - pApp = pa - break - } - } - - if pApp == nil { - return nil, ErrPlatformNotFound - } - - return pApp, nil -} diff --git a/cmd/sims/sims.go b/cmd/sims/sims.go index 9ddacb0..6f0dbe4 100644 --- a/cmd/sims/sims.go +++ b/cmd/sims/sims.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tapglue/snaas/core" + pErr "github.com/tapglue/snaas/error" "github.com/tapglue/snaas/platform/metrics" platformSNS "github.com/tapglue/snaas/platform/sns" platformSQS "github.com/tapglue/snaas/platform/sqs" @@ -24,6 +25,7 @@ import ( "github.com/tapglue/snaas/service/device" "github.com/tapglue/snaas/service/event" "github.com/tapglue/snaas/service/object" + "github.com/tapglue/snaas/service/platform" "github.com/tapglue/snaas/service/user" ) @@ -50,7 +52,6 @@ var ( func main() { var ( begin = time.Now() - pApps = platformApps{} awsID = flag.String("aws.id", "", "Identifier for AWS requests") awsRegion = flag.String("aws.region", "us-east-1", "AWS region to operate in") @@ -58,7 +59,6 @@ func main() { postgresURL = flag.String("postgres.url", "", "Postgres URL to connect to") telemetryAddr = flag.String("telemetry.addr", ":9001", "Address to expose telemetry on") ) - flag.Var(&pApps, "app", "Repeated platform apps.") flag.Parse() logger := log.NewContext( @@ -189,6 +189,11 @@ func main() { )(objects) objects = object.LogServiceMiddleware(logger, storeService)(objects) + var platforms platform.Service + platforms = platform.PostgresService(pgClient) + // TODO: Implement instrumentaiton middleware. + // TODO: Implement logging middleware. + var users user.Service users = user.PostgresService(pgClient) users = user.InstrumentMiddleware( @@ -271,9 +276,19 @@ func main() { go func() { for c := range changec { - a, err := appForARN(core.AppFetch(apps), pApps, c.Resource) + p, err := core.PlatformFetchByARN(platforms)(c.Resource) + if err != nil { + if pErr.IsNotFound(err) { + continue + } + + logger.Log("err", err, "lifecycle", "abort") + os.Exit(1) + } + + a, err := core.AppFetch(apps)(p.AppID) if err != nil { - if isPlatformNotFound(err) { + if core.IsNotFound(err) { continue } @@ -367,8 +382,8 @@ func main() { platformSNS.EndpointRetrieve(snsAPI), platformSNS.EndpointUpdate(snsAPI), ), + core.PlatformFetchActive(platforms), platformSNS.Push(snsAPI), - pApps, ), } diff --git a/core/device.go b/core/device.go index 8b6ae21..60e82f7 100644 --- a/core/device.go +++ b/core/device.go @@ -85,7 +85,7 @@ func DeviceListUser(devices device.Service) DeviceListUserFunc { return devices.Query(currentApp.Namespace(), device.QueryOptions{ Deleted: &defaultDeleted, Disabled: &defaultDeleted, - Platforms: []device.Platform{ + Platforms: []sns.Platform{ device.PlatformIOSSandbox, device.PlatformIOS, device.PlatformAndroid, @@ -168,7 +168,7 @@ type DeviceUpdateFunc func( currentApp *app.App, origin Origin, deviceID string, - platform device.Platform, + platform sns.Platform, token string, language string, ) error @@ -179,13 +179,13 @@ func DeviceUpdate(devices device.Service) DeviceUpdateFunc { currentApp *app.App, origin Origin, deviceID string, - platform device.Platform, + platform sns.Platform, token string, language string, ) error { ds, err := devices.Query(currentApp.Namespace(), device.QueryOptions{ Deleted: &defaultDeleted, - Platforms: []device.Platform{ + Platforms: []sns.Platform{ platform, }, UserIDs: []uint64{ diff --git a/core/platform.go b/core/platform.go new file mode 100644 index 0000000..e8aa269 --- /dev/null +++ b/core/platform.go @@ -0,0 +1,131 @@ +package core + +import ( + "fmt" + + pErr "github.com/tapglue/snaas/error" + "github.com/tapglue/snaas/platform/pg" + "github.com/tapglue/snaas/platform/sns" + "github.com/tapglue/snaas/service/app" + "github.com/tapglue/snaas/service/platform" +) + +var ( + defaultActive = true +) + +// PlatformCreateFunc stores the provided platform. +type PlatformCreateFunc func( + currentApp *app.App, + p *platform.Platform, + cert, key string, +) (*platform.Platform, error) + +// PlatformCreate stores the provided platform. +func PlatformCreate( + platforms platform.Service, + createAPNS sns.AppCreateAPNSFunc, + createAPNSSandbox sns.AppCreateAPNSSandboxFunc, + createAndroid sns.AppCreateGCMFunc, +) PlatformCreateFunc { + return func( + currentApp *app.App, + p *platform.Platform, + cert, key string, + ) (*platform.Platform, error) { + arn := "" + + fmt.Printf("\n%s\n%s\n%#v\n\n", cert, key, p) + + switch p.Ecosystem { + case platform.Android: + var err error + arn, err = createAndroid(p.Name, key) + if err != nil { + return nil, err + } + case platform.IOS: + var err error + arn, err = createAPNS(p.Name, cert, key) + if err != nil { + return nil, err + } + case platform.IOSSandbox: + var err error + arn, err = createAPNSSandbox(p.Name, cert, key) + if err != nil { + return nil, err + } + } + + p.AppID = currentApp.ID + p.ARN = arn + + return platforms.Put(pg.MetaNamespace, p) + } +} + +// PlatformFetchActiveFunc returns the active platform for the current app and the +// given ecosystem. +type PlatformFetchActiveFunc func(*app.App, sns.Platform) (*platform.Platform, error) + +// PlatformFetchActive returns the active platform for the current app and the +// given ecosystem. +func PlatformFetchActive(platforms platform.Service) PlatformFetchActiveFunc { + return func( + currentApp *app.App, + ecosystem sns.Platform, + ) (*platform.Platform, error) { + ps, err := platforms.Query(pg.MetaNamespace, platform.QueryOptions{ + Active: &defaultActive, + AppIDs: []uint64{ + currentApp.ID, + }, + Deleted: &defaultDeleted, + Ecosystems: []sns.Platform{ + ecosystem, + }, + }) + if err != nil { + return nil, err + } + + if len(ps) != 1 { + return nil, pErr.Wrap( + pErr.ErrNotFound, + "no active platform found for %s", + sns.PlatformIdentifiers[ecosystem], + ) + } + + return ps[0], nil + } +} + +// PlatformFetchByARNFunc returns the Platform for the given ARN. +type PlatformFetchByARNFunc func(arn string) (*platform.Platform, error) + +// PlatformFetchByARN returns the Platform for the given ARN. +func PlatformFetchByARN(platforms platform.Service) PlatformFetchByARNFunc { + return func(arn string) (*platform.Platform, error) { + ps, err := platforms.Query(pg.MetaNamespace, platform.QueryOptions{ + ARNs: []string{ + arn, + }, + Deleted: &defaultDeleted, + }) + if err != nil { + return nil, err + } + + if len(ps) != 1 { + return nil, pErr.Wrap( + pErr.ErrNotFound, + "no platform found for '%s'", + arn, + ) + } + + return ps[0], nil + } +} diff --git a/error/error.go b/error/error.go new file mode 100644 index 0000000..3de2538 --- /dev/null +++ b/error/error.go @@ -0,0 +1,58 @@ +package error + +import ( + "errors" + "fmt" +) + +const errFmt = "%s: %s" + +// General-purpose errors. +var ( + ErrNotFound = errors.New("not found") +) + +// Platform errors. +var ( + ErrInvalidPlatform = errors.New("invalid platform") +) + +// Error wrapper. +type Error struct { + err error + msg string +} + +func (e Error) Error() string { + return e.msg +} + +// IsInvalidPlatform indicates if err is ErrInvalidPlatform. +func IsInvalidPlatform(err error) bool { + return unwrapError(err) == ErrInvalidPlatform +} + +// IsNotFound indicates if err is ErrNotFouund. +func IsNotFound(err error) bool { + return unwrapError(err) == ErrNotFound +} + +// Wrap constructs an Error with proper messaaging. +func Wrap(err error, format string, args ...interface{}) error { + return &Error{ + err: err, + msg: fmt.Sprintf( + errFmt, + err, fmt.Sprintf(format, args...), + ), + } +} + +func unwrapError(err error) error { + switch e := err.(type) { + case *Error: + return e.err + } + + return err +} diff --git a/handler/http/device.go b/handler/http/device.go index 2d30b5d..f8243c3 100644 --- a/handler/http/device.go +++ b/handler/http/device.go @@ -8,7 +8,7 @@ import ( "golang.org/x/net/context" "github.com/tapglue/snaas/core" - "github.com/tapglue/snaas/service/device" + "github.com/tapglue/snaas/platform/sns" ) // DeviceDelete removes a user's device. @@ -58,15 +58,15 @@ func DeviceUpdate(fn core.DeviceUpdateFunc) Handler { type payloadDevice struct { language string - platform device.Platform + platform sns.Platform token string } func (p *payloadDevice) UnmarshalJSON(raw []byte) error { f := struct { - Language string `json:"language"` - Platform device.Platform `json:"platform"` - Token string `json:"token"` + Language string `json:"language"` + Platform sns.Platform `json:"platform"` + Token string `json:"token"` }{} err := json.Unmarshal(raw, &f) diff --git a/handler/http/platform.go b/handler/http/platform.go new file mode 100644 index 0000000..7703bea --- /dev/null +++ b/handler/http/platform.go @@ -0,0 +1,90 @@ +package http + +import ( + "encoding/json" + "net/http" + "strconv" + "time" + + "golang.org/x/net/context" + + "github.com/tapglue/snaas/core" + "github.com/tapglue/snaas/platform/sns" + "github.com/tapglue/snaas/service/platform" +) + +// PlatformCreate stores the provided platform. +func PlatformCreate(fn core.PlatformCreateFunc) Handler { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + var ( + currentApp = appFromContext(ctx) + payload = payloadPlatform{} + ) + + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + respondError(w, 0, wrapError(ErrBadRequest, err.Error())) + return + } + + p, err := fn(currentApp, payload.platform, payload.cert, payload.key) + if err != nil { + respondError(w, 0, err) + return + } + + respondJSON(w, http.StatusCreated, &payloadPlatform{platform: p}) + } +} + +type payloadPlatform struct { + cert, key string + platform *platform.Platform +} + +func (p *payloadPlatform) MarshalJSON() ([]byte, error) { + return json.Marshal(&struct { + Active bool `json:"active"` + ARN string `json:"arn"` + Deleted bool `json:"deleted"` + Ecosystem int `json:"ecosystem"` + ID string `json:"id"` + Name string `json:"name"` + Scheme string `json:"scheme"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + }{ + Active: p.platform.Active, + ARN: p.platform.ARN, + Deleted: p.platform.Deleted, + Ecosystem: int(p.platform.Ecosystem), + ID: strconv.FormatUint(p.platform.ID, 10), + Name: p.platform.Name, + Scheme: p.platform.Scheme, + CreatedAt: p.platform.CreatedAt, + UpdatedAt: p.platform.UpdatedAt, + }) +} + +func (p *payloadPlatform) UnmarshalJSON(raw []byte) error { + f := struct { + Cert string `json:"cert"` + Ecosystem sns.Platform `json:"ecosystem"` + Key string `json:"key"` + Name string `json:"name"` + Scheme string `json:"scheme"` + }{} + + if err := json.Unmarshal(raw, &f); err != nil { + return err + } + + p.cert = f.Cert + p.key = f.Key + p.platform = &platform.Platform{ + Ecosystem: f.Ecosystem, + Name: f.Name, + Scheme: f.Scheme, + } + + return nil +} diff --git a/infrastructure/terraform/template/storage.tf b/infrastructure/terraform/template/storage.tf index 306b72d..2dafcc4 100644 --- a/infrastructure/terraform/template/storage.tf +++ b/infrastructure/terraform/template/storage.tf @@ -213,25 +213,25 @@ EOF visibility_timeout_seconds = 60 } -resource "aws_sqs_queue" "endpoint-state-change-dlq" { +resource "aws_sqs_queue" "event-state-change-dlq" { delay_seconds = 0 max_message_size = 262144 message_retention_seconds = 1209600 - name = "endpoint-state-change-dlq" + name = "event-state-change-dlq" receive_wait_time_seconds = 1 visibility_timeout_seconds = 300 } -resource "aws_sqs_queue" "endpoint-state-change" { +resource "aws_sqs_queue" "event-state-change" { delay_seconds = 0 max_message_size = 262144 message_retention_seconds = 1209600 - name = "endpoint-state-change" + name = "event-state-change" receive_wait_time_seconds = 1 redrive_policy = < Android { + return pErr.Wrap(pErr.ErrInvalidPlatform, "Ecosystem '%d' not supported", p.Ecosystem) + } + + if p.Name == "" { + return pErr.Wrap(pErr.ErrInvalidPlatform, "Name must be set") + } + + if p.Scheme == "" { + return pErr.Wrap(pErr.ErrInvalidPlatform, "Scheme must be set") + } + + return nil +} + +// List is a collection of Platforms. +type List []*Platform + +// QueryOptions to narrow-down platform queries. +type QueryOptions struct { + Active *bool + ARNs []string + AppIDs []uint64 + Deleted *bool + Ecosystems []sns.Platform + IDs []uint64 +} + +// Service for platform interactions. +type Service interface { + service.Lifecycle + + Put(namespace string, platform *Platform) (*Platform, error) + Query(namespace string, opts QueryOptions) (List, error) +} + +// ServiceMiddleware is a chainable behaviour modifier for Service. +type ServiceMiddleware func(Service) Service + +func flakeNamespace(ns string) string { + return fmt.Sprintf("%s_%s", ns, "platforms") +} diff --git a/service/platform/platform_test.go b/service/platform/platform_test.go new file mode 100644 index 0000000..a1337ea --- /dev/null +++ b/service/platform/platform_test.go @@ -0,0 +1,26 @@ +package platform + +import ( + "testing" + + pErr "github.com/tapglue/snaas/error" +) + +func TestValidate(t *testing.T) { + var ( + p = testPlatform() + ps = List{ + {}, // Missing ARN + {ARN: p.ARN}, // Missing Ecosystem + {ARN: p.ARN, Ecosystem: 4}, // Unsupported Ecosystem + {ARN: p.ARN, Ecosystem: p.Ecosystem}, // Missing Name + {ARN: p.ARN, Ecosystem: p.Ecosystem, Name: p.Name}, // Missing Scheme + } + ) + + for _, p := range ps { + if have, want := p.Validate(), pErr.ErrInvalidPlatform; !pErr.IsInvalidPlatform(have) { + t.Errorf("have %v, want %v", have, want) + } + } +} diff --git a/service/platform/postgres.go b/service/platform/postgres.go new file mode 100644 index 0000000..68daca1 --- /dev/null +++ b/service/platform/postgres.go @@ -0,0 +1,375 @@ +package platform + +import ( + "fmt" + "strings" + "time" + + "github.com/jmoiron/sqlx" + + "github.com/tapglue/snaas/platform/flake" + "github.com/tapglue/snaas/platform/pg" +) + +const ( + pgInsertPlatform = `INSERT INTO + %s.platforms(active, app_id, arn, deleted, ecosystem, id, name, scheme, created_at, updated_at) + VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)` + pgUpdatePlatform = ` + UPDATE + %s.platforms + SET + active = $2, + app_id = $3, + arn = $4, + deleted = $5, + ecosystem = $6, + name = $7, + scheme = $8, + updated_at = $9 + WHERE + id = $1` + + pgClauseActive = `active = ?` + pgClauseAppIDs = `app_id IN (?)` + pgClauseARNs = `arn IN (?)` + pgClauseDeleted = `deleted = ?` + pgClauseEcosystems = `ecosystem IN (?)` + pgClauseIDs = `id IN (?)` + + pgListPlatforms = ` + SELECT + active, app_id, arn, deleted, ecosystem, id, name, scheme, created_at, updated_at + FROM + %s.platforms + %s` + + pgOrderCreatedAt = `ORDER BY created_at DESC` + + pgCreateSchema = `CREATE SCHEMA IF NOT EXISTS %s` + pgCreateTable = `CREATE TABLE IF NOT EXISTS %s.platforms( + active BOOL DEFAULT false, + app_id BIGINT NOT NULL, + arn TEXT NOT NULL, + deleted BOOL DEFAULT false, + ecosystem INT NOT NULL, + id BIGINT NOT NULL UNIQUE, + name CITEXT NOT NULL UNIQUE, + scheme TEXT NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL + )` + pgDropTable = `DROP TABLE IF EXISTS %s.platforms` +) + +type pgService struct { + db *sqlx.DB +} + +// PostgresService returns a Postgres based Service implementation. +func PostgresService(db *sqlx.DB) Service { + return &pgService{ + db: db, + } +} + +func (s *pgService) Put(ns string, p *Platform) (*Platform, error) { + if err := p.Validate(); err != nil { + return nil, err + } + + if p.ID == 0 { + return s.insert(ns, p) + } + + return s.update(ns, p) +} + +func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { + clauses, params, err := convertOpts(opts) + if err != nil { + return nil, err + } + + ps, err := s.listPlatforms(ns, clauses, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + } + + ps, err = s.listPlatforms(ns, clauses, params...) + } + + return ps, err +} + +func (s *pgService) Setup(ns string) error { + qs := []string{ + fmt.Sprintf(pgCreateSchema, ns), + fmt.Sprintf(pgCreateTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("setup (%s): %s", q, err) + } + } + + return nil +} + +func (s *pgService) Teardown(ns string) error { + qs := []string{ + fmt.Sprintf(pgDropTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("teardown '%s': %s", q, err) + } + } + + return nil +} + +func (s *pgService) insert(ns string, p *Platform) (*Platform, error) { + if p.CreatedAt.IsZero() { + p.CreatedAt = time.Now().UTC() + } + + ts, err := time.Parse(pg.TimeFormat, p.CreatedAt.UTC().Format(pg.TimeFormat)) + if err != nil { + return nil, err + } + + p.CreatedAt = ts + p.UpdatedAt = ts + + id, err := flake.NextID(flakeNamespace(ns)) + if err != nil { + return nil, err + } + + p.ID = id + + var ( + params = []interface{}{ + p.Active, + p.AppID, + p.ARN, + p.Deleted, + p.Ecosystem, + p.ID, + p.Name, + p.Scheme, + p.CreatedAt, + p.UpdatedAt, + } + query = fmt.Sprintf(pgInsertPlatform, ns) + ) + + _, err = s.db.Exec(query, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + _, err = s.db.Exec(query, params...) + } + } + + return p, err +} + +func (s *pgService) listPlatforms( + ns string, + clauses []string, + params ...interface{}, +) (List, error) { + c := strings.Join(clauses, "\nAND") + + if len(clauses) > 0 { + c = fmt.Sprintf("WHERE %s", c) + } + + query := strings.Join([]string{ + fmt.Sprintf(pgListPlatforms, ns, c), + pgOrderCreatedAt, + }, "\n") + + query = sqlx.Rebind(sqlx.DOLLAR, query) + + rows, err := s.db.Query(query, params...) + if err != nil { + return nil, err + } + defer rows.Close() + + ps := List{} + + for rows.Next() { + p := &Platform{} + + err := rows.Scan( + &p.Active, + &p.AppID, + &p.ARN, + &p.Deleted, + &p.Ecosystem, + &p.ID, + &p.Name, + &p.Scheme, + &p.CreatedAt, + &p.UpdatedAt, + ) + if err != nil { + return nil, err + } + + p.CreatedAt = p.CreatedAt.UTC() + p.UpdatedAt = p.UpdatedAt.UTC() + + ps = append(ps, p) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return ps, nil +} + +func (s *pgService) update(ns string, p *Platform) (*Platform, error) { + now, err := time.Parse(pg.TimeFormat, time.Now().UTC().Format(pg.TimeFormat)) + if err != nil { + return nil, err + } + + p.UpdatedAt = now + + var ( + params = []interface{}{ + p.ID, + p.Active, + p.AppID, + p.ARN, + p.Deleted, + p.Ecosystem, + p.Name, + p.Scheme, + p.UpdatedAt, + } + query = fmt.Sprintf(pgUpdatePlatform, ns) + ) + + _, err = s.db.Exec(query, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + _, err = s.db.Exec(query, params...) + } + } + + return p, err +} + +func convertOpts(opts QueryOptions) ([]string, []interface{}, error) { + var ( + clauses = []string{} + params = []interface{}{} + ) + + if opts.Active != nil { + clause, _, err := sqlx.In(pgClauseActive, []interface{}{*opts.Active}) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, *opts.Active) + } + + if len(opts.AppIDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.AppIDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseAppIDs, ps) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.ARNs) > 0 { + ps := []interface{}{} + + for _, arn := range opts.ARNs { + ps = append(ps, arn) + } + + clause, _, err := sqlx.In(pgClauseARNs, ps) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if opts.Deleted != nil { + clause, _, err := sqlx.In(pgClauseDeleted, []interface{}{*opts.Deleted}) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, *opts.Deleted) + } + + if len(opts.Ecosystems) > 0 { + ps := []interface{}{} + + for _, e := range opts.Ecosystems { + ps = append(ps, e) + } + + clause, _, err := sqlx.In(pgClauseEcosystems, ps) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.IDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.IDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseIDs, ps) + if err != nil { + return nil, nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + return clauses, params, nil +} diff --git a/service/platform/postgres_test.go b/service/platform/postgres_test.go new file mode 100644 index 0000000..121a852 --- /dev/null +++ b/service/platform/postgres_test.go @@ -0,0 +1,54 @@ +// +build integration + +package platform + +import ( + "flag" + "fmt" + "os/user" + + "github.com/tapglue/snaas/platform/pg" + + "github.com/jmoiron/sqlx" + + "testing" +) + +var pgTestURL string + +func TestPostgresPut(t *testing.T) { + testServicePut(t, preparePostgres) +} + +func TestPostgresQuery(t *testing.T) { + testServiceQuery(t, preparePostgres) +} + +func preparePostgres(t *testing.T, namespace string) Service { + db, err := sqlx.Connect("postgres", pgTestURL) + if err != nil { + t.Fatal(err) + } + + s := PostgresService(db) + + if err := s.Teardown(namespace); err != nil { + t.Fatal(err) + } + + return s +} + +func init() { + u, err := user.Current() + if err != nil { + panic(err) + } + + d := fmt.Sprintf(pg.URLTest, u.Username) + + url := flag.String("postgres.url", d, "Postgres test connection URL") + flag.Parse() + + pgTestURL = *url +}