forked from krka/futures-guide
-
Notifications
You must be signed in to change notification settings - Fork 0
/
WhereDoesItRunTest.java
104 lines (87 loc) · 4.45 KB
/
WhereDoesItRunTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package se.krka.futures;
import com.google.common.util.concurrent.Futures;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class WhereDoesItRunTest {
@Test
public void testExecutor() {
ExecutorService runner = Util.newExecutor("runner");
ExecutorService banana = Util.newExecutor("banana");
ExecutorService apple = Util.newExecutor("apple");
ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
IntStream.range(0, 1000000).parallel().mapToObj(i ->
runner.submit(
() -> {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture
.supplyAsync(() -> Util.currThread(), banana)
.thenApplyAsync(s -> s + " -> " + Util.currThread(), apple)
.thenApply(s -> s + " -> " + Util.currThread())
.thenAccept(s -> counters.computeIfAbsent(s, key -> new AtomicInteger()).incrementAndGet());
return voidCompletableFuture;
}
))
.collect(Collectors.toList())
.stream().map(Futures::getUnchecked)
.forEach(CompletableFuture::join);
// Expected output:
// 999180: banana -> apple -> apple
// 820: banana -> apple -> runner
counters.forEach((s, count) -> System.out.printf("%10d: %s\n", count.get(), s));
}
@Test
public void testExecutorWithException() {
ExecutorService runner = Util.newExecutor("runner");
ExecutorService banana = Util.newExecutor("banana");
ExecutorService apple = Util.newExecutor("apple");
ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
IntStream.range(0, 1000000).parallel().mapToObj(i ->
runner.submit(
() -> CompletableFuture
.supplyAsync(() -> Util.currThread(), banana)
.thenApply(s -> Util.doThrow(new RuntimeException(s + " -> " + Util.currThread())))
.thenApplyAsync(s -> s + " -> " + Util.currThread(), apple)
.exceptionally(e -> e.getMessage() + " -> " + Util.currThread())
.thenAccept(s -> counters.computeIfAbsent(s, key -> new AtomicInteger()).incrementAndGet())
))
.collect(Collectors.toList())
.stream().map(Futures::getUnchecked)
.forEach(CompletableFuture::join);
// Note that nothing ever runs on "apple"!
// Expected output:
// 1: java.lang.RuntimeException: banana -> banana -> runner
// 999997: java.lang.RuntimeException: banana -> banana -> banana
// 2: java.lang.RuntimeException: banana -> runner -> runner
counters.forEach((s, count) -> System.out.printf("%10d: %s\n", count.get(), s));
}
@Test
public void testExecutorWithExceptionAndHandle() {
ExecutorService runner = Util.newExecutor("runner");
ExecutorService banana = Util.newExecutor("banana");
ExecutorService apple = Util.newExecutor("apple");
ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
IntStream.range(0, 1000000).parallel().mapToObj(i ->
runner.submit(
() -> CompletableFuture
.supplyAsync(() -> Util.currThread(), banana)
.thenApply(s -> Util.<String>doThrow(new RuntimeException(s + " -> " + Util.currThread())))
.handleAsync((s, ex) -> ex.getMessage() + " -> " + Util.currThread(), apple)
.exceptionally(e -> e.getMessage() + " -> " + Util.currThread())
.thenAccept(s -> counters.computeIfAbsent(s, key -> new AtomicInteger()).incrementAndGet())
))
.collect(Collectors.toList())
.stream().map(Futures::getUnchecked)
.forEach(CompletableFuture::join);
// Expected output:
// 1000000: java.lang.RuntimeException: banana -> banana -> apple
counters.forEach((s, count) -> System.out.printf("%10d: %s\n", count.get(), s));
}
}