--- jsr166/src/jsr166e/ConcurrentHashMapV8.java 2012/11/18 03:07:22 1.77
+++ jsr166/src/jsr166e/ConcurrentHashMapV8.java 2012/12/08 14:10:38 1.81
@@ -28,6 +28,28 @@ import java.util.concurrent.atomic.Atomi
import java.io.Serializable;
+import java.util.Comparator;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collection;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.AbstractCollection;
+import java.util.Hashtable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Enumeration;
+import java.util.ConcurrentModificationException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.atomic.AtomicReference;
+
+import java.io.Serializable;
+
/**
* A hash table supporting full concurrency of retrievals and
* high expected concurrency for updates. This class obeys the
@@ -40,7 +62,7 @@ import java.io.Serializable;
* interoperable with {@code Hashtable} in programs that rely on its
* thread safety but not on its synchronization details.
*
- *
Retrieval operations (including {@code get}) generally do not
+ *
Retrieval operations (including {@code get}) generally do not
* block, so may overlap with update operations (including {@code put}
* and {@code remove}). Retrievals reflect the results of the most
* recently completed update operations holding upon their
@@ -61,7 +83,7 @@ import java.io.Serializable;
* that may be adequate for monitoring or estimation purposes, but not
* for program control.
*
- *
The table is dynamically expanded when there are too many
+ *
The table is dynamically expanded when there are too many
* collisions (i.e., keys that have distinct hash codes but fall into
* the same slot modulo the table size), with the expected average
* effect of maintaining roughly two bins per mapping (corresponding
@@ -82,13 +104,13 @@ import java.io.Serializable;
* {@code hashCode()} is a sure way to slow down performance of any
* hash table.
*
- *
A {@link Set} projection of a ConcurrentHashMapV8 may be created
+ *
A {@link Set} projection of a ConcurrentHashMapV8 may be created
* (using {@link #newKeySet()} or {@link #newKeySet(int)}), or viewed
* (using {@link #keySet(Object)} when only keys are of interest, and the
* mapped values are (perhaps transiently) not used or all take the
* same mapping value.
*
- *
A ConcurrentHashMapV8 can be used as scalable frequency map (a
+ *
A ConcurrentHashMapV8 can be used as scalable frequency map (a
* form of histogram or multiset) by using {@link LongAdder} values
* and initializing via {@link #computeIfAbsent}. For example, to add
* a count to a {@code ConcurrentHashMapV8 freqs}, you
@@ -99,7 +121,7 @@ import java.io.Serializable;
* optional methods of the {@link Map} and {@link Iterator}
* interfaces.
*
- * Like {@link Hashtable} but unlike {@link HashMap}, this class
+ *
Like {@link Hashtable} but unlike {@link HashMap}, this class
* does not allow {@code null} to be used as a key or value.
*
*
ConcurrentHashMapV8s support parallel operations using the {@link
@@ -184,7 +206,7 @@ import java.io.Serializable;
* arguments can be supplied using {@code new
* AbstractMap.SimpleEntry(k,v)}.
*
- *
Bulk operations may complete abruptly, throwing an
+ *
Bulk operations may complete abruptly, throwing an
* exception encountered in the application of a supplied
* function. Bear in mind when handling such exceptions that other
* concurrently executing functions could also have thrown
@@ -199,7 +221,7 @@ import java.io.Serializable;
* Similarly, parallelization may not lead to much actual parallelism
* if all processors are busy performing unrelated tasks.
*
- *
All arguments to all task methods must be non-null.
+ *
All arguments to all task methods must be non-null.
*
*
jsr166e note: During transition, this class
* uses nested functional interfaces with different names but the
@@ -225,7 +247,7 @@ public class ConcurrentHashMapV8
* portion of the elements, and so may be amenable to parallel
* execution.
*
- * This interface exports a subset of expected JDK8
+ *
This interface exports a subset of expected JDK8
* functionality.
*
*
Sample usage: Here is one (of the several) ways to compute
@@ -671,7 +693,10 @@ public class ConcurrentHashMapV8
try {
wait();
} catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
+ try {
+ Thread.currentThread().interrupt();
+ } catch (SecurityException ignore) {
+ }
}
}
else
@@ -842,16 +867,17 @@ public class ConcurrentHashMapV8
if (c != (pc = pk.getClass()) ||
!(k instanceof Comparable) ||
(dir = ((Comparable)k).compareTo((Comparable)pk)) == 0) {
- dir = (c == pc) ? 0 : c.getName().compareTo(pc.getName());
- TreeNode r = null, s = null, pl, pr;
- if (dir >= 0) {
- if ((pl = p.left) != null && h <= pl.hash)
- s = pl;
+ if ((dir = (c == pc) ? 0 :
+ c.getName().compareTo(pc.getName())) == 0) {
+ TreeNode r = null, pl, pr; // check both sides
+ if ((pr = p.right) != null && h >= pr.hash &&
+ (r = getTreeNode(h, k, pr)) != null)
+ return r;
+ else if ((pl = p.left) != null && h <= pl.hash)
+ dir = -1;
+ else // nothing there
+ return null;
}
- else if ((pr = p.right) != null && h >= pr.hash)
- s = pr;
- if (s != null && (r = getTreeNode(h, k, s)) != null)
- return r;
}
}
else
@@ -906,11 +932,14 @@ public class ConcurrentHashMapV8
if (c != (pc = pk.getClass()) ||
!(k instanceof Comparable) ||
(dir = ((Comparable)k).compareTo((Comparable)pk)) == 0) {
- dir = (c == pc) ? 0 : c.getName().compareTo(pc.getName());
- TreeNode r = null, s = null, pl, pr;
- if (dir >= 0) {
- if ((pl = p.left) != null && h <= pl.hash)
- s = pl;
+ TreeNode s = null, r = null, pr;
+ if ((dir = (c == pc) ? 0 :
+ c.getName().compareTo(pc.getName())) == 0) {
+ if ((pr = p.right) != null && h >= pr.hash &&
+ (r = getTreeNode(h, k, pr)) != null)
+ return r;
+ else // continue left
+ dir = -1;
}
else if ((pr = p.right) != null && h >= pr.hash)
s = pr;
@@ -2380,14 +2409,14 @@ public class ConcurrentHashMapV8
* across threads, iteration terminates if a bounds checks fails
* for a table read.
*
- * This class extends ForkJoinTask to streamline parallel
- * iteration in bulk operations (see BulkTask). This adds only an
- * int of space overhead, which is close enough to negligible in
- * cases where it is not needed to not worry about it. Because
- * ForkJoinTask is Serializable, but iterators need not be, we
- * need to add warning suppressions.
+ * This class extends CountedCompleter to streamline parallel
+ * iteration in bulk operations. This adds only a few fields of
+ * space overhead, which is small enough in cases where it is not
+ * needed to not worry about it. Because CountedCompleter is
+ * Serializable, but iterators need not be, we need to add warning
+ * suppressions.
*/
- @SuppressWarnings("serial") static class Traverser extends ForkJoinTask {
+ @SuppressWarnings("serial") static class Traverser extends CountedCompleter {
final ConcurrentHashMapV8 map;
Node next; // the next entry to use
Object nextKey; // cached key field of next
@@ -2397,24 +2426,28 @@ public class ConcurrentHashMapV8
int baseIndex; // current index of initial table
int baseLimit; // index bound for initial table
int baseSize; // initial table size
+ int batch; // split control
/** Creates iterator for all entries in the table. */
Traverser(ConcurrentHashMapV8 map) {
this.map = map;
}
- /** Creates iterator for split() methods */
- Traverser(Traverser it) {
- ConcurrentHashMapV8 m; Node[] t;
- if ((m = this.map = it.map) == null)
- t = null;
- else if ((t = it.tab) == null && // force parent tab initialization
- (t = it.tab = m.table) != null)
- it.baseLimit = it.baseSize = t.length;
- this.tab = t;
- this.baseSize = it.baseSize;
- it.baseLimit = this.index = this.baseIndex =
- ((this.baseLimit = it.baseLimit) + it.baseIndex + 1) >>> 1;
+ /** Creates iterator for split() methods and task constructors */
+ Traverser(ConcurrentHashMapV8 map, Traverser it, int batch) {
+ super(it);
+ this.batch = batch;
+ if ((this.map = map) != null && it != null) { // split parent
+ Node[] t;
+ if ((t = it.tab) == null &&
+ (t = it.tab = map.table) != null)
+ it.baseLimit = it.baseSize = t.length;
+ this.tab = t;
+ this.baseSize = it.baseSize;
+ int hi = this.baseLimit = it.baseLimit;
+ it.baseLimit = this.index = this.baseIndex =
+ (hi + it.baseIndex + 1) >>> 1;
+ }
}
/**
@@ -2467,9 +2500,39 @@ public class ConcurrentHashMapV8
}
public final boolean hasMoreElements() { return hasNext(); }
- public final void setRawResult(Object x) { }
- public R getRawResult() { return null; }
- public boolean exec() { return true; }
+
+ public void compute() { } // default no-op CountedCompleter body
+
+ /**
+ * Returns a batch value > 0 if this task should (and must) be
+ * split, if so, adding to pending count, and in any case
+ * updating batch value. The initial batch value is approx
+ * exp2 of the number of times (minus one) to split task by
+ * two before executing leaf action. This value is faster to
+ * compute and more convenient to use as a guide to splitting
+ * than is the depth, since it is used while dividing by two
+ * anyway.
+ */
+ final int preSplit() {
+ ConcurrentHashMapV8 m; int b; Node[] t; ForkJoinPool pool;
+ if ((b = batch) < 0 && (m = map) != null) { // force initialization
+ if ((t = tab) == null && (t = tab = m.table) != null)
+ baseLimit = baseSize = t.length;
+ if (t != null) {
+ long n = m.counter.sum();
+ int par = ((pool = getPool()) == null) ?
+ ForkJoinPool.getCommonPoolParallelism() :
+ pool.getParallelism();
+ int sp = par << 3; // slack of 8
+ b = (n <= 0L) ? 0 : (n < (long)sp) ? (int)n : sp;
+ }
+ }
+ b = (b <= 1 || baseIndex == baseLimit) ? 0 : (b >>> 1);
+ if ((batch = b) > 0)
+ addToPendingCount(1);
+ return b;
+ }
+
}
/* ---------------- Public operations -------------- */
@@ -2609,8 +2672,8 @@ public class ConcurrentHashMapV8
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMapV8 may
* contain more mappings than can be represented as an int. The
- * value returned is a snapshot; the actual count may differ if
- * there are ongoing concurrent insertions or removals.
+ * value returned is an estimate; the actual count may differ if
+ * there are concurrent insertions or removals.
*
* @return the number of mappings
*/
@@ -2713,7 +2776,7 @@ public class ConcurrentHashMapV8
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
- * The value can be retrieved by calling the {@code get} method
+ *
The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
@@ -3171,13 +3234,13 @@ public class ConcurrentHashMapV8
@SuppressWarnings("serial") static final class KeyIterator extends Traverser
implements Spliterator, Enumeration {
KeyIterator(ConcurrentHashMapV8 map) { super(map); }
- KeyIterator(Traverser it) {
- super(it);
+ KeyIterator(ConcurrentHashMapV8 map, Traverser it) {
+ super(map, it, -1);
}
public KeyIterator split() {
if (nextKey != null)
throw new IllegalStateException();
- return new KeyIterator(this);
+ return new KeyIterator(map, this);
}
@SuppressWarnings("unchecked") public final K next() {
if (nextVal == null && advance() == null)
@@ -3193,13 +3256,13 @@ public class ConcurrentHashMapV8
@SuppressWarnings("serial") static final class ValueIterator extends Traverser
implements Spliterator, Enumeration {
ValueIterator(ConcurrentHashMapV8 map) { super(map); }
- ValueIterator(Traverser it) {
- super(it);
+ ValueIterator(ConcurrentHashMapV8 map, Traverser it) {
+ super(map, it, -1);
}
public ValueIterator split() {
if (nextKey != null)
throw new IllegalStateException();
- return new ValueIterator(this);
+ return new ValueIterator(map, this);
}
@SuppressWarnings("unchecked") public final V next() {
@@ -3216,13 +3279,13 @@ public class ConcurrentHashMapV8
@SuppressWarnings("serial") static final class EntryIterator extends Traverser
implements Spliterator> {
EntryIterator(ConcurrentHashMapV8 map) { super(map); }
- EntryIterator(Traverser it) {
- super(it);
+ EntryIterator(ConcurrentHashMapV8 map, Traverser it) {
+ super(map, it, -1);
}
public EntryIterator split() {
if (nextKey != null)
throw new IllegalStateException();
- return new EntryIterator(this);
+ return new EntryIterator(map, this);
}
@SuppressWarnings("unchecked") public final Map.Entry next() {
@@ -3278,6 +3341,14 @@ public class ConcurrentHashMapV8
}
}
+ /**
+ * Returns exportable snapshot entry for the given key and value
+ * when write-through can't or shouldn't be used.
+ */
+ static AbstractMap.SimpleEntry entryFor(K k, V v) {
+ return new AbstractMap.SimpleEntry(k, v);
+ }
+
/* ---------------- Serialization Support -------------- */
/**
@@ -4665,7 +4736,7 @@ public class ConcurrentHashMapV8
(ConcurrentHashMapV8 map,
BiAction action) {
if (action == null) throw new NullPointerException();
- return new ForEachMappingTask(map, null, -1, null, action);
+ return new ForEachMappingTask(map, null, -1, action);
}
/**
@@ -4686,7 +4757,7 @@ public class ConcurrentHashMapV8
if (transformer == null || action == null)
throw new NullPointerException();
return new ForEachTransformedMappingTask
- (map, null, -1, null, transformer, action);
+ (map, null, -1, transformer, action);
}
/**
@@ -4706,7 +4777,7 @@ public class ConcurrentHashMapV8
BiFun super K, ? super V, ? extends U> searchFunction) {
if (searchFunction == null) throw new NullPointerException();
return new SearchMappingsTask
- (map, null, -1, null, searchFunction,
+ (map, null, -1, searchFunction,
new AtomicReference());
}
@@ -4815,7 +4886,7 @@ public class ConcurrentHashMapV8
(ConcurrentHashMapV8 map,
Action action) {
if (action == null) throw new NullPointerException();
- return new ForEachKeyTask(map, null, -1, null, action);
+ return new ForEachKeyTask(map, null, -1, action);
}
/**
@@ -4836,7 +4907,7 @@ public class ConcurrentHashMapV8
if (transformer == null || action == null)
throw new NullPointerException();
return new ForEachTransformedKeyTask
- (map, null, -1, null, transformer, action);
+ (map, null, -1, transformer, action);
}
/**
@@ -4856,7 +4927,7 @@ public class ConcurrentHashMapV8
Fun super K, ? extends U> searchFunction) {
if (searchFunction == null) throw new NullPointerException();
return new SearchKeysTask
- (map, null, -1, null, searchFunction,
+ (map, null, -1, searchFunction,
new AtomicReference());
}
@@ -4982,7 +5053,7 @@ public class ConcurrentHashMapV8
(ConcurrentHashMapV8 map,
Action action) {
if (action == null) throw new NullPointerException();
- return new ForEachValueTask(map, null, -1, null, action);
+ return new ForEachValueTask(map, null, -1, action);
}
/**
@@ -5002,7 +5073,7 @@ public class ConcurrentHashMapV8
if (transformer == null || action == null)
throw new NullPointerException();
return new ForEachTransformedValueTask
- (map, null, -1, null, transformer, action);
+ (map, null, -1, transformer, action);
}
/**
@@ -5022,7 +5093,7 @@ public class ConcurrentHashMapV8
Fun super V, ? extends U> searchFunction) {
if (searchFunction == null) throw new NullPointerException();
return new SearchValuesTask
- (map, null, -1, null, searchFunction,
+ (map, null, -1, searchFunction,
new AtomicReference());
}
@@ -5148,7 +5219,7 @@ public class ConcurrentHashMapV8
(ConcurrentHashMapV8 map,
Action> action) {
if (action == null) throw new NullPointerException();
- return new ForEachEntryTask(map, null, -1, null, action);
+ return new ForEachEntryTask(map, null, -1, action);
}
/**
@@ -5168,7 +5239,7 @@ public class ConcurrentHashMapV8
if (transformer == null || action == null)
throw new NullPointerException();
return new ForEachTransformedEntryTask
- (map, null, -1, null, transformer, action);
+ (map, null, -1, transformer, action);
}
/**
@@ -5188,7 +5259,7 @@ public class ConcurrentHashMapV8
Fun, ? extends U> searchFunction) {
if (searchFunction == null) throw new NullPointerException();
return new SearchEntriesTask
- (map, null, -1, null, searchFunction,
+ (map, null, -1, searchFunction,
new AtomicReference());
}
@@ -5306,162 +5377,6 @@ public class ConcurrentHashMapV8
// -------------------------------------------------------
- /**
- * Base for FJ tasks for bulk operations. This adds a variant of
- * CountedCompleters and some split and merge bookkeeping to
- * iterator functionality. The forEach and reduce methods are
- * similar to those illustrated in CountedCompleter documentation,
- * except that bottom-up reduction completions perform them within
- * their compute methods. The search methods are like forEach
- * except they continually poll for success and exit early. Also,
- * exceptions are handled in a simpler manner, by just trying to
- * complete root task exceptionally.
- */
- @SuppressWarnings("serial") static abstract class BulkTask extends Traverser {
- final BulkTask parent; // completion target
- int batch; // split control; -1 for unknown
- int pending; // completion control
-
- BulkTask(ConcurrentHashMapV8 map, BulkTask parent,
- int batch) {
- super(map);
- this.parent = parent;
- this.batch = batch;
- if (parent != null && map != null) { // split parent
- Node[] t;
- if ((t = parent.tab) == null &&
- (t = parent.tab = map.table) != null)
- parent.baseLimit = parent.baseSize = t.length;
- this.tab = t;
- this.baseSize = parent.baseSize;
- int hi = this.baseLimit = parent.baseLimit;
- parent.baseLimit = this.index = this.baseIndex =
- (hi + parent.baseIndex + 1) >>> 1;
- }
- }
-
- /**
- * Forces root task to complete.
- * @param ex if null, complete normally, else exceptionally
- * @return false to simplify use
- */
- final boolean tryCompleteComputation(Throwable ex) {
- for (BulkTask a = this;;) {
- BulkTask p = a.parent;
- if (p == null) {
- if (ex != null)
- a.completeExceptionally(ex);
- else
- a.quietlyComplete();
- return false;
- }
- a = p;
- }
- }
-
- /**
- * Version of tryCompleteComputation for function screening checks
- */
- final boolean abortOnNullFunction() {
- return tryCompleteComputation(new Error("Unexpected null function"));
- }
-
- // utilities
-
- /** CompareAndSet pending count */
- final boolean casPending(int cmp, int val) {
- return U.compareAndSwapInt(this, PENDING, cmp, val);
- }
-
- /**
- * Returns approx exp2 of the number of times (minus one) to
- * split task by two before executing leaf action. This value
- * is faster to compute and more convenient to use as a guide
- * to splitting than is the depth, since it is used while
- * dividing by two anyway.
- */
- final int batch() {
- ConcurrentHashMapV8 m; int b; Node[] t; ForkJoinPool pool;
- if ((b = batch) < 0 && (m = map) != null) { // force initialization
- if ((t = tab) == null && (t = tab = m.table) != null)
- baseLimit = baseSize = t.length;
- if (t != null) {
- long n = m.counter.sum();
- int par = ((pool = getPool()) == null) ?
- ForkJoinPool.getCommonPoolParallelism() :
- pool.getParallelism();
- int sp = par << 3; // slack of 8
- b = batch = (n <= 0L) ? 0 : (n < (long)sp) ? (int)n : sp;
- }
- }
- return b;
- }
-
- /**
- * Returns exportable snapshot entry.
- */
- static AbstractMap.SimpleEntry entryFor(K k, V v) {
- return new AbstractMap.SimpleEntry(k, v);
- }
-
- // Unsafe mechanics
- private static final sun.misc.Unsafe U;
- private static final long PENDING;
- static {
- try {
- U = getUnsafe();
- PENDING = U.objectFieldOffset
- (BulkTask.class.getDeclaredField("pending"));
- } catch (Exception e) {
- throw new Error(e);
- }
- }
- }
-
- /**
- * Base class for non-reductive actions
- */
- @SuppressWarnings("serial") static abstract class BulkAction extends BulkTask {
- BulkAction nextTask;
- BulkAction(ConcurrentHashMapV8 map, BulkTask parent,
- int batch, BulkAction nextTask) {
- super(map, parent, batch);
- this.nextTask = nextTask;
- }
-
- /**
- * Try to complete task and upward parents. Upon hitting
- * non-completed parent, if a non-FJ task, try to help out the
- * computation.
- */
- final void tryComplete(BulkAction subtasks) {
- BulkTask a = this, s = a;
- for (int c;;) {
- if ((c = a.pending) == 0) {
- if ((a = (s = a).parent) == null) {
- s.quietlyComplete();
- break;
- }
- }
- else if (a.casPending(c, c - 1)) {
- if (subtasks != null && !inForkJoinPool()) {
- while ((s = a.parent) != null)
- a = s;
- while (!a.isDone()) {
- BulkAction next = subtasks.nextTask;
- if (subtasks.tryUnfork())
- subtasks.exec();
- if ((subtasks = next) == null)
- break;
- }
- }
- break;
- }
- }
- }
-
- }
-
/*
* Task classes. Coded in a regular but ugly format/style to
* simplify checks that each variant differs in the right way from
@@ -5469,665 +5384,508 @@ public class ConcurrentHashMapV8
*/
@SuppressWarnings("serial") static final class ForEachKeyTask
- extends BulkAction {
+ extends Traverser {
final Action action;
ForEachKeyTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachKeyTask nextTask,
+ (ConcurrentHashMapV8 m, Traverser p, int b,
Action action) {
- super(m, p, b, nextTask);
+ super(m, p, b);
this.action = action;
}
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Action action = this.action;
- if (action == null)
- return abortOnNullFunction();
- ForEachKeyTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachKeyTask
- (map, this, b >>>= 1, subtasks, action)).fork();
- }
- while (advance() != null)
- action.apply((K)nextKey);
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
- }
- tryComplete(subtasks);
- return false;
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Action action;
+ if ((action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachKeyTask(map, this, b, action).fork();
+ while (advance() != null)
+ action.apply((K)nextKey);
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachValueTask
- extends BulkAction {
+ extends Traverser {
final Action action;
ForEachValueTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachValueTask nextTask,
+ (ConcurrentHashMapV8 m, Traverser p, int b,
Action action) {
- super(m, p, b, nextTask);
+ super(m, p, b);
this.action = action;
}
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Action action = this.action;
- if (action == null)
- return abortOnNullFunction();
- ForEachValueTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachValueTask
- (map, this, b >>>= 1, subtasks, action)).fork();
- }
- Object v;
- while ((v = advance()) != null)
- action.apply((V)v);
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
- }
- tryComplete(subtasks);
- return false;
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Action action;
+ if ((action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachValueTask(map, this, b, action).fork();
+ Object v;
+ while ((v = advance()) != null)
+ action.apply((V)v);
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachEntryTask
- extends BulkAction {
+ extends Traverser {
final Action> action;
ForEachEntryTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachEntryTask nextTask,
+ (ConcurrentHashMapV8 m, Traverser p, int b,
Action> action) {
- super(m, p, b, nextTask);
+ super(m, p, b);
this.action = action;
}
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Action> action = this.action;
- if (action == null)
- return abortOnNullFunction();
- ForEachEntryTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachEntryTask
- (map, this, b >>>= 1, subtasks, action)).fork();
- }
- Object v;
- while ((v = advance()) != null)
- action.apply(entryFor((K)nextKey, (V)v));
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
- }
- tryComplete(subtasks);
- return false;
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Action> action;
+ if ((action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachEntryTask(map, this, b, action).fork();
+ Object v;
+ while ((v = advance()) != null)
+ action.apply(entryFor((K)nextKey, (V)v));
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachMappingTask
- extends BulkAction {
+ extends Traverser {
final BiAction action;
ForEachMappingTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachMappingTask nextTask,
+ (ConcurrentHashMapV8 m, Traverser p, int b,
BiAction action) {
- super(m, p, b, nextTask);
+ super(m, p, b);
this.action = action;
}
- @SuppressWarnings("unchecked") public final boolean exec() {
- final BiAction action = this.action;
- if (action == null)
- return abortOnNullFunction();
- ForEachMappingTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachMappingTask
- (map, this, b >>>= 1, subtasks, action)).fork();
- }
- Object v;
- while ((v = advance()) != null)
- action.apply((K)nextKey, (V)v);
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
- }
- tryComplete(subtasks);
- return false;
+ @SuppressWarnings("unchecked") public final void compute() {
+ final BiAction action;
+ if ((action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachMappingTask(map, this, b, action).fork();
+ Object v;
+ while ((v = advance()) != null)
+ action.apply((K)nextKey, (V)v);
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachTransformedKeyTask
- extends BulkAction {
+ extends Traverser {
final Fun super K, ? extends U> transformer;
final Action action;
ForEachTransformedKeyTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachTransformedKeyTask nextTask,
- Fun super K, ? extends U> transformer,
- Action action) {
- super(m, p, b, nextTask);
- this.transformer = transformer;
- this.action = action;
-
- }
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Fun super K, ? extends U> transformer =
- this.transformer;
- final Action action = this.action;
- if (transformer == null || action == null)
- return abortOnNullFunction();
- ForEachTransformedKeyTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachTransformedKeyTask
- (map, this, b >>>= 1, subtasks, transformer, action)).fork();
- }
- U u;
- while (advance() != null) {
- if ((u = transformer.apply((K)nextKey)) != null)
- action.apply(u);
- }
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
+ (ConcurrentHashMapV8 m, Traverser p, int b,
+ Fun super K, ? extends U> transformer, Action action) {
+ super(m, p, b);
+ this.transformer = transformer; this.action = action;
+ }
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Fun super K, ? extends U> transformer;
+ final Action action;
+ if ((transformer = this.transformer) == null ||
+ (action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachTransformedKeyTask
+ (map, this, b, transformer, action).fork();
+ U u;
+ while (advance() != null) {
+ if ((u = transformer.apply((K)nextKey)) != null)
+ action.apply(u);
}
- tryComplete(subtasks);
- return false;
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachTransformedValueTask
- extends BulkAction {
+ extends Traverser {
final Fun super V, ? extends U> transformer;
final Action action;
ForEachTransformedValueTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachTransformedValueTask nextTask,
- Fun super V, ? extends U> transformer,
- Action action) {
- super(m, p, b, nextTask);
- this.transformer = transformer;
- this.action = action;
-
- }
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Fun super V, ? extends U> transformer =
- this.transformer;
- final Action action = this.action;
- if (transformer == null || action == null)
- return abortOnNullFunction();
- ForEachTransformedValueTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachTransformedValueTask
- (map, this, b >>>= 1, subtasks, transformer, action)).fork();
- }
- Object v; U u;
- while ((v = advance()) != null) {
- if ((u = transformer.apply((V)v)) != null)
- action.apply(u);
- }
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
+ (ConcurrentHashMapV8 m, Traverser p, int b,
+ Fun super V, ? extends U> transformer, Action action) {
+ super(m, p, b);
+ this.transformer = transformer; this.action = action;
+ }
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Fun super V, ? extends U> transformer;
+ final Action action;
+ if ((transformer = this.transformer) == null ||
+ (action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachTransformedValueTask
+ (map, this, b, transformer, action).fork();
+ Object v; U u;
+ while ((v = advance()) != null) {
+ if ((u = transformer.apply((V)v)) != null)
+ action.apply(u);
}
- tryComplete(subtasks);
- return false;
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachTransformedEntryTask
- extends BulkAction {
+ extends Traverser {
final Fun, ? extends U> transformer;
final Action action;
ForEachTransformedEntryTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachTransformedEntryTask nextTask,
- Fun, ? extends U> transformer,
- Action action) {
- super(m, p, b, nextTask);
- this.transformer = transformer;
- this.action = action;
-
- }
- @SuppressWarnings("unchecked") public final boolean exec() {
- final Fun, ? extends U> transformer =
- this.transformer;
- final Action action = this.action;
- if (transformer == null || action == null)
- return abortOnNullFunction();
- ForEachTransformedEntryTask subtasks = null;
- try {
- int b = batch(), c;
- while (b > 1 && baseIndex != baseLimit) {
- do {} while (!casPending(c = pending, c+1));
- (subtasks = new ForEachTransformedEntryTask
- (map, this, b >>>= 1, subtasks, transformer, action)).fork();
- }
- Object v; U u;
- while ((v = advance()) != null) {
- if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
- action.apply(u);
- }
- } catch (Throwable ex) {
- return tryCompleteComputation(ex);
+ (ConcurrentHashMapV8 m, Traverser p, int b,
+ Fun, ? extends U> transformer, Action action) {
+ super(m, p, b);
+ this.transformer = transformer; this.action = action;
+ }
+ @SuppressWarnings("unchecked") public final void compute() {
+ final Fun, ? extends U> transformer;
+ final Action action;
+ if ((transformer = this.transformer) == null ||
+ (action = this.action) == null)
+ throw new NullPointerException();
+ for (int b; (b = preSplit()) > 0;)
+ new ForEachTransformedEntryTask
+ (map, this, b, transformer, action).fork();
+ Object v; U u;
+ while ((v = advance()) != null) {
+ if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
+ action.apply(u);
}
- tryComplete(subtasks);
- return false;
+ propagateCompletion();
}
}
@SuppressWarnings("serial") static final class ForEachTransformedMappingTask
- extends BulkAction {
+ extends Traverser {
final BiFun super K, ? super V, ? extends U> transformer;
final Action action;
ForEachTransformedMappingTask
- (ConcurrentHashMapV8 m, BulkTask p, int b,
- ForEachTransformedMappingTask nextTask,
+ (ConcurrentHashMapV8 m, Traverser p, int b,
BiFun super K, ? super V, ? extends U> transformer,
Action action) {
- super(m, p, b, nextTask);
- this.transformer = transformer;
- this.action = action;
-
+ super(m, p, b);
+ this.transformer = transformer; this.action = action;
}
- @SuppressWarnings("unchecked") public final boolean exec() {
- final BiFun super K, ? super V, ? extends U> transformer =
- this.transformer;
- final Action action = this.action;
- if (transformer == null || action == null)
- return abortOnNullFunction();
- ForEachTransformedMappingTask