-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FluxSampleTimeout Javadoc concerns #3689
Comments
Okay, I have now done some ground work on the Operator, but I have reached a standstill. my work is present in the branch https://github.com/MikkelHJuul/reactor-core/tree/issues/3689 The implementation plainly ditch using a queue. (which seems to be OK since the queue was used in an unsafe way, and would only really result in caching more Publishers than needed (you only need 1, and can only propagate values from that 1)) (and overflowing would drop elements silently) I'm very aware that some corner-cases are not met in the implementation I link. My issue now, is that I disagree with some of the tests, and how the operator works/should work. This could obviously be a thing that I will simply have to be completely backwards compatible (so we cannot change this now), but here are the cases I disagree with:
test: @Test
void lateSubscriber() {
Flux<Integer> input = Flux.just(4);
int[] discards = {-1};
Flux<Integer> sampled = input.sampleTimeout(i -> Mono.empty()) // instant playback
.doOnDiscard(i -> discards[0] = i);
ZeroRequestOnSubscribeSubscriber testSub = new ZeroRequestOnSubscribeSubscriber();
sampled.subscribe(testSub);
assert testSub.next == 0;
testSub.subs.request(1);
assert testSub.next == 0; // value still not played
Integer val = sampled.blockFirst();
assert testSub.next == 0; // no value appeared here
assert val == 4; // 4 is available if you requested eagerly
assert discards[0] == -1;
}
static class ZeroRequestOnSubscribeSubscriber extends BaseSubscriber<Integer> {
Subscriber subs;
int next;
@Override
protected void hookOnSubscribe(Subscription s) {
subs = s;
}
@Override
protected void hookOnNext(Integer i) {
next = i;
}
} I disagree with: @Test
public void scanOther() {
...
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(main.other);
...
} this test tests that they are both null, they should be verified separately (i break this, since main.other is set at startup, but I also think that it's incorrect to test that two nulls are the same). A possible way around doing too much here is to simply document this behavior in the Javadoc.
|
Hi, thanks for the report and the work you did on your branch. I spent some time looking into the existing implementation and tried to frame your concerns around what I learned.
Before addressing these concerns, I believe there is some misunderstanding with regards to what ends up in the queue. Whenever a new item is emitted from the source, the implementation handling timeout companions cancels the current timeout companion and loses its reference. There is always at most one reference to a companion. The queue is used by the companion wrapper class ( Now to the actual raised points:
However, it doesn't look incorrect per se, it does make sense to cancel the timeout Publisher. The assumption that upon cancellation the item is discarded might not hold true though.
Please let me know if I got this right and we can decide what to do about these. |
hi @chemicL thanks for your help I'm not entirely sure what you are asking me to provide here on all points. But I will try
I just noted another thing, in the test above, I didn't explicitly state that the subscriber was not completed, and I just checked it; it is not. So if you subscribe to a "completed" FluxSampleTimeout and do not request in the onSubscribe, the subscription will hang infinitely, since no signals can ever end/complete the subscription; why it should be either emit+complete on late requests, or complete instantly without emitting. I'm still not entirely sure the queue is needed, and I don't really understand the race you are describing (but it is several weeks since I looked at the code) please let me know if there is anything else you need me to answer, or discuss |
First of all, the javadoc has a
FIXME
. This ultimately helps to confuse the reader.I will clear out the question in the
FIXME
though;maxConcurrency
is not a terrible name for the variable. Emissions from the source is added onto a queue, and read from the queue rapidly hereafter it seems. Therefore the queue/buffer protect for concurrency mitigating data loss. (It's not clear to me why this has not been passed along on the stack in stead - but all of this is a bigger discussion - the same goes for the ReplayBuffer interfaces between add and replay). I think the default buffer size may even be a bit beefy (who has 32cores to overwhelm the buffer?) in any case the buffer size here really do regard the concurrency, and nothing else really.Now onto what is actually important for me to have cleared; discard support.
The operator discard elements in the queue when downstream cancels, or when upstream errors. Concurrent overwhelming of the buffer will result in the items simply losing reference; which could result in untimely triggering. These items are not discarded nor dropped, and their companion
Publisher
are not cancelled.But the misleading wording tells the reader that items not part of the sampling are discarded; they are not. The trigger that a client needs to implement themselves to handle "discard" is the companion
Publisher#cancel
. The marble diagram hints at this, but I think the docs could be more clear here.I can also report of a possible bug: it seems to me that the
Publisher
companion of an element that is successfully emitted to a downstream thatcancelOnNext
will itself be cancelled, causing the normal/successful element to become "discarded". I will investigate/recreate later.The text was updated successfully, but these errors were encountered: