Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics collection and endpoint #284

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.git/
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ $(DEB): tmp/build/$(SERVER_NAME)-linux-amd64 tmp/build/$(CLI_NAME)-linux-amd64
$(word 2,$^)=/usr/bin/$(CLI_NAME) \
./share/toxiproxy.conf=/etc/init/toxiproxy.conf

docker:
docker: linux
docker build --tag="shopify/toxiproxy:git" .

docker-release: linux
Expand Down
82 changes: 82 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ All endpoints are JSON.
- **POST /proxies/{proxy}/toxics/{toxic}** - Update an active toxic
- **DELETE /proxies/{proxy}/toxics/{toxic}** - Remove an active toxic
- **POST /reset** - Enable all proxies and remove all active toxics
- **GET /metrics** - Get metrics information (global)
- **GET /events** - Returns all available events history, with optional location (see bellow)
- **GET /version** - Returns the server version number

#### Populating Proxies
Expand All @@ -478,6 +480,86 @@ A `/populate` call can be included for example at application start to ensure al
exist. It is safe to make this call several times, since proxies will be untouched as long as their
fields are consistent with the new data.

#### Metrics and event history

Toxiproxy can keep metrics and record of recent events in order to enable easy visualization of real
time traffic, and enabling automated tests to verify the behaviour of the networking. This feature
makes Toxiproxy useful also for scenarios that do not involve toxics. Please note that since Toxiproxy
supports any kind of tcp stream, it does not count requests (which exists only in some protocols)
but tcp packets.

The **metrics** endpoint gives global information about number of packets that passed through a specific
proxy. For example, we have two proxies, `Main DB` and `Distributed Cahce`, so a call to the
metrics endpoint will yield a response that looks like this:
```json
{
"Distributed Cache": 103,
"Main DB": 51
}
```

The **events** endpoint gives you recent events that happened, such as client connections and disconnections,
packets transferred and failures. The event history is limited in time and number (to prevent excessive memory consumption),
both can be configured when running the server and default to 10 seconds, 100,000 events.

The available event types can be seen [here](metrics/event_types.go).

For example, a call to the `events` endpoint will yield a response that looks like this:
```json
{
"data": [
{
"client": "[::1]:50189",
"target": "127.0.0.1:4000",
"timestamp": "2020-04-07T17:12:09.914659+03:00",
"proxyName": "Distributed Cache",
"eventType": "Client Connected"
},
{
"client": "",
"target": "127.0.0.1:4000",
"timestamp": "2020-04-07T17:12:10.446332+03:00",
"proxyName": "Distributed Cache",
"eventType": "Message"
},
{
"client": "",
"target": "127.0.0.1:4000",
"timestamp": "2020-04-07T17:12:13.448622+03:00",
"proxyName": "Distributed Cache",
"eventType": "Message"
},
{
"client": "[::1]:50189",
"target": "127.0.0.1:4000",
"timestamp": "2020-04-07T17:12:15.452107+03:00",
"proxyName": "Distributed Cache",
"eventType": "Client Disconnected"
},
{
"client": "[::1]:50189",
"target": "127.0.0.1:4000",
"timestamp": "2020-04-07T17:12:19.914659+03:00",
"proxyName": "Distributed Cache",
"eventType": "Client Connected"
},
{
"client": "[::1]:50189",
"target": "127.0.0.1:4000",
"timestamp": "2020-04-07T17:12:19.914812+03:00",
"proxyName": "Distributed Cache",
"eventType": "Upstream unavailable"
}
],
"location": "a439j"
}
```
Here we see a client that connected, sent two packets (good chance that it is also two requests, if this is HTTP)
and disconnected the tcp connection. Then it tried again, but the message could not be forwarded to the target.

The `location` field can be used in consecutive calls, to get only unread messages. The next call in this example
would be `/events?afterLocation=a439j`.

### CLI Example

```bash
Expand Down
47 changes: 47 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package toxiproxy
import (
"encoding/json"
"fmt"
"github.com/Shopify/toxiproxy/metrics"
"log"
"net"
"net/http"
"os"
"strconv"
"strings"

"github.com/Shopify/toxiproxy/toxics"
Expand Down Expand Up @@ -72,6 +74,8 @@ func (server *ApiServer) Listen(host string, port string) {
r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicUpdate).Methods("POST")
r.HandleFunc("/proxies/{proxy}/toxics/{toxic}", server.ToxicDelete).Methods("DELETE")

r.HandleFunc("/events", server.GetMetricEvents).Methods("GET")
r.HandleFunc("/metrics", server.GetMetrics).Methods("GET")
r.HandleFunc("/version", server.Version).Methods("GET")

http.Handle("/", StopBrowsersMiddleware(r))
Expand Down Expand Up @@ -108,6 +112,49 @@ func (server *ApiServer) ProxyIndex(response http.ResponseWriter, request *http.
}
}

func (server *ApiServer) GetMetricEvents(response http.ResponseWriter, request *http.Request) {
afterLocation := request.URL.Query().Get("afterLocation")
var result metrics.EventsAndLocation
if len(afterLocation) >= 1 {
locationNum, err := strconv.Atoi(afterLocation)
if err != nil {
apiError(response, newError("afterLocation must be a one returned from this api", http.StatusBadRequest))
return
}
result = metrics.GetMetricEventsStartingFrom(locationNum)
} else {
result = metrics.GetMetricEvents()
}

body, err := json.Marshal(result)

if apiError(response, err) {
return
}

response.Header().Set("Content-Type", "application/json")
_, err = response.Write(body)
if err != nil {
logrus.Warn("ProxyIndex: Failed to write response to client", err)
}
}

func (server *ApiServer) GetMetrics(response http.ResponseWriter, request *http.Request) {
data := metrics.GetMetrics()

body, err := json.Marshal(data)

if apiError(response, err) {
return
}

response.Header().Set("Content-Type", "application/json")
_, err = response.Write(body)
if err != nil {
logrus.Warn("ProxyIndex: Failed to write response to client", err)
}
}

func (server *ApiServer) ResetState(response http.ResponseWriter, request *http.Request) {
proxies := server.Collection.Proxies()

Expand Down
7 changes: 7 additions & 0 deletions cmd/toxiproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"flag"
"github.com/Shopify/toxiproxy/metrics"
"math/rand"
"os"
"os/signal"
Expand All @@ -14,17 +15,23 @@ import (
var host string
var port string
var config string
var metricsTimeToKeep string
var metricsMaxEvents int

func init() {
flag.StringVar(&host, "host", "localhost", "Host for toxiproxy's API to listen on")
flag.StringVar(&port, "port", "8474", "Port for toxiproxy's API to listen on")
flag.StringVar(&config, "config", "", "JSON file containing proxies to create on startup")
flag.StringVar(&metricsTimeToKeep, "metrics-time", "10s", "Oldest age of events to keep in toxiproxy metrics (e.g. 20s)")
flag.IntVar(&metricsMaxEvents, "metrics-max", 100000, "Max num of events to keep in toxiproxy events")
seed := flag.Int64("seed", time.Now().UTC().UnixNano(), "Seed for randomizing toxics with")
flag.Parse()
rand.Seed(*seed)
}

func main() {
metrics.InitSettings(metricsTimeToKeep, metricsMaxEvents)

server := toxiproxy.NewServer()
if len(config) > 0 {
server.PopulateConfig(config)
Expand Down
38 changes: 33 additions & 5 deletions link.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package toxiproxy

import (
"github.com/Shopify/toxiproxy/metrics"
"io"
"net"
"time"

"github.com/Shopify/toxiproxy/stream"
"github.com/Shopify/toxiproxy/toxics"
Expand Down Expand Up @@ -45,23 +48,40 @@ func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Di
next = make(chan *stream.StreamChunk)
}

link.stubs[i] = toxics.NewToxicStub(last, next)
proxyName := "Unknown"
proxyUpstream := "Unknown"
if proxy != nil {
proxyName = proxy.Name
proxyUpstream = proxy.Upstream
}
link.stubs[i] = toxics.NewToxicStub(last, next, proxyName, proxyUpstream)
last = next
}
link.output = stream.NewChanReader(last)
return link
}

// Start the link with the specified toxics
func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) {
func (link *ToxicLink) Start(name string, source net.Conn, dest io.WriteCloser) {
// assigned here so can be safely used in go routines
proxyName := link.proxy.Name
upstreamName := link.proxy.Upstream

go func() {
bytes, err := io.Copy(link.input, source)
if err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"name": proxyName,
"bytes": bytes,
"err": err,
}).Warn("Source terminated")

metrics.RegisterEvent(metrics.Event{
EventType: metrics.ClientDisconnected,
Client: source.RemoteAddr().String(),
Upstream: upstreamName,
ProxyName: proxyName,
Time: time.Now()})
}
link.input.Close()
}()
Expand All @@ -76,7 +96,7 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)
bytes, err := io.Copy(dest, link.output)
if err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"name": proxyName,
"bytes": bytes,
"err": err,
}).Warn("Destination terminated")
Expand All @@ -92,7 +112,15 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) {
i := len(link.stubs)

newin := make(chan *stream.StreamChunk, toxic.BufferSize)
link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output))

proxyName := "Unknown"
proxyUpstream := "Unknown"
if link.proxy != nil {
proxyName = link.proxy.Name
proxyUpstream = link.proxy.Upstream
}

link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output, proxyName, proxyUpstream))

// Interrupt the last toxic so that we don't have a race when moving channels
if link.stubs[i-1].InterruptToxic() {
Expand Down
6 changes: 6 additions & 0 deletions metrics/event_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package metrics

const ClientConnected = "Client Connected"
const UpstreamUnavailable = "Upstream Unavailable"
const Message = "Message"
const ClientDisconnected = "Client Disconnected"
Loading