From 47bbc60e04a273347470f098dd0b8f9bc2c4fb0a Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Tue, 28 Mar 2017 13:45:10 +0200 Subject: [PATCH] Invite flow To enable a better on-boarding experience we need a way to correlate users and their inviter. One way is to keep a list of invites of a user that stores a unique id (e.g. facebook id, or generate random identifier) which can be kept through until the invitee hits the signup. If the invitee signs up and the conenction to the inviter is preserved we can automatically create connections for those so they can hit the ground running. --- cmd/gateway-http/gateway-http.go | 23 ++ core/invite.go | 51 +++++ core/user.go | 76 +++++++ handler/http/invite.go | 40 ++++ handler/http/query.go | 42 ++-- handler/http/user.go | 25 ++- platform/flake/flake.go | 8 + service/invite/helper_test.go | 137 ++++++++++++ service/invite/instrumentation.go | 110 ++++++++++ service/invite/invite.go | 46 ++++ service/invite/logging.go | 100 +++++++++ service/invite/postgres.go | 338 ++++++++++++++++++++++++++++++ service/invite/postgres_test.go | 52 +++++ service/reaction/postgres.go | 15 +- 14 files changed, 1036 insertions(+), 27 deletions(-) create mode 100644 core/invite.go create mode 100644 handler/http/invite.go create mode 100644 service/invite/helper_test.go create mode 100644 service/invite/instrumentation.go create mode 100644 service/invite/invite.go create mode 100644 service/invite/logging.go create mode 100644 service/invite/postgres.go create mode 100644 service/invite/postgres_test.go diff --git a/cmd/gateway-http/gateway-http.go b/cmd/gateway-http/gateway-http.go index accd83d..0433b26 100644 --- a/cmd/gateway-http/gateway-http.go +++ b/cmd/gateway-http/gateway-http.go @@ -27,6 +27,7 @@ import ( "github.com/tapglue/snaas/service/connection" "github.com/tapglue/snaas/service/device" "github.com/tapglue/snaas/service/event" + "github.com/tapglue/snaas/service/invite" "github.com/tapglue/snaas/service/object" "github.com/tapglue/snaas/service/reaction" "github.com/tapglue/snaas/service/session" @@ -355,6 +356,17 @@ func main() { // TODO: Implement write path to avoid stale counts. // events = event.CacheServiceMiddleware(eventCountsCache)(events) + var invites invite.Service + invites = invite.PostgresService(pgClient) + invites = invite.InstrumentServiceMiddleware( + component, + storeService, + serviceErrCount, + serviceOpCount, + serviceOpLatency, + )(invites) + invites = invite.LogServiceMiddleware(logger, storeService)(invites) + var objects object.Service objects = object.PostgresService(pgClient) objects = object.InstrumentServiceMiddleware( @@ -587,6 +599,16 @@ func main() { ), ) + // Invite routes. + current.Methods("POST").Path(`/me/invites`).Name("deviceCreate").HandlerFunc( + handler.Wrap( + withUser, + handler.InviteCreate( + core.InviteCreate(invites), + ), + ), + ) + // Post routes. current.Methods("POST").Path("/posts").Name("postCreate").HandlerFunc( handler.Wrap( @@ -867,6 +889,7 @@ func main() { withApp, handler.UserCreate( core.UserCreate(sessions, users), + core.UserCreateWithInvite(connections, invites, sessions, users), ), ), ) diff --git a/core/invite.go b/core/invite.go new file mode 100644 index 0000000..8527c88 --- /dev/null +++ b/core/invite.go @@ -0,0 +1,51 @@ +package core + +import ( + "github.com/tapglue/snaas/service/app" + "github.com/tapglue/snaas/service/invite" +) + +// InviteCreateFunc stores the key and value for the users invite. +type InviteCreateFunc func( + currentApp *app.App, + origin Origin, + key, value string, +) error + +func InviteCreate(invites invite.Service) InviteCreateFunc { + return func( + currentApp *app.App, + origin Origin, + key, value string, + ) error { + is, err := invites.Query(currentApp.Namespace(), invite.QueryOptions{ + Keys: []string{ + key, + }, + UserIDs: []uint64{ + origin.UserID, + }, + Values: []string{ + value, + }, + }) + if err != nil { + return err + } + + if len(is) == 1 { + return nil + } + + _, err = invites.Put(currentApp.Namespace(), &invite.Invite{ + Key: key, + UserID: origin.UserID, + Value: value, + }) + if err != nil { + return err + } + + return nil + } +} diff --git a/core/user.go b/core/user.go index ad829b9..8689d4d 100644 --- a/core/user.go +++ b/core/user.go @@ -9,6 +9,7 @@ import ( "github.com/tapglue/snaas/platform/generate" "github.com/tapglue/snaas/service/app" "github.com/tapglue/snaas/service/connection" + "github.com/tapglue/snaas/service/invite" "github.com/tapglue/snaas/service/session" "github.com/tapglue/snaas/service/user" ) @@ -60,6 +61,38 @@ func UserCreate( } } +// UserCreateWithInviteFunc stores the provided user, creates a session and +// sets up pending connections for open invites. +type UserCreateWithInviteFunc func( + currentApp *app.App, + origin Origin, + u *user.User, + conType connection.Type, +) (*user.User, error) + +// UserCreateWithInvite stores the provided user and creates a session. +func UserCreateWithInvite( + connections connection.Service, + invites invite.Service, + sessions session.Service, + users user.Service, +) UserCreateWithInviteFunc { + return func( + currentApp *app.App, + origin Origin, + u *user.User, + conType connection.Type, + ) (output *user.User, err error) { + defer func() { + if err == nil { + mapInvites(connections, invites, currentApp, u, conType) + } + }() + + return UserCreate(sessions, users)(currentApp, origin, u) + } +} + // UserDeleteFunc disables the user. type UserDeleteFunc func( currentApp *app.App, @@ -454,6 +487,7 @@ func UserUpdate( // UsersFetchFunc retrieves the users for the given ids. type UsersFetchFunc func(currentApp *app.App, ids ...uint64) (user.List, error) +// UsersFetch retrieves the users for the given ids. func UsersFetch(users user.Service) UsersFetchFunc { return func(currentApp *app.App, ids ...uint64) (user.List, error) { if len(ids) == 0 { @@ -664,6 +698,48 @@ func login( return u, nil } +func mapInvites( + connections connection.Service, + invites invite.Service, + currentApp *app.App, + u *user.User, + t connection.Type, +) { + for k, v := range u.SocialIDs { + is, err := invites.Query(currentApp.Namespace(), invite.QueryOptions{ + Deleted: &defaultDeleted, + Keys: []string{ + k, + }, + Values: []string{ + v, + }, + }) + if err != nil { + continue + } + + for _, i := range is { + _, err := connections.Put(currentApp.Namespace(), &connection.Connection{ + FromID: i.UserID, + State: connection.StatePending, + Type: t, + ToID: u.ID, + }) + if err != nil { + continue + } + + i.Deleted = true + + _, err = invites.Put(currentApp.Namespace(), i) + if err != nil { + continue + } + } + } +} + func passwordCompare(dec, enc string) (bool, error) { d, err := base64.StdEncoding.DecodeString(enc) if err != nil { diff --git a/handler/http/invite.go b/handler/http/invite.go new file mode 100644 index 0000000..64be218 --- /dev/null +++ b/handler/http/invite.go @@ -0,0 +1,40 @@ +package http + +import ( + "encoding/json" + "net/http" + + "golang.org/x/net/context" + + "github.com/tapglue/snaas/core" +) + +// InviteCreate stores the key and value for a users invite. +func InviteCreate(fn core.InviteCreateFunc) Handler { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + var ( + currentApp = appFromContext(ctx) + origin = originFromContext(ctx) + p = payloadInvite{} + ) + + err := json.NewDecoder(r.Body).Decode(&p) + if err != nil { + respondError(w, 0, wrapError(ErrBadRequest, err.Error())) + return + } + + err = fn(currentApp, origin, p.Key, p.Value) + if err != nil { + respondError(w, 0, err) + return + } + + respondJSON(w, http.StatusNoContent, nil) + } +} + +type payloadInvite struct { + Key string `json:"key"` + Value string `json:"value"` +} diff --git a/handler/http/query.go b/handler/http/query.go index eefa794..05d57e6 100644 --- a/handler/http/query.go +++ b/handler/http/query.go @@ -24,18 +24,19 @@ const ( headerForwardedProto = "X-Forwarded-Proto" - keyAppID = "appID" - keyCommentID = "commentID" - keyCursorAfter = "after" - keyCursorBefore = "before" - keyLimit = "limit" - keyPostID = "postID" - keyReactionType = "reactionType" - keyRuleID = "ruleID" - keyState = "state" - keyUserID = "userID" - keyUserQuery = "q" - keyWhere = "where" + keyAppID = "appID" + keyCommentID = "commentID" + keyCursorAfter = "after" + keyCursorBefore = "before" + keyInviteConnections = "invite-connections" + keyLimit = "limit" + keyPostID = "postID" + keyReactionType = "reactionType" + keyRuleID = "ruleID" + keyState = "state" + keyUserID = "userID" + keyUserQuery = "q" + keyWhere = "where" limitDefault = 25 limitMax = 50 @@ -241,6 +242,23 @@ func extractIDCursorBefore(r *http.Request) (uint64, error) { return strconv.ParseUint(string(cursor), 10, 64) } +func extractInviteConnections(r *http.Request) (bool, connection.Type) { + param := r.URL.Query().Get(keyInviteConnections) + + if param == "" { + return false, "" + } + + switch connection.Type(param) { + case connection.TypeFollow: + return true, connection.TypeFollow + case connection.TypeFriend: + return true, connection.TypeFriend + default: + return false, "" + } +} + func extractLikeOpts(r *http.Request) (event.QueryOptions, error) { return event.QueryOptions{}, nil } diff --git a/handler/http/user.go b/handler/http/user.go index f6e03f9..58da6d1 100644 --- a/handler/http/user.go +++ b/handler/http/user.go @@ -15,15 +15,18 @@ import ( ) // UserCreate stores the provided user and returns it with a valid session. -func UserCreate(fn core.UserCreateFunc) Handler { +func UserCreate( + createFn core.UserCreateFunc, + createWithInviteFn core.UserCreateWithInviteFunc, +) Handler { return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { var ( - currentApp = appFromContext(ctx) - deviceID = deviceIDFromContext(ctx) - p = payloadUser{} - tokenType = tokenTypeFromContext(ctx) - - origin = createOrigin(deviceID, tokenType, 0) + currentApp = appFromContext(ctx) + deviceID = deviceIDFromContext(ctx) + invite, conType = extractInviteConnections(r) + p = payloadUser{} + tokenType = tokenTypeFromContext(ctx) + origin = createOrigin(deviceID, tokenType, 0) ) err := json.NewDecoder(r.Body).Decode(&p) @@ -32,7 +35,13 @@ func UserCreate(fn core.UserCreateFunc) Handler { return } - u, err := fn(currentApp, origin, p.user) + var u *user.User + + if invite { + u, err = createWithInviteFn(currentApp, origin, p.user, conType) + } else { + u, err = createFn(currentApp, origin, p.user) + } if err != nil { respondError(w, 0, err) return diff --git a/platform/flake/flake.go b/platform/flake/flake.go index 1b94b55..55d682b 100644 --- a/platform/flake/flake.go +++ b/platform/flake/flake.go @@ -1,13 +1,21 @@ package flake import ( + "fmt" "time" "github.com/sony/sonyflake" ) +const fmtNamespace = "%s_%s" + var flakes = map[string]*sonyflake.Sonyflake{} +// Namespace returns the prefixed entity path. +func Namespace(prefix, entity string) string { + return fmt.Sprintf(fmtNamespace, prefix, entity) +} + // NextID returns the next safe to use ID for the given namespace. func NextID(namespace string) (uint64, error) { if _, ok := flakes[namespace]; !ok { diff --git a/service/invite/helper_test.go b/service/invite/helper_test.go new file mode 100644 index 0000000..8158b94 --- /dev/null +++ b/service/invite/helper_test.go @@ -0,0 +1,137 @@ +package invite + +import ( + "math/rand" + "reflect" + "testing" + "time" + + "github.com/tapglue/snaas/platform/generate" +) + +type prepareFunc func(t *testing.T, namespace string) Service + +func testServicePut(t *testing.T, p prepareFunc) { + var ( + invite = testInvite() + namespace = "service_put" + service = p(t, namespace) + ) + + created, err := service.Put(namespace, invite) + if err != nil { + t.Fatal(err) + } + + list, err := service.Query(namespace, QueryOptions{ + IDs: []uint64{ + created.ID, + }, + }) + if err != nil { + t.Fatal(err) + } + + if have, want := len(list), 1; have != want { + t.Fatalf("have %v, want %v", have, want) + } + if have, want := list[0], created; !reflect.DeepEqual(have, want) { + t.Errorf("have %v, want %v", have, want) + } + + created.Deleted = true + + updated, err := service.Put(namespace, created) + if err != nil { + t.Fatal(err) + } + + list, err = service.Query(namespace, QueryOptions{ + IDs: []uint64{ + updated.ID, + }, + }) + if err != nil { + t.Fatal(err) + } + + if have, want := list[0], updated; !reflect.DeepEqual(have, want) { + t.Errorf("have %v, want %v", have, want) + } +} + +func testServiceQuery(t *testing.T, p prepareFunc) { + var ( + deleted = true + namespace = "service_query" + service = p(t, namespace) + userID = uint64(rand.Int63()) + ) + + for _, i := range testList(userID) { + _, err := service.Put(namespace, i) + if err != nil { + t.Fatal(err) + } + } + + created, err := service.Put(namespace, testInvite()) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond) + + cases := map[*QueryOptions]uint{ + &QueryOptions{}: 9, + &QueryOptions{Before: created.UpdatedAt}: 8, + &QueryOptions{Deleted: &deleted}: 5, + &QueryOptions{Keys: []string{created.Key}}: 1, + &QueryOptions{Limit: 6}: 6, + &QueryOptions{IDs: []uint64{created.ID}}: 1, + &QueryOptions{UserIDs: []uint64{userID}}: 3, + &QueryOptions{Values: []string{created.Value}}: 1, + } + + for opts, want := range cases { + list, err := service.Query(namespace, *opts) + if err != nil { + t.Fatal(err) + } + + if have := uint(len(list)); have != want { + t.Errorf("have %v, want %v", have, want) + } + } +} + +func testInvite() *Invite { + return &Invite{ + Deleted: false, + Key: generate.RandomStringSafe(24), + UserID: uint64(rand.Int63()), + Value: generate.RandomStringSafe(24), + } +} + +func testList(userID uint64) List { + is := List{} + + for i := 0; i < 5; i++ { + i := testInvite() + + i.Deleted = true + + is = append(is, i) + } + + for i := 0; i < 3; i++ { + i := testInvite() + + i.UserID = userID + + is = append(is, i) + } + + return is +} diff --git a/service/invite/instrumentation.go b/service/invite/instrumentation.go new file mode 100644 index 0000000..296f30f --- /dev/null +++ b/service/invite/instrumentation.go @@ -0,0 +1,110 @@ +package invite + +import ( + "time" + + kitmetrics "github.com/go-kit/kit/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/tapglue/snaas/platform/metrics" +) + +type instrumentService struct { + component string + errCount kitmetrics.Counter + next Service + opCount kitmetrics.Counter + opLatency *prometheus.HistogramVec + store string +} + +// InstrumentServiceMiddleware observes key apsects of Service operations and exposes +// Prometheus metrics. +func InstrumentServiceMiddleware( + component, store string, + errCount kitmetrics.Counter, + opCount kitmetrics.Counter, + opLatency *prometheus.HistogramVec, +) ServiceMiddleware { + return func(next Service) Service { + return &instrumentService{ + component: component, + errCount: errCount, + next: next, + opCount: opCount, + opLatency: opLatency, + store: store, + } + } +} + +func (s *instrumentService) Put( + ns string, + input *Invite, +) (output *Invite, err error) { + defer func(begin time.Time) { + s.track("Put", ns, begin, err) + }(time.Now()) + + return s.next.Put(ns, input) +} + +func (s *instrumentService) Query( + ns string, + opts QueryOptions, +) (list List, err error) { + defer func(begin time.Time) { + s.track("Query", ns, begin, err) + }(time.Now()) + + return s.next.Query(ns, opts) +} + +func (s *instrumentService) Setup(ns string) (err error) { + defer func(begin time.Time) { + s.track("Setup", ns, begin, err) + }(time.Now()) + + return s.next.Setup(ns) +} + +func (s *instrumentService) Teardown(ns string) (err error) { + defer func(begin time.Time) { + s.track("Teardown", ns, begin, err) + }(time.Now()) + + return s.next.Teardown(ns) +} + +func (s *instrumentService) track( + method, namespace string, + begin time.Time, + err error, +) { + if err != nil { + s.errCount.With( + metrics.FieldComponent, s.component, + metrics.FieldMethod, method, + metrics.FieldNamespace, namespace, + metrics.FieldService, entity, + metrics.FieldStore, s.store, + ).Add(1) + + return + } + + s.opCount.With( + metrics.FieldComponent, s.component, + metrics.FieldMethod, method, + metrics.FieldNamespace, namespace, + metrics.FieldService, entity, + metrics.FieldStore, s.store, + ).Add(1) + + s.opLatency.With(prometheus.Labels{ + metrics.FieldComponent: s.component, + metrics.FieldMethod: method, + metrics.FieldNamespace: namespace, + metrics.FieldService: entity, + metrics.FieldStore: s.store, + }).Observe(time.Since(begin).Seconds()) +} diff --git a/service/invite/invite.go b/service/invite/invite.go new file mode 100644 index 0000000..cabb750 --- /dev/null +++ b/service/invite/invite.go @@ -0,0 +1,46 @@ +package invite + +import ( + "time" + + "github.com/tapglue/snaas/platform/service" +) + +const entity = "invite" + +// Invite is a loose promise to create a conection if the person assoicated with +// the social id key-value signs up. +type Invite struct { + Deleted bool + ID uint64 + Key string + UserID uint64 + Value string + CreatedAt time.Time + UpdatedAt time.Time +} + +// List is a collection of Invite. +type List []*Invite + +// QueryOptions to narrow-down Invite queries. +type QueryOptions struct { + Before time.Time + Deleted *bool + IDs []uint64 + Keys []string + Limit uint + UserIDs []uint64 + Values []string +} + +// Service for Invite interactions. +type Service interface { + service.Lifecycle + + Put(namespace string, i *Invite) (*Invite, error) + Query(namespace string, opts QueryOptions) (List, error) +} + +// ServiceMiddleware is a chainable behaviour modifier for Service. +type ServiceMiddleware func(Service) Service diff --git a/service/invite/logging.go b/service/invite/logging.go new file mode 100644 index 0000000..d4735da --- /dev/null +++ b/service/invite/logging.go @@ -0,0 +1,100 @@ +package invite + +import ( + "time" + + "github.com/go-kit/kit/log" +) + +type logService struct { + logger log.Logger + next Service +} + +// LogServiceMiddleware given a logger wraps the next Service with logging +// capabilities. +func LogServiceMiddleware(logger log.Logger, store string) ServiceMiddleware { + return func(next Service) Service { + logger = log.With( + logger, + "service", entity, + "store", store, + ) + + return &logService{logger: logger, next: next} + } +} + +func (s *logService) Put(ns string, input *Invite) (output *Invite, err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Put", + "namespace", ns, + "invite_input", input, + "invite_output", output, + } + + if err != nil { + ps = append(ps, "err", err) + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Put(ns, input) +} + +func (s *logService) Query(ns string, opts QueryOptions) (list List, err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Query", + "namespace", ns, + "invite_len", len(list), + "opts", opts, + } + + if err != nil { + ps = append(ps, "err", err) + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Query(ns, opts) +} + +func (s *logService) Setup(ns string) (err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Setup", + "namespace", ns, + } + + if err != nil { + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Setup(ns) +} + +func (s *logService) Teardown(ns string) (err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Teardown", + "namespace", ns, + } + + if err != nil { + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Setup(ns) +} diff --git a/service/invite/postgres.go b/service/invite/postgres.go new file mode 100644 index 0000000..0afa23d --- /dev/null +++ b/service/invite/postgres.go @@ -0,0 +1,338 @@ +package invite + +import ( + "fmt" + "time" + + "github.com/jmoiron/sqlx" + "github.com/tapglue/snaas/platform/flake" + "github.com/tapglue/snaas/platform/pg" +) + +const ( + pgInsertInvite = `INSERT INTO + %s.invites(deleted, id, key, user_id, value, created_at, updated_at) + VALUES($1, $2, $3, $4, $5, $6, $7)` + pgUpdateInvite = ` + UPDATE + %s.invites + SET + deleted = $2, + updated_at = $3 + WHERE + id = $1` + + pgClauseBefore = `created_at < ?` + pgClauseDeleted = `deleted = ?` + pgClauseIDs = `id IN (?)` + pgClauseKeys = `key IN (?)` + pgClauseUserIDs = `user_id IN (?)` + pgClauseValues = `value IN (?)` + + pgListInvites = ` + SELECT + deleted, id, key, user_id, value, created_at, updated_at + FROM + %s.invites + %s` + + pgOrderCreatedAt = `ORDER BY created_at DESC` + + pgCreateScheme = `CREATE SCHEMA IF NOT EXISTS %s` + pgCreateTable = `CREATE TABLE IF NOT EXISTS %s.invites( + deleted BOOL DEFAULT false, + id BIGINT NOT NULL UNIQUE, + key TEXT NOT NULL, + user_id BIGINT NOT NULL, + value TEXT NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL + )` + pgDropTable = `DROP TABLE IF EXISTS %s.invites` +) + +type pgService struct { + db *sqlx.DB +} + +// PostgresService returns a Postgres based Service implementation. +func PostgresService(db *sqlx.DB) Service { + return &pgService{ + db: db, + } +} + +func (s *pgService) Put(ns string, i *Invite) (*Invite, error) { + if i.ID == 0 { + return s.insert(ns, i) + } + + return s.update(ns, i) +} + +func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { + where, params, err := convertOpts(opts) + if err != nil { + return nil, err + } + + return s.listInvites(ns, where, params...) +} + +func (s *pgService) Setup(ns string) error { + qs := []string{ + fmt.Sprintf(pgCreateScheme, ns), + fmt.Sprintf(pgCreateTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("setup '%s': %s", q, err) + } + } + + return nil +} + +func (s *pgService) Teardown(ns string) error { + qs := []string{ + fmt.Sprintf(pgDropTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("teardown '%s': %s", q, err) + } + } + + return nil +} + +func (s *pgService) insert(ns string, i *Invite) (*Invite, error) { + if i.CreatedAt.IsZero() { + i.CreatedAt = time.Now().UTC() + } + + ts, err := time.Parse(pg.TimeFormat, i.CreatedAt.UTC().Format(pg.TimeFormat)) + if err != nil { + return nil, err + } + + i.CreatedAt = ts + i.UpdatedAt = ts + + id, err := flake.NextID(flake.Namespace(ns, entity)) + if err != nil { + return nil, err + } + + i.ID = id + + var ( + params = []interface{}{ + i.Deleted, + i.ID, + i.Key, + i.UserID, + i.Value, + i.CreatedAt, + i.UpdatedAt, + } + query = fmt.Sprintf(pgInsertInvite, ns) + ) + + _, err = s.db.Exec(query, params...) + if err != nil && pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + _, err = s.db.Exec(query, params...) + } + + return i, err +} + +func (s *pgService) listInvites( + ns, where string, + params ...interface{}, +) (List, error) { + query := fmt.Sprintf(pgListInvites, ns, where) + + rows, err := s.db.Query(query, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + return s.listInvites(ns, where, params...) + } + + return nil, err + } + defer rows.Close() + + is := List{} + + for rows.Next() { + invite := &Invite{} + + err := rows.Scan( + &invite.Deleted, + &invite.ID, + &invite.Key, + &invite.UserID, + &invite.Value, + &invite.CreatedAt, + &invite.UpdatedAt, + ) + if err != nil { + return nil, err + } + + invite.CreatedAt = invite.CreatedAt.UTC() + invite.UpdatedAt = invite.UpdatedAt.UTC() + + is = append(is, invite) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return is, nil +} + +func (s *pgService) update(ns string, i *Invite) (*Invite, error) { + now, err := time.Parse(pg.TimeFormat, time.Now().UTC().Format(pg.TimeFormat)) + if err != nil { + return nil, err + } + + i.UpdatedAt = now + + var ( + params = []interface{}{ + i.ID, + i.Deleted, + i.UpdatedAt, + } + query = fmt.Sprintf(pgUpdateInvite, ns) + ) + + _, err = s.db.Exec(query, params...) + if err != nil && pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + _, err = s.db.Exec(query, params...) + } + + return i, err +} + +func convertOpts(opts QueryOptions) (string, []interface{}, error) { + var ( + clauses = []string{} + params = []interface{}{} + ) + + if !opts.Before.IsZero() { + clauses = append(clauses, pgClauseBefore) + params = append(params, opts.Before.UTC().Format(pg.TimeFormat)) + } + + if opts.Deleted != nil { + clause, _, err := sqlx.In(pgClauseDeleted, []interface{}{*opts.Deleted}) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, *opts.Deleted) + } + + if len(opts.IDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.IDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseIDs, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.Keys) > 0 { + ps := []interface{}{} + + for _, k := range opts.Keys { + ps = append(ps, k) + } + + clause, _, err := sqlx.In(pgClauseKeys, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.UserIDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.UserIDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseUserIDs, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.Values) > 0 { + ps := []interface{}{} + + for _, v := range opts.Values { + ps = append(ps, v) + } + + clause, _, err := sqlx.In(pgClauseValues, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + where := "" + + if len(clauses) > 0 { + where = sqlx.Rebind(sqlx.DOLLAR, pg.ClausesToWhere(clauses...)) + } + + if !opts.Before.IsZero() { + where = fmt.Sprintf("%s\n%s", where, pgOrderCreatedAt) + } + + if opts.Limit > 0 { + where = fmt.Sprintf("%s\nLIMIT %d", where, opts.Limit) + } + + return where, params, nil +} diff --git a/service/invite/postgres_test.go b/service/invite/postgres_test.go new file mode 100644 index 0000000..9983736 --- /dev/null +++ b/service/invite/postgres_test.go @@ -0,0 +1,52 @@ +// +build integration + +package invite + +import ( + "flag" + "fmt" + "os/user" + "testing" + + "github.com/jmoiron/sqlx" + "github.com/tapglue/snaas/platform/pg" +) + +var pgTestURL string + +func TestPostgresPut(t *testing.T) { + testServicePut(t, preparePostgres) +} + +func TestPostgresQuery(t *testing.T) { + testServiceQuery(t, preparePostgres) +} + +func preparePostgres(t *testing.T, namespace string) Service { + db, err := sqlx.Connect("postgres", pgTestURL) + if err != nil { + t.Fatal(err) + } + + s := PostgresService(db) + + if err := s.Teardown(namespace); err != nil { + t.Fatal(err) + } + + return s +} + +func init() { + u, err := user.Current() + if err != nil { + panic(err) + } + + d := fmt.Sprintf(pg.URLTest, u.Username) + + url := flag.String("postgres.url", d, "Postgres test connection URL") + flag.Parse() + + pgTestURL = *url +} diff --git a/service/reaction/postgres.go b/service/reaction/postgres.go index 3ece3c1..d70bdf2 100644 --- a/service/reaction/postgres.go +++ b/service/reaction/postgres.go @@ -4,10 +4,9 @@ import ( "fmt" "time" - "github.com/tapglue/snaas/platform/flake" - "github.com/jmoiron/sqlx" + "github.com/tapglue/snaas/platform/flake" "github.com/tapglue/snaas/platform/pg" ) @@ -92,7 +91,7 @@ func (s *pgService) Count(ns string, opts QueryOptions) (uint, error) { return 0, err } - return s.countEvents(ns, where, params...) + return s.countReactions(ns, where, params...) } func (s *pgService) Put(ns string, r *Reaction) (*Reaction, error) { @@ -113,7 +112,7 @@ func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { return nil, err } - return s.listEvents(ns, where, params...) + return s.listReactions(ns, where, params...) } func (s *pgService) Setup(ns string) error { @@ -154,7 +153,7 @@ func (s *pgService) Teardown(ns string) error { return nil } -func (s *pgService) countEvents( +func (s *pgService) countReactions( ns, where string, params ...interface{}, ) (uint, error) { @@ -221,7 +220,7 @@ func (s *pgService) insert(ns string, r *Reaction) (*Reaction, error) { return r, err } -func (s *pgService) listEvents( +func (s *pgService) listReactions( ns, where string, params ...interface{}, ) (List, error) { @@ -234,8 +233,10 @@ func (s *pgService) listEvents( return nil, err } - return s.listEvents(ns, where, params...) + return s.listReactions(ns, where, params...) } + + return nil, err } defer rows.Close()