Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bidirectional Toxics #132

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 2.2.0 (unreleased)

* Add support for bidirectional toxics (for protocol-aware toxics) #132

# 2.1.3

* Update `/version` endpoint to also return a charset of utf-8. #204
Expand Down
87 changes: 84 additions & 3 deletions CREATING_TOXICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,18 @@ and how much memory you are comfortable with using.
## Stateful toxics

If a toxic needs to store extra information for a connection such as the number of bytes
transferred (See `limit_data` toxic), a state object can be created by implementing the
`StatefulToxic` interface. This interface defines the `NewState()` function that can create
a new state object with default values set.
transferred (See the [limit_data toxic](https://github.com/Shopify/toxiproxy/blob/master/toxics/limit_data.go)),
a state object can be created by implementing the `StatefulToxic` interface. This interface
defines the `NewState()` function that can create a new state object with default values set.

```go
func (t *ExampleToxic) NewState() interface{} {
return &ExampleToxicState{
BytesRemaining: t.BytesAllowed,
SomeOtherState: true,
}
}
```

When a stateful toxic is created, the state object will be stored on the `ToxicStub` and
can be accessed from `toxic.Pipe()`:
Expand All @@ -133,6 +142,78 @@ If necessary, some global state can be stored in the toxic struct, which will no
instanced per-connection. These fields cannot have a custom default value set and will
not be thread-safe, so proper locking or atomic operations will need to be used.

## Bidirectional toxics

Regular toxics are limited to data flowing in a single direction, so they can't make decisions
for the `downstream` based on a request in the `upstream`. For things like protocol aware toxics
this is a problem.

Bidirectional toxics allow state to be shared for the `upstream` and `downstream` pipes in a single
toxic implementation. They also ensure direction-specific code is always run on the correct pipe
(a toxic that only works on the `upstream` can't be added to the `downstream`).

Creating a bidirectional toxic is done by implementing a second `Pipe()` function called `PipeUpstream()`.
The implementation is same as a regular toxic, and can be paired with other types such as a stateful toxic.

One use case of a bidirectional toxic is to mock out the backend server entirely, which is shown below:

```go
type EchoToxic struct {}

type EchoToxicState struct {
Request chan *stream.StreamChunk
}

// PipeUpstream handles the upstream direction
func (t *EchoToxic) PipeUpstream(stub *toxics.ToxicStub) {
state := stub.State.(*EchoToxicState)

for {
select {
case <-stub.Interrupt:
return
case c := <-stub.Input:
if c == nil {
// Close the downstream when the client closes
close(state.Request)
stub.Close()
return
}
// Send the data to the downstream through the state object
state.Request <- c
}
}
}

// Pipe() will only handle the downstream on a bidirectional toxic
func (t *EchoToxic) Pipe(stub *toxics.ToxicStub) {
state := stub.State.(*EchoToxicState)

for {
select {
case <-stub.Interrupt:
return
case c := <-state.Request: // Read from the upstream instead of the server
if c == nil {
stub.Close()
return
}
stub.Output <- c
}
}
}

func (t *EchoToxic) NewState() interface{} {
return &EchoToxicState{
Request: make(chan *stream.StreamChunk),
}
}
```

This example will loop back all data send to the server back to the client. Another use case seen
within toxiproxy is to filter http response modifications based on the request URL (See the
[http toxic](https://github.com/Shopify/toxiproxy/tree/master/toxics/http.go)).

## Using `io.Reader` and `io.Writer`

If your toxic involves modifying the data going through a proxy, you can use the `ChanReader`
Expand Down
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func apiError(resp http.ResponseWriter, err error) bool {

type proxyToxics struct {
*Proxy
Toxics []toxics.Toxic `json:"toxics"`
Toxics []*toxics.ToxicWrapper `json:"toxics"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we change from Toxic to ToxicWrapper?

}

func proxyWithToxics(proxy *Proxy) (result proxyToxics) {
Expand Down
37 changes: 25 additions & 12 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
// Input > ToxicStub > ToxicStub > Output
//
type ToxicLink struct {
stubs []*toxics.ToxicStub
proxy *Proxy
toxics *ToxicCollection
input *stream.ChanWriter
output *stream.ChanReader
direction stream.Direction
stubs []*toxics.ToxicStub
proxy *Proxy
toxics *ToxicCollection
input *stream.ChanWriter
output *stream.ChanReader
direction stream.Direction
pairedLink *ToxicLink
}

func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Direction) *ToxicLink {
Expand Down Expand Up @@ -66,9 +67,7 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)
link.input.Close()
}()
for i, toxic := range link.toxics.chain[link.direction] {
if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
link.stubs[i].State = stateful.NewState()
}
link.InitPairState(toxic)

go link.stubs[i].Run(toxic)
}
Expand All @@ -87,6 +86,22 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)
}()
}

func (link *ToxicLink) InitPairState(toxic *toxics.ToxicWrapper) {
// If the toxic is stateful, create a state object or copy it from the paired link.
if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
if toxic.PairedToxic == nil || link.pairedLink.stubs[toxic.PairedToxic.Index].State == nil {
link.stubs[toxic.Index].State = stateful.NewState()
} else {
link.stubs[toxic.Index].State = link.pairedLink.stubs[toxic.PairedToxic.Index].State
}
}

// If the toxic is paired, synchronize the toxicity so they are always in the same state.
if toxic.PairedToxic != nil {
link.stubs[toxic.Index].Toxicity = link.pairedLink.stubs[toxic.PairedToxic.Index].Toxicity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If toxicity is always initialized to the paired toxic's toxicity, where is the value actually configured?

}
}

// Add a toxic to the end of the chain.
func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) {
i := len(link.stubs)
Expand All @@ -98,9 +113,7 @@ func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) {
if link.stubs[i-1].InterruptToxic() {
link.stubs[i-1].Output = newin

if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
link.stubs[i].State = stateful.NewState()
}
link.InitPairState(toxic)

go link.stubs[i].Run(toxic)
go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1])
Expand Down
7 changes: 2 additions & 5 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package toxiproxy

import (
"errors"
"net"
"sync"

"github.com/Shopify/toxiproxy/stream"
"github.com/sirupsen/logrus"
tomb "gopkg.in/tomb.v1"

"net"
)

// Proxy represents the proxy in its entirity with all its links. The main
Expand Down Expand Up @@ -176,8 +174,7 @@ func (proxy *Proxy) server() {
proxy.connections.list[name+"upstream"] = upstream
proxy.connections.list[name+"downstream"] = client
proxy.connections.Unlock()
proxy.Toxics.StartLink(name+"upstream", client, upstream, stream.Upstream)
proxy.Toxics.StartLink(name+"downstream", upstream, client, stream.Downstream)
proxy.Toxics.StartLinks(name, client, upstream)
}
}

Expand Down
81 changes: 53 additions & 28 deletions toxic_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"strings"
"sync"

Expand Down Expand Up @@ -61,18 +62,17 @@ func (c *ToxicCollection) GetToxic(name string) *toxics.ToxicWrapper {
return c.findToxicByName(name)
}

func (c *ToxicCollection) GetToxicArray() []toxics.Toxic {
func (c *ToxicCollection) GetToxicArray() []*toxics.ToxicWrapper {
c.Lock()
defer c.Unlock()

result := make([]toxics.Toxic, 0)
result := make([]*toxics.ToxicWrapper, 0)
for dir := range c.chain {
for i, toxic := range c.chain[dir] {
if i == 0 {
// Skip the first noop toxic, it should not be visible
continue
for _, toxic := range c.chain[dir] {
if len(toxic.Name) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a Hidden flag rather than relying on no name?

// Skip toxics with no name, they should be hidden
result = append(result, toxic)
}
result = append(result, toxic)
}
}
return result
Expand All @@ -96,20 +96,31 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er
return nil, joinError(err, ErrBadRequestBody)
}

switch strings.ToLower(wrapper.Stream) {
case "downstream":
wrapper.Direction = stream.Downstream
case "upstream":
wrapper.Direction = stream.Upstream
default:
return nil, ErrInvalidStream
if toxics.New(wrapper) == nil {
return nil, ErrInvalidToxicType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ErrInvalidToxicType the only kind of error we could get here?
I should understand how this works.

}
if wrapper.Name == "" {
wrapper.Name = fmt.Sprintf("%s_%s", wrapper.Type, wrapper.Stream)

if wrapper.PairedToxic == nil {
switch strings.ToLower(wrapper.Stream) {
case "downstream":
wrapper.Direction = stream.Downstream
case "upstream":
wrapper.Direction = stream.Upstream
default:
return nil, ErrInvalidStream
}
} else {
wrapper.Stream = "both"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the behaviour should be here since there's only one option for bidirectional toxics.
Right now stream can be set to anything and it will just be changed to both by the server.

wrapper.Direction = stream.Downstream
wrapper.PairedToxic.Direction = stream.Upstream
}

if toxics.New(wrapper) == nil {
return nil, ErrInvalidToxicType
if wrapper.Name == "" {
if wrapper.PairedToxic != nil {
wrapper.Name = wrapper.Type
} else {
wrapper.Name = fmt.Sprintf("%s_%s", wrapper.Type, wrapper.Stream)
}
}

found := c.findToxicByName(wrapper.Name)
Expand All @@ -129,6 +140,9 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er
}

c.chainAddToxic(wrapper)
if wrapper.PairedToxic != nil {
c.chainAddToxic(wrapper.PairedToxic)
}
return wrapper, nil
}

Expand All @@ -151,6 +165,10 @@ func (c *ToxicCollection) UpdateToxicJson(name string, data io.Reader) (*toxics.
}
toxic.Toxicity = attrs.Toxicity

if toxic.PairedToxic != nil {
toxic.PairedToxic.Toxicity = attrs.Toxicity
c.chainUpdateToxic(toxic.PairedToxic)
}
c.chainUpdateToxic(toxic)
return toxic, nil
}
Expand All @@ -163,19 +181,30 @@ func (c *ToxicCollection) RemoveToxic(name string) error {

toxic := c.findToxicByName(name)
if toxic != nil {
if toxic.PairedToxic != nil {
c.chainRemoveToxic(toxic.PairedToxic)
}
c.chainRemoveToxic(toxic)
return nil
}
return ErrToxicNotFound
}

func (c *ToxicCollection) StartLink(name string, input io.Reader, output io.WriteCloser, direction stream.Direction) {
func (c *ToxicCollection) StartLinks(name string, client, upstream net.Conn) {
c.Lock()
defer c.Unlock()

link := NewToxicLink(c.proxy, c, direction)
link.Start(name, input, output)
c.links[name] = link
linkUp := NewToxicLink(c.proxy, c, stream.Upstream)
linkDown := NewToxicLink(c.proxy, c, stream.Downstream)

linkUp.pairedLink = linkDown
linkDown.pairedLink = linkUp

linkUp.Start(name+"upstream", client, upstream)
linkDown.Start(name+"downstream", upstream, client)

c.links[name+"upstream"] = linkUp
c.links[name+"downstream"] = linkDown
}

func (c *ToxicCollection) RemoveLink(name string) {
Expand All @@ -187,12 +216,8 @@ func (c *ToxicCollection) RemoveLink(name string) {
// All following functions assume the lock is already grabbed
func (c *ToxicCollection) findToxicByName(name string) *toxics.ToxicWrapper {
for dir := range c.chain {
for i, toxic := range c.chain[dir] {
if i == 0 {
// Skip the first noop toxic, it has no name
continue
}
if toxic.Name == name {
for _, toxic := range c.chain[dir] {
if len(toxic.Name) > 0 && toxic.Name == name {
return toxic
}
}
Expand Down
Loading