Skip to content

Commit

Permalink
Merge pull request #7 from vansante/fixDataRace
Browse files Browse the repository at this point in the history
Fix data races in the emitting of events + add test
  • Loading branch information
vansante authored Dec 17, 2018
2 parents dcd6ffb + bc86fb3 commit f730fc6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 24 deletions.
51 changes: 27 additions & 24 deletions emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (

// Emitter is the base struct which manages event subscriptions and calls all registered handlers on event emits.
type Emitter struct {
sync.RWMutex

mu sync.RWMutex
async bool
capturers []*Capturer
listeners map[EventType][]*Listener
Expand All @@ -28,32 +27,36 @@ func NewEmitter(async bool) (em *Emitter) {
// EmitEvent emits the given event to all listeners and capturers
func (em *Emitter) EmitEvent(event EventType, arguments ...interface{}) {
// If we have no single listeners for this event, skip
em.mu.RLock()
if len(em.listenersOnce) > 0 {
em.mu.RUnlock()
// Get a full lock, we are changing a map
em.Lock()
em.mu.Lock()
// Copy the slice
listenersOnce := em.listenersOnce[event]
// Create new empty slice
em.listenersOnce[event] = make([]*Listener, 0)
em.Unlock()
em.mu.Unlock()

// No lock needed, we are working with an inaccessible copy
em.emitListenerEvents(listenersOnce, arguments)
} else {
em.mu.RUnlock()
}

em.mu.RLock()
// If we have no listeners for this event, skip
if len(em.listeners[event]) > 0 {
em.RLock()
em.emitListenerEvents(em.listeners[event], arguments)
em.RUnlock()
}
em.mu.RUnlock()

em.mu.RLock()
// If we have no capturers, skip
if len(em.capturers) > 0 {
em.RLock()
em.emitCapturerEvents(em.capturers, event, arguments)
em.RUnlock()
}
em.mu.RUnlock()
}

func (em *Emitter) emitListenerEvents(listeners []*Listener, arguments []interface{}) {
Expand All @@ -78,8 +81,8 @@ func (em *Emitter) emitCapturerEvents(capturers []*Capturer, event EventType, ar

// AddListener adds a listener for the given event type
func (em *Emitter) AddListener(event EventType, handler HandleFunc) (listener *Listener) {
em.Lock()
defer em.Unlock()
em.mu.Lock()
defer em.mu.Unlock()

listener = &Listener{
handler: handler,
Expand All @@ -90,8 +93,8 @@ func (em *Emitter) AddListener(event EventType, handler HandleFunc) (listener *L

// ListenOnce adds a listener for the given event type that removes itself after it has been fired once
func (em *Emitter) ListenOnce(event EventType, handler HandleFunc) (listener *Listener) {
em.Lock()
defer em.Unlock()
em.mu.Lock()
defer em.mu.Unlock()

listener = &Listener{
handler: handler,
Expand All @@ -102,8 +105,8 @@ func (em *Emitter) ListenOnce(event EventType, handler HandleFunc) (listener *Li

// AddCapturer adds an event capturer for all events
func (em *Emitter) AddCapturer(handler CaptureFunc) (capturer *Capturer) {
em.Lock()
defer em.Unlock()
em.mu.Lock()
defer em.mu.Unlock()

capturer = &Capturer{
handler: handler,
Expand All @@ -114,8 +117,8 @@ func (em *Emitter) AddCapturer(handler CaptureFunc) (capturer *Capturer) {

// RemoveListener removes the registered given listener for the given event
func (em *Emitter) RemoveListener(event EventType, listener *Listener) {
em.Lock()
defer em.Unlock()
em.mu.Lock()
defer em.mu.Unlock()

for index, list := range em.listeners[event] {
if list == listener {
Expand All @@ -135,25 +138,25 @@ func (em *Emitter) RemoveListener(event EventType, listener *Listener) {

// RemoveAllListenersForEvent removes all registered listeners for a given event type
func (em *Emitter) RemoveAllListenersForEvent(event EventType) {
em.Lock()
defer em.Unlock()
em.mu.Lock()
defer em.mu.Unlock()

em.listeners[event] = make([]*Listener, 0)
}

// RemoveAllListeners removes all registered listeners for all event types
func (em *Emitter) RemoveAllListeners() {
em.Lock()
defer em.Unlock()
em.mu.Lock()
defer em.mu.Unlock()

em.listeners = make(map[EventType][]*Listener)
em.listenersOnce = make(map[EventType][]*Listener)
}

// RemoveCapturer removes the given capturer
func (em *Emitter) RemoveCapturer(capturer *Capturer) {
em.Lock()
defer em.Unlock()
em.mu.Lock()
defer em.mu.Unlock()

for index, capt := range em.capturers {
if capt == capturer {
Expand All @@ -165,8 +168,8 @@ func (em *Emitter) RemoveCapturer(capturer *Capturer) {

// RemoveAllCapturers removes all registered capturers
func (em *Emitter) RemoveAllCapturers() {
em.Lock()
defer em.Unlock()
em.mu.Lock()
defer em.mu.Unlock()

em.capturers = make([]*Capturer, 0)
}
Expand Down
26 changes: 26 additions & 0 deletions emitter_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eventemitter

import (
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -129,3 +130,28 @@ func TestEmitNonAsyncRecursive(t *testing.T) {
t.Fail()
}
}

func TestMultipleRoutineEmitListen(t *testing.T) {
e := NewEmitter(true)

wg := sync.WaitGroup{}
wg.Add(20)

for i := 0; i < 10; i++ {
go func() {
e.EmitEvent(EventType("blurp"), 1, 2, 3)
e.EmitEvent(EventType("worst"), 321)
wg.Done()
}()
}

for i := 0; i < 10; i++ {
go func() {
e.AddListener(EventType("blurp"), func(arguments ...interface{}) {})
e.AddCapturer(func(event EventType, arguments ...interface{}) {})
wg.Done()
}()
}

wg.Wait()
}

0 comments on commit f730fc6

Please sign in to comment.