diff --git a/control.go b/control.go index 300e24d..07ab0ea 100644 --- a/control.go +++ b/control.go @@ -1,5 +1,3 @@ -// Copyright (C) 2024 Center for High Performance Computing - package main import ( @@ -19,8 +17,6 @@ import ( const MAX_UINT64 uint64 = ^uint64(0) -// A subset of available properties to modify. -// See https://man7.org/linux/man-pages/man5/systemd.resource-control.5.html. var ( CPUAccounting = "CPUAccounting" CPUQuotaPerSecUSec = "CPUQuotaPerSecUSec" @@ -29,17 +25,6 @@ var ( MemoryMax = "MemoryMax" ) -type systemdConn struct { - conn *systemd.Conn - ctx context.Context -} - -func newSystemdConn() (systemdConn, error) { - ctx := context.Background() - conn, err := systemd.NewSystemConnectionContext(ctx) - return systemdConn{conn, ctx}, err -} - type controlProperty struct { Name string `json:"name"` Value string `json:"value"` @@ -83,15 +68,16 @@ func controlHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) } - sysconn, err := newSystemdConn() + ctx := context.Background() + conn, err := systemd.NewSystemConnectionContext(ctx) if err != nil { slog.Warn("unable to connect to systemd", "err", err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } - defer sysconn.conn.Close() + defer conn.Close() - err = sysconn.conn.SetUnitPropertiesContext(sysconn.ctx, unit, request.Runtime, property) + err = conn.SetUnitPropertiesContext(ctx, unit, request.Runtime, property) if err != nil { slog.Warn("unable to set property", "err", err.Error(), "property", property, "unit", unit) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/control_test.go b/control_test.go index 29d786d..15f16f6 100644 --- a/control_test.go +++ b/control_test.go @@ -1,10 +1,6 @@ package main import ( - "bytes" - "encoding/json" - "net/http" - "net/http/httptest" "testing" ) @@ -82,28 +78,3 @@ func TestTransform(t *testing.T) { t.Fail() } } - -// this test requires priviledge -func _TestControlHandler(t *testing.T) { - mockControlRequest := map[string]interface{}{ - "unit": "user-0.slice", - "property": map[string]interface{}{ - "name": "CPUAccounting", - "value": "true", - }, - "runtime": true, - } - payload, _ := json.Marshal(mockControlRequest) - req := httptest.NewRequest("POST", "/control", bytes.NewReader(payload)) - w := httptest.NewRecorder() - ControlHandler(w, req) - resp := w.Result() - var cr controlResponse - err := json.NewDecoder(resp.Body).Decode(&cr) - if err != nil { - t.Fail() - } - if resp.StatusCode != http.StatusOK { - t.Fail() - } -} diff --git a/go.mod b/go.mod index c3934f9..93d5e1b 100644 --- a/go.mod +++ b/go.mod @@ -7,4 +7,19 @@ toolchain go1.22.5 require ( github.com/coreos/go-systemd/v22 v22.5.0 github.com/godbus/dbus/v5 v5.1.0 + github.com/prometheus/client_golang v1.20.3 + github.com/prometheus/procfs v0.15.1 ) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + golang.org/x/sys v0.22.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) + +replace github.com/coreos/go-systemd/v22 => github.com/jay-mckay/go-systemd/v22 v22.0.0 diff --git a/go.sum b/go.sum index 5e54cec..12dde30 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,29 @@ -github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/jay-mckay/go-systemd/v22 v22.0.0 h1:nXxwYusmnPs+IvtAPhMibtBpaiK6LAz2YnQFPUytMms= +github.com/jay-mckay/go-systemd/v22 v22.0.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/prometheus/client_golang v1.20.3 h1:oPksm4K8B+Vt35tUhw6GbSNSgVlVSBH0qELP/7u83l4= +github.com/prometheus/client_golang v1.20.3/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/main.go b/main.go index 6c2eda9..12d43cf 100644 --- a/main.go +++ b/main.go @@ -1,5 +1,3 @@ -// Copyright (C) 2024 Center for High Performance Computing - package main import ( @@ -19,30 +17,32 @@ func authorize(next http.Handler, secret string) http.Handler { }) } -func newHandler() http.Handler { - mux := http.NewServeMux() - mux.Handle("/control", ControlHandler) - mux.Handle("/", http.NotFoundHandler()) - var handler http.Handler = mux - return handler -} - func main() { var ( + pattern string listenAddr string certFile string keyFile string bearerToken string insecure bool + collectProc bool ) + flag.StringVar(&pattern, "pattern", "user-*.slice", "unit pattern to match units on") flag.StringVar(&listenAddr, "listenAddr", ":2112", "address to listen on for telemetry") flag.StringVar(&certFile, "certFile", "", "file containing certificate to use for tls") flag.StringVar(&keyFile, "keyFile", "", "file containing key to use for tls") flag.StringVar(&bearerToken, "bearerToken", "", "bearer token to use for authentication") flag.BoolVar(&insecure, "insecure", false, "disable tls and bearer token authentication") + flag.BoolVar(&collectProc, "collectProc", false, "enable the collection of process metrics") flag.Parse() + mux := http.NewServeMux() + mux.Handle("/control", ControlHandler) + mux.Handle("/metrics", MetricsHandler(pattern, collectProc)) + mux.Handle("/", http.NotFoundHandler()) + var handler http.Handler = mux + if !insecure { if certFile == "" { log.Fatal("certificate required for use with tls") @@ -54,11 +54,9 @@ func main() { log.Fatal("token of length > 16 required for authentication") } - handler := authorize(newHandler(), bearerToken) - log.Fatal(http.ListenAndServeTLS(listenAddr, certFile, keyFile, handler)) + log.Fatal(http.ListenAndServeTLS(listenAddr, certFile, keyFile, authorize(handler, bearerToken))) } else { - handler := newHandler() log.Println("running in insecure mode") log.Fatal(http.ListenAndServe(listenAddr, handler)) } diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..00152fd --- /dev/null +++ b/metrics.go @@ -0,0 +1,258 @@ +package main + +import ( + "context" + "log" + "net/http" + "os/user" + "regexp" + + systemd "github.com/coreos/go-systemd/v22/dbus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/procfs" +) + +func MetricsHandler(pattern string, collectProc bool) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + registry := prometheus.NewRegistry() + collector := NewCollector(pattern, collectProc) + registry.MustRegister(collector) + h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{}) + h.ServeHTTP(w, r) + } +} + +var namespace = "systemd_unit" +var labels = []string{"unit", "username"} +var procLabels = []string{"unit", "username", "proc"} + +type Collector struct { + pattern string + collectProc bool + memoryAccounting *prometheus.Desc + memoryMax *prometheus.Desc + memoryMin *prometheus.Desc + memoryHigh *prometheus.Desc + memoryLow *prometheus.Desc + memoryCurrent *prometheus.Desc + cpuAccounting *prometheus.Desc + cpuUsage *prometheus.Desc + cpuQuota *prometheus.Desc + procCPU *prometheus.Desc + procMemory *prometheus.Desc + procCount *prometheus.Desc +} + +type Metric struct { + memoryAccounting bool + memoryMax uint64 + memoryMin uint64 + memoryHigh uint64 + memoryLow uint64 + memoryCurrent uint64 + cpuAccounting bool + cpuUsage uint64 + cpuQuota uint64 + unit string + username string + processes map[string]*Process +} + +type Process struct { + cpu float64 + memory uint64 + count uint64 +} + +func NewCollector(pattern string, collectProc bool) *Collector { + return &Collector{ + pattern: pattern, + collectProc: collectProc, + memoryAccounting: prometheus.NewDesc(prometheus.BuildFQName(namespace, "memory", "accounting"), + "Whether memory accounting is enabled", labels, nil), + memoryMax: prometheus.NewDesc(prometheus.BuildFQName(namespace, "memory", "max_bytes"), + "Memory maximum limit", labels, nil), + memoryMin: prometheus.NewDesc(prometheus.BuildFQName(namespace, "memory", "min_bytes"), + "Memory minimum limit", labels, nil), + memoryHigh: prometheus.NewDesc(prometheus.BuildFQName(namespace, "memory", "high_bytes"), + "Memory high limit", labels, nil), + memoryLow: prometheus.NewDesc(prometheus.BuildFQName(namespace, "memory", "low_bytes"), + "Memory low limit", labels, nil), + memoryCurrent: prometheus.NewDesc(prometheus.BuildFQName(namespace, "memory", "current_bytes"), + "Resident shared size memory usage", labels, nil), + cpuAccounting: prometheus.NewDesc(prometheus.BuildFQName(namespace, "cpu", "accounting"), + "Whether CPU accounting is enabled", labels, nil), + cpuUsage: prometheus.NewDesc(prometheus.BuildFQName(namespace, "cpu", "usage_ns"), + "Total CPU usage", labels, nil), + cpuQuota: prometheus.NewDesc(prometheus.BuildFQName(namespace, "cpu", "quota_ns_per_s"), + "CPU Quota", labels, nil), + procCPU: prometheus.NewDesc(prometheus.BuildFQName(namespace, "proc", "cpu_seconds"), + "Aggregate CPU usage for this process", procLabels, nil), + procMemory: prometheus.NewDesc(prometheus.BuildFQName(namespace, "proc", "memory_bytes"), + "Aggregate memory usage for this process", procLabels, nil), + procCount: prometheus.NewDesc(prometheus.BuildFQName(namespace, "proc", "count"), + "Instance count of this process", procLabels, nil), + } +} + +func (c *Collector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.memoryAccounting + ch <- c.memoryMax + ch <- c.memoryMin + ch <- c.memoryHigh + ch <- c.memoryLow + ch <- c.memoryCurrent + ch <- c.cpuAccounting + ch <- c.cpuUsage + ch <- c.cpuQuota + if c.collectProc { + ch <- c.procCPU + ch <- c.procMemory + ch <- c.procCount + } +} + +func (c *Collector) Collect(ch chan<- prometheus.Metric) { + metrics := c.collectMetrics() + for _, m := range metrics { + ch <- prometheus.MustNewConstMetric(c.memoryAccounting, prometheus.GaugeValue, b2f(m.memoryAccounting), m.unit, m.username) + ch <- prometheus.MustNewConstMetric(c.memoryMax, prometheus.GaugeValue, float64(m.memoryMax), m.unit, m.username) + ch <- prometheus.MustNewConstMetric(c.memoryMin, prometheus.GaugeValue, float64(m.memoryMin), m.unit, m.username) + ch <- prometheus.MustNewConstMetric(c.memoryHigh, prometheus.GaugeValue, float64(m.memoryHigh), m.unit, m.username) + ch <- prometheus.MustNewConstMetric(c.memoryLow, prometheus.GaugeValue, float64(m.memoryLow), m.unit, m.username) + ch <- prometheus.MustNewConstMetric(c.memoryCurrent, prometheus.GaugeValue, float64(m.memoryCurrent), m.unit, m.username) + ch <- prometheus.MustNewConstMetric(c.cpuAccounting, prometheus.GaugeValue, b2f(m.cpuAccounting), m.unit, m.username) + ch <- prometheus.MustNewConstMetric(c.cpuUsage, prometheus.CounterValue, float64(m.cpuUsage), m.unit, m.username) + ch <- prometheus.MustNewConstMetric(c.cpuQuota, prometheus.CounterValue, float64(m.cpuQuota), m.unit, m.username) + if c.collectProc { + for name, p := range m.processes { + ch <- prometheus.MustNewConstMetric(c.procCPU, prometheus.GaugeValue, p.cpu, m.unit, m.username, name) + ch <- prometheus.MustNewConstMetric(c.procMemory, prometheus.GaugeValue, float64(p.memory), m.unit, m.username, name) + ch <- prometheus.MustNewConstMetric(c.procCount, prometheus.GaugeValue, float64(p.count), m.unit, m.username, name) + } + } + } +} + +func (c *Collector) collectMetrics() []Metric { + + var metrics []Metric + ctx := context.Background() + conn, err := systemd.NewSystemConnectionContext(ctx) + if err != nil { + log.Println(err) + return metrics + } + defer conn.Close() + + units, err := conn.ListUnitsByPatternsContext(ctx, []string{}, []string{c.pattern}) + if err != nil { + log.Println(err) + return metrics + } + + for _, unit := range units { + props, err := conn.GetUnitTypePropertiesContext(ctx, unit.Name, "Slice") + if err != nil { + log.Println(err) + continue + } + metric := Metric{ + memoryAccounting: props["MemoryAccounting"].(bool), + memoryMax: props["MemoryMax"].(uint64), + memoryMin: props["MemoryMin"].(uint64), + memoryHigh: props["MemoryHigh"].(uint64), + memoryLow: props["MemoryLow"].(uint64), + memoryCurrent: props["MemoryCurrent"].(uint64), + cpuAccounting: props["CPUAccounting"].(bool), + cpuUsage: props["CPUUsageNSec"].(uint64), + cpuQuota: props["CPUQuotaPerSecUSec"].(uint64), + unit: unit.Name, + username: lookupUsername(unit), + } + if c.collectProc { + procs, err := collectProcesses(conn, ctx, unit.Name) + if err != nil { + log.Println(err) + } else { + metric.processes = procs + } + } + metrics = append(metrics, metric) + } + return metrics +} + +func collectProcesses(conn *systemd.Conn, ctx context.Context, unit string) (map[string]*Process, error) { + processes := make(map[string]*Process) + procs, err := conn.GetUnitProcesses(ctx, unit) + if err != nil { + return processes, err + } + + fs, err := procfs.NewDefaultFS() + if err != nil { + return processes, err + } + + for _, p := range procs { + proc, err := fs.Proc(int(p.PID)) + if err != nil { + log.Println(err) + continue + } + + comm, err := proc.Comm() + if err != nil { + log.Println(err) + continue + } + + stat, err := proc.Stat() + if err != nil { + log.Println(err) + continue + } + + smaps, err := proc.ProcSMapsRollup() + if err != nil { + log.Println(err) + continue + } + + val, ok := processes[comm] + if !ok { + processes[comm] = &Process{cpu: stat.CPUTime(), memory: smaps.Pss, count: 1} + } else { + val.cpu += stat.CPUTime() + val.memory += smaps.Pss + val.count += 1 + } + } + return processes, nil +} + +func lookupUsername(unit systemd.UnitStatus) string { + pattern := `^user-(\d+)\.slice$` + re := regexp.MustCompile(pattern) + match := re.FindStringSubmatch(unit.Name) + + if len(match) < 1 { + return "unknown user" + } + + user, err := user.LookupId(match[1]) + if err != nil { + return "unknown user" + } + + return user.Username +} + +func b2f(b bool) float64 { + if !b { + return -1.0 + } + return 1.0 +}