Skip to content

Commit

Permalink
fix: race on batch processing (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
chameleon82 authored Nov 2, 2023
1 parent c4f4cb6 commit 8152194
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
14 changes: 14 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,17 @@ jobs:
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

test-race:
runs-on: ubuntu-latest
steps:
- name: Checkout Repo
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v4
with:
go-version: ${{ env.DEFAULT_GO_VERSION }}
check-latest: true
cache-dependency-path: "**/go.sum"
- name: Run tests with race detector
run: make test-race
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@
test-coverage:
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out -o coverage.html

.PHONY: test-race
test-race:
go test -race ./...
20 changes: 19 additions & 1 deletion sdk/logs/batch_log_record_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ type batchLogRecordProcessor struct {
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopped atomic.Bool
}

func (lrp *batchLogRecordProcessor) Shutdown(ctx context.Context) error {
var err error
lrp.stopOnce.Do(func() {
lrp.stopped.Store(true)
wait := make(chan struct{})
go func() {
close(lrp.stopCh)
Expand Down Expand Up @@ -199,7 +201,12 @@ func NewBatchLogRecordProcessor(exporter LogRecordExporter, options ...BatchLogR
return blp
}

func (lrp batchLogRecordProcessor) OnEmit(rol ReadableLogRecord) {
func (lrp *batchLogRecordProcessor) OnEmit(rol ReadableLogRecord) {

// Do not enqueue spans after Shutdown.
if lrp.stopped.Load() {
return
}
// Do not enqueue logs if we are just going to drop them.
if lrp.e == nil {
return
Expand Down Expand Up @@ -322,6 +329,17 @@ func (lrp *batchLogRecordProcessor) enqueue(sd ReadableLogRecord) {

// ForceFlush exports all ended logs that have not yet been exported.
func (lrp *batchLogRecordProcessor) ForceFlush(ctx context.Context) error {

// Interrupt if context is already canceled.
if err := ctx.Err(); err != nil {
return err
}
// Do nothing after Shutdown.
// Do not enqueue spans after Shutdown.
if lrp.stopped.Load() {
return nil
}

var err error
if lrp.e != nil {
flushCh := make(chan struct{})
Expand Down

0 comments on commit 8152194

Please sign in to comment.