Skip to content

Commit

Permalink
Fixes race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
robgonnella committed Jun 23, 2023
1 parent 0ed368d commit b557479
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 80 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ all: $(PREFIX)/ops
$(PREFIX)/ops: $(go_deps)
cd cli && go build -ldflags '-s -w' -o $(@)

$(PREFIX)/ops-dev: $(go_deps)
cd cli && go build -race -ldflags '-s -w' -o $(@)

.PHONY: ops
ops: $(PREFIX)/ops

.PHONY: dev
dev: $(PREFIX)/ops-dev

.PHONY: test
test:
go test -v ./...
go test -v -race ./...

.PHONY: mock
mock:
Expand Down
10 changes: 10 additions & 0 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Core struct {
discovery discovery.Service
serverService server.Service
log logger.Logger
eventSubscription int
evtListeners []*EventListener
serverPollListeners []*ServerPollListener
nextListenerId int
Expand Down Expand Up @@ -64,6 +65,9 @@ func New(

func (c *Core) Stop() error {
c.discovery.Stop()
if c.eventSubscription != 0 {
c.serverService.StopStream(c.eventSubscription)
}
c.cancel()
return c.ctx.Err()
}
Expand Down Expand Up @@ -146,6 +150,9 @@ func (c *Core) RemoveEventListener(id int) {

listeners := []*EventListener{}
for _, listener := range c.evtListeners {
if listener.id == id {
close(listener.channel)
}
if listener.id != id {
listeners = append(listeners, listener)
}
Expand Down Expand Up @@ -175,6 +182,9 @@ func (c *Core) RemoveServerPollListener(id int) {

listeners := []*ServerPollListener{}
for _, listener := range c.serverPollListeners {
if listener.id == id {
close(listener.channel)
}
if listener.id != id {
listeners = append(listeners, listener)
}
Expand Down
39 changes: 31 additions & 8 deletions internal/core/core_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package core_test

import (
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/robgonnella/ops/internal/config"
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestCore(t *testing.T) {
})

t.Run("registers and removes event listener", func(st *testing.T) {
evtChan := make(chan *event.Event, 1)
evtChan := make(chan *event.Event)
id := coreService.RegisterEventListener(evtChan)

assert.Equal(st, 1, id)
Expand All @@ -168,7 +168,7 @@ func TestCore(t *testing.T) {
})

t.Run("registers and removes server listener", func(st *testing.T) {
serverChan := make(chan []*server.Server, 1)
serverChan := make(chan []*server.Server)

id := coreService.RegisterServerPollListener(serverChan)

Expand All @@ -178,16 +178,39 @@ func TestCore(t *testing.T) {
})

t.Run("monitors network", func(st *testing.T) {
defer coreService.Stop()
wg := sync.WaitGroup{}
wg.Add(2)

mockServerService.EXPECT().StreamEvents(gomock.Any())
mockServerService.EXPECT().StreamEvents(gomock.Any()).Return(1)
mockServerService.EXPECT().GetAllServersInNetworkTargets(conf.Targets)
mockScanner.EXPECT().Scan()
mockScanner.EXPECT().Scan().DoAndReturn(func() ([]*discovery.DiscoveryResult, error) {
defer func() {
coreService.Stop()
wg.Done()
}()
return []*discovery.DiscoveryResult{
{
ID: "id",
Hostname: "hostname",
IP: "ip",
OS: "os",
Status: server.StatusOnline,
Ports: []discovery.Port{
{
ID: 22,
Status: discovery.PortOpen,
},
},
},
}, nil
})
mockScanner.EXPECT().Stop()
mockServerService.EXPECT().StopStream(gomock.Any()).AnyTimes()
mockServerService.EXPECT().StopStream(1).Do(func(int) {
wg.Done()
})

go coreService.Monitor()

time.Sleep(time.Millisecond * 10)
wg.Wait()
})
}
28 changes: 19 additions & 9 deletions internal/core/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ import (

// Run runs the sequence driver for the HostInstallStage
func (c *Core) Monitor() error {
evtReceiveChan := make(chan *event.Event, 100)
evtReceiveChan := make(chan *event.Event)

// create event subscription
subscription := c.serverService.StreamEvents(evtReceiveChan)

defer c.serverService.StopStream(subscription)
c.eventSubscription = c.serverService.StreamEvents(evtReceiveChan)

// Start network scanner
go c.discovery.MonitorNetwork()
Expand All @@ -24,12 +22,16 @@ func (c *Core) Monitor() error {
go c.pollForDatabaseUpdates()

for {
select {
case <-c.ctx.Done():
return c.ctx.Err()
case evt := <-evtReceiveChan:
c.handleServerEvent(evt)
evt, ok := <-evtReceiveChan
if !ok {
c.mux.Lock()
for _, listener := range c.evtListeners {
close(listener.channel)
}
c.mux.Unlock()
return nil
}
c.handleServerEvent(evt)
}
}

Expand All @@ -47,6 +49,9 @@ func (c *Core) handleServerEvent(evt *event.Event) {

c.log.Info().Fields(fields).Msg("Event Received")

c.mux.Lock()
defer c.mux.Unlock()

for _, listener := range c.evtListeners {
listener.channel <- evt
}
Expand All @@ -59,6 +64,9 @@ func (c *Core) pollForDatabaseUpdates() error {
for {
select {
case <-c.ctx.Done():
for _, listener := range c.serverPollListeners {
close(listener.channel)
}
return c.ctx.Err()
default:
if errCount >= 5 {
Expand All @@ -77,9 +85,11 @@ func (c *Core) pollForDatabaseUpdates() error {

errCount = 0

c.mux.Lock()
for _, listener := range c.serverPollListeners {
listener.channel <- response
}
c.mux.Unlock()

time.Sleep(pollTime)
}
Expand Down
18 changes: 13 additions & 5 deletions internal/server/service.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package server

import (
"context"
"errors"
"net"
"sync"

"github.com/imdario/mergo"
"github.com/robgonnella/ops/internal/config"
Expand Down Expand Up @@ -37,23 +37,21 @@ func filterChannels(channels []*eventChannel, fn func(c *eventChannel) bool) []*

// ServerService represents our server service implementation
type ServerService struct {
ctx context.Context
log logger.Logger
repo Repo
evtChans []*eventChannel
mux sync.Mutex
}

// NewService returns a new instance server service
func NewService(conf config.Config, repo Repo) *ServerService {
log := logger.New()

ctx := context.Background()

return &ServerService{
ctx: ctx,
log: log,
repo: repo,
evtChans: []*eventChannel{},
mux: sync.Mutex{},
}
}

Expand Down Expand Up @@ -175,14 +173,22 @@ func (s *ServerService) StreamEvents(send chan *event.Event) int {
send: send,
}

s.mux.Lock()
s.evtChans = append(s.evtChans, evtChan)
s.mux.Unlock()

return evtChan.id
}

func (s *ServerService) StopStream(id int) {
s.mux.Lock()
defer s.mux.Unlock()

s.log.Info().Int("channelID", id).Msg("Filtering channel")
s.evtChans = filterChannels(s.evtChans, func(c *eventChannel) bool {
if c.id == id {
close(c.send)
}
return c.id != id
})
}
Expand All @@ -193,6 +199,8 @@ func (s *ServerService) GetServer(id string) (*Server, error) {
}

func (s *ServerService) sendServerUpdateEvent(server *Server) {
s.mux.Lock()
defer s.mux.Unlock()
for _, clientChan := range s.evtChans {
clientChan.send <- &event.Event{
Type: event.SeverUpdate,
Expand Down
2 changes: 1 addition & 1 deletion internal/server/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestServerService(t *testing.T) {
})

t.Run("streams events", func(st *testing.T) {
evtChan := make(chan *event.Event, 1)
evtChan := make(chan *event.Event)

streamID := service.StreamEvents(evtChan)

Expand Down
16 changes: 14 additions & 2 deletions internal/ui/component/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/gdamore/tcell/v2"
"github.com/rivo/tview"
"github.com/robgonnella/ops/internal/config"
"github.com/robgonnella/ops/internal/logger"
"github.com/robgonnella/ops/internal/ui/key"
"github.com/robgonnella/ops/internal/ui/style"
)
Expand All @@ -21,6 +22,8 @@ func NewConfigContext(
onSelect func(id int),
onDelete func(name string, id int),
) *ConfigContext {
log := logger.New()

colHeaders := []string{"ID", "Name", "Target", "SSH-User", "SSH-Identity", "Overrides"}
table := createTable("Context", colHeaders)

Expand All @@ -31,7 +34,12 @@ func NewConfigContext(
idStr := table.GetCell(row, 0).Text
name := table.GetCell(row, 1).Text

id, _ := strconv.Atoi(idStr)
id, err := strconv.Atoi(idStr)

if err != nil {
log.Error().Err(err).Msg("failed to delete context")
return nil
}

onDelete(name, id)

Expand All @@ -41,7 +49,11 @@ func NewConfigContext(
if evt.Key() == key.KeyEnter {
row, _ := table.GetSelection()
idStr := table.GetCell(row, 0).Text
id, _ := strconv.Atoi(idStr)
id, err := strconv.Atoi(idStr)
if err != nil {
log.Error().Err(err).Msg("failed to select new context")
return nil
}
onSelect(id)
return nil
}
Expand Down
7 changes: 0 additions & 7 deletions internal/ui/component/event.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package component

import (
"context"
"strconv"

"github.com/rivo/tview"
Expand All @@ -11,8 +10,6 @@ import (
)

type EventTable struct {
ctx context.Context
cancel context.CancelFunc
table *tview.Table
columnHeaders []string
count uint
Expand All @@ -31,11 +28,7 @@ func NewEventTable() *EventTable {
"STATUS",
}

ctx, cancel := context.WithCancel(context.Background())

return &EventTable{
ctx: ctx,
cancel: cancel,
table: createTable("events", columnHeaders),
columnHeaders: columnHeaders,
count: 0,
Expand Down
8 changes: 0 additions & 8 deletions internal/ui/component/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package component

import (
"context"

"github.com/gdamore/tcell/v2"
"github.com/rivo/tview"
"github.com/robgonnella/ops/internal/server"
Expand All @@ -11,17 +9,13 @@ import (
)

type ServerTable struct {
ctx context.Context
cancel context.CancelFunc
table *tview.Table
columnHeaders []string
}

func NewServerTable(OnSSH func(ip string)) *ServerTable {
columnHeaders := []string{"HOSTNAME", "IP", "ID", "OS", "SSH", "STATUS"}

ctx, cancel := context.WithCancel(context.Background())

table := createTable("servers", columnHeaders)

table.SetInputCapture(func(evt *tcell.EventKey) *tcell.EventKey {
Expand All @@ -36,8 +30,6 @@ func NewServerTable(OnSSH func(ip string)) *ServerTable {
})

return &ServerTable{
ctx: ctx,
cancel: cancel,
table: table,
columnHeaders: columnHeaders,
}
Expand Down
Loading

0 comments on commit b557479

Please sign in to comment.