* portion of the elements, and so may be amenable to parallel
* execution.
*
- * This interface exports a subset of expected JDK8
+ *
This interface exports a subset of expected JDK8
* functionality.
*
*
Sample usage: Here is one (of the several) ways to compute
@@ -671,7 +693,10 @@ public class ConcurrentHashMapV8
try {
wait();
} catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
+ try {
+ Thread.currentThread().interrupt();
+ } catch (SecurityException ignore) {
+ }
}
}
else
@@ -2380,14 +2405,14 @@ public class ConcurrentHashMapV8
* across threads, iteration terminates if a bounds checks fails
* for a table read.
*
- * This class extends ForkJoinTask to streamline parallel
- * iteration in bulk operations (see BulkTask). This adds only an
- * int of space overhead, which is close enough to negligible in
- * cases where it is not needed to not worry about it. Because
- * ForkJoinTask is Serializable, but iterators need not be, we
- * need to add warning suppressions.
+ * This class extends CountedCompleter to streamline parallel
+ * iteration in bulk operations. This adds only a few fields of
+ * space overhead, which is small enough in cases where it is not
+ * needed to not worry about it. Because CountedCompleter is
+ * Serializable, but iterators need not be, we need to add warning
+ * suppressions.
*/
- @SuppressWarnings("serial") static class Traverser extends ForkJoinTask {
+ @SuppressWarnings("serial") static class Traverser extends CountedCompleter {
final ConcurrentHashMapV8 map;
Node next; // the next entry to use
Object nextKey; // cached key field of next
@@ -2397,24 +2422,28 @@ public class ConcurrentHashMapV8
int baseIndex; // current index of initial table
int baseLimit; // index bound for initial table
int baseSize; // initial table size
+ int batch; // split control
/** Creates iterator for all entries in the table. */
Traverser(ConcurrentHashMapV8 map) {
this.map = map;
}
- /** Creates iterator for split() methods */
- Traverser(Traverser it) {
- ConcurrentHashMapV8 m; Node[] t;
- if ((m = this.map = it.map) == null)
- t = null;
- else if ((t = it.tab) == null && // force parent tab initialization
- (t = it.tab = m.table) != null)
- it.baseLimit = it.baseSize = t.length;
- this.tab = t;
- this.baseSize = it.baseSize;
- it.baseLimit = this.index = this.baseIndex =
- ((this.baseLimit = it.baseLimit) + it.baseIndex + 1) >>> 1;
+ /** Creates iterator for split() methods and task constructors */
+ Traverser(ConcurrentHashMapV8 map, Traverser it, int batch) {
+ super(it);
+ this.batch = batch;
+ if ((this.map = map) != null && it != null) { // split parent
+ Node[] t;
+ if ((t = it.tab) == null &&
+ (t = it.tab = map.table) != null)
+ it.baseLimit = it.baseSize = t.length;
+ this.tab = t;
+ this.baseSize = it.baseSize;
+ int hi = this.baseLimit = it.baseLimit;
+ it.baseLimit = this.index = this.baseIndex =
+ (hi + it.baseIndex + 1) >>> 1;
+ }
}
/**
@@ -2467,9 +2496,39 @@ public class ConcurrentHashMapV8
}
public final boolean hasMoreElements() { return hasNext(); }
- public final void setRawResult(Object x) { }
- public R getRawResult() { return null; }
- public boolean exec() { return true; }
+
+ public void compute() { } // default no-op CountedCompleter body
+
+ /**
+ * Returns a batch value > 0 if this task should (and must) be
+ * split, if so, adding to pending count, and in any case
+ * updating batch value. The initial batch value is approx
+ * exp2 of the number of times (minus one) to split task by
+ * two before executing leaf action. This value is faster to
+ * compute and more convenient to use as a guide to splitting
+ * than is the depth, since it is used while dividing by two
+ * anyway.
+ */
+ final int preSplit() {
+ ConcurrentHashMapV8 m; int b; Node[] t; ForkJoinPool pool;
+ if ((b = batch) < 0 && (m = map) != null) { // force initialization
+ if ((t = tab) == null && (t = tab = m.table) != null)
+ baseLimit = baseSize = t.length;
+ if (t != null) {
+ long n = m.counter.sum();
+ int par = ((pool = getPool()) == null) ?
+ ForkJoinPool.getCommonPoolParallelism() :
+ pool.getParallelism();
+ int sp = par << 3; // slack of 8
+ b = (n <= 0L) ? 0 : (n < (long)sp) ? (int)n : sp;
+ }
+ }
+ b = (b <= 1 || baseIndex == baseLimit) ? 0 : (b >>> 1);
+ if ((batch = b) > 0)
+ addToPendingCount(1);
+ return b;
+ }
+
}
/* ---------------- Public operations -------------- */
@@ -2609,8 +2668,8 @@ public class ConcurrentHashMapV8
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMapV8 may
* contain more mappings than can be represented as an int. The
- * value returned is a snapshot; the actual count may differ if
- * there are ongoing concurrent insertions or removals.
+ * value returned is an estimate; the actual count may differ if
+ * there are concurrent insertions or removals.
*
* @return the number of mappings
*/
@@ -2713,7 +2772,7 @@ public class ConcurrentHashMapV8
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
- * The value can be retrieved by calling the {@code get} method
+ *
The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
@@ -3010,7 +3069,7 @@ public class ConcurrentHashMapV8
/**
* Returns a {@link Collection} view of the values contained in this map.
* The collection is backed by the map, so changes to the map are
- * reflected in the collection, and vice-versa.
+ * reflected in the collection, and vice-versa.
*/
public ValuesView values() {
ValuesView vs = values;
@@ -3171,13 +3230,13 @@ public class ConcurrentHashMapV8
@SuppressWarnings("serial") static final class KeyIterator extends Traverser
implements Spliterator, Enumeration {
KeyIterator(ConcurrentHashMapV8 map) { super(map); }
- KeyIterator(Traverser it) {
- super(it);
+ KeyIterator(ConcurrentHashMapV8 map, Traverser it) {
+ super(map, it, -1);
}
public KeyIterator split() {
if (nextKey != null)
throw new IllegalStateException();
- return new KeyIterator(this);
+ return new KeyIterator(map, this);
}
@SuppressWarnings("unchecked") public final K next() {
if (nextVal == null && advance() == null)
@@ -3193,13 +3252,13 @@ public class ConcurrentHashMapV8
@SuppressWarnings("serial") static final class ValueIterator extends Traverser
implements Spliterator, Enumeration {
ValueIterator(ConcurrentHashMapV8 map) { super(map); }
- ValueIterator(Traverser it) {
- super(it);
+ ValueIterator(ConcurrentHashMapV8 map, Traverser it) {
+ super(map, it, -1);
}
public ValueIterator split() {
if (nextKey != null)
throw new IllegalStateException();
- return new ValueIterator(this);
+ return new ValueIterator(map, this);
}
@SuppressWarnings("unchecked") public final V next() {
@@ -3216,13 +3275,13 @@ public class ConcurrentHashMapV8
@SuppressWarnings("serial") static final class EntryIterator extends Traverser
implements Spliterator> {
EntryIterator(ConcurrentHashMapV8 map) { super(map); }
- EntryIterator(Traverser it) {
- super(it);
+ EntryIterator(ConcurrentHashMapV8 map, Traverser it) {
+ super(map, it, -1);
}
public EntryIterator split() {
if (nextKey != null)
throw new IllegalStateException();
- return new EntryIterator(this);
+ return new EntryIterator(map, this);
}
@SuppressWarnings("unchecked") public final Map.Entry next() {
@@ -3278,6 +3337,14 @@ public class ConcurrentHashMapV8
}
}
+ /**
+ * Returns exportable snapshot entry for the given key and value
+ * when write-through can't or shouldn't be used.
+ */
+ static AbstractMap.SimpleEntry entryFor(K k, V v) {
+ return new AbstractMap.SimpleEntry(k, v);
+ }
+
/* ---------------- Serialization Support -------------- */
/**
@@ -4247,7 +4314,7 @@ public class ConcurrentHashMapV8
return ForkJoinTasks.reduceKeysToLong
(map, transformer, basis, reducer).invoke();
}
-
+
/**
* Returns the result of accumulating the given transformation
* of all keys using the given reducer to combine values, and
@@ -4266,7 +4333,7 @@ public class ConcurrentHashMapV8
return ForkJoinTasks.reduceKeysToInt
(map, transformer, basis, reducer).invoke();
}
-
+
}
/**
@@ -4388,7 +4455,7 @@ public class ConcurrentHashMapV8
return ForkJoinTasks.reduceValues
(map, transformer, reducer).invoke();
}
-
+
/**
* Returns the result of accumulating the given transformation
* of all values using the given reducer to combine values,
@@ -4665,7 +4732,7 @@ public class ConcurrentHashMapV8
(ConcurrentHashMapV8 map,
BiAction action) {
if (action == null) throw new NullPointerException();
- return new ForEachMappingTask(map, null, -1, null, action);
+ return new ForEachMappingTask(map, null, -1, action);
}
/**
@@ -4686,7 +4753,7 @@ public class ConcurrentHashMapV8
if (transformer == null || action == null)
throw new NullPointerException();
return new ForEachTransformedMappingTask
- (map, null, -1, null, transformer, action);
+ (map, null, -1, transformer, action);
}
/**
@@ -4706,7 +4773,7 @@ public class ConcurrentHashMapV8
BiFun super K, ? super V, ? extends U> searchFunction) {
if (searchFunction == null) throw new NullPointerException();
return new SearchMappingsTask
- (map, null, -1, null, searchFunction,
+ (map, null, -1, searchFunction,
new AtomicReference());
}
@@ -4815,7 +4882,7 @@ public class ConcurrentHashMapV8
(ConcurrentHashMapV8 map,
Action action) {
if (action == null) throw new NullPointerException();
- return new ForEachKeyTask(map, null, -1, null, action);
+ return new ForEachKeyTask(map, null, -1, action);
}
/**
@@ -4836,7 +4903,7 @@ public class ConcurrentHashMapV8
if (transformer == null || action == null)
throw new NullPointerException();
return new ForEachTransformedKeyTask
- (map, null, -1, null, transformer, action);
+ (map, null, -1, transformer, action);
}
/**
@@ -4856,7 +4923,7 @@ public class ConcurrentHashMapV8
Fun super K, ? extends U> searchFunction) {
if (searchFunction == null) throw new NullPointerException();
return new SearchKeysTask
- (map, null, -1, null, searchFunction,
+ (map, null, -1, searchFunction,
new AtomicReference());
}
@@ -4982,7 +5049,7 @@ public class ConcurrentHashMapV8
(ConcurrentHashMapV8 map,
Action action) {
if (action == null) throw new NullPointerException();
- return new ForEachValueTask(map, null, -1, null, action);
+ return new ForEachValueTask(map, null, -1, action);
}
/**
@@ -5002,7 +5069,7 @@ public class ConcurrentHashMapV8
if (transformer == null || action == null)
throw new NullPointerException();
return new ForEachTransformedValueTask
- (map, null, -1, null, transformer, action);
+ (map, null, -1, transformer, action);
}
/**
@@ -5022,7 +5089,7 @@ public class ConcurrentHashMapV8
Fun super V, ? extends U> searchFunction) {
if (searchFunction == null) throw new NullPointerException();
return new SearchValuesTask
- (map, null, -1, null, searchFunction,
+ (map, null, -1, searchFunction,
new AtomicReference());
}
@@ -5148,7 +5215,7 @@ public class ConcurrentHashMapV8
(ConcurrentHashMapV8 map,
Action> action) {
if (action == null) throw new NullPointerException();
- return new ForEachEntryTask(map, null, -1, null, action);
+ return new ForEachEntryTask(map, null, -1, action);
}
/**
@@ -5168,7 +5235,7 @@ public class ConcurrentHashMapV8
if (transformer == null || action == null)
throw new NullPointerException();
return new ForEachTransformedEntryTask
- (map, null, -1, null, transformer, action);
+ (map, null, -1, transformer, action);
}
/**
@@ -5188,7 +5255,7 @@ public class ConcurrentHashMapV8
Fun, ? extends U> searchFunction) {
if (searchFunction == null) throw new NullPointerException();
return new SearchEntriesTask
- (map, null, -1, null, searchFunction,
+ (map, null, -1, searchFunction,
new AtomicReference());
}
@@ -5306,162 +5373,6 @@ public class ConcurrentHashMapV8
// -------------------------------------------------------
- /**
- * Base for FJ tasks for bulk operations. This adds a variant of
- * CountedCompleters and some split and merge bookkeeping to
- * iterator functionality. The forEach and reduce methods are
- * similar to those illustrated in CountedCompleter documentation,
- * except that bottom-up reduction completions perform them within
- * their compute methods. The search methods are like forEach
- * except they continually poll for success and exit early. Also,
- * exceptions are handled in a simpler manner, by just trying to
- * complete root task exceptionally.
- */
- @SuppressWarnings("serial") static abstract class BulkTask extends Traverser {
- final BulkTask parent; // completion target
- int batch; // split control; -1 for unknown
- int pending; // completion control
-
- BulkTask(ConcurrentHashMapV8 map, BulkTask parent,
- int batch) {
- super(map);
- this.parent = parent;
- this.batch = batch;
- if (parent != null && map != null) { // split parent
- Node[] t;
- if ((t = parent.tab) == null &&
- (t = parent.tab = map.table) != null)
- parent.baseLimit = parent.baseSize = t.length;
- this.tab = t;
- this.baseSize = parent.baseSize;
- int hi = this.baseLimit = parent.baseLimit;
- parent.baseLimit = this.index = this.baseIndex =
- (hi + parent.baseIndex + 1) >>> 1;
- }
- }
-
- /**
- * Forces root task to complete.
- * @param ex if null, complete normally, else exceptionally
- * @return false to simplify use
- */
- final boolean tryCompleteComputation(Throwable ex) {
- for (BulkTask a = this;;) {
- BulkTask p = a.parent;
- if (p == null) {
- if (ex != null)
- a.completeExceptionally(ex);
- else
- a.quietlyComplete();
- return false;
- }
- a = p;
- }
- }
-
- /**
- * Version of tryCompleteComputation for function screening checks
- */
- final boolean abortOnNullFunction() {
- return tryCompleteComputation(new Error("Unexpected null function"));
- }
-
- // utilities
-
- /** CompareAndSet pending count */
- final boolean casPending(int cmp, int val) {
- return U.compareAndSwapInt(this, PENDING, cmp, val);
- }
-
- /**
- * Returns approx exp2 of the number of times (minus one) to
- * split task by two before executing leaf action. This value
- * is faster to compute and more convenient to use as a guide
- * to splitting than is the depth, since it is used while
- * dividing by two anyway.
- */
- final int batch() {
- ConcurrentHashMapV8 m; int b; Node[] t; ForkJoinPool pool;
- if ((b = batch) < 0 && (m = map) != null) { // force initialization
- if ((t = tab) == null && (t = tab = m.table) != null)
- baseLimit = baseSize = t.length;
- if (t != null) {
- long n = m.counter.sum();
- int par = ((pool = getPool()) == null) ?
- ForkJoinPool.getCommonPoolParallelism() :
- pool.getParallelism();
- int sp = par << 3; // slack of 8
- b = batch = (n <= 0L) ? 0 : (n < (long)sp) ? (int)n : sp;
- }
- }
- return b;
- }
-
- /**
- * Returns exportable snapshot entry.
- */
- static AbstractMap.SimpleEntry entryFor(K k, V v) {
- return new AbstractMap.SimpleEntry(k, v);
- }
-
- // Unsafe mechanics
- private static final sun.misc.Unsafe U;
- private static final long PENDING;
- static {
- try {
- U = getUnsafe();
- PENDING = U.objectFieldOffset
- (BulkTask.class.getDeclaredField("pending"));
- } catch (Exception e) {
- throw new Error(e);
- }
- }
- }
-
- /**
- * Base class for non-reductive actions
- */
- @SuppressWarnings("serial") static abstract class BulkAction extends BulkTask {
- BulkAction nextTask;
- BulkAction(ConcurrentHashMapV8 map, BulkTask parent,
- int batch, BulkAction nextTask) {
- super(map, parent, batch);
- this.nextTask = nextTask;
- }
-
- /**
- * Try to complete task and upward parents. Upon hitting
- * non-completed parent, if a non-FJ task, try to help out the
- * computation.
- */
- final void tryComplete(BulkAction subtasks) {
- BulkTask a = this, s = a;
- for (int c;;) {
- if ((c = a.pending) == 0) {
- if ((a = (s = a).parent) == null) {
- s.quietlyComplete();
- break;
- }
- }
- else if (a.casPending(c, c - 1)) {
- if (subtasks != null && !inForkJoinPool()) {
- while ((s = a.parent) != null)
- a = s;
- while (!a.isDone()) {
- BulkAction next = subtasks.nextTask;
- if (subtasks.tryUnfork())
- subtasks.exec();
- if ((subtasks = next) == null)
- break;
- }
- }
- break;
- }
- }
- }
-
- }
-
/*
* Task classes. Coded in a regular but ugly format/style to
* simplify checks that each variant differs in the right way from
@@ -5469,665 +5380,508 @@ public class ConcurrentHashMapV8
*/
@SuppressWarnings("serial") static final class ForEachKeyTask
- extends BulkAction {
+ extends Traverser {
final Action action;
ForEachKeyTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachKeyTask nextTask,
+ (ConcurrentHashMapV8 m, Traverser p, int b,
Action action) {
- super(m, p, b, nextTask);
+ super(m, p, b);
this.action = action;
}
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Action action = this.action;
- if (action == null)
- return abortOnNullFunction();
- ForEachKeyTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachKeyTask
- (map, this, b >>>= 1, subtasks, action)).fork();
- }
- while (advance() != null)
- action.apply((K)nextKey);
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
- }
- tryComplete(subtasks);
- return false;
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Action action;
+ if ((action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachKeyTask(map, this, b, action).fork();
+ while (advance() != null)
+ action.apply((K)nextKey);
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachValueTask
- extends BulkAction {
+ extends Traverser {
final Action action;
ForEachValueTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachValueTask nextTask,
+ (ConcurrentHashMapV8 m, Traverser p, int b,
Action action) {
- super(m, p, b, nextTask);
+ super(m, p, b);
this.action = action;
}
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Action action = this.action;
- if (action == null)
- return abortOnNullFunction();
- ForEachValueTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachValueTask
- (map, this, b >>>= 1, subtasks, action)).fork();
- }
- Object v;
- while ((v = advance()) != null)
- action.apply((V)v);
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
- }
- tryComplete(subtasks);
- return false;
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Action action;
+ if ((action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachValueTask(map, this, b, action).fork();
+ Object v;
+ while ((v = advance()) != null)
+ action.apply((V)v);
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachEntryTask
- extends BulkAction {
+ extends Traverser {
final Action> action;
ForEachEntryTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachEntryTask nextTask,
+ (ConcurrentHashMapV8 m, Traverser p, int b,
Action> action) {
- super(m, p, b, nextTask);
+ super(m, p, b);
this.action = action;
}
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Action> action = this.action;
- if (action == null)
- return abortOnNullFunction();
- ForEachEntryTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachEntryTask
- (map, this, b >>>= 1, subtasks, action)).fork();
- }
- Object v;
- while ((v = advance()) != null)
- action.apply(entryFor((K)nextKey, (V)v));
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
- }
- tryComplete(subtasks);
- return false;
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Action> action;
+ if ((action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachEntryTask(map, this, b, action).fork();
+ Object v;
+ while ((v = advance()) != null)
+ action.apply(entryFor((K)nextKey, (V)v));
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachMappingTask
- extends BulkAction {
+ extends Traverser {
final BiAction action;
ForEachMappingTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachMappingTask nextTask,
+ (ConcurrentHashMapV8 m, Traverser p, int b,
BiAction action) {
- super(m, p, b, nextTask);
+ super(m, p, b);
this.action = action;
}
- @SuppressWarnings("unchecked") public final boolean exec() {
- final BiAction action = this.action;
- if (action == null)
- return abortOnNullFunction();
- ForEachMappingTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachMappingTask
- (map, this, b >>>= 1, subtasks, action)).fork();
- }
- Object v;
- while ((v = advance()) != null)
- action.apply((K)nextKey, (V)v);
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
- }
- tryComplete(subtasks);
- return false;
+ @SuppressWarnings("unchecked") public final void compute() {
+ final BiAction action;
+ if ((action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachMappingTask(map, this, b, action).fork();
+ Object v;
+ while ((v = advance()) != null)
+ action.apply((K)nextKey, (V)v);
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachTransformedKeyTask
- extends BulkAction {
+ extends Traverser {
final Fun super K, ? extends U> transformer;
final Action action;
ForEachTransformedKeyTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachTransformedKeyTask nextTask,
- Fun super K, ? extends U> transformer,
- Action action) {
- super(m, p, b, nextTask);
- this.transformer = transformer;
- this.action = action;
-
- }
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Fun super K, ? extends U> transformer =
- this.transformer;
- final Action action = this.action;
- if (transformer == null || action == null)
- return abortOnNullFunction();
- ForEachTransformedKeyTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachTransformedKeyTask
- (map, this, b >>>= 1, subtasks, transformer, action)).fork();
- }
- U u;
- while (advance() != null) {
- if ((u = transformer.apply((K)nextKey)) != null)
- action.apply(u);
- }
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
+ (ConcurrentHashMapV8 m, Traverser p, int b,
+ Fun super K, ? extends U> transformer, Action action) {
+ super(m, p, b);
+ this.transformer = transformer; this.action = action;
+ }
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Fun super K, ? extends U> transformer;
+ final Action action;
+ if ((transformer = this.transformer) == null ||
+ (action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachTransformedKeyTask
+ (map, this, b, transformer, action).fork();
+ U u;
+ while (advance() != null) {
+ if ((u = transformer.apply((K)nextKey)) != null)
+ action.apply(u);
}
- tryComplete(subtasks);
- return false;
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachTransformedValueTask
- extends BulkAction {
+ extends Traverser {
final Fun super V, ? extends U> transformer;
final Action action;
ForEachTransformedValueTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachTransformedValueTask nextTask,
- Fun super V, ? extends U> transformer,
- Action action) {
- super(m, p, b, nextTask);
- this.transformer = transformer;
- this.action = action;
-
- }
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Fun super V, ? extends U> transformer =
- this.transformer;
- final Action action = this.action;
- if (transformer == null || action == null)
- return abortOnNullFunction();
- ForEachTransformedValueTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachTransformedValueTask
- (map, this, b >>>= 1, subtasks, transformer, action)).fork();
- }
- Object v; U u;
- while ((v = advance()) != null) {
- if ((u = transformer.apply((V)v)) != null)
- action.apply(u);
- }
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
+ (ConcurrentHashMapV8 m, Traverser p, int b,
+ Fun super V, ? extends U> transformer, Action action) {
+ super(m, p, b);
+ this.transformer = transformer; this.action = action;
+ }
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Fun super V, ? extends U> transformer;
+ final Action action;
+ if ((transformer = this.transformer) == null ||
+ (action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachTransformedValueTask
+ (map, this, b, transformer, action).fork();
+ Object v; U u;
+ while ((v = advance()) != null) {
+ if ((u = transformer.apply((V)v)) != null)
+ action.apply(u);
}
- tryComplete(subtasks);
- return false;
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachTransformedEntryTask
- extends BulkAction {
+ extends Traverser {
final Fun, ? extends U> transformer;
final Action action;
ForEachTransformedEntryTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachTransformedEntryTask nextTask,
- Fun, ? extends U> transformer,
- Action action) {
- super(m, p, b, nextTask);
- this.transformer = transformer;
- this.action = action;
-
- }
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Fun, ? extends U> transformer =
- this.transformer;
- final Action action = this.action;
- if (transformer == null || action == null)
- return abortOnNullFunction();
- ForEachTransformedEntryTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachTransformedEntryTask
- (map, this, b >>>= 1, subtasks, transformer, action)).fork();
- }
- Object v; U u;
- while ((v = advance()) != null) {
- if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
- action.apply(u);
- }
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
+ (ConcurrentHashMapV8 m, Traverser p, int b,
+ Fun, ? extends U> transformer, Action action) {
+ super(m, p, b);
+ this.transformer = transformer; this.action = action;
+ }
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Fun, ? extends U> transformer;
+ final Action action;
+ if ((transformer = this.transformer) == null ||
+ (action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachTransformedEntryTask
+ (map, this, b, transformer, action).fork();
+ Object v; U u;
+ while ((v = advance()) != null) {
+ if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
+ action.apply(u);
}
- tryComplete(subtasks);
- return false;
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachTransformedMappingTask
- extends BulkAction {
+ extends Traverser {
final BiFun super K, ? super V, ? extends U> transformer;
final Action action;
ForEachTransformedMappingTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachTransformedMappingTask nextTask,
+ (ConcurrentHashMapV8