Skip to content

Commit

Permalink
ReplaySubject;
Browse files Browse the repository at this point in the history
  • Loading branch information
bjornbytes committed Nov 13, 2015
1 parent 3fe1ee4 commit 007d9bc
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 3 deletions.
40 changes: 40 additions & 0 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ RxLua
- [subscribe](#subscribeonnext-onerror-oncompleted)
- [onNext](#onnextvalues)
- [getValue](#getvalue)
- [ReplaySubject](#replaysubject)
- [create](#createbuffersize)
- [subscribe](#subscribeonnext-onerror-oncompleted)
- [onNext](#onnextvalues)

# Subscription

Expand Down Expand Up @@ -928,3 +932,39 @@ Pushes zero or more values to the BehaviorSubject. They will be broadcasted to a

Returns the last value emitted by the Subject, or the initial value passed to the constructor if nothing has been emitted yet.

# ReplaySubject

A Subject that provides new Subscribers with some or all of the most recently produced values upon subscription.

---

#### `.create(bufferSize)`

Creates a new ReplaySubject.

| Name | Type | Default | Description |
|------|------|---------|-------------|
| `bufferSize` | number (optional) | | The number of values to send to new subscribers. If nil, an infinite buffer is used (note that this could lead to memory issues). |

---

#### `:subscribe(onNext, onError, onCompleted)`

Creates a new Observer and attaches it to the ReplaySubject. Immediately broadcasts the most contents of the buffer to the Observer.

| Name | Type | Default | Description |
|------|------|---------|-------------|
| `onNext` | function | | Called when the ReplaySubject produces a value. |
| `onError` | function | | Called when the ReplaySubject terminates due to an error. |
| `onCompleted` | function | | Called when the ReplaySubject completes normally. |

---

#### `:onNext(values)`

Pushes zero or more values to the ReplaySubject. They will be broadcasted to all Observers.

| Name | Type | Default | Description |
|------|------|---------|-------------|
| `values` | *... | | |

61 changes: 60 additions & 1 deletion rx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1952,6 +1952,64 @@ end

BehaviorSubject.__call = BehaviorSubject.onNext

--- @class ReplaySubject
-- @description A Subject that provides new Subscribers with some or all of the most recently
-- produced values upon subscription.
local ReplaySubject = setmetatable({}, Subject)
ReplaySubject.__index = ReplaySubject
ReplaySubject.__tostring = util.constant('ReplaySubject')

--- Creates a new ReplaySubject.
-- @arg {number=} bufferSize - The number of values to send to new subscribers. If nil, an infinite
-- buffer is used (note that this could lead to memory issues).
-- @returns {ReplaySubject}
function ReplaySubject.create(n)
local self = {
observers = {},
stopped = false,
buffer = {},
bufferSize = n
}

return setmetatable(self, ReplaySubject)
end

--- Creates a new Observer and attaches it to the ReplaySubject. Immediately broadcasts the most
-- contents of the buffer to the Observer.
-- @arg {function} onNext - Called when the ReplaySubject produces a value.
-- @arg {function} onError - Called when the ReplaySubject terminates due to an error.
-- @arg {function} onCompleted - Called when the ReplaySubject completes normally.
function ReplaySubject:subscribe(onNext, onError, onCompleted)
local observer

if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
end

local subscription = Subject.subscribe(self, observer)

for i = 1, #self.buffer do
observer:onNext(util.unpack(self.buffer[i]))
end

return subscription
end

--- Pushes zero or more values to the ReplaySubject. They will be broadcasted to all Observers.
-- @arg {*...} values
function ReplaySubject:onNext(...)
table.insert(self.buffer, util.pack(...))
if self.bufferSize and #self.buffer > self.bufferSize then
table.remove(self.buffer, 1)
end

return Subject.onNext(self, ...)
end

ReplaySubject.__call = ReplaySubject.onNext

Observable.wrap = Observable.buffer
Observable['repeat'] = Observable.replicate

Expand All @@ -1964,5 +2022,6 @@ return {
CooperativeScheduler = CooperativeScheduler,
Subject = Subject,
AsyncSubject = AsyncSubject,
BehaviorSubject = BehaviorSubject
BehaviorSubject = BehaviorSubject,
ReplaySubject = ReplaySubject
}
63 changes: 63 additions & 0 deletions src/subjects/replaysubject.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
local Subject = require 'subjects/subject'
local Observer = require 'observer'
local util = require 'util'

--- @class ReplaySubject
-- @description A Subject that provides new Subscribers with some or all of the most recently
-- produced values upon subscription.
local ReplaySubject = setmetatable({}, Subject)
ReplaySubject.__index = ReplaySubject
ReplaySubject.__tostring = util.constant('ReplaySubject')

--- Creates a new ReplaySubject.
-- @arg {number=} bufferSize - The number of values to send to new subscribers. If nil, an infinite
-- buffer is used (note that this could lead to memory issues).
-- @returns {ReplaySubject}
function ReplaySubject.create(n)
local self = {
observers = {},
stopped = false,
buffer = {},
bufferSize = n
}

return setmetatable(self, ReplaySubject)
end

--- Creates a new Observer and attaches it to the ReplaySubject. Immediately broadcasts the most
-- contents of the buffer to the Observer.
-- @arg {function} onNext - Called when the ReplaySubject produces a value.
-- @arg {function} onError - Called when the ReplaySubject terminates due to an error.
-- @arg {function} onCompleted - Called when the ReplaySubject completes normally.
function ReplaySubject:subscribe(onNext, onError, onCompleted)
local observer

if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
end

local subscription = Subject.subscribe(self, observer)

for i = 1, #self.buffer do
observer:onNext(util.unpack(self.buffer[i]))
end

return subscription
end

--- Pushes zero or more values to the ReplaySubject. They will be broadcasted to all Observers.
-- @arg {*...} values
function ReplaySubject:onNext(...)
table.insert(self.buffer, util.pack(...))
if self.bufferSize and #self.buffer > self.bufferSize then
table.remove(self.buffer, 1)
end

return Subject.onNext(self, ...)
end

ReplaySubject.__call = ReplaySubject.onNext

return ReplaySubject
87 changes: 87 additions & 0 deletions tests/replaysubject.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
describe('ReplaySubject', function()
describe('create', function()
it('returns a ReplaySubject', function()
expect(Rx.ReplaySubject.create()).to.be.an(Rx.ReplaySubject)
end)

it('sets an appropriate buffer size if it is specified', function()
local subject = Rx.ReplaySubject.create(2)
local observer = Rx.Observer.create()
local onNext = spy(observer, '_onNext')
subject:onNext(1)
subject:onNext(2)
subject:onNext(3)
subject:subscribe(observer)
expect(onNext).to.equal({{2}, {3}})
end)

it('keeps an infinite buffer if no buffer size is specified', function()
local subject = Rx.ReplaySubject.create()
local observer = Rx.Observer.create()
local onNext = spy(observer, '_onNext')
subject:onNext(1)
subject:onNext(2)
subject:onNext(3)
subject:subscribe(observer)
expect(onNext).to.equal({{1}, {2}, {3}})
end)
end)

describe('subscribe', function()
it('returns a Subscription', function()
local subject = Rx.ReplaySubject.create()
local observer = Rx.Observer.create()
expect(subject:subscribe(observer)).to.be.an(Rx.Subscription)
end)

it('accepts 3 functions as arguments', function()
local onNext, onCompleted = spy(), spy()
local subject = Rx.ReplaySubject.create()
subject:subscribe(onNext, nil, onCompleted)
subject:onNext(5)
subject:onCompleted()
expect(onNext).to.equal({{5}})
expect(#onCompleted).to.equal(1)
end)

it('calls onNext with the current buffer', function()
local subject = Rx.ReplaySubject.create(2)
local observer = Rx.Observer.create()
local onNext = spy(observer, '_onNext')
subject:onNext(1)
subject:onNext(2)
subject:onNext(3)
subject:subscribe(observer)
expect(onNext).to.equal({{2}, {3}})
end)
end)

describe('onNext', function()
it('pushes values to all subscribers', function()
local observers = {}
local spies = {}
for i = 1, 2 do
observers[i] = Rx.Observer.create()
spies[i] = spy(observers[i], '_onNext')
end

local subject = Rx.ReplaySubject.create()
subject:subscribe(observers[1])
subject:subscribe(observers[2])
subject:onNext(1)
subject:onNext(2)
subject:onNext(3)
expect(spies[1]).to.equal({{1}, {2}, {3}})
expect(spies[2]).to.equal({{1}, {2}, {3}})
end)

it('can be called using function syntax', function()
local observer = Rx.Observer.create()
local subject = Rx.ReplaySubject.create()
local onNext = spy(observer, 'onNext')
subject:subscribe(observer)
subject(4)
expect(#onNext).to.equal(1)
end)
end)
end)
3 changes: 2 additions & 1 deletion tests/runner.lua
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ else
'subscription',
'subject',
'asyncsubject',
'behaviorsubject'
'behaviorsubject',
'replaysubject'
}

for i, file in ipairs(files) do
Expand Down
4 changes: 3 additions & 1 deletion tools/concat.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ local files = {
'src/subjects/subject.lua',
'src/subjects/asyncsubject.lua',
'src/subjects/behaviorsubject.lua',
'src/subjects/replaysubject.lua',
'src/aliases.lua'
}

Expand All @@ -80,7 +81,8 @@ local footer = [[return {
CooperativeScheduler = CooperativeScheduler,
Subject = Subject,
AsyncSubject = AsyncSubject,
BehaviorSubject = BehaviorSubject
BehaviorSubject = BehaviorSubject,
ReplaySubject = ReplaySubject
}]]

local output = ''
Expand Down

0 comments on commit 007d9bc

Please sign in to comment.