Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter class to dask and do the tests for it #283

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

xuzifan08
Copy link

Added filter class to dask.py and four tests associated with it.

@codecov
Copy link

codecov bot commented Nov 25, 2019

Codecov Report

Merging #283 into master will decrease coverage by 0.89%.
The diff coverage is 33.33%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master     #283     +/-   ##
=========================================
- Coverage   94.69%   93.79%   -0.9%     
=========================================
  Files          13       13             
  Lines        1620     1644     +24     
=========================================
+ Hits         1534     1542      +8     
- Misses         86      102     +16
Impacted Files Coverage Δ
streamz/dask.py 87.78% <33.33%> (-12.22%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a4f2a27...b7472e6. Read the comment docs.

@@ -131,6 +131,67 @@ def test_buffer(c, s, a, b):
assert source.loop == c.loop


@pytest.mark.slow
def test_filter(backend):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to remove the backend arg here, and in the scatter statement. Streamz only has a dask backend at the moment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I remove the backend for all the four tests I have added?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already changed the four tests but the build is still failed

@@ -140,6 +157,24 @@ def update(self, x, who=None):
return self._emit(result)


@DaskStream.register_api()
class filter(DaskStream):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also need the modifications to the gather and other nodes as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made changes to gather already. I also compared other nodes. There is a slightly difference between the dask.starmap and parallel.starmap. Do I need to change that one?

@xuzifan08
Copy link
Author

Hi, @martindurant @mrocklin, this is Zifan. I'm working on this branch with @CJ-Wright .

We added class filter() to dask and modified class gather() according to parallel.py (including the associated tests). However, we ran into an issue which causes the test test_integration_from_stream fail in the test_dataframe.py. The test passes on the Stream but fails on Daskstream. After troubleshooting for a while, we guess there is something wrong with the process of serializing the code to different dask clusters, but we are not sure what the specific issue is.

Can we pick your brains about how we can resolve it? Thanks!

@martindurant
Copy link
Member

Sorry for not following up here - are you still interested in this, @xuzifan08 ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants