Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Mar 20, 2024
1 parent 959de84 commit 5c75727
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 211 deletions.
132 changes: 56 additions & 76 deletions cmd/outline-ss-server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

const (
// How often to report the active IP key TunnelTime.
activeIPKeyTrackerReportingInterval = 5 * time.Second
)
// How often to report the active IP key TunnelTime.
const tunnelTimeTrackerReportingInterval = 5 * time.Second

var since = time.Since
// Now is stubbable for testing.
var Now = time.Now

type outlineMetrics struct {
ipinfo.IPInfoMap
activeIPKeyTracker
tunnelTimeTracker

buildInfo *prometheus.GaugeVec
accessKeys prometheus.Gauge
Expand All @@ -45,8 +44,8 @@ type outlineMetrics struct {
timeToCipherMs *prometheus.HistogramVec
// TODO: Add time to first byte.

IPKeyTimePerKey *prometheus.CounterVec
IPKeyTimePerLocation *prometheus.CounterVec
TunnelTimePerKey *prometheus.CounterVec
TunnelTimePerLocation *prometheus.CounterVec

tcpProbes *prometheus.HistogramVec
tcpOpenConnections *prometheus.CounterVec
Expand All @@ -61,93 +60,89 @@ type outlineMetrics struct {
var _ service.TCPMetrics = (*outlineMetrics)(nil)
var _ service.UDPMetrics = (*outlineMetrics)(nil)

type ReportTunnelTimeFunc func(IPKey, ipinfo.IPInfo, time.Duration)

type activeClient struct {
IPKey IPKey
clientInfo ipinfo.IPInfo
connectionCount int
startTime time.Time
}

func (c *activeClient) IsActive() bool {
return c.connectionCount > 0
}

type IPKey struct {
ip string
accessKey string
}

type activeIPKeyTracker struct {
activeClients map[IPKey]activeClient
metricsCallback func(IPKey, time.Duration)
type tunnelTimeTracker struct {
activeClients map[IPKey]activeClient
reportTunnelTime ReportTunnelTimeFunc
}

// Reports time connected for all active clients, called at a regular interval.
func (t *activeIPKeyTracker) reportAll() {
func (t *tunnelTimeTracker) reportAll(now time.Time) {
if len(t.activeClients) == 0 {
logger.Debugf("No active clients. No IPKey activity to report.")
return
}
for _, c := range t.activeClients {
t.reportDuration(c)
t.reportDuration(c, now)
}
}

// Reports time connected for a given active client.
func (t *activeIPKeyTracker) reportDuration(c activeClient) {
connDuration := since(c.startTime)
func (t *tunnelTimeTracker) reportDuration(c activeClient, now time.Time) {
connDuration := now.Sub(c.startTime)
logger.Debugf("Reporting activity for key `%v`, duration: %v", c.IPKey.accessKey, connDuration)
t.metricsCallback(c.IPKey, connDuration)
t.reportTunnelTime(c.IPKey, c.clientInfo, connDuration)

// Reset the start time now that it's been reported.
c.startTime = time.Now()
c.startTime = Now()
t.activeClients[c.IPKey] = c
}

// Registers a new active connection for a client [net.Addr] and access key.
func (t *activeIPKeyTracker) startConnection(addr net.Addr, accessKey string) {
hostname, _, _ := net.SplitHostPort(addr.String())
func (t *tunnelTimeTracker) startConnection(clientInfo ipinfo.IPInfo, clientAddr net.Addr, accessKey string) {
hostname, _, _ := net.SplitHostPort(clientAddr.String())
ipKey := IPKey{ip: hostname, accessKey: accessKey}

c, exists := t.activeClients[ipKey]
if !exists {
c = activeClient{ipKey, 0, time.Now()}
c = activeClient{ipKey, clientInfo, 0, Now()}
}
c.connectionCount++
t.activeClients[ipKey] = c
}

// Removes an active connection for a client [net.Addr] and access key.
func (t *activeIPKeyTracker) stopConnection(addr net.Addr, accessKey string) {
hostname, _, _ := net.SplitHostPort(addr.String())
func (t *tunnelTimeTracker) stopConnection(clientAddr net.Addr, accessKey string) {
hostname, _, _ := net.SplitHostPort(clientAddr.String())
ipKey := IPKey{ip: hostname, accessKey: accessKey}

c := t.activeClients[ipKey]
c, exists := t.activeClients[ipKey]
if !exists {
logger.Warningf("Failed to find active client")
return
}
c.connectionCount--
if !c.IsActive() {
t.reportDuration(c)
if c.connectionCount <= 0 {
t.reportDuration(c, Now())
delete(t.activeClients, ipKey)
return
}
t.activeClients[ipKey] = c
}

func newActiveIPKeyTracker(callback func(IPKey, time.Duration)) *activeIPKeyTracker {
t := &activeIPKeyTracker{activeClients: make(map[IPKey]activeClient), metricsCallback: callback}
ticker := time.NewTicker(activeIPKeyTrackerReportingInterval)
done := make(chan struct{})
func newTunnelTimeTracker(report ReportTunnelTimeFunc) *tunnelTimeTracker {
tracker := &tunnelTimeTracker{activeClients: make(map[IPKey]activeClient), reportTunnelTime: report}
ticker := time.NewTicker(tunnelTimeTrackerReportingInterval)
go func() {
for {
select {
case <-ticker.C:
t.reportAll()
case <-done:
logger.Debugf("done channel %p closed", done)
ticker.Stop()
return
}
for t := range ticker.C {
tracker.reportAll(t)
}
}()
return t
return tracker
}

// newPrometheusOutlineMetrics constructs a metrics object that uses
Expand Down Expand Up @@ -205,14 +200,14 @@ func newPrometheusOutlineMetrics(ip2info ipinfo.IPInfoMap, registerer prometheus
float64(7 * 24 * time.Hour.Milliseconds()), // Week
},
}, []string{"status"}),
IPKeyTimePerKey: prometheus.NewCounterVec(prometheus.CounterOpts{
TunnelTimePerKey: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "shadowsocks",
Name: "ip_key_connectivity_seconds",
Name: "tunnel_time_seconds",
Help: "Time at least 1 connection was open for a (IP, access key) pair, per key",
}, []string{"access_key"}),
IPKeyTimePerLocation: prometheus.NewCounterVec(prometheus.CounterOpts{
TunnelTimePerLocation: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "shadowsocks",
Name: "ip_key_connectivity_seconds_per_location",
Name: "tunnel_time_seconds_per_location",
Help: "Time at least 1 connection was open for a (IP, access key) pair, per location",
}, []string{"location", "asn"}),
dataBytes: prometheus.NewCounterVec(
Expand Down Expand Up @@ -256,12 +251,12 @@ func newPrometheusOutlineMetrics(ip2info ipinfo.IPInfoMap, registerer prometheus
Help: "Entries removed from the UDP NAT table",
}),
}
m.activeIPKeyTracker = *newActiveIPKeyTracker(m.reportIPKeyActivity)
m.tunnelTimeTracker = *newTunnelTimeTracker(m.addTunnelTime)

// TODO: Is it possible to pass where to register the collectors?
registerer.MustRegister(m.buildInfo, m.accessKeys, m.ports, m.tcpProbes, m.tcpOpenConnections, m.tcpClosedConnections, m.tcpConnectionDurationMs,
m.dataBytes, m.dataBytesPerLocation, m.timeToCipherMs, m.udpPacketsFromClientPerLocation, m.udpAddedNatEntries, m.udpRemovedNatEntries,
m.IPKeyTimePerKey, m.IPKeyTimePerLocation)
m.TunnelTimePerKey, m.TunnelTimePerLocation)
return m
}

Expand All @@ -274,28 +269,18 @@ func (m *outlineMetrics) SetNumAccessKeys(numKeys int, ports int) {
m.ports.Set(float64(ports))
}

func (m *outlineMetrics) AddOpenTCPConnection(addr net.Addr) {
clientInfo, err := ipinfo.GetIPInfoFromAddr(m.IPInfoMap, addr)
if err != nil {
logger.Warningf("Failed client info lookup: %v", err)
}
logger.Debugf("Got info \"%#v\" for IP %v", clientInfo, addr.String())
func (m *outlineMetrics) AddOpenTCPConnection(clientInfo ipinfo.IPInfo) {
m.tcpOpenConnections.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)).Inc()
}

// Reports total time connected, by access key and by country.
func (m *outlineMetrics) reportIPKeyActivity(ipKey IPKey, duration time.Duration) {
m.IPKeyTimePerKey.WithLabelValues(ipKey.accessKey).Add(duration.Seconds())
ip := net.ParseIP(ipKey.ip)
clientInfo, err := ipinfo.GetIPInfoFromIP(m.IPInfoMap, ip)
if err != nil {
logger.Warningf("Failed client info lookup: %v", err)
}
m.IPKeyTimePerLocation.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)).Add(duration.Seconds())
// Reports total time connected (i.e. TunnelTime), by access key and by country.
func (m *outlineMetrics) addTunnelTime(ipKey IPKey, clientInfo ipinfo.IPInfo, duration time.Duration) {
m.TunnelTimePerKey.WithLabelValues(ipKey.accessKey).Add(duration.Seconds())
m.TunnelTimePerLocation.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN)).Add(duration.Seconds())
}

func (m *outlineMetrics) AddAuthenticatedTCPConnection(addr net.Addr, accessKey string) {
m.activeIPKeyTracker.startConnection(addr, accessKey)
func (m *outlineMetrics) AddAuthenticatedTCPConnection(clientInfo ipinfo.IPInfo, clientAddr net.Addr, accessKey string) {
m.tunnelTimeTracker.startConnection(clientInfo, clientAddr, accessKey)
}

// addIfNonZero helps avoid the creation of series that are always zero.
Expand All @@ -312,12 +297,7 @@ func asnLabel(asn int) string {
return fmt.Sprint(asn)
}

func (m *outlineMetrics) AddClosedTCPConnection(addr net.Addr, accessKey, status string, data metrics.ProxyMetrics, duration time.Duration) {
clientInfo, err := ipinfo.GetIPInfoFromAddr(m.IPInfoMap, addr)
if err != nil {
logger.Warningf("Failed client info lookup: %v", err)
}
logger.Debugf("Got info \"%#v\" for IP %v", clientInfo, addr.String())
func (m *outlineMetrics) AddClosedTCPConnection(clientInfo ipinfo.IPInfo, clientAddr net.Addr, accessKey, status string, data metrics.ProxyMetrics, duration time.Duration) {
m.tcpClosedConnections.WithLabelValues(clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN), status, accessKey).Inc()
m.tcpConnectionDurationMs.WithLabelValues(status).Observe(duration.Seconds() * 1000)
addIfNonZero(data.ClientProxy, m.dataBytes, "c>p", "tcp", accessKey)
Expand All @@ -329,7 +309,7 @@ func (m *outlineMetrics) AddClosedTCPConnection(addr net.Addr, accessKey, status
addIfNonZero(data.ProxyClient, m.dataBytes, "c<p", "tcp", accessKey)
addIfNonZero(data.ProxyClient, m.dataBytesPerLocation, "c<p", "tcp", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN))

m.activeIPKeyTracker.stopConnection(addr, accessKey)
m.tunnelTimeTracker.stopConnection(clientAddr, accessKey)
}

func (m *outlineMetrics) AddUDPPacketFromClient(clientInfo ipinfo.IPInfo, accessKey, status string, clientProxyBytes, proxyTargetBytes int) {
Expand All @@ -347,16 +327,16 @@ func (m *outlineMetrics) AddUDPPacketFromTarget(clientInfo ipinfo.IPInfo, access
addIfNonZero(int64(proxyClientBytes), m.dataBytesPerLocation, "c<p", "udp", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN))
}

func (m *outlineMetrics) AddUDPNatEntry(addr net.Addr, accessKey string) {
func (m *outlineMetrics) AddUDPNatEntry(clientInfo ipinfo.IPInfo, clientAddr net.Addr, accessKey string) {
m.udpAddedNatEntries.Inc()

m.activeIPKeyTracker.startConnection(addr, accessKey)
m.tunnelTimeTracker.startConnection(clientInfo, clientAddr, accessKey)
}

func (m *outlineMetrics) RemoveUDPNatEntry(addr net.Addr, accessKey string) {
func (m *outlineMetrics) RemoveUDPNatEntry(clientInfo ipinfo.IPInfo, clientAddr net.Addr, accessKey string) {
m.udpRemovedNatEntries.Inc()

m.activeIPKeyTracker.stopConnection(addr, accessKey)
m.tunnelTimeTracker.stopConnection(clientAddr, accessKey)
}

func (m *outlineMetrics) AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64) {
Expand Down
Loading

0 comments on commit 5c75727

Please sign in to comment.