-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel_test.go
44 lines (34 loc) · 1016 Bytes
/
model_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package microbatch
import "sync/atomic"
type JobInput struct{}
type JobOutput struct{}
type testBatchProcessor[T *JobInput, R *JobOutput] struct {
jobsDone atomic.Int32
}
func newTestBatchProcessor() *testBatchProcessor[*JobInput, *JobOutput] {
return &testBatchProcessor[*JobInput, *JobOutput]{}
}
func (bp *testBatchProcessor[T, R]) Process(batch []Job[*JobInput, *JobOutput]) error {
for _, j := range batch {
j(&JobInput{})
bp.incrementJobs()
}
return nil
}
func (bp *testBatchProcessor[T, R]) incrementJobs() {
for {
// Load current balance atomically
current := bp.jobsDone.Load()
// Calculate new balance
new := current + 1
// Try to update balance atomically
if bp.jobsDone.CompareAndSwap(current, new) {
return
}
}
}
func (bp *testBatchProcessor[T, R]) getJobsDone() int {
return int(bp.jobsDone.Load())
}
// Ensure testBatchProcessor conforms to the BatchProcessor interface.
var _ BatchProcessor[*JobInput, *JobOutput] = &testBatchProcessor[*JobInput, *JobOutput]{}