877 |
|
*/ |
878 |
|
@SuppressWarnings("serial") |
879 |
|
static final class ConsumerTask<T> extends ForkJoinTask<Void> |
880 |
< |
implements Runnable { |
880 |
> |
implements Runnable, CompletableFuture.AsynchronousCompletionTask { |
881 |
|
final BufferedSubscription<T> consumer; |
882 |
|
ConsumerTask(BufferedSubscription<T> consumer) { |
883 |
|
this.consumer = consumer; |
930 |
|
* Blocking control relies on the "waiter" field. Producers set |
931 |
|
* the field before trying to block, but must then recheck (via |
932 |
|
* offer) before parking. Signalling then just unparks and clears |
933 |
< |
* waiter field. If the producer and consumer are both in the same |
934 |
< |
* ForkJoinPool, or consumers are running in commonPool, the |
935 |
< |
* producer attempts to help run consumer tasks that it forked |
936 |
< |
* before blocking. To avoid potential cycles, only one level of |
937 |
< |
* helping is currently supported. |
933 |
> |
* waiter field. If the producer and/or consumer are using a |
934 |
> |
* ForkJoinPool, the producer attempts to help run consumer tasks |
935 |
> |
* via ForkJoinPool.helpAsyncBlocker before blocking. |
936 |
|
* |
937 |
|
* This class uses @Contended and heuristic field declaration |
938 |
|
* ordering to reduce false-sharing-based memory contention among |
952 |
|
volatile long demand; // # unfilled requests |
953 |
|
int maxCapacity; // reduced on OOME |
954 |
|
int putStat; // offer result for ManagedBlocker |
957 |
– |
int helpDepth; // nested helping depth (at most 1) |
955 |
|
volatile int ctl; // atomic run state flags |
956 |
|
volatile int head; // next position to take |
957 |
|
int tail; // next position to put |
1104 |
|
* initial offer return 0. |
1105 |
|
*/ |
1106 |
|
final int submit(T item) { |
1107 |
< |
int stat; Executor e; ForkJoinWorkerThread w; |
1108 |
< |
if ((stat = offer(item)) == 0 && helpDepth == 0 && |
1112 |
< |
((e = executor) instanceof ForkJoinPool)) { |
1113 |
< |
helpDepth = 1; |
1114 |
< |
Thread thread = Thread.currentThread(); |
1115 |
< |
if ((thread instanceof ForkJoinWorkerThread) && |
1116 |
< |
((w = (ForkJoinWorkerThread)thread)).getPool() == e) |
1117 |
< |
stat = internalHelpConsume(w.workQueue, item); |
1118 |
< |
else if (e == ForkJoinPool.commonPool()) |
1119 |
< |
stat = externalHelpConsume |
1120 |
< |
(ForkJoinPool.commonSubmitterQueue(), item); |
1121 |
< |
helpDepth = 0; |
1122 |
< |
} |
1123 |
< |
if (stat == 0 && (stat = offer(item)) == 0) { |
1107 |
> |
int stat; |
1108 |
> |
if ((stat = offer(item)) == 0) { |
1109 |
|
putItem = item; |
1110 |
|
timeout = 0L; |
1111 |
< |
try { |
1112 |
< |
ForkJoinPool.managedBlock(this); |
1113 |
< |
} catch (InterruptedException ie) { |
1114 |
< |
timeout = INTERRUPTED; |
1111 |
> |
putStat = 0; |
1112 |
> |
ForkJoinPool.helpAsyncBlocker(executor, this); |
1113 |
> |
if ((stat = putStat) == 0) { |
1114 |
> |
try { |
1115 |
> |
ForkJoinPool.managedBlock(this); |
1116 |
> |
} catch (InterruptedException ie) { |
1117 |
> |
timeout = INTERRUPTED; |
1118 |
> |
} |
1119 |
> |
stat = putStat; |
1120 |
|
} |
1131 |
– |
stat = putStat; |
1121 |
|
if (timeout < 0L) |
1122 |
|
Thread.currentThread().interrupt(); |
1123 |
|
} |
1125 |
|
} |
1126 |
|
|
1127 |
|
/** |
1139 |
– |
* Tries helping for FJ submitter. |
1140 |
– |
*/ |
1141 |
– |
private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) { |
1142 |
– |
int stat = 0; |
1143 |
– |
if (w != null) { |
1144 |
– |
ForkJoinTask<?> t; |
1145 |
– |
while ((t = w.peek()) != null && (t instanceof ConsumerTask)) { |
1146 |
– |
if ((stat = offer(item)) != 0 || !w.tryUnpush(t)) |
1147 |
– |
break; |
1148 |
– |
((ConsumerTask<?>)t).consumer.consume(); |
1149 |
– |
} |
1150 |
– |
} |
1151 |
– |
return stat; |
1152 |
– |
} |
1153 |
– |
|
1154 |
– |
/** |
1155 |
– |
* Tries helping for non-FJ submitter. |
1156 |
– |
*/ |
1157 |
– |
private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) { |
1158 |
– |
int stat = 0; |
1159 |
– |
if (w != null) { |
1160 |
– |
ForkJoinTask<?> t; |
1161 |
– |
while ((t = w.peek()) != null && (t instanceof ConsumerTask)) { |
1162 |
– |
if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t)) |
1163 |
– |
break; |
1164 |
– |
((ConsumerTask<?>)t).consumer.consume(); |
1165 |
– |
} |
1166 |
– |
} |
1167 |
– |
return stat; |
1168 |
– |
} |
1169 |
– |
|
1170 |
– |
/** |
1128 |
|
* Timeout version; similar to submit. |
1129 |
|
*/ |
1130 |
|
final int timedOffer(T item, long nanos) { |
1131 |
< |
int stat; Executor e; |
1132 |
< |
if ((stat = offer(item)) == 0 && helpDepth == 0 && |
1176 |
< |
((e = executor) instanceof ForkJoinPool)) { |
1177 |
< |
Thread thread = Thread.currentThread(); |
1178 |
< |
if (((thread instanceof ForkJoinWorkerThread) && |
1179 |
< |
((ForkJoinWorkerThread)thread).getPool() == e) || |
1180 |
< |
e == ForkJoinPool.commonPool()) { |
1181 |
< |
helpDepth = 1; |
1182 |
< |
ForkJoinTask<?> t; |
1183 |
< |
long deadline = System.nanoTime() + nanos; |
1184 |
< |
while ((t = ForkJoinTask.peekNextLocalTask()) != null && |
1185 |
< |
(t instanceof ConsumerTask)) { |
1186 |
< |
if ((stat = offer(item)) != 0 || |
1187 |
< |
(nanos = deadline - System.nanoTime()) <= 0L || |
1188 |
< |
!t.tryUnfork()) |
1189 |
< |
break; |
1190 |
< |
((ConsumerTask<?>)t).consumer.consume(); |
1191 |
< |
} |
1192 |
< |
helpDepth = 0; |
1193 |
< |
} |
1194 |
< |
} |
1195 |
< |
if (stat == 0 && (stat = offer(item)) == 0 && |
1196 |
< |
(timeout = nanos) > 0L) { |
1131 |
> |
int stat; |
1132 |
> |
if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) { |
1133 |
|
putItem = item; |
1134 |
< |
try { |
1135 |
< |
ForkJoinPool.managedBlock(this); |
1136 |
< |
} catch (InterruptedException ie) { |
1137 |
< |
timeout = INTERRUPTED; |
1134 |
> |
putStat = 0; |
1135 |
> |
ForkJoinPool.helpAsyncBlocker(executor, this); |
1136 |
> |
if ((stat = putStat) == 0) { |
1137 |
> |
try { |
1138 |
> |
ForkJoinPool.managedBlock(this); |
1139 |
> |
} catch (InterruptedException ie) { |
1140 |
> |
timeout = INTERRUPTED; |
1141 |
> |
} |
1142 |
> |
stat = putStat; |
1143 |
|
} |
1203 |
– |
stat = putStat; |
1144 |
|
if (timeout < 0L) |
1145 |
|
Thread.currentThread().interrupt(); |
1146 |
|
} |