Skip to content

Commit

Permalink
streamd: follow prometheus naming conv and srt labels
Browse files Browse the repository at this point in the history
  • Loading branch information
hmelder committed Dec 5, 2024
1 parent bed6387 commit 32947ed
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 102 deletions.
216 changes: 119 additions & 97 deletions streamd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,155 +11,177 @@ type httpServer struct {
daemonController
}

// Minimalist prometheus exporter
func (h *httpServer) metrics(w http.ResponseWriter, r *http.Request) {
m := h.metricsSnapshot()
func writeSRTStatsMeta(w http.ResponseWriter) {
fmt.Fprintln(w, "# HELP srt_callers Current number of subscribers to the SRT stream")
fmt.Fprintln(w, "# TYPE srt_callers gauge")

/* CPU */
fmt.Fprintln(w, "# HELP srt_send_bytes_total Total bytes sent across all callers")
fmt.Fprintln(w, "# TYPE srt_send_bytes_total counter")

cpuTime := m.cpu.Time.UnixMilli()
fmt.Fprintf(w, "# HELP linux_proc_user Time spent in user mode, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_user gauge\n")
fmt.Fprintf(w, "linux_proc_user %d %d\n", m.cpu.User, cpuTime)
fmt.Fprintln(w, "# HELP srt_send_rate Send rate in Mbps")
fmt.Fprintln(w, "# TYPE srt_send_rate gauge")

fmt.Fprintf(w, "# HELP linux_proc_system Time spent in system mode, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_system gauge\n")
fmt.Fprintf(w, "linux_proc_system %d %d\n", m.cpu.System, cpuTime)
fmt.Fprintln(w, "# HELP srt_bandwidth Bandwidth in Mbps")
fmt.Fprintln(w, "# TYPE srt_bandwidth gauge")

fmt.Fprintf(w, "# HELP linux_proc_iowait Time spent waiting for I/O to complete, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_iowait gauge\n")
fmt.Fprintf(w, "linux_proc_iowait %d %d\n", m.cpu.Iowait, cpuTime)
fmt.Fprintln(w, "# HELP srt_rtt_seconds RTT in s")
fmt.Fprintln(w, "# TYPE srt_rtt_seconds gauge")

fmt.Fprintf(w, "# HELP linux_proc_irq Time spent servicing interrupts, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_irq gauge\n")
fmt.Fprintf(w, "linux_proc_irq %d %d\n", m.cpu.Irq, cpuTime)
fmt.Fprintln(w, "# HELP srt_negotiated_latency_seconds Negotiated latency in s")
fmt.Fprintln(w, "# TYPE srt_negotiated_latency_seconds gauge")

fmt.Fprintf(w, "# HELP linux_proc_softirq Time spent servicing soft interrupts, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_softirq gauge\n")
fmt.Fprintf(w, "linux_proc_softirq %d %d\n", m.cpu.SoftIrq, cpuTime)
fmt.Fprintln(w, "# HELP srt_sent_bytes_total Total bytes sent")
fmt.Fprintln(w, "# TYPE srt_sent_bytes_total counter")

/* Memory */
fmt.Fprintln(w, "# HELP srt_retransmitted_bytes_total Total bytes retransmitted")
fmt.Fprintln(w, "# TYPE srt_retransmitted_bytes_total counter")

memTime := m.mem.Time.UnixMilli()
fmt.Fprintf(w, "# HELP linux_mem_used Amount of memory used, in kB\n")
fmt.Fprintf(w, "# TYPE linux_mem_used gauge\n")
fmt.Fprintf(w, "linux_mem_used %d %d\n", m.mem.MemUsed, memTime)
fmt.Fprintln(w, "# HELP srt_sent_dropped_bytes_total Total bytes retransmitted")
fmt.Fprintln(w, "# TYPE srt_sent_dropped_bytes_total counter")

fmt.Fprintf(w, "# HELP linux_mem_free Amount of free memory, in kB\n")
fmt.Fprintf(w, "# TYPE linux_mem_free gauge\n")
fmt.Fprintf(w, "linux_mem_free %d %d\n", m.mem.MemFree, memTime)
fmt.Fprintln(w, "# HELP srt_packets_sent_total Total packets sent")
fmt.Fprintln(w, "# TYPE srt_packets_sent_total counter")

/* TODO(hugo) I/O Status metrics via `iostat` when recording is implemented */
fmt.Fprintln(w, "# HELP srt_packets_sent_lost_total Total packets lost")
fmt.Fprintln(w, "# TYPE srt_packets_sent_lost_total counter")

/* Load Average */

// The timestamp is an int64 (milliseconds since epoch, i.e. 1970-01-01
// 00:00:00 UTC, excluding leap seconds), represented as required by Go's
// ParseInt() function.
loadAvgTime := m.loadAvg.Time.UnixMilli()
fmt.Fprintf(w, "# HELP load_avg_one Load average over one minute\n")
fmt.Fprintf(w, "# TYPE load_avg_one gauge\n")
fmt.Fprintf(w, "load_avg_one %f %d\n", m.loadAvg.One, loadAvgTime)
fmt.Fprintln(w, "# HELP srt_packets_sent_dropped_total Total packets dropped")
fmt.Fprintln(w, "# TYPE srt_packets_sent_dropped_total counter")

fmt.Fprintf(w, "# HELP load_avg_five Load average over five minutes\n")
fmt.Fprintf(w, "# TYPE load_avg_five gauge\n")
fmt.Fprintf(w, "load_avg_five %f %d\n", m.loadAvg.Five, loadAvgTime)
fmt.Fprintln(w, "# HELP srt_packets_retransmitted_total Total packets retransmitted")
fmt.Fprintln(w, "# TYPE srt_packets_retransmitted_total counter")

fmt.Fprintf(w, "# HELP load_avg_fifteen Load average over fifteen minutes\n")
fmt.Fprintf(w, "# TYPE load_avg_fifteen gauge\n")
fmt.Fprintf(w, "load_avg_fifteen %f %d\n", m.loadAvg.Fifteen, loadAvgTime)
fmt.Fprintln(w, "# HELP srt_packets_ack_received_total Number of acks received")
fmt.Fprintln(w, "# TYPE srt_packets_ack_received_total counter")

/* SRT Statistics */
fmt.Fprintln(w, "# HELP srt_packets_nack_received_total Number of nacks received")
fmt.Fprintln(w, "# TYPE srt_packets_nack_received_total counter")
}

srtTime := m.compSinkStats.time.UnixMilli()
fmt.Fprintf(w, "# HELP srt_callers Current number of subscribers to the SRT stream\n")
fmt.Fprintf(w, "# TYPE srt_callers gauge\n")
fmt.Fprintf(w, "srt_callers %d %d\n", len(m.compSinkStats.callers), srtTime)
func writeSRTStats(w http.ResponseWriter, s *srtStats, sink string) {
srtTime := s.time.UnixMilli()
fmt.Fprintf(w, "srt_callers{sink=\"%s\"} %d %d\n", sink, len(s.callers), srtTime)

// Total bytes sent
fmt.Fprintf(w, "# HELP srt_bytes_send_total Total bytes sent across all callers\n")
fmt.Fprintf(w, "# TYPE srt_bytes_send_total counter\n")
fmt.Fprintf(w, "srt_bytes_send_total %d %d\n", m.compSinkStats.bytesSendTotal, srtTime)
fmt.Fprintf(w, "srt_send_bytes_total{sink=\"%s\"} %d %d\n", sink, s.bytesSendTotal, srtTime)

// Send rate per caller
// TODO: caller should be identified by IP, but that field is NULL in the srt stats structure
for i, caller := range m.compSinkStats.callers {
common := fmt.Sprintf("caller=\"%d\"", i)
for _, caller := range s.callers {
common := fmt.Sprintf("address=\"%s\", port=\"%d\", sink=\"%s\"", caller.callerAddress.String(), caller.callerPort, sink)

// Send Rate
fmt.Fprintf(w, "# HELP srt_send_rate Send rate in Mbps\n")
fmt.Fprintf(w, "# TYPE srt_send_rate gauge\n")
fmt.Fprintf(w, "srt_send_rate{%s} %f %d\n", common, caller.sendRateMbps, srtTime)

// Bandwidth
fmt.Fprintf(w, "# HELP srt_bandwidth Bandwidth in Mbps\n")
fmt.Fprintf(w, "# TYPE srt_bandwidth gauge\n")
fmt.Fprintf(w, "srt_bandwidth{%s} %f %d\n", common, caller.bandwidthMbps, srtTime)

// Round-trip time (RTT)
fmt.Fprintf(w, "# HELP srt_rtt RTT in ms\n")
fmt.Fprintf(w, "# TYPE srt_rtt gauge\n")
fmt.Fprintf(w, "srt_rtt{%s} %f %d\n", common, caller.rttMS, srtTime)
fmt.Fprintf(w, "srt_rtt_seconds{%s} %f %d\n", common, caller.rttMS/1000, srtTime)

// Negotiated Latency
fmt.Fprintf(w, "# HELP srt_negotiated_latency Negotiated latency in ms\n")
fmt.Fprintf(w, "# TYPE srt_negotiated_latency gauge\n")
fmt.Fprintf(w, "srt_negotiated_latency{%s} %d %d\n", common, caller.negotiatedLatencyMS, srtTime)
fmt.Fprintf(w, "srt_negotiated_latency_seconds{%s} %d %d\n", common, caller.negotiatedLatencyMS/1000, srtTime)

// Bytes sent
fmt.Fprintf(w, "# HELP srt_bytes_sent Total bytes sent\n")
fmt.Fprintf(w, "# TYPE srt_bytes_sent gauge\n")
fmt.Fprintf(w, "srt_bytes_sent{%s} %d %d\n", common, caller.bytesSent, srtTime)
fmt.Fprintf(w, "srt_sent_bytes_total{%s} %d %d\n", common, caller.bytesSent, srtTime)

// Bytes Retransmitted
fmt.Fprintf(w, "# HELP srt_bytes_retransmitted Total bytes retransmitted\n")
fmt.Fprintf(w, "# TYPE srt_bytes_retransmitted gauge\n")
fmt.Fprintf(w, "srt_bytes_retransmitted{%s} %d %d\n", common, caller.bytesRetransmitted, srtTime)
fmt.Fprintf(w, "srt_retransmitted_bytes_total{%s} %d %d\n", common, caller.bytesRetransmitted, srtTime)

// Bytes Sent Dropped
fmt.Fprintf(w, "# HELP srt_bytes_send_dropped Total bytes retransmitted\n")
fmt.Fprintf(w, "# TYPE srt_bytes_send_dropped gauge\n")
fmt.Fprintf(w, "srt_bytes_send_dropped{%s} %d %d\n", common, caller.bytesSentDropped, srtTime)
fmt.Fprintf(w, "srt_sent_dropped_bytes_total{%s} %d %d\n", common, caller.bytesSentDropped, srtTime)

// Packets sent
fmt.Fprintf(w, "# HELP srt_packets_sent Total packets sent\n")
fmt.Fprintf(w, "# TYPE srt_packets_sent gauge\n")
fmt.Fprintf(w, "srt_packets_sent{%s} %d %d\n", common, caller.packetsSent, srtTime)
fmt.Fprintf(w, "srt_packets_sent_total{%s} %d %d\n", common, caller.packetsSent, srtTime)

// Packets Sent Lost
fmt.Fprintf(w, "# HELP srt_packets_sent_lost Total packets lost\n")
fmt.Fprintf(w, "# TYPE srt_packets_sent_lost gauge\n")
fmt.Fprintf(w, "srt_packets_sent_lost{%s} %d %d\n", common, caller.packetsSentLost, srtTime)
fmt.Fprintf(w, "srt_packets_sent_lost_total{%s} %d %d\n", common, caller.packetsSentLost, srtTime)

// Packets Sent Dropped
fmt.Fprintf(w, "# HELP srt_packets_sent_dropped Total packets dropped\n")
fmt.Fprintf(w, "# TYPE srt_packets_sent_dropped gauge\n")
fmt.Fprintf(w, "srt_packets_sent_dropped{%s} %d %d\n", common, caller.packetsSentDropped, srtTime)
fmt.Fprintf(w, "srt_packets_sent_dropped_total{%s} %d %d\n", common, caller.packetsSentDropped, srtTime)

// Packets Retransmitted
fmt.Fprintf(w, "# HELP srt_packets_retransmitted Total packets retransmitted\n")
fmt.Fprintf(w, "# TYPE srt_packets_retransmitted gauge\n")
fmt.Fprintf(w, "srt_packets_retransmitted{%s} %d %d\n", common, caller.packetsRetransmitted, srtTime)
fmt.Fprintf(w, "srt_packets_retransmitted_total{%s} %d %d\n", common, caller.packetsRetransmitted, srtTime)

// Packets Ack Received
fmt.Fprintf(w, "# HELP srt_packets_ack_received Number of acks received\n")
fmt.Fprintf(w, "# TYPE srt_packets_ack_received gauge\n")
fmt.Fprintf(w, "srt_packets_ack_received{%s} %d %d\n", common, caller.packetAckReceived, srtTime)
fmt.Fprintf(w, "srt_packets_ack_received_total{%s} %d %d\n", common, caller.packetAckReceived, srtTime)

// Packets Nack Received
fmt.Fprintf(w, "# HELP srt_packets_nack_received Number of nacks received\n")
fmt.Fprintf(w, "# TYPE srt_packets_nack_received gauge\n")
fmt.Fprintf(w, "srt_packets_nack_received{%s} %d %d\n", common, caller.packetNackReceived, srtTime)

// TODO(hugo): Add receive metrics from 'srtCallerStats'?
fmt.Fprintf(w, "srt_packets_nack_received_total{%s} %d %d\n", common, caller.packetNackReceived, srtTime)
}

}

// Minimalist prometheus exporter
func (h *httpServer) metrics(w http.ResponseWriter, r *http.Request) {
m := h.metricsSnapshot()

/* CPU */

cpuTime := m.cpu.Time.UnixMilli()
fmt.Fprintf(w, "# HELP linux_proc_user_total Time spent in user mode, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_user_total counter\n")
fmt.Fprintf(w, "linux_proc_user_total %d %d\n", m.cpu.User, cpuTime)

fmt.Fprintf(w, "# HELP linux_proc_system_total Time spent in system mode, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_system_total counter\n")
fmt.Fprintf(w, "linux_proc_system_total %d %d\n", m.cpu.System, cpuTime)

fmt.Fprintf(w, "# HELP linux_proc_iowait_total Time spent waiting for I/O to complete, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_iowait_total counter\n")
fmt.Fprintf(w, "linux_proc_iowait_total %d %d\n", m.cpu.Iowait, cpuTime)

fmt.Fprintf(w, "# HELP linux_proc_irq_total Time spent servicing interrupts, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_irq_total counter\n")
fmt.Fprintf(w, "linux_proc_irq_total %d %d\n", m.cpu.Irq, cpuTime)

fmt.Fprintf(w, "# HELP linux_proc_softirq_total Time spent servicing soft interrupts, in ticks\n")
fmt.Fprintf(w, "# TYPE linux_proc_softirq_total counter\n")
fmt.Fprintf(w, "linux_proc_softirq_total %d %d\n", m.cpu.SoftIrq, cpuTime)

/* Memory */

memTime := m.mem.Time.UnixMilli()
fmt.Fprintf(w, "# HELP linux_mem_used_bytes Amount of memory used, in bytes\n")
fmt.Fprintf(w, "# TYPE linux_mem_used_bytes gauge\n")
fmt.Fprintf(w, "linux_mem_used_bytes %d %d\n", m.mem.MemUsed*1024, memTime)

fmt.Fprintf(w, "# HELP linux_mem_free_bytes Amount of free memory, in bytes\n")
fmt.Fprintf(w, "# TYPE linux_mem_free_bytes gauge\n")
fmt.Fprintf(w, "linux_mem_free_bytes %d %d\n", m.mem.MemFree*1024, memTime)

/* TODO(hugo) I/O Status metrics via `iostat` when recording is implemented */

/* Load Average */

// The timestamp is an int64 (milliseconds since epoch, i.e. 1970-01-01
// 00:00:00 UTC, excluding leap seconds), represented as required by Go's
// ParseInt() function.
loadAvgTime := m.loadAvg.Time.UnixMilli()
fmt.Fprintf(w, "# HELP load_avg_one Load average over one minute\n")
fmt.Fprintf(w, "# TYPE load_avg_one gauge\n")
fmt.Fprintf(w, "load_avg_one %f %d\n", m.loadAvg.One, loadAvgTime)

fmt.Fprintf(w, "# HELP load_avg_five Load average over five minutes\n")
fmt.Fprintf(w, "# TYPE load_avg_five gauge\n")
fmt.Fprintf(w, "load_avg_five %f %d\n", m.loadAvg.Five, loadAvgTime)

fmt.Fprintf(w, "# HELP load_avg_fifteen Load average over fifteen minutes\n")
fmt.Fprintf(w, "# TYPE load_avg_fifteen gauge\n")
fmt.Fprintf(w, "load_avg_fifteen %f %d\n", m.loadAvg.Fifteen, loadAvgTime)

/* SRT Statistics */

writeSRTStatsMeta(w)
writeSRTStats(w, &m.compSinkStats, "combined")
writeSRTStats(w, &m.presentSinkStats, "present")
writeSRTStats(w, &m.camSinkStats, "camera")

/* GStreamer Statistics */

for k, v := range m.pipelineStats.qosEvents {
fmt.Fprintf(w, "# HELP gst_qos_events Number of qos events\n")
fmt.Fprintf(w, "# TYPE gst_qos_events gauge\n")
fmt.Fprintf(w, "gst_qos_events{source=\"%s\"} %d\n", k, v)
fmt.Fprintf(w, "# HELP gst_qos_events_total Number of qos events\n")
fmt.Fprintf(w, "# TYPE gst_qos_events_total gauge\n")
fmt.Fprintf(w, "gst_qos_events_total{source=\"%s\"} %d\n", k, v)
}
}

Expand Down
17 changes: 12 additions & 5 deletions streamd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
)

type metrics struct {
compSinkStats srtStats
pipelineStats pipelineStats // Updated by bus watch on main thread
cpu systemstat.CPUSample
mem systemstat.MemSample
loadAvg systemstat.LoadAvgSample
compSinkStats srtStats
presentSinkStats srtStats
camSinkStats srtStats
pipelineStats pipelineStats // Updated by bus watch on main thread
cpu systemstat.CPUSample
mem systemstat.MemSample
loadAvg systemstat.LoadAvgSample
}

func (d *daemon) metricsProcess(ctx context.Context) {
Expand All @@ -26,19 +28,24 @@ func (d *daemon) metricsProcess(ctx context.Context) {
mem := systemstat.GetMemSample()
loadAvg := systemstat.GetLoadAvgSample()

// []*srtStats{combStats, presentStats, camStats}
srtStats, err := d.srtStatistics()
if err != nil {
klog.Warningf("failed to retrieve statistics from srtsinks: %v", err)
continue
}

srtCompStats := srtStats[0]
srtPresentStats := srtStats[1]
srtCamStats := srtStats[2]

d.mu.Lock()
d.metrics.cpu = cpu
d.metrics.mem = mem
d.metrics.loadAvg = loadAvg
d.metrics.compSinkStats = *srtCompStats
d.metrics.presentSinkStats = *srtPresentStats
d.metrics.camSinkStats = *srtCamStats
d.mu.Unlock()

time.Sleep(time.Second * 1)
Expand Down

0 comments on commit 32947ed

Please sign in to comment.