Skip to content

Commit

Permalink
backend: Add bun
Browse files Browse the repository at this point in the history
  • Loading branch information
monstermunchkin committed Sep 26, 2024
1 parent f61ddd4 commit 638afe4
Show file tree
Hide file tree
Showing 20 changed files with 570 additions and 1,298 deletions.
21 changes: 0 additions & 21 deletions backend/postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,12 @@ Configuration for the PostgreSQL connection pool of the microservice.
* postgres user to access the database
* `POSTGRES_DB` default: `postgres`
* database to access
* `POSTGRES_MAX_RETRIES` default: `5`
* Maximum number of retries before giving up
* `POSTGRES_RETRY_STATEMENT_TIMEOUT` default: `false`
* Whether to retry queries cancelled because of statement_timeout
* `POSTGRES_MIN_RETRY_BACKOFF` default: `250ms`
* Minimum backoff between each retry
* `POSTGRES_MAX_RETRY_BACKOFF` default: `4s`
* Maximum backoff between each retry
* `POSTGRES_DIAL_TIMEOUT` default: `5s`
* Dial timeout for establishing new connections
* `POSTGRES_READ_TIMEOUT` default: `30s`
* Timeout for socket reads. If reached, commands will fail with a timeout instead of blocking
* `POSTGRES_WRITE_TIMEOUT` default: `30s`
* Timeout for socket writes. If reached, commands will fail with a timeout instead of blocking.
* `POSTGRES_POOL_SIZE` default: `100`
* Maximum number of socket connections
* `POSTGRES_MIN_IDLE_CONNECTIONS` default: `10`
* Minimum number of idle connections which is useful when establishing new connection is slow
* `POSTGRES_MAX_CONN_AGE` default: `30m`
* Connection age at which client retires (closes) the connection
* `POSTGRES_POOL_TIMEOUT` default: `31s`
* Time for which client waits for free connection if all connections are busy before returning an error
* `POSTGRES_IDLE_TIMEOUT` default: `5m`
* Amount of time after which client closes idle connections
* `POSTGRES_IDLE_CHECK_FREQUENCY` default: `1m`
* Frequency of idle checks made by idle connections reaper
* `POSTGRES_HEALTH_CHECK_TABLE_NAME` default: `healthcheck`
* Name of the Table that is created to try if database is writeable
* `POSTGRES_HEALTH_CHECK_RESULT_TTL` default: `10s`
Expand All @@ -53,7 +33,6 @@ Prometheus metrics exposed.
* `pace_postgres_query_total{database}` Collects stats about the number of postgres queries made
* `pace_postgres_query_failed{database}` Collects stats about the number of postgres queries failed
* `pace_postgres_query_duration_seconds{database}` Collects performance metrics for each postgres query
* `pace_postgres_query_rows_total{database}` Collects stats about the number of rows returned by a postgres query
* `pace_postgres_query_affected_total{database}` Collects stats about the number of rows affected by a postgres query
* `pace_postgres_connection_pool_hits{database}` Collects number of times free connection was found in the pool
* `pace_postgres_connection_pool_misses{database}` Collects number of times free connection was NOT found in the pool
Expand Down
121 changes: 121 additions & 0 deletions backend/postgres/bun.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright © 2024 by PACE Telematics GmbH. All rights reserved.

package postgres

import (
"context"
"database/sql"
"net"
"os"
"path/filepath"
"strconv"
"time"

"github.com/caarlos0/env/v10"
"github.com/pace/bricks/backend/postgres/hooks"
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"
"github.com/uptrace/bun/driver/pgdriver"

"github.com/pace/bricks/maintenance/log"
)

type Config struct {
Port int `env:"POSTGRES_PORT" envDefault:"5432"`
Host string `env:"POSTGRES_HOST" envDefault:"postgres"`
Password string `env:"POSTGRES_PASSWORD" envDefault:"mysecretpassword"`
User string `env:"POSTGRES_USER" envDefault:"postgres"`
Database string `env:"POSTGRES_DB" envDefault:"postgres"`

// ApplicationName is the application name. Used in logs on Pg side.
// Only availaible from pg-9.0.
ApplicationName string `env:"POSTGRES_APPLICATION_NAME" envDefault:"-"`
// Dial timeout for establishing new connections.
DialTimeout time.Duration `env:"POSTGRES_DIAL_TIMEOUT" envDefault:"5s"`
// Name of the Table that is created to try if database is writeable
HealthCheckTableName string `env:"POSTGRES_HEALTH_CHECK_TABLE_NAME" envDefault:"healthcheck"`
// Amount of time to cache the last health check result
HealthCheckResultTTL time.Duration `env:"POSTGRES_HEALTH_CHECK_RESULT_TTL" envDefault:"10s"`
// Indicator whether write (insert,update,delete) queries should be logged
LogWrite bool `env:"POSTGRES_LOG_WRITES" envDefault:"true"`
// Indicator whether read (select) queries should be logged
LogRead bool `env:"POSTGRES_LOG_READS" envDefault:"false"`
// Timeout for socket reads. If reached, commands will fail
// with a timeout instead of blocking.
ReadTimeout time.Duration `env:"POSTGRES_READ_TIMEOUT" envDefault:"30s"`
// Timeout for socket writes. If reached, commands will fail
// with a timeout instead of blocking.
WriteTimeout time.Duration `env:"POSTGRES_WRITE_TIMEOUT" envDefault:"30s"`
}

var cfg Config

func init() {
prometheus.MustRegister(hooks.MetricQueryTotal)
prometheus.MustRegister(hooks.MetricQueryFailed)
prometheus.MustRegister(hooks.MetricQueryDurationSeconds)
prometheus.MustRegister(hooks.MetricQueryAffectedTotal)

err := env.Parse(&cfg)
if err != nil {
log.Fatalf("Failed to parse postgres environment: %v", err)
}

// if the application name is unset infer it from the:
// jaeger service name , service name or executable name
if cfg.ApplicationName == "-" {
names := []string{
os.Getenv("JAEGER_SERVICE_NAME"),
os.Getenv("SERVICE_NAME"),
filepath.Base(os.Args[0]),
}
for _, name := range names {
if name != "" {
cfg.ApplicationName = name
break
}
}
}

servicehealthcheck.RegisterHealthCheck("postgresdefault", NewHealthCheck(NewDB(context.Background())))
}

func NewDB(ctx context.Context, options ...ConfigOption) *bun.DB {
for _, opt := range options {
opt(&cfg)
}

connector := pgdriver.NewConnector(
pgdriver.WithAddr(net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port))),
pgdriver.WithApplicationName(cfg.ApplicationName),
pgdriver.WithDatabase(cfg.Database),
pgdriver.WithDialTimeout(cfg.DialTimeout),
pgdriver.WithPassword(cfg.Password),
pgdriver.WithReadTimeout(cfg.ReadTimeout),
pgdriver.WithUser(cfg.User),
pgdriver.WithWriteTimeout(cfg.WriteTimeout),
)

sqldb := sql.OpenDB(connector)
db := bun.NewDB(sqldb, pgdialect.New())

log.Ctx(ctx).Info().Str("addr", connector.Config().Addr).
Str("user", connector.Config().User).
Str("database", connector.Config().Database).
Str("as", connector.Config().AppName).
Msg("PostgreSQL connection pool created")

// Add hooks
db.AddQueryHook(&hooks.TracingHook{})
db.AddQueryHook(hooks.NewMetricsHook(cfg.Host, cfg.Database))

if cfg.LogWrite || cfg.LogRead {
db.AddQueryHook(hooks.NewLoggingHook(cfg.LogRead, cfg.LogWrite))
} else {
log.Ctx(ctx).Warn().Msg("Connection pool has logging queries disabled completely")
}

return db
}
38 changes: 0 additions & 38 deletions backend/postgres/errors.go

This file was deleted.

44 changes: 0 additions & 44 deletions backend/postgres/errors_test.go

This file was deleted.

92 changes: 92 additions & 0 deletions backend/postgres/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright © 2024 by PACE Telematics GmbH. All rights reserved.

package postgres

import (
"context"
"database/sql"
"time"

"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/uptrace/bun"
)

type queryExecutor interface {
Exec(ctx context.Context, dest ...interface{}) (sql.Result, error)
}

// HealthCheck checks the state of a postgres connection. It must not be changed
// after it was registered as a health check.
type HealthCheck struct {
state servicehealthcheck.ConnectionState

createTableQueryExecutor queryExecutor
deleteQueryExecutor queryExecutor
dropTableQueryExecutor queryExecutor
insertQueryExecutor queryExecutor
selectQueryExecutor queryExecutor
}

type healthcheck struct {
bun.BaseModel

OK bool `bun:"column:ok"`
}

// NewHealthCheck creates a new HealthCheck instance.
func NewHealthCheck(db *bun.DB) *HealthCheck {
return &HealthCheck{
createTableQueryExecutor: db.NewCreateTable().Table(cfg.HealthCheckTableName).Model((*healthcheck)(nil)),
deleteQueryExecutor: db.NewDelete().Table(cfg.HealthCheckTableName),
dropTableQueryExecutor: db.NewDropTable().Table(cfg.HealthCheckTableName).IfExists(),
insertQueryExecutor: db.NewInsert().Table(cfg.HealthCheckTableName).Model(&healthcheck{OK: true}),
selectQueryExecutor: db.NewSelect().Table(cfg.HealthCheckTableName).Column("1"),
}
}

// Init initializes the test table
func (h *HealthCheck) Init(ctx context.Context) error {
_, err := h.createTableQueryExecutor.Exec(ctx)

return err
}

// HealthCheck performs the read test on the database. If enabled, it performs a
// write test as well.
func (h *HealthCheck) HealthCheck(ctx context.Context) servicehealthcheck.HealthCheckResult {
if time.Since(h.state.LastChecked()) <= cfg.HealthCheckResultTTL {
// the last result of the Health Check is still not outdated
return h.state.GetState()
}

// Readcheck
if _, err := h.selectQueryExecutor.Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// writecheck - add Data to configured Table
if _, err := h.insertQueryExecutor.Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// and while we're at it, check delete as well (so as not to clutter the database
// because UPSERT is impractical here
if _, err := h.deleteQueryExecutor.Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// If no error occurred set the State of this Health Check to healthy
h.state.SetHealthy()

return h.state.GetState()
}

// CleanUp drops the test table.
func (h *HealthCheck) CleanUp(ctx context.Context) error {
_, err := h.dropTableQueryExecutor.Exec(ctx)

return err
}
Loading

0 comments on commit 638afe4

Please sign in to comment.