Skip to content

Commit

Permalink
Merge pull request #236 from ReactiveX/time-fix
Browse files Browse the repository at this point in the history
Fixing time based observable
  • Loading branch information
teivah committed Mar 22, 2020
2 parents e94d354 + fe66c9f commit 037b748
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 125 deletions.
224 changes: 169 additions & 55 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,39 +451,64 @@ func (o *ObservableImpl) BufferWithTime(timespan Duration, opts ...Option) Obser
}

f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
defer close(next)
observe := o.Observe(opts...)
buffer := make([]interface{}, 0)
stop := make(chan struct{})
mutex := sync.Mutex{}

checkBuffer := func() {
mutex.Lock()
if len(buffer) != 0 {
if !Of(buffer).SendContext(ctx, next) {
mutex.Unlock()
return
}
buffer = make([]interface{}, 0)
}
mutex.Unlock()
}

go func() {
defer close(next)
duration := timespan.duration()
for {
select {
case <-stop:
checkBuffer()
return
case <-ctx.Done():
return
case <-time.After(duration):
checkBuffer()
}
}
}()

for {
select {
case <-ctx.Done():
close(stop)
return
case item, ok := <-observe:
if !ok {
if len(buffer) != 0 {
Of(buffer).SendContext(ctx, next)
}
close(stop)
return
}
if item.Error() {
item.SendContext(ctx, next)
if option.getErrorStrategy() == StopOnError {
close(stop)
return
}
} else {
mutex.Lock()
buffer = append(buffer, item.V)
}
case <-time.After(timespan.duration()):
if len(buffer) != 0 {
if !Of(buffer).SendContext(ctx, next) {
return
}
buffer = make([]interface{}, 0)
mutex.Unlock()
}
}
}
}

return customObservableOperator(f, opts...)
}

Expand All @@ -498,42 +523,69 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
}

f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
defer close(next)
observe := o.Observe(opts...)
buffer := make([]interface{}, 0)
stop := make(chan struct{})
send := make(chan struct{})
mutex := sync.Mutex{}

checkBuffer := func() {
mutex.Lock()
if len(buffer) != 0 {
if !Of(buffer).SendContext(ctx, next) {
mutex.Unlock()
return
}
buffer = make([]interface{}, 0)
}
mutex.Unlock()
}

go func() {
defer close(next)
duration := timespan.duration()
for {
select {
case <-send:
checkBuffer()
case <-stop:
checkBuffer()
return
case <-ctx.Done():
return
case <-time.After(duration):
checkBuffer()
}
}
}()

for {
select {
case <-ctx.Done():
return
case item, ok := <-observe:
if !ok {
if len(buffer) != 0 {
Of(buffer).SendContext(ctx, next)
}
close(stop)
close(send)
return
}
if item.Error() {
item.SendContext(ctx, next)
if option.getErrorStrategy() == StopOnError {
close(stop)
close(send)
return
}
} else {
mutex.Lock()
buffer = append(buffer, item.V)
if len(buffer) == count {
if !Of(buffer).SendContext(ctx, next) {
return
}
buffer = make([]interface{}, 0)
mutex.Unlock()
send <- struct{}{}
} else {
mutex.Unlock()
}
}
case <-time.After(timespan.duration()):
if len(buffer) != 0 {
if !Of(buffer).SendContext(ctx, next) {
return
}
buffer = make([]interface{}, 0)
}
}
}
}
Expand Down Expand Up @@ -2676,47 +2728,77 @@ func (o *ObservableImpl) WindowWithTime(timespan Duration, opts ...Option) Obser
}

f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
defer close(next)
observe := o.Observe(opts...)
ch := option.buildChannel()
done := make(chan struct{})
empty := true
mutex := sync.Mutex{}
if !Of(FromChannel(ch)).SendContext(ctx, next) {
return
}

go func() {
defer func() {
mutex.Lock()
close(ch)
mutex.Unlock()
}()
defer close(next)
for {
select {
case <-ctx.Done():
return
case <-done:
return
case <-time.After(timespan.duration()):
mutex.Lock()
if empty {
mutex.Unlock()
continue
}
close(ch)
empty = true
ch = option.buildChannel()
if !Of(FromChannel(ch)).SendContext(ctx, next) {
close(done)
return
}
mutex.Unlock()
}
}
}()

for {
select {
case <-ctx.Done():
close(ch)
return
case <-done:
return
case item, ok := <-observe:
if !ok {
close(ch)
close(done)
return
}
if item.Error() {
mutex.Lock()
if !item.SendContext(ctx, ch) {
mutex.Unlock()
close(done)
return
}
mutex.Unlock()
if option.getErrorStrategy() == StopOnError {
close(ch)
close(done)
return
}
}
mutex.Lock()
if !item.SendContext(ctx, ch) {
mutex.Unlock()
return
}
empty = false
case <-time.After(timespan.duration()):
if empty {
continue
}
close(ch)
ch = option.buildChannel()
empty = true
if !Of(FromChannel(ch)).SendContext(ctx, next) {
return
}
mutex.Unlock()
}
}
}
Expand All @@ -2735,55 +2817,87 @@ func (o *ObservableImpl) WindowWithTimeOrCount(timespan Duration, count int, opt
}

f := func(ctx context.Context, next chan Item, option Option, opts ...Option) {
defer close(next)
observe := o.Observe(opts...)
ch := option.buildChannel()
done := make(chan struct{})
mutex := sync.Mutex{}
iCount := 0
if !Of(FromChannel(ch)).SendContext(ctx, next) {
return
}

go func() {
defer func() {
mutex.Lock()
close(ch)
mutex.Unlock()
}()
defer close(next)
for {
select {
case <-ctx.Done():
return
case <-done:
return
case <-time.After(timespan.duration()):
mutex.Lock()
if iCount == 0 {
mutex.Unlock()
continue
}
close(ch)
iCount = 0
ch = option.buildChannel()
if !Of(FromChannel(ch)).SendContext(ctx, next) {
close(done)
return
}
mutex.Unlock()
}
}
}()

for {
select {
case <-ctx.Done():
close(ch)
return
case <-done:
return
case item, ok := <-observe:
if !ok {
close(ch)
close(done)
return
}
if item.Error() {
mutex.Lock()
if !item.SendContext(ctx, ch) {
mutex.Unlock()
close(done)
return
}
mutex.Unlock()
if option.getErrorStrategy() == StopOnError {
close(ch)
close(done)
return
}
}
mutex.Lock()
if !item.SendContext(ctx, ch) {
mutex.Unlock()
return
}
iCount++
if iCount == count {
close(ch)
ch = option.buildChannel()
iCount = 0
ch = option.buildChannel()
if !Of(FromChannel(ch)).SendContext(ctx, next) {
mutex.Unlock()
close(done)
return
}
}
case <-time.After(timespan.duration()):
if iCount == 0 {
continue
}
close(ch)
ch = option.buildChannel()
iCount = 0
if !Of(FromChannel(ch)).SendContext(ctx, next) {
return
}
mutex.Unlock()
}
}
}
Expand Down

0 comments on commit 037b748

Please sign in to comment.