Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observables continue being evaluated after stream completes #22

Open
w0utert opened this issue Jul 30, 2017 · 4 comments
Open

Observables continue being evaluated after stream completes #22

w0utert opened this issue Jul 30, 2017 · 4 comments

Comments

@w0utert
Copy link

w0utert commented Jul 30, 2017

When a stream (chain of observables) completes, subscribers will stop receiving events (as expected), but any observables higher up the chain that would have been evaluated if the stream was still active, are still evaluated after the stream completes.

See the following Lua snippet:

rx = require 'rx'

subject_1 = rx.Subject.create()

positive_cubes = subject_1:
  filter(function(v)
    print "--> PC 1"
    return v > 0
  end):
  map(function(v)
    print "--> PC 2"
    return v*v*v
  end):
  take(2)

positive_even_cubes_times_two = positive_cubes:
  filter(function(v)
    print "--> PEC 1"
    return (v % 2) == 0
  end):
  map(function(v)
    print "--> PEC 2"
    return v*2
  end):
  first()

negative_cubes = subject_1:
  filter(function(v)
    print "--> NC 1"
    return v < 0
  end):
  map(function(v)
    print "--> NC 2"
    return v*v*v
  end)

pc_sub = positive_cubes:dump("positiveCubes")
pec_sub = positive_even_cubes_times_two:dump("positiveEvenCubesTimesTwo")
nc_sub = negative_cubes:dump("negativeCubes")

subject_1:onNext(-1)
subject_1:onNext(4)
subject_1:onNext(3)
subject_1:onNext(5)
subject_1:onNext(2)
subject_1:onNext(4)
subject_1:onNext(-4)
subject_1:onNext(4)
subject_1:onNext(-4)

This creates three streams sharing the same subject. The positiveCubes stream completes after two values are pushed for which x^3 is positive. The positiveEvenCubesTimesTwo stream completes after one value is pushed for which x^3 is even. The negativeCubes stream never completes.

Each observable produces a side effect (the print statement). Execting the script produces the following output:

lua rx_test.lua 
--> PC 1
--> PC 1
--> NC 1
--> NC 2
negativeCubes onNext: -1
--> PC 1
--> PC 2
positiveCubes onNext: 64
--> PC 1
--> PC 2
--> PEC 1
--> PEC 2
positiveEvenCubesTimesTwo onNext: 128
positiveEvenCubesTimesTwo onCompleted
--> NC 1
--> PC 1
--> PC 2
positiveCubes onNext: 27
positiveCubes onCompleted
--> PC 1
--> PC 2
--> PEC 1
--> NC 1
--> PC 1
--> PC 2
--> PC 1
--> PC 2
--> NC 1
--> PC 1
--> PC 2
--> PC 1
--> PC 2
--> NC 1
--> PC 1
--> PC 2
--> PC 1
--> PC 2
--> NC 1
--> PC 1
--> PC 1
--> NC 1
--> NC 2
negativeCubes onNext: -64
--> PC 1
--> PC 2
--> PC 1
--> PC 2
--> NC 1
--> PC 1
--> PC 1
--> NC 1
--> NC 2
negativeCubes onNext: -64

As can be seen from the output, the observables in the positiveCubes stream are still being called after the stream completes. Interestingly, the observables in the positiveEvenCubesTimesTwo streams are also called after the stream completes, but only once, after that they are not evaluated anymore (?).

Recreating the same example using rxJS:

rx = require('rxjs/Rx');

subject_1 = new rx.Subject()

positive_cubes = subject_1.
  filter(function(v) {
    console.log('--> PC 1');
    return (v > 0);
  }).
  map(function(v) {
    console.log('--> PC 2');
    return v*v*v
  }).
  take(2);

positive_even_cubes_times_two = positive_cubes.
  filter(function(v) {
    console.log('--> PEC 1');
    return ((v % 2) == 0);
  }).
  map(function(v) {
    console.log('--> PEC 2');
    return (v*2)
  }).
  first();

negative_cubes = subject_1.
  filter(function(v) {
    console.log('--> NC 1');
    return (v < 0);
  }).
  map(function(v) {
    console.log('--> NC 2');
    return (v*v*v)
  });

positive_cubes.
  subscribe(function(x) {
    console.log('positiveCubes: onNext: ' + x)
  },
  null,
  function() {
    console.log('postitiveCubes: onCompleted');
  });

positive_even_cubes_times_two.
  subscribe(function(x) {
    console.log('positiveEvenCubesTimesTwo: onNext: ' + x)
  },
  null,
  function() {
    console.log('postitiveEvenCubesTimesTwo: onCompleted');
  });

negative_cubes.
  subscribe(function(x) {
    console.log('negativeCubes: onNext: ' + x)
  },
  null,
  function() {
    console.log('negativeCubes: onCompleted');
  });

subject_1.next(-1)
subject_1.next(4)
subject_1.next(3)
subject_1.next(5)
subject_1.next(2)
subject_1.next(4)
subject_1.next(-4)
subject_1.next(4)
subject_1.next(-4)

The output looks as expected:

node rx_test.js 
--> PC 1
--> PC 1
--> NC 1
--> NC 2
negativeCubes: onNext: -1
--> PC 1
--> PC 2
positiveCubes: onNext: 64
--> PC 1
--> PC 2
--> PEC 1
--> PEC 2
positiveEvenCubesTimesTwo: onNext: 128
postitiveEvenCubesTimesTwo: onCompleted
--> NC 1
--> PC 1
--> PC 2
positiveCubes: onNext: 27
postitiveCubes: onCompleted
--> NC 1
--> NC 1
--> NC 1
--> NC 1
--> NC 1
--> NC 2
negativeCubes: onNext: -64
--> NC 1
--> NC 1
--> NC 2
negativeCubes: onNext: -64

Observables not being disposed after the stream completes becomes a problem if the number of streams is large, or if temporary/transient streams are created. Over time, the number of observables that are being evaluated unnecessarily because they are part of a completed stream will start to add up.

@italonascimento
Copy link

Any news on this?
Maybe some help needed to close this issue?

@bb010g
Copy link

bb010g commented Jan 24, 2018

This is due to the take* operators not unsubscribing after completion. Observer would need some refactoring to support this.

@4O4
Copy link
Contributor

4O4 commented Oct 17, 2020

Hello, for anyone who might be interested in this topic: I've decided to maintain my own fork (https://github.com/4O4/lua-reactivex) which includes a refactoring in order to fix all the underlying issues and introduce a proper automatic unsubscription mechanism inspired by RxJS internals.

This was a major redesign of some core parts of the code which would probably never be merged in this repo, that's why I decided to make a friendly fork under totally different name to indicate that these two projects are totally different, yet mostly still compatible (unless you rely on bugs like the one described in this issue). You can check out what was the scope of changes in the diff in this PR: 4O4#2

I'm planning to include in this fork some other patches which were sent to this repository but never merged. Also in the meantime I discovered that the tests don't cover the code very well and are leaking, so I started to improve them and also reconfigure coverage reports to better visualize the parts of code which were assumed to be tested, but actually are not (this effort is partially finished and can be found in this PR: 4O4#4). So I guess my focus will be on incrementally improving the tests too, not only a standard feature / bugfix development.

Contributions are welcome, if anyone wants to help me with development and bringing some important patches to this fork I would really appreaciate it. Currently I'm maintaining it for my own needs, but I'm sure there are lot of people who could benefit from an updated and actively maintained version of this library. Maybe we will even be able to merge back with this original repository some day if bjornbytes will be interested, who knows. But currently it is as it is, this repo is getting a bit stale and I can't wait for critical patches forever so that's why I'm choosing this alternative path and am willing to move the development forward myself under the separate "brand" :)

@4O4
Copy link
Contributor

4O4 commented Oct 17, 2020

Here's the output of the code from the first post in this issue, while using the current version of my reactivex fork:

$ lua issue-22.lua
--> PC 1
--> PC 1
--> NC 1
--> NC 2
negativeCubes onNext: -1
--> PC 1
--> PC 2
positiveCubes onNext: 64
--> PC 1
--> PC 2
--> PEC 1
--> PEC 2
positiveEvenCubesTimesTwo onNext: 128
positiveEvenCubesTimesTwo onCompleted
--> NC 1
--> PC 1
--> PC 2
positiveCubes onNext: 27
positiveCubes onCompleted
--> NC 1
--> NC 1
--> NC 1
--> NC 1
--> NC 1
--> NC 2
negativeCubes onNext: -64
--> NC 1
--> NC 1
--> NC 2
negativeCubes onNext: -64

As you can see it is exactly the same output as expected and exactly the same as RxJS is giving. All of that is achieved without any hacky :unsubscribe()s added everywhere "just in case" ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants