Skip to content

Commit

Permalink
backend: Add bun
Browse files Browse the repository at this point in the history
  • Loading branch information
monstermunchkin committed Sep 25, 2024
1 parent f0191f9 commit d1221df
Show file tree
Hide file tree
Showing 6 changed files with 496 additions and 0 deletions.
123 changes: 123 additions & 0 deletions backend/bun/bun.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package bun

import (
"context"
"database/sql"
"net"
"os"
"path/filepath"
"strconv"
"time"

"github.com/caarlos0/env/v10"
"github.com/pace/bricks/backend/bun/hooks"
"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"
"github.com/uptrace/bun/driver/pgdriver"

"github.com/pace/bricks/maintenance/log"
)

type Config struct {
Port int `env:"POSTGRES_PORT" envDefault:"5432"`
Host string `env:"POSTGRES_HOST" envDefault:"postgres"`
Password string `env:"POSTGRES_PASSWORD" envDefault:"mysecretpassword"`
User string `env:"POSTGRES_USER" envDefault:"postgres"`
Database string `env:"POSTGRES_DB" envDefault:"postgres"`

// ApplicationName is the application name. Used in logs on Pg side.
// Only availaible from pg-9.0.
ApplicationName string `env:"POSTGRES_APPLICATION_NAME" envDefault:"-"`
// Dial timeout for establishing new connections.
DialTimeout time.Duration `env:"POSTGRES_DIAL_TIMEOUT" envDefault:"5s"`
// Name of the Table that is created to try if database is writeable
HealthCheckTableName string `env:"POSTGRES_HEALTH_CHECK_TABLE_NAME" envDefault:"healthcheck"`
// Amount of time to cache the last health check result
HealthCheckResultTTL time.Duration `env:"POSTGRES_HEALTH_CHECK_RESULT_TTL" envDefault:"10s"`
// Indicator whether write (insert,update,delete) queries should be logged
LogWrite bool `env:"POSTGRES_LOG_WRITES" envDefault:"true"`
// Indicator whether read (select) queries should be logged
LogRead bool `env:"POSTGRES_LOG_READS" envDefault:"false"`
// Timeout for socket reads. If reached, commands will fail
// with a timeout instead of blocking.
ReadTimeout time.Duration `env:"POSTGRES_READ_TIMEOUT" envDefault:"30s"`
// Timeout for socket writes. If reached, commands will fail
// with a timeout instead of blocking.
WriteTimeout time.Duration `env:"POSTGRES_WRITE_TIMEOUT" envDefault:"30s"`
}

var (
cfg Config
)

func init() {
prometheus.MustRegister(hooks.MetricQueryTotal)
prometheus.MustRegister(hooks.MetricQueryFailed)
prometheus.MustRegister(hooks.MetricQueryDurationSeconds)
prometheus.MustRegister(hooks.MetricQueryAffectedTotal)

err := env.Parse(&cfg)
if err != nil {
log.Fatalf("Failed to parse postgres environment: %v", err)
}

// if the application name is unset infer it from the:
// jaeger service name , service name or executable name
if cfg.ApplicationName == "-" {
names := []string{
os.Getenv("JAEGER_SERVICE_NAME"),
os.Getenv("SERVICE_NAME"),
filepath.Base(os.Args[0]),
}
for _, name := range names {
if name != "" {
cfg.ApplicationName = name
break
}
}
}

servicehealthcheck.RegisterHealthCheck("postgresdefault", &HealthCheck{
db: NewDB(context.Background()),
})
}

func NewDB(ctx context.Context, options ...ConfigOption) *bun.DB {
for _, opt := range options {
opt(&cfg)
}

connector := pgdriver.NewConnector(
pgdriver.WithAddr(net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port))),
pgdriver.WithApplicationName(cfg.ApplicationName),
pgdriver.WithDatabase(cfg.Database),
pgdriver.WithDialTimeout(cfg.DialTimeout),
pgdriver.WithPassword(cfg.Password),
pgdriver.WithReadTimeout(cfg.ReadTimeout),
pgdriver.WithUser(cfg.User),
pgdriver.WithWriteTimeout(cfg.WriteTimeout),
)

sqldb := sql.OpenDB(connector)
db := bun.NewDB(sqldb, pgdialect.New())

log.Ctx(ctx).Info().Str("addr", connector.Config().Addr).
Str("user", connector.Config().User).
Str("database", connector.Config().Database).
Str("as", connector.Config().AppName).
Msg("PostgreSQL connection pool created")

// Add hooks
db.AddQueryHook(&hooks.TracingHook{})
db.AddQueryHook(hooks.NewMetricsHook(cfg.Host, cfg.Database))

if cfg.LogWrite || cfg.LogRead {
db.AddQueryHook(hooks.NewLoggingHook(cfg.LogRead, cfg.LogWrite))
} else {
log.Ctx(ctx).Warn().Msg("Connection pool has logging queries disabled completely")
}

return db
}
73 changes: 73 additions & 0 deletions backend/bun/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright © 2019 by PACE Telematics GmbH. All rights reserved.

package bun

import (
"context"
"time"

"github.com/pace/bricks/maintenance/health/servicehealthcheck"
"github.com/uptrace/bun"
)

// HealthCheck checks the state of a postgres connection. It must not be changed
// after it was registered as a health check.
type HealthCheck struct {
state servicehealthcheck.ConnectionState
db *bun.DB
}

type healthcheck struct {
bun.BaseModel

OK bool `bun:"column:ok"`
}

// Init initializes the test table
func (h *HealthCheck) Init(ctx context.Context) error {
_, err := h.db.NewCreateTable().Table(cfg.HealthCheckTableName).Model((*healthcheck)(nil)).Exec(ctx)

return err
}

// HealthCheck performs the read test on the database. If enabled, it performs a
// write test as well.
func (h *HealthCheck) HealthCheck(ctx context.Context) servicehealthcheck.HealthCheckResult {
if time.Since(h.state.LastChecked()) <= cfg.HealthCheckResultTTL {
// the last result of the Health Check is still not outdated
return h.state.GetState()
}

// Readcheck
if _, err := h.db.NewSelect().Column("1").Exec(ctx); err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

hc := &healthcheck{OK: true}

// writecheck - add Data to configured Table
_, err := h.db.NewInsert().Model(hc).Exec(ctx)
if err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}

// and while we're at it, check delete as well (so as not to clutter the database
// because UPSERT is impractical here
_, err = h.db.NewDelete().Exec(ctx)
if err != nil {
h.state.SetErrorState(err)
return h.state.GetState()
}
// If no error occurred set the State of this Health Check to healthy
h.state.SetHealthy()
return h.state.GetState()
}

// CleanUp drops the test table.
func (h *HealthCheck) CleanUp(ctx context.Context) error {
_, err := h.db.NewDropTable().Table(cfg.HealthCheckTableName).IfExists().Exec(ctx)

return err
}
84 changes: 84 additions & 0 deletions backend/bun/hooks/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package hooks

import (
"context"
"strings"
"time"

"github.com/rs/zerolog"
"github.com/uptrace/bun"

"github.com/pace/bricks/maintenance/log"
)

type queryMode int

const (
readMode queryMode = iota
writeMode queryMode = iota
)

type LoggingHook struct {
logReadQueries bool
logWriteQueries bool
}

func NewLoggingHook(logRead bool, logWrite bool) *LoggingHook {
return &LoggingHook{
logReadQueries: logRead,
logWriteQueries: logWrite,
}
}

func (h *LoggingHook) BeforeQuery(ctx context.Context, event *bun.QueryEvent) context.Context {
return ctx
}

func (h *LoggingHook) AfterQuery(ctx context.Context, event *bun.QueryEvent) {
// we can only and should only perfom the following check if we have the information availaible
mode := determineQueryMode(event.Query)

if mode == readMode && !h.logReadQueries {
return
}

if mode == writeMode && !h.logWriteQueries {
return
}

dur := float64(time.Since(event.StartTime)) / float64(time.Millisecond)

// check if log context is given
var logger *zerolog.Logger
if ctx != nil {
logger = log.Ctx(ctx)
} else {
logger = log.Logger()
}

// add general info
logEvent := logger.Debug().
Float64("duration", dur).
Str("sentry:category", "postgres")

// add error or result set info
if event.Err != nil {
logEvent = logEvent.Err(event.Err)
} else {
rowsAffected, err := event.Result.RowsAffected()
if err == nil {
logEvent = logEvent.Int64("affected", rowsAffected)
}
}

logEvent.Msg(event.Query)
}

// determineQueryMode is a poorman's attempt at checking whether the query is a read or write to the database.
// Feel free to improve.
func determineQueryMode(qry string) queryMode {
if strings.HasPrefix(strings.ToLower(strings.TrimSpace(qry)), "select") {
return readMode
}
return writeMode
}
80 changes: 80 additions & 0 deletions backend/bun/hooks/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package hooks

import (
"context"
"math"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"
)

var (
MetricQueryTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pace_postgres_query_total",
Help: "Collects stats about the number of postgres queries made",
},
[]string{"database"},
)
MetricQueryFailed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pace_postgres_query_failed",
Help: "Collects stats about the number of postgres queries failed",
},
[]string{"database"},
)
MetricQueryDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "pace_postgres_query_duration_seconds",
Help: "Collect performance metrics for each postgres query",
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 60},
},
[]string{"database"},
)
MetricQueryAffectedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pace_postgres_query_affected_total",
Help: "Collects stats about the number of rows affected by a postgres query",
},
[]string{"database"},
)
)

type MetricsHook struct {
addr string
database string
}

func NewMetricsHook(addr string, database string) *MetricsHook {
return &MetricsHook{
addr: addr,
database: database,
}
}

func (h *MetricsHook) BeforeQuery(ctx context.Context, event *bun.QueryEvent) context.Context {
return ctx
}

func (h *MetricsHook) AfterQuery(ctx context.Context, event *bun.QueryEvent) {
dur := float64(time.Since(event.StartTime)) / float64(time.Millisecond)

labels := prometheus.Labels{
"database": h.addr + "/" + h.database,
}

MetricQueryTotal.With(labels).Inc()

if event.Err != nil {
MetricQueryFailed.With(labels).Inc()
} else {
r := event.Result
rowsAffected, err := r.RowsAffected()
if err == nil {
MetricQueryAffectedTotal.With(labels).Add(math.Max(0, float64(rowsAffected)))
}
}

MetricQueryDurationSeconds.With(labels).Observe(dur)
}
Loading

0 comments on commit d1221df

Please sign in to comment.