Skip to content

Commit

Permalink
fix: issue ReactiveX#341
Browse files Browse the repository at this point in the history
  • Loading branch information
si3nloong committed Aug 31, 2022
1 parent 7d96f8e commit e8a0edc
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
33 changes: 33 additions & 0 deletions item.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,39 @@ func send(ctx context.Context, ch chan<- Item, items ...interface{}) {
}
}

func sendSingleItem(ctx context.Context, ch chan<- Item, strategy CloseChannelStrategy, items ...interface{}) {
if strategy == CloseChannel {
defer close(ch)
}
for _, currentItem := range items {
switch item := currentItem.(type) {
default:
rt := reflect.TypeOf(item)
switch rt.Kind() {
default:
Of(item).SendContext(ctx, ch)
case reflect.Chan:
in := reflect.ValueOf(currentItem)
for {
v, ok := in.Recv()
if !ok {
return
}
currentItem := v.Interface()
switch item := currentItem.(type) {
default:
Of(item).SendContext(ctx, ch)
case error:
Error(item).SendContext(ctx, ch)
}
}
}
case error:
Error(item).SendContext(ctx, ch)
}
}
}

// Error checks if an item is an error.
func (i Item) Error() bool {
return i.E != nil
Expand Down
2 changes: 1 addition & 1 deletion iterable_just.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ func (i *justIterable) Observe(opts ...Option) <-chan Item {
option := parseOptions(append(i.opts, opts...)...)
next := option.buildChannel()

go SendItems(option.buildContext(emptyContext), next, CloseChannel, i.items)
go sendSingleItem(option.buildContext(emptyContext), next, CloseChannel, i.items...)
return next
}
4 changes: 3 additions & 1 deletion single.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package rxgo

import "context"
import (
"context"
)

// Single is a observable with a single element.
type Single interface {
Expand Down

0 comments on commit e8a0edc

Please sign in to comment.