Skip to content

Commit

Permalink
switch node names to machine ID (#251)
Browse files Browse the repository at this point in the history
* WIP switch node names to machine ID

* fix replica clone

* updates

* fixes

* TODO

* fix application_name on replica startup

* move to function

* reuse code

* upgrade primary

* clean up the diff

* fix tests

* silence warning

* missed a few

* fix deepsource callout

* make restart-repmgrd more resilient

* make pg_unregister work with new names

* Accept migration failures

* add missing panic

* update pg_unregister comment

* remove old comment
  • Loading branch information
benwaffle authored Aug 13, 2024
1 parent 7a8ac4d commit a39f76f
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 47 deletions.
9 changes: 8 additions & 1 deletion bin/restart-repmgrd
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
#!/bin/bash

kill `cat /tmp/repmgrd.pid`
if [ -f /tmp/repmgrd.pid ]; then
PID=$(cat /tmp/repmgrd.pid)

# Check if the process is running
if ps -p $PID > /dev/null 2>&1; then
kill $PID
fi
fi
11 changes: 10 additions & 1 deletion cmd/pg_unregister/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/base64"
"errors"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -44,7 +45,15 @@ func processUnregistration(ctx context.Context) error {
defer func() { _ = conn.Close(ctx) }()

member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes))
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
// for historical reasons, old versions of flyctl passes in the 6pn as the hostname
// most likely this won't work because the hostname does not resolve if the machine is stopped,
// but we try anyway
member, err = node.RepMgr.MemberBy6PN(ctx, conn, string(hostnameBytes))
if err != nil {
return fmt.Errorf("failed to resolve member by hostname and 6pn: %s", err)
}
} else if err != nil {
return fmt.Errorf("failed to resolve member: %s", err)
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ require (
github.com/hashicorp/consul/api v1.18.0
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgx/v5 v5.5.4
github.com/olekukonko/tablewriter v0.0.5
github.com/pkg/errors v0.9.1
github.com/pkg/term v1.1.0
github.com/spf13/cobra v1.8.1
github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2
golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3
golang.org/x/sync v0.1.0
Expand All @@ -36,8 +38,6 @@ require (
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
Expand Down
65 changes: 62 additions & 3 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/fly-apps/postgres-flex/internal/privnet"
"github.com/fly-apps/postgres-flex/internal/utils"
"github.com/jackc/pgx/v5"
"golang.org/x/exp/slices"
)

type Node struct {
AppName string
MachineID string
PrivateIP string
PrimaryRegion string
DataDir string
Expand Down Expand Up @@ -52,6 +54,8 @@ func NewNode() (*Node, error) {

node.PrivateIP = ipv6.String()

node.MachineID = os.Getenv("FLY_MACHINE_ID")

node.PrimaryRegion = os.Getenv("PRIMARY_REGION")
if node.PrimaryRegion == "" {
return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set")
Expand Down Expand Up @@ -89,6 +93,7 @@ func NewNode() (*Node, error) {
PasswordConfigPath: "/data/.pgpass",
DataDir: node.DataDir,
PrivateIP: node.PrivateIP,
MachineID: node.MachineID,
Port: 5433,
DatabaseName: "repmgr",
Credentials: node.ReplCredentials,
Expand Down Expand Up @@ -265,7 +270,7 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve member role: %s", err)
}

// Restart repmgrd in the event the IP changes for an already registered node.
// Restart repmgrd in the event the machine ID changes for an already registered node.
// This can happen if the underlying volume is moved to a different node.
daemonRestartRequired := n.RepMgr.daemonRestartRequired(member)

Expand All @@ -279,6 +284,8 @@ func (n *Node) PostInit(ctx context.Context) error {
if err := Quarantine(ctx, n, primary); err != nil {
return fmt.Errorf("failed to quarantine failed primary: %s", err)
}

panic(err)
} else if errors.Is(err, ErrZombieDiscovered) {
log.Printf("[ERROR] The majority of registered members agree that '%s' is the real primary.\n", primary)
// Turn member read-only
Expand All @@ -292,10 +299,10 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// This should never happen
if primary != n.PrivateIP {
if primary != n.RepMgr.machineIdToDNS(n.MachineID) {
return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen",
primary,
n.PrivateIP,
n.RepMgr.machineIdToDNS(n.MachineID),
)
}

Expand All @@ -311,6 +318,11 @@ func (n *Node) PostInit(ctx context.Context) error {
}
}
case StandbyRoleName:
if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil {
log.Printf("[ERROR] failed to migrate node name: %s", err)
// We try to bring the standby up anyway
}

// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
Expand Down Expand Up @@ -527,3 +539,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro

return nil
}

// migrate node name from 6pn to machine ID if needed
func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error {
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

// if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
}

return nil
}
4 changes: 2 additions & 2 deletions internal/flypg/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {

for _, member := range members {
if member.Role == PrimaryRoleName {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err)
Expand All @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error {
}

for _, member := range members {
endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint)
endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint)
resp, err := http.Get(endpoint)
if err != nil {
log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err)
Expand Down
91 changes: 81 additions & 10 deletions internal/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type RepMgr struct {
PrimaryRegion string
Region string
PrivateIP string
MachineID string
DataDir string
DatabaseName string
Credentials admin.Credential
Expand Down Expand Up @@ -161,10 +162,12 @@ func (r *RepMgr) setDefaults() error {
return err
}

hostname := r.machineIdToDNS(r.MachineID)

conf := ConfigMap{
"node_id": nodeID,
"node_name": fmt.Sprintf("'%s'", r.PrivateIP),
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.PrivateIP, r.Port, r.Credentials.Username, r.DatabaseName),
"node_name": fmt.Sprintf("'%s'", hostname),
"conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", hostname, r.Port, r.Credentials.Username, r.DatabaseName),
"data_directory": fmt.Sprintf("'%s'", r.DataDir),
"failover": "'automatic'",
"use_replication_slots": "yes",
Expand Down Expand Up @@ -276,7 +279,7 @@ func (*RepMgr) restartDaemon() error {
}

func (r *RepMgr) daemonRestartRequired(m *Member) bool {
return m.Hostname != r.PrivateIP
return m.Hostname != r.MachineID
}

func (r *RepMgr) unregisterWitness(id int) error {
Expand All @@ -301,14 +304,14 @@ func (r *RepMgr) rejoinCluster(hostname string) error {
return err
}

func (r *RepMgr) clonePrimary(ipStr string) error {
func (r *RepMgr) clonePrimary(hostname string) error {
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
return fmt.Errorf("failed to create pg directory: %s", err)
}

cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F",
ipStr,
hostname,
r.Port,
r.DatabaseName,
r.Credentials.Username,
Expand All @@ -322,6 +325,21 @@ func (r *RepMgr) clonePrimary(ipStr string) error {
return nil
}

func (r *RepMgr) regenReplicationConf(ctx context.Context) error {
// TODO: do we need -c?
if _, err := utils.RunCmd(ctx, "postgres",
"repmgr", "--replication-conf-only",
"-h", "",
"-p", fmt.Sprint(r.Port),
"-d", r.DatabaseName,
"-U", r.Credentials.Username,
"-f", r.ConfigPath,
"standby", "clone", "-F"); err != nil {
return fmt.Errorf("failed to regenerate replication conf: %s", err)
}
return nil
}

type Member struct {
ID int
Hostname string
Expand Down Expand Up @@ -431,26 +449,56 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri
return &member, nil
}

// MemberBy6PN returns a member by its 6PN address.
func (r *RepMgr) MemberBy6PN(ctx context.Context, pg *pgx.Conn, ip string) (*Member, error) {
members, err := r.Members(ctx, pg)
if err != nil {
return nil, err
}

resolver := privnet.GetResolver()
var lastErr error
for _, member := range members {
ips, err := resolver.LookupIPAddr(ctx, member.Hostname)
if err != nil {
lastErr = err
continue
}

for _, addr := range ips {
if addr.IP.String() == ip {
return &member, nil
}
}
}

if lastErr != nil {
return nil, fmt.Errorf("no matches found for %s, and error encountered: %s", ip, lastErr)
}

return nil, nil
}

func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) {
ips, err := r.InRegionPeerIPs(ctx)
machineIds, err := r.InRegionPeerMachines(ctx)
if err != nil {
return nil, err
}

var target *Member

for _, ip := range ips {
if ip.String() == r.PrivateIP {
for _, machineId := range machineIds {
if machineId == r.MachineID {
continue
}

conn, err := r.NewRemoteConnection(ctx, ip.String())
conn, err := r.NewRemoteConnection(ctx, r.machineIdToDNS(machineId))
if err != nil {
continue
}
defer func() { _ = conn.Close(ctx) }()

member, err := r.MemberByHostname(ctx, conn, ip.String())
member, err := r.MemberByHostname(ctx, conn, r.machineIdToDNS(machineId))
if err != nil {
continue
}
Expand All @@ -477,6 +525,21 @@ func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) {
return privnet.AllPeers(ctx, targets)
}

func (r *RepMgr) InRegionPeerMachines(ctx context.Context) ([]string, error) {
machines, err := privnet.AllMachines(ctx, r.AppName)
if err != nil {
return nil, err
}

var machineIDs []string
for _, machine := range machines {
if machine.Region == r.PrimaryRegion {
machineIDs = append(machineIDs, machine.Id)
}
}
return machineIDs, nil
}

func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error) {
ips, err := r.InRegionPeerIPs(ctx)
if err != nil {
Expand Down Expand Up @@ -514,3 +577,11 @@ func (r *RepMgr) UnregisterMember(member Member) error {
func (r *RepMgr) eligiblePrimary() bool {
return r.Region == r.PrimaryRegion
}

func (r *RepMgr) machineIdToDNS(nodeName string) string {
if len(nodeName) != 14 {
panic("invalid machine id")
}

return fmt.Sprintf("%s.vm.%s.internal", nodeName, r.AppName)
}
6 changes: 4 additions & 2 deletions internal/flypg/repmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestRepmgrInitialization(t *testing.T) {
UserConfigPath: repgmrUserConfigFilePath,
PasswordConfigPath: repgmrPasswordConfigFilePath,
DataDir: repmgrTestDirectory,
MachineID: "abcdefg1234567",
PrivateIP: "127.0.0.1",
Credentials: admin.Credential{
Username: "user",
Expand Down Expand Up @@ -91,8 +92,8 @@ func TestRepmgrInitialization(t *testing.T) {
t.Fatal(err)
}

if config["node_name"] != "'127.0.0.1'" {
t.Fatalf("expected node_name to be '127.0.0.1', got %v", config["node_name"])
if config["node_name"] != "'abcdefg1234567.vm.test-app.internal'" {
t.Fatalf("expected node_name to be 'abcdefg1234567.vm.test-app.internal', got %v", config["node_name"])
}

if config["location"] != "'dev'" {
Expand Down Expand Up @@ -122,6 +123,7 @@ func TestRepmgrNodeIDGeneration(t *testing.T) {

DataDir: repmgrTestDirectory,
PrivateIP: "127.0.0.1",
MachineID: "abcdefg1234567",
Port: 5433,
DatabaseName: "repmgr",
Credentials: admin.Credential{
Expand Down
Loading

0 comments on commit a39f76f

Please sign in to comment.