Skip to content

Commit

Permalink
Implement conditional toxics
Browse files Browse the repository at this point in the history
  • Loading branch information
hwrdtm committed Aug 3, 2023
1 parent 8f9d002 commit d354567
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 12 deletions.
90 changes: 80 additions & 10 deletions toxic_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
type ToxicCollection struct {
sync.Mutex

noop *toxics.ToxicWrapper
proxy *Proxy
chain [][]*toxics.ToxicWrapper
links map[string]*ToxicLink
noop *toxics.ToxicWrapper
proxy *Proxy
chain [][]*toxics.ToxicWrapper
toxicConditions [][]*toxics.ToxicCondition
links map[string]*ToxicLink
}

func NewToxicCollection(proxy *Proxy) *ToxicCollection {
Expand All @@ -34,13 +35,16 @@ func NewToxicCollection(proxy *Proxy) *ToxicCollection {
Type: "noop",
Enabled: true,
},
proxy: proxy,
chain: make([][]*toxics.ToxicWrapper, stream.NumDirections),
links: make(map[string]*ToxicLink),
proxy: proxy,
chain: make([][]*toxics.ToxicWrapper, stream.NumDirections),
toxicConditions: make([][]*toxics.ToxicCondition, stream.NumDirections),
links: make(map[string]*ToxicLink),
}
for dir := range collection.chain {
collection.chain[dir] = make([]*toxics.ToxicWrapper, 1, toxics.Count()+1)
collection.chain[dir][0] = collection.noop
collection.toxicConditions[dir] = make([]*toxics.ToxicCondition, 1, toxics.Count()+1)
collection.toxicConditions[dir][0] = nil
}
return collection
}
Expand Down Expand Up @@ -116,6 +120,8 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er
// Set the wrapper to be enabled if no condition is specified.
if wrapper.Condition == nil {
wrapper.Enabled = true
} else {
wrapper.Condition.ToxicWrapper = wrapper
}

// Check if toxic already exists
Expand Down Expand Up @@ -208,9 +214,70 @@ func (c *ToxicCollection) StartLink(
logger = zerolog.Nop()
}

link := NewToxicLink(c.proxy, c, direction, logger)
link.Start(server, name, input, output)
c.links[name] = link
// If the direction is upstream, we need to run matchers and update
// toxics if matched.
if direction == stream.Upstream {
// Write input to the matcher writer so that we can match the input
// in parallel while piping it through the link.
streamChan := make(chan *stream.StreamChunk)
streamChanWriter := stream.NewChanWriter(streamChan)
forkedInput := io.TeeReader(input, streamChanWriter)

// Fire of a goroutine to match all conditions separately.
go c.matchAllToxicConditions(streamChan, direction)

link := NewToxicLink(c.proxy, c, direction, logger)
link.Start(server, name, forkedInput, output)
c.links[name] = link
} else {
link := NewToxicLink(c.proxy, c, direction, logger)
link.Start(server, name, input, output)
c.links[name] = link
}
}

// matchAllToxicConditions matches all conditions for a given direction, and updates
// the toxics if matched.
func (c *ToxicCollection) matchAllToxicConditions(
streamChan chan *stream.StreamChunk,
direction stream.Direction,
) {
c.Lock()
defer c.Unlock()

var logger zerolog.Logger
if c.proxy.Logger != nil {
logger = *c.proxy.Logger
} else {
logger = zerolog.Nop()
}

streamChunk := <-streamChan

// Loop through all conditions and try to match them.
// If matched, enable the toxic.
for _, condition := range c.toxicConditions[direction] {
if condition == nil {
continue
}

matched, err := condition.TryMatch(streamChunk.Data)
if err != nil {
logger.Warn().Err(err).Msg("Error matching condition")
continue
}

if matched {
// Get the toxic wrapper from the condition and enable it.
newToxicWrapper := condition.ToxicWrapper
newToxicWrapper.Enabled = true

// TODO: Do I need to call this? Currently fails when uncommented, though.
// c.chainUpdateToxic(newToxicWrapper)
}
}

return
}

func (c *ToxicCollection) RemoveLink(name string) {
Expand All @@ -236,6 +303,7 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) {
dir := toxic.Direction
toxic.Index = len(c.chain[dir])
c.chain[dir] = append(c.chain[dir], toxic)
c.toxicConditions[dir] = append(c.toxicConditions[dir], toxic.Condition)

// Asynchronously add the toxic to each link
wg := sync.WaitGroup{}
Expand All @@ -253,6 +321,7 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) {

func (c *ToxicCollection) chainUpdateToxic(toxic *toxics.ToxicWrapper) {
c.chain[toxic.Direction][toxic.Index] = toxic
c.toxicConditions[toxic.Direction][toxic.Index] = toxic.Condition

// Asynchronously update the toxic in each link
group := sync.WaitGroup{}
Expand All @@ -279,6 +348,7 @@ func (c *ToxicCollection) chainRemoveToxic(ctx context.Context, toxic *toxics.To

dir := toxic.Direction
c.chain[dir] = append(c.chain[dir][:toxic.Index], c.chain[dir][toxic.Index+1:]...)
c.toxicConditions[dir] = append(c.toxicConditions[dir][:toxic.Index], c.toxicConditions[dir][toxic.Index+1:]...)
for i := toxic.Index; i < len(c.chain[dir]); i++ {
c.chain[dir][i].Index = i
}
Expand Down
4 changes: 2 additions & 2 deletions toxics/toxic.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type StatefulToxic interface {
}

type ToxicCondition struct {
MatcherType string `json:"matcherType"`
ToxicWrapper *ToxicWrapper `json:"-"`
MatcherType string `json:"matcherType"`

// A matcher means this toxic is only enabled when the matcher matches on any data
// passing through the link this toxic is attached to.
Expand Down Expand Up @@ -125,7 +126,6 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) {
defer close(s.running)
//#nosec
if toxic.Enabled && rand.Float32() < toxic.Toxicity {
println("Piping toxic", toxic)
toxic.Pipe(s)
} else {
new(NoopToxic).Pipe(s)
Expand Down

0 comments on commit d354567

Please sign in to comment.