diff --git a/cmd/gateway-http/gateway-http.go b/cmd/gateway-http/gateway-http.go index accd83d..0433b26 100644 --- a/cmd/gateway-http/gateway-http.go +++ b/cmd/gateway-http/gateway-http.go @@ -27,6 +27,7 @@ import ( "github.com/tapglue/snaas/service/connection" "github.com/tapglue/snaas/service/device" "github.com/tapglue/snaas/service/event" + "github.com/tapglue/snaas/service/invite" "github.com/tapglue/snaas/service/object" "github.com/tapglue/snaas/service/reaction" "github.com/tapglue/snaas/service/session" @@ -355,6 +356,17 @@ func main() { // TODO: Implement write path to avoid stale counts. // events = event.CacheServiceMiddleware(eventCountsCache)(events) + var invites invite.Service + invites = invite.PostgresService(pgClient) + invites = invite.InstrumentServiceMiddleware( + component, + storeService, + serviceErrCount, + serviceOpCount, + serviceOpLatency, + )(invites) + invites = invite.LogServiceMiddleware(logger, storeService)(invites) + var objects object.Service objects = object.PostgresService(pgClient) objects = object.InstrumentServiceMiddleware( @@ -587,6 +599,16 @@ func main() { ), ) + // Invite routes. + current.Methods("POST").Path(`/me/invites`).Name("deviceCreate").HandlerFunc( + handler.Wrap( + withUser, + handler.InviteCreate( + core.InviteCreate(invites), + ), + ), + ) + // Post routes. current.Methods("POST").Path("/posts").Name("postCreate").HandlerFunc( handler.Wrap( @@ -867,6 +889,7 @@ func main() { withApp, handler.UserCreate( core.UserCreate(sessions, users), + core.UserCreateWithInvite(connections, invites, sessions, users), ), ), ) diff --git a/core/invite.go b/core/invite.go new file mode 100644 index 0000000..8527c88 --- /dev/null +++ b/core/invite.go @@ -0,0 +1,51 @@ +package core + +import ( + "github.com/tapglue/snaas/service/app" + "github.com/tapglue/snaas/service/invite" +) + +// InviteCreateFunc stores the key and value for the users invite. +type InviteCreateFunc func( + currentApp *app.App, + origin Origin, + key, value string, +) error + +func InviteCreate(invites invite.Service) InviteCreateFunc { + return func( + currentApp *app.App, + origin Origin, + key, value string, + ) error { + is, err := invites.Query(currentApp.Namespace(), invite.QueryOptions{ + Keys: []string{ + key, + }, + UserIDs: []uint64{ + origin.UserID, + }, + Values: []string{ + value, + }, + }) + if err != nil { + return err + } + + if len(is) == 1 { + return nil + } + + _, err = invites.Put(currentApp.Namespace(), &invite.Invite{ + Key: key, + UserID: origin.UserID, + Value: value, + }) + if err != nil { + return err + } + + return nil + } +} diff --git a/core/user.go b/core/user.go index ad829b9..8689d4d 100644 --- a/core/user.go +++ b/core/user.go @@ -9,6 +9,7 @@ import ( "github.com/tapglue/snaas/platform/generate" "github.com/tapglue/snaas/service/app" "github.com/tapglue/snaas/service/connection" + "github.com/tapglue/snaas/service/invite" "github.com/tapglue/snaas/service/session" "github.com/tapglue/snaas/service/user" ) @@ -60,6 +61,38 @@ func UserCreate( } } +// UserCreateWithInviteFunc stores the provided user, creates a session and +// sets up pending connections for open invites. +type UserCreateWithInviteFunc func( + currentApp *app.App, + origin Origin, + u *user.User, + conType connection.Type, +) (*user.User, error) + +// UserCreateWithInvite stores the provided user and creates a session. +func UserCreateWithInvite( + connections connection.Service, + invites invite.Service, + sessions session.Service, + users user.Service, +) UserCreateWithInviteFunc { + return func( + currentApp *app.App, + origin Origin, + u *user.User, + conType connection.Type, + ) (output *user.User, err error) { + defer func() { + if err == nil { + mapInvites(connections, invites, currentApp, u, conType) + } + }() + + return UserCreate(sessions, users)(currentApp, origin, u) + } +} + // UserDeleteFunc disables the user. type UserDeleteFunc func( currentApp *app.App, @@ -454,6 +487,7 @@ func UserUpdate( // UsersFetchFunc retrieves the users for the given ids. type UsersFetchFunc func(currentApp *app.App, ids ...uint64) (user.List, error) +// UsersFetch retrieves the users for the given ids. func UsersFetch(users user.Service) UsersFetchFunc { return func(currentApp *app.App, ids ...uint64) (user.List, error) { if len(ids) == 0 { @@ -664,6 +698,48 @@ func login( return u, nil } +func mapInvites( + connections connection.Service, + invites invite.Service, + currentApp *app.App, + u *user.User, + t connection.Type, +) { + for k, v := range u.SocialIDs { + is, err := invites.Query(currentApp.Namespace(), invite.QueryOptions{ + Deleted: &defaultDeleted, + Keys: []string{ + k, + }, + Values: []string{ + v, + }, + }) + if err != nil { + continue + } + + for _, i := range is { + _, err := connections.Put(currentApp.Namespace(), &connection.Connection{ + FromID: i.UserID, + State: connection.StatePending, + Type: t, + ToID: u.ID, + }) + if err != nil { + continue + } + + i.Deleted = true + + _, err = invites.Put(currentApp.Namespace(), i) + if err != nil { + continue + } + } + } +} + func passwordCompare(dec, enc string) (bool, error) { d, err := base64.StdEncoding.DecodeString(enc) if err != nil { diff --git a/handler/http/invite.go b/handler/http/invite.go new file mode 100644 index 0000000..64be218 --- /dev/null +++ b/handler/http/invite.go @@ -0,0 +1,40 @@ +package http + +import ( + "encoding/json" + "net/http" + + "golang.org/x/net/context" + + "github.com/tapglue/snaas/core" +) + +// InviteCreate stores the key and value for a users invite. +func InviteCreate(fn core.InviteCreateFunc) Handler { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { + var ( + currentApp = appFromContext(ctx) + origin = originFromContext(ctx) + p = payloadInvite{} + ) + + err := json.NewDecoder(r.Body).Decode(&p) + if err != nil { + respondError(w, 0, wrapError(ErrBadRequest, err.Error())) + return + } + + err = fn(currentApp, origin, p.Key, p.Value) + if err != nil { + respondError(w, 0, err) + return + } + + respondJSON(w, http.StatusNoContent, nil) + } +} + +type payloadInvite struct { + Key string `json:"key"` + Value string `json:"value"` +} diff --git a/handler/http/query.go b/handler/http/query.go index eefa794..05d57e6 100644 --- a/handler/http/query.go +++ b/handler/http/query.go @@ -24,18 +24,19 @@ const ( headerForwardedProto = "X-Forwarded-Proto" - keyAppID = "appID" - keyCommentID = "commentID" - keyCursorAfter = "after" - keyCursorBefore = "before" - keyLimit = "limit" - keyPostID = "postID" - keyReactionType = "reactionType" - keyRuleID = "ruleID" - keyState = "state" - keyUserID = "userID" - keyUserQuery = "q" - keyWhere = "where" + keyAppID = "appID" + keyCommentID = "commentID" + keyCursorAfter = "after" + keyCursorBefore = "before" + keyInviteConnections = "invite-connections" + keyLimit = "limit" + keyPostID = "postID" + keyReactionType = "reactionType" + keyRuleID = "ruleID" + keyState = "state" + keyUserID = "userID" + keyUserQuery = "q" + keyWhere = "where" limitDefault = 25 limitMax = 50 @@ -241,6 +242,23 @@ func extractIDCursorBefore(r *http.Request) (uint64, error) { return strconv.ParseUint(string(cursor), 10, 64) } +func extractInviteConnections(r *http.Request) (bool, connection.Type) { + param := r.URL.Query().Get(keyInviteConnections) + + if param == "" { + return false, "" + } + + switch connection.Type(param) { + case connection.TypeFollow: + return true, connection.TypeFollow + case connection.TypeFriend: + return true, connection.TypeFriend + default: + return false, "" + } +} + func extractLikeOpts(r *http.Request) (event.QueryOptions, error) { return event.QueryOptions{}, nil } diff --git a/handler/http/user.go b/handler/http/user.go index f6e03f9..58da6d1 100644 --- a/handler/http/user.go +++ b/handler/http/user.go @@ -15,15 +15,18 @@ import ( ) // UserCreate stores the provided user and returns it with a valid session. -func UserCreate(fn core.UserCreateFunc) Handler { +func UserCreate( + createFn core.UserCreateFunc, + createWithInviteFn core.UserCreateWithInviteFunc, +) Handler { return func(ctx context.Context, w http.ResponseWriter, r *http.Request) { var ( - currentApp = appFromContext(ctx) - deviceID = deviceIDFromContext(ctx) - p = payloadUser{} - tokenType = tokenTypeFromContext(ctx) - - origin = createOrigin(deviceID, tokenType, 0) + currentApp = appFromContext(ctx) + deviceID = deviceIDFromContext(ctx) + invite, conType = extractInviteConnections(r) + p = payloadUser{} + tokenType = tokenTypeFromContext(ctx) + origin = createOrigin(deviceID, tokenType, 0) ) err := json.NewDecoder(r.Body).Decode(&p) @@ -32,7 +35,13 @@ func UserCreate(fn core.UserCreateFunc) Handler { return } - u, err := fn(currentApp, origin, p.user) + var u *user.User + + if invite { + u, err = createWithInviteFn(currentApp, origin, p.user, conType) + } else { + u, err = createFn(currentApp, origin, p.user) + } if err != nil { respondError(w, 0, err) return diff --git a/platform/flake/flake.go b/platform/flake/flake.go index 1b94b55..55d682b 100644 --- a/platform/flake/flake.go +++ b/platform/flake/flake.go @@ -1,13 +1,21 @@ package flake import ( + "fmt" "time" "github.com/sony/sonyflake" ) +const fmtNamespace = "%s_%s" + var flakes = map[string]*sonyflake.Sonyflake{} +// Namespace returns the prefixed entity path. +func Namespace(prefix, entity string) string { + return fmt.Sprintf(fmtNamespace, prefix, entity) +} + // NextID returns the next safe to use ID for the given namespace. func NextID(namespace string) (uint64, error) { if _, ok := flakes[namespace]; !ok { diff --git a/service/invite/helper_test.go b/service/invite/helper_test.go new file mode 100644 index 0000000..8158b94 --- /dev/null +++ b/service/invite/helper_test.go @@ -0,0 +1,137 @@ +package invite + +import ( + "math/rand" + "reflect" + "testing" + "time" + + "github.com/tapglue/snaas/platform/generate" +) + +type prepareFunc func(t *testing.T, namespace string) Service + +func testServicePut(t *testing.T, p prepareFunc) { + var ( + invite = testInvite() + namespace = "service_put" + service = p(t, namespace) + ) + + created, err := service.Put(namespace, invite) + if err != nil { + t.Fatal(err) + } + + list, err := service.Query(namespace, QueryOptions{ + IDs: []uint64{ + created.ID, + }, + }) + if err != nil { + t.Fatal(err) + } + + if have, want := len(list), 1; have != want { + t.Fatalf("have %v, want %v", have, want) + } + if have, want := list[0], created; !reflect.DeepEqual(have, want) { + t.Errorf("have %v, want %v", have, want) + } + + created.Deleted = true + + updated, err := service.Put(namespace, created) + if err != nil { + t.Fatal(err) + } + + list, err = service.Query(namespace, QueryOptions{ + IDs: []uint64{ + updated.ID, + }, + }) + if err != nil { + t.Fatal(err) + } + + if have, want := list[0], updated; !reflect.DeepEqual(have, want) { + t.Errorf("have %v, want %v", have, want) + } +} + +func testServiceQuery(t *testing.T, p prepareFunc) { + var ( + deleted = true + namespace = "service_query" + service = p(t, namespace) + userID = uint64(rand.Int63()) + ) + + for _, i := range testList(userID) { + _, err := service.Put(namespace, i) + if err != nil { + t.Fatal(err) + } + } + + created, err := service.Put(namespace, testInvite()) + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond) + + cases := map[*QueryOptions]uint{ + &QueryOptions{}: 9, + &QueryOptions{Before: created.UpdatedAt}: 8, + &QueryOptions{Deleted: &deleted}: 5, + &QueryOptions{Keys: []string{created.Key}}: 1, + &QueryOptions{Limit: 6}: 6, + &QueryOptions{IDs: []uint64{created.ID}}: 1, + &QueryOptions{UserIDs: []uint64{userID}}: 3, + &QueryOptions{Values: []string{created.Value}}: 1, + } + + for opts, want := range cases { + list, err := service.Query(namespace, *opts) + if err != nil { + t.Fatal(err) + } + + if have := uint(len(list)); have != want { + t.Errorf("have %v, want %v", have, want) + } + } +} + +func testInvite() *Invite { + return &Invite{ + Deleted: false, + Key: generate.RandomStringSafe(24), + UserID: uint64(rand.Int63()), + Value: generate.RandomStringSafe(24), + } +} + +func testList(userID uint64) List { + is := List{} + + for i := 0; i < 5; i++ { + i := testInvite() + + i.Deleted = true + + is = append(is, i) + } + + for i := 0; i < 3; i++ { + i := testInvite() + + i.UserID = userID + + is = append(is, i) + } + + return is +} diff --git a/service/invite/instrumentation.go b/service/invite/instrumentation.go new file mode 100644 index 0000000..296f30f --- /dev/null +++ b/service/invite/instrumentation.go @@ -0,0 +1,110 @@ +package invite + +import ( + "time" + + kitmetrics "github.com/go-kit/kit/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/tapglue/snaas/platform/metrics" +) + +type instrumentService struct { + component string + errCount kitmetrics.Counter + next Service + opCount kitmetrics.Counter + opLatency *prometheus.HistogramVec + store string +} + +// InstrumentServiceMiddleware observes key apsects of Service operations and exposes +// Prometheus metrics. +func InstrumentServiceMiddleware( + component, store string, + errCount kitmetrics.Counter, + opCount kitmetrics.Counter, + opLatency *prometheus.HistogramVec, +) ServiceMiddleware { + return func(next Service) Service { + return &instrumentService{ + component: component, + errCount: errCount, + next: next, + opCount: opCount, + opLatency: opLatency, + store: store, + } + } +} + +func (s *instrumentService) Put( + ns string, + input *Invite, +) (output *Invite, err error) { + defer func(begin time.Time) { + s.track("Put", ns, begin, err) + }(time.Now()) + + return s.next.Put(ns, input) +} + +func (s *instrumentService) Query( + ns string, + opts QueryOptions, +) (list List, err error) { + defer func(begin time.Time) { + s.track("Query", ns, begin, err) + }(time.Now()) + + return s.next.Query(ns, opts) +} + +func (s *instrumentService) Setup(ns string) (err error) { + defer func(begin time.Time) { + s.track("Setup", ns, begin, err) + }(time.Now()) + + return s.next.Setup(ns) +} + +func (s *instrumentService) Teardown(ns string) (err error) { + defer func(begin time.Time) { + s.track("Teardown", ns, begin, err) + }(time.Now()) + + return s.next.Teardown(ns) +} + +func (s *instrumentService) track( + method, namespace string, + begin time.Time, + err error, +) { + if err != nil { + s.errCount.With( + metrics.FieldComponent, s.component, + metrics.FieldMethod, method, + metrics.FieldNamespace, namespace, + metrics.FieldService, entity, + metrics.FieldStore, s.store, + ).Add(1) + + return + } + + s.opCount.With( + metrics.FieldComponent, s.component, + metrics.FieldMethod, method, + metrics.FieldNamespace, namespace, + metrics.FieldService, entity, + metrics.FieldStore, s.store, + ).Add(1) + + s.opLatency.With(prometheus.Labels{ + metrics.FieldComponent: s.component, + metrics.FieldMethod: method, + metrics.FieldNamespace: namespace, + metrics.FieldService: entity, + metrics.FieldStore: s.store, + }).Observe(time.Since(begin).Seconds()) +} diff --git a/service/invite/invite.go b/service/invite/invite.go new file mode 100644 index 0000000..cabb750 --- /dev/null +++ b/service/invite/invite.go @@ -0,0 +1,46 @@ +package invite + +import ( + "time" + + "github.com/tapglue/snaas/platform/service" +) + +const entity = "invite" + +// Invite is a loose promise to create a conection if the person assoicated with +// the social id key-value signs up. +type Invite struct { + Deleted bool + ID uint64 + Key string + UserID uint64 + Value string + CreatedAt time.Time + UpdatedAt time.Time +} + +// List is a collection of Invite. +type List []*Invite + +// QueryOptions to narrow-down Invite queries. +type QueryOptions struct { + Before time.Time + Deleted *bool + IDs []uint64 + Keys []string + Limit uint + UserIDs []uint64 + Values []string +} + +// Service for Invite interactions. +type Service interface { + service.Lifecycle + + Put(namespace string, i *Invite) (*Invite, error) + Query(namespace string, opts QueryOptions) (List, error) +} + +// ServiceMiddleware is a chainable behaviour modifier for Service. +type ServiceMiddleware func(Service) Service diff --git a/service/invite/logging.go b/service/invite/logging.go new file mode 100644 index 0000000..d4735da --- /dev/null +++ b/service/invite/logging.go @@ -0,0 +1,100 @@ +package invite + +import ( + "time" + + "github.com/go-kit/kit/log" +) + +type logService struct { + logger log.Logger + next Service +} + +// LogServiceMiddleware given a logger wraps the next Service with logging +// capabilities. +func LogServiceMiddleware(logger log.Logger, store string) ServiceMiddleware { + return func(next Service) Service { + logger = log.With( + logger, + "service", entity, + "store", store, + ) + + return &logService{logger: logger, next: next} + } +} + +func (s *logService) Put(ns string, input *Invite) (output *Invite, err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Put", + "namespace", ns, + "invite_input", input, + "invite_output", output, + } + + if err != nil { + ps = append(ps, "err", err) + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Put(ns, input) +} + +func (s *logService) Query(ns string, opts QueryOptions) (list List, err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Query", + "namespace", ns, + "invite_len", len(list), + "opts", opts, + } + + if err != nil { + ps = append(ps, "err", err) + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Query(ns, opts) +} + +func (s *logService) Setup(ns string) (err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Setup", + "namespace", ns, + } + + if err != nil { + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Setup(ns) +} + +func (s *logService) Teardown(ns string) (err error) { + defer func(begin time.Time) { + ps := []interface{}{ + "duration_ns", time.Since(begin).Nanoseconds(), + "method", "Teardown", + "namespace", ns, + } + + if err != nil { + } + + _ = s.logger.Log(ps...) + }(time.Now()) + + return s.next.Setup(ns) +} diff --git a/service/invite/postgres.go b/service/invite/postgres.go new file mode 100644 index 0000000..0afa23d --- /dev/null +++ b/service/invite/postgres.go @@ -0,0 +1,338 @@ +package invite + +import ( + "fmt" + "time" + + "github.com/jmoiron/sqlx" + "github.com/tapglue/snaas/platform/flake" + "github.com/tapglue/snaas/platform/pg" +) + +const ( + pgInsertInvite = `INSERT INTO + %s.invites(deleted, id, key, user_id, value, created_at, updated_at) + VALUES($1, $2, $3, $4, $5, $6, $7)` + pgUpdateInvite = ` + UPDATE + %s.invites + SET + deleted = $2, + updated_at = $3 + WHERE + id = $1` + + pgClauseBefore = `created_at < ?` + pgClauseDeleted = `deleted = ?` + pgClauseIDs = `id IN (?)` + pgClauseKeys = `key IN (?)` + pgClauseUserIDs = `user_id IN (?)` + pgClauseValues = `value IN (?)` + + pgListInvites = ` + SELECT + deleted, id, key, user_id, value, created_at, updated_at + FROM + %s.invites + %s` + + pgOrderCreatedAt = `ORDER BY created_at DESC` + + pgCreateScheme = `CREATE SCHEMA IF NOT EXISTS %s` + pgCreateTable = `CREATE TABLE IF NOT EXISTS %s.invites( + deleted BOOL DEFAULT false, + id BIGINT NOT NULL UNIQUE, + key TEXT NOT NULL, + user_id BIGINT NOT NULL, + value TEXT NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL + )` + pgDropTable = `DROP TABLE IF EXISTS %s.invites` +) + +type pgService struct { + db *sqlx.DB +} + +// PostgresService returns a Postgres based Service implementation. +func PostgresService(db *sqlx.DB) Service { + return &pgService{ + db: db, + } +} + +func (s *pgService) Put(ns string, i *Invite) (*Invite, error) { + if i.ID == 0 { + return s.insert(ns, i) + } + + return s.update(ns, i) +} + +func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { + where, params, err := convertOpts(opts) + if err != nil { + return nil, err + } + + return s.listInvites(ns, where, params...) +} + +func (s *pgService) Setup(ns string) error { + qs := []string{ + fmt.Sprintf(pgCreateScheme, ns), + fmt.Sprintf(pgCreateTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("setup '%s': %s", q, err) + } + } + + return nil +} + +func (s *pgService) Teardown(ns string) error { + qs := []string{ + fmt.Sprintf(pgDropTable, ns), + } + + for _, q := range qs { + _, err := s.db.Exec(q) + if err != nil { + return fmt.Errorf("teardown '%s': %s", q, err) + } + } + + return nil +} + +func (s *pgService) insert(ns string, i *Invite) (*Invite, error) { + if i.CreatedAt.IsZero() { + i.CreatedAt = time.Now().UTC() + } + + ts, err := time.Parse(pg.TimeFormat, i.CreatedAt.UTC().Format(pg.TimeFormat)) + if err != nil { + return nil, err + } + + i.CreatedAt = ts + i.UpdatedAt = ts + + id, err := flake.NextID(flake.Namespace(ns, entity)) + if err != nil { + return nil, err + } + + i.ID = id + + var ( + params = []interface{}{ + i.Deleted, + i.ID, + i.Key, + i.UserID, + i.Value, + i.CreatedAt, + i.UpdatedAt, + } + query = fmt.Sprintf(pgInsertInvite, ns) + ) + + _, err = s.db.Exec(query, params...) + if err != nil && pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + _, err = s.db.Exec(query, params...) + } + + return i, err +} + +func (s *pgService) listInvites( + ns, where string, + params ...interface{}, +) (List, error) { + query := fmt.Sprintf(pgListInvites, ns, where) + + rows, err := s.db.Query(query, params...) + if err != nil { + if pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + return s.listInvites(ns, where, params...) + } + + return nil, err + } + defer rows.Close() + + is := List{} + + for rows.Next() { + invite := &Invite{} + + err := rows.Scan( + &invite.Deleted, + &invite.ID, + &invite.Key, + &invite.UserID, + &invite.Value, + &invite.CreatedAt, + &invite.UpdatedAt, + ) + if err != nil { + return nil, err + } + + invite.CreatedAt = invite.CreatedAt.UTC() + invite.UpdatedAt = invite.UpdatedAt.UTC() + + is = append(is, invite) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return is, nil +} + +func (s *pgService) update(ns string, i *Invite) (*Invite, error) { + now, err := time.Parse(pg.TimeFormat, time.Now().UTC().Format(pg.TimeFormat)) + if err != nil { + return nil, err + } + + i.UpdatedAt = now + + var ( + params = []interface{}{ + i.ID, + i.Deleted, + i.UpdatedAt, + } + query = fmt.Sprintf(pgUpdateInvite, ns) + ) + + _, err = s.db.Exec(query, params...) + if err != nil && pg.IsRelationNotFound(pg.WrapError(err)) { + if err := s.Setup(ns); err != nil { + return nil, err + } + + _, err = s.db.Exec(query, params...) + } + + return i, err +} + +func convertOpts(opts QueryOptions) (string, []interface{}, error) { + var ( + clauses = []string{} + params = []interface{}{} + ) + + if !opts.Before.IsZero() { + clauses = append(clauses, pgClauseBefore) + params = append(params, opts.Before.UTC().Format(pg.TimeFormat)) + } + + if opts.Deleted != nil { + clause, _, err := sqlx.In(pgClauseDeleted, []interface{}{*opts.Deleted}) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, *opts.Deleted) + } + + if len(opts.IDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.IDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseIDs, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.Keys) > 0 { + ps := []interface{}{} + + for _, k := range opts.Keys { + ps = append(ps, k) + } + + clause, _, err := sqlx.In(pgClauseKeys, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.UserIDs) > 0 { + ps := []interface{}{} + + for _, id := range opts.UserIDs { + ps = append(ps, id) + } + + clause, _, err := sqlx.In(pgClauseUserIDs, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + if len(opts.Values) > 0 { + ps := []interface{}{} + + for _, v := range opts.Values { + ps = append(ps, v) + } + + clause, _, err := sqlx.In(pgClauseValues, ps) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, clause) + params = append(params, ps...) + } + + where := "" + + if len(clauses) > 0 { + where = sqlx.Rebind(sqlx.DOLLAR, pg.ClausesToWhere(clauses...)) + } + + if !opts.Before.IsZero() { + where = fmt.Sprintf("%s\n%s", where, pgOrderCreatedAt) + } + + if opts.Limit > 0 { + where = fmt.Sprintf("%s\nLIMIT %d", where, opts.Limit) + } + + return where, params, nil +} diff --git a/service/invite/postgres_test.go b/service/invite/postgres_test.go new file mode 100644 index 0000000..9983736 --- /dev/null +++ b/service/invite/postgres_test.go @@ -0,0 +1,52 @@ +// +build integration + +package invite + +import ( + "flag" + "fmt" + "os/user" + "testing" + + "github.com/jmoiron/sqlx" + "github.com/tapglue/snaas/platform/pg" +) + +var pgTestURL string + +func TestPostgresPut(t *testing.T) { + testServicePut(t, preparePostgres) +} + +func TestPostgresQuery(t *testing.T) { + testServiceQuery(t, preparePostgres) +} + +func preparePostgres(t *testing.T, namespace string) Service { + db, err := sqlx.Connect("postgres", pgTestURL) + if err != nil { + t.Fatal(err) + } + + s := PostgresService(db) + + if err := s.Teardown(namespace); err != nil { + t.Fatal(err) + } + + return s +} + +func init() { + u, err := user.Current() + if err != nil { + panic(err) + } + + d := fmt.Sprintf(pg.URLTest, u.Username) + + url := flag.String("postgres.url", d, "Postgres test connection URL") + flag.Parse() + + pgTestURL = *url +} diff --git a/service/reaction/postgres.go b/service/reaction/postgres.go index 3ece3c1..d70bdf2 100644 --- a/service/reaction/postgres.go +++ b/service/reaction/postgres.go @@ -4,10 +4,9 @@ import ( "fmt" "time" - "github.com/tapglue/snaas/platform/flake" - "github.com/jmoiron/sqlx" + "github.com/tapglue/snaas/platform/flake" "github.com/tapglue/snaas/platform/pg" ) @@ -92,7 +91,7 @@ func (s *pgService) Count(ns string, opts QueryOptions) (uint, error) { return 0, err } - return s.countEvents(ns, where, params...) + return s.countReactions(ns, where, params...) } func (s *pgService) Put(ns string, r *Reaction) (*Reaction, error) { @@ -113,7 +112,7 @@ func (s *pgService) Query(ns string, opts QueryOptions) (List, error) { return nil, err } - return s.listEvents(ns, where, params...) + return s.listReactions(ns, where, params...) } func (s *pgService) Setup(ns string) error { @@ -154,7 +153,7 @@ func (s *pgService) Teardown(ns string) error { return nil } -func (s *pgService) countEvents( +func (s *pgService) countReactions( ns, where string, params ...interface{}, ) (uint, error) { @@ -221,7 +220,7 @@ func (s *pgService) insert(ns string, r *Reaction) (*Reaction, error) { return r, err } -func (s *pgService) listEvents( +func (s *pgService) listReactions( ns, where string, params ...interface{}, ) (List, error) { @@ -234,8 +233,10 @@ func (s *pgService) listEvents( return nil, err } - return s.listEvents(ns, where, params...) + return s.listReactions(ns, where, params...) } + + return nil, err } defer rows.Close()