8 |
|
import java.util.concurrent.*; |
9 |
|
|
10 |
|
/** |
11 |
< |
* Create PRODUCERS publishers each with PROCESSORS processors |
11 |
> |
* Creates PRODUCERS publishers each with PROCESSORS processors |
12 |
|
* each with CONSUMERS subscribers, each sent ITEMS items, with |
13 |
< |
* max CAP buffering; repeat REPS times |
13 |
> |
* max CAP buffering; repeats REPS times |
14 |
|
*/ |
15 |
|
public class SubmissionPublisherLoops4 { |
16 |
|
static final int ITEMS = 1 << 20; |
36 |
|
" CONSUMERS: " + CONSUMERS + |
37 |
|
" CAP: " + CAP); |
38 |
|
for (int rep = 0; rep < reps; ++rep) { |
39 |
< |
long startTime = System.nanoTime(); |
40 |
< |
for (int i = 0; i < PRODUCERS; ++i) |
41 |
< |
new Pub().fork(); |
42 |
< |
phaser.arriveAndAwaitAdvance(); |
43 |
< |
long elapsed = System.nanoTime() - startTime; |
44 |
< |
double secs = ((double)elapsed) / (1000L * 1000 * 1000); |
45 |
< |
double ips = NEXTS / secs; |
46 |
< |
System.out.printf("Time: %7.2f", secs); |
47 |
< |
System.out.printf(" items per sec: %14.2f\n", ips); |
48 |
< |
System.out.println(ForkJoinPool.commonPool()); |
39 |
> |
oneRun(); |
40 |
> |
Thread.sleep(1000); |
41 |
|
} |
42 |
|
} |
43 |
|
|
44 |
+ |
static void oneRun() throws Exception { |
45 |
+ |
long startTime = System.nanoTime(); |
46 |
+ |
for (int i = 0; i < PRODUCERS; ++i) |
47 |
+ |
new Pub().fork(); |
48 |
+ |
phaser.arriveAndAwaitAdvance(); |
49 |
+ |
long elapsed = System.nanoTime() - startTime; |
50 |
+ |
double secs = ((double)elapsed) / (1000L * 1000 * 1000); |
51 |
+ |
double ips = NEXTS / secs; |
52 |
+ |
System.out.printf("Time: %7.2f", secs); |
53 |
+ |
System.out.printf(" items per sec: %14.2f\n", ips); |
54 |
+ |
System.out.println(ForkJoinPool.commonPool()); |
55 |
+ |
} |
56 |
+ |
|
57 |
|
static final class Sub implements Flow.Subscriber<Boolean> { |
58 |
|
int count; |
59 |
|
Flow.Subscription subscription; |