ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/ConcurrentHashMapV8.java
(Generate patch)

Comparing jsr166/src/jsr166e/ConcurrentHashMapV8.java (file contents):
Revision 1.78 by jsr166, Sun Nov 18 18:03:10 2012 UTC vs.
Revision 1.79 by dl, Fri Nov 23 17:50:51 2012 UTC

# Line 28 | Line 28 | import java.util.concurrent.atomic.Atomi
28  
29   import java.io.Serializable;
30  
31 + import java.util.Comparator;
32 + import java.util.Arrays;
33 + import java.util.Map;
34 + import java.util.Set;
35 + import java.util.Collection;
36 + import java.util.AbstractMap;
37 + import java.util.AbstractSet;
38 + import java.util.AbstractCollection;
39 + import java.util.Hashtable;
40 + import java.util.HashMap;
41 + import java.util.Iterator;
42 + import java.util.Enumeration;
43 + import java.util.ConcurrentModificationException;
44 + import java.util.NoSuchElementException;
45 + import java.util.concurrent.ConcurrentMap;
46 + import java.util.concurrent.ThreadLocalRandom;
47 + import java.util.concurrent.locks.LockSupport;
48 + import java.util.concurrent.locks.AbstractQueuedSynchronizer;
49 + import java.util.concurrent.atomic.AtomicReference;
50 +
51 + import java.io.Serializable;
52 +
53   /**
54   * A hash table supporting full concurrency of retrievals and
55   * high expected concurrency for updates. This class obeys the
# Line 671 | Line 693 | public class ConcurrentHashMapV8<K, V>
693                                  try {
694                                      wait();
695                                  } catch (InterruptedException ie) {
696 <                                    Thread.currentThread().interrupt();
696 >                                    try {
697 >                                        Thread.currentThread().interrupt();
698 >                                    } catch (SecurityException ignore) {
699 >                                    }
700                                  }
701                              }
702                              else
# Line 2380 | Line 2405 | public class ConcurrentHashMapV8<K, V>
2405       * across threads, iteration terminates if a bounds checks fails
2406       * for a table read.
2407       *
2408 <     * This class extends ForkJoinTask to streamline parallel
2409 <     * iteration in bulk operations (see BulkTask). This adds only an
2410 <     * int of space overhead, which is close enough to negligible in
2411 <     * cases where it is not needed to not worry about it.  Because
2412 <     * ForkJoinTask is Serializable, but iterators need not be, we
2413 <     * need to add warning suppressions.
2408 >     * This class extends CountedCompleter to streamline parallel
2409 >     * iteration in bulk operations. This adds only a few fields of
2410 >     * space overhead, which is small enough in cases where it is not
2411 >     * needed to not worry about it.  Because CountedCompleter is
2412 >     * Serializable, but iterators need not be, we need to add warning
2413 >     * suppressions.
2414       */
2415 <    @SuppressWarnings("serial") static class Traverser<K,V,R> extends ForkJoinTask<R> {
2415 >    @SuppressWarnings("serial") static class Traverser<K,V,R> extends CountedCompleter<R> {
2416          final ConcurrentHashMapV8<K, V> map;
2417          Node next;           // the next entry to use
2418          Object nextKey;      // cached key field of next
# Line 2397 | Line 2422 | public class ConcurrentHashMapV8<K, V>
2422          int baseIndex;       // current index of initial table
2423          int baseLimit;       // index bound for initial table
2424          int baseSize;        // initial table size
2425 +        int batch;           // split control
2426  
2427          /** Creates iterator for all entries in the table. */
2428          Traverser(ConcurrentHashMapV8<K, V> map) {
2429              this.map = map;
2430          }
2431  
2432 <        /** Creates iterator for split() methods */
2433 <        Traverser(Traverser<K,V,?> it) {
2434 <            ConcurrentHashMapV8<K, V> m; Node[] t;
2435 <            if ((m = this.map = it.map) == null)
2436 <                t = null;
2437 <            else if ((t = it.tab) == null && // force parent tab initialization
2438 <                     (t = it.tab = m.table) != null)
2439 <                it.baseLimit = it.baseSize = t.length;
2440 <            this.tab = t;
2441 <            this.baseSize = it.baseSize;
2442 <            it.baseLimit = this.index = this.baseIndex =
2443 <                ((this.baseLimit = it.baseLimit) + it.baseIndex + 1) >>> 1;
2432 >        /** Creates iterator for split() methods and task constructors */
2433 >        Traverser(ConcurrentHashMapV8<K,V> map, Traverser<K,V,?> it, int batch) {
2434 >            super(it);
2435 >            this.batch = batch;
2436 >            if ((this.map = map) != null && it != null) { // split parent
2437 >                Node[] t;
2438 >                if ((t = it.tab) == null &&
2439 >                    (t = it.tab = map.table) != null)
2440 >                    it.baseLimit = it.baseSize = t.length;
2441 >                this.tab = t;
2442 >                this.baseSize = it.baseSize;
2443 >                int hi = this.baseLimit = it.baseLimit;
2444 >                it.baseLimit = this.index = this.baseIndex =
2445 >                    (hi + it.baseIndex + 1) >>> 1;
2446 >            }
2447          }
2448  
2449          /**
# Line 2467 | Line 2496 | public class ConcurrentHashMapV8<K, V>
2496          }
2497  
2498          public final boolean hasMoreElements() { return hasNext(); }
2499 <        public final void setRawResult(Object x) { }
2500 <        public R getRawResult() { return null; }
2501 <        public boolean exec() { return true; }
2499 >
2500 >        public void compute() { } // default no-op CountedCompleter body
2501 >
2502 >        /**
2503 >         * Returns a batch value > 0 if this task should (and must) be
2504 >         * split, if so, adding to pending count, and in any case
2505 >         * updating batch value. The initial batch value is approx
2506 >         * exp2 of the number of times (minus one) to split task by
2507 >         * two before executing leaf action. This value is faster to
2508 >         * compute and more convenient to use as a guide to splitting
2509 >         * than is the depth, since it is used while dividing by two
2510 >         * anyway.
2511 >         */
2512 >        final int preSplit() {
2513 >            ConcurrentHashMapV8<K, V> m; int b; Node[] t;  ForkJoinPool pool;
2514 >            if ((b = batch) < 0 && (m = map) != null) { // force initialization
2515 >                if ((t = tab) == null && (t = tab = m.table) != null)
2516 >                    baseLimit = baseSize = t.length;
2517 >                if (t != null) {
2518 >                    long n = m.counter.sum();
2519 >                    int par = ((pool = getPool()) == null) ?
2520 >                        ForkJoinPool.getCommonPoolParallelism() :
2521 >                        pool.getParallelism();
2522 >                    int sp = par << 3; // slack of 8
2523 >                    b = (n <= 0L) ? 0 : (n < (long)sp) ? (int)n : sp;
2524 >                }
2525 >            }
2526 >            b = (b <= 1 || baseIndex == baseLimit)? 0 : (b >>> 1);
2527 >            if ((batch = b) > 0)
2528 >                addToPendingCount(1);
2529 >            return b;
2530 >        }
2531 >
2532      }
2533  
2534      /* ---------------- Public operations -------------- */
# Line 2609 | Line 2668 | public class ConcurrentHashMapV8<K, V>
2668       * Returns the number of mappings. This method should be used
2669       * instead of {@link #size} because a ConcurrentHashMapV8 may
2670       * contain more mappings than can be represented as an int. The
2671 <     * value returned is a snapshot; the actual count may differ if
2672 <     * there are ongoing concurrent insertions or removals.
2671 >     * value returned is an estimate; the actual count may differ if
2672 >     * there are concurrent insertions or removals.
2673       *
2674       * @return the number of mappings
2675       */
# Line 3171 | Line 3230 | public class ConcurrentHashMapV8<K, V>
3230      @SuppressWarnings("serial") static final class KeyIterator<K,V> extends Traverser<K,V,Object>
3231          implements Spliterator<K>, Enumeration<K> {
3232          KeyIterator(ConcurrentHashMapV8<K, V> map) { super(map); }
3233 <        KeyIterator(Traverser<K,V,Object> it) {
3234 <            super(it);
3233 >        KeyIterator(ConcurrentHashMapV8<K, V> map, Traverser<K,V,Object> it) {
3234 >            super(map, it, -1);
3235          }
3236          public KeyIterator<K,V> split() {
3237              if (nextKey != null)
3238                  throw new IllegalStateException();
3239 <            return new KeyIterator<K,V>(this);
3239 >            return new KeyIterator<K,V>(map, this);
3240          }
3241          @SuppressWarnings("unchecked") public final K next() {
3242              if (nextVal == null && advance() == null)
# Line 3193 | Line 3252 | public class ConcurrentHashMapV8<K, V>
3252      @SuppressWarnings("serial") static final class ValueIterator<K,V> extends Traverser<K,V,Object>
3253          implements Spliterator<V>, Enumeration<V> {
3254          ValueIterator(ConcurrentHashMapV8<K, V> map) { super(map); }
3255 <        ValueIterator(Traverser<K,V,Object> it) {
3256 <            super(it);
3255 >        ValueIterator(ConcurrentHashMapV8<K, V> map, Traverser<K,V,Object> it) {
3256 >            super(map, it, -1);
3257          }
3258          public ValueIterator<K,V> split() {
3259              if (nextKey != null)
3260                  throw new IllegalStateException();
3261 <            return new ValueIterator<K,V>(this);
3261 >            return new ValueIterator<K,V>(map, this);
3262          }
3263  
3264          @SuppressWarnings("unchecked") public final V next() {
# Line 3216 | Line 3275 | public class ConcurrentHashMapV8<K, V>
3275      @SuppressWarnings("serial") static final class EntryIterator<K,V> extends Traverser<K,V,Object>
3276          implements Spliterator<Map.Entry<K,V>> {
3277          EntryIterator(ConcurrentHashMapV8<K, V> map) { super(map); }
3278 <        EntryIterator(Traverser<K,V,Object> it) {
3279 <            super(it);
3278 >        EntryIterator(ConcurrentHashMapV8<K, V> map, Traverser<K,V,Object> it) {
3279 >            super(map, it, -1);
3280          }
3281          public EntryIterator<K,V> split() {
3282              if (nextKey != null)
3283                  throw new IllegalStateException();
3284 <            return new EntryIterator<K,V>(this);
3284 >            return new EntryIterator<K,V>(map, this);
3285          }
3286  
3287          @SuppressWarnings("unchecked") public final Map.Entry<K,V> next() {
# Line 3278 | Line 3337 | public class ConcurrentHashMapV8<K, V>
3337          }
3338      }
3339  
3340 +    /**
3341 +     * Returns exportable snapshot entry for the given key and value
3342 +     * when write-through can't or shouldn't be used.
3343 +     */
3344 +    static <K,V> AbstractMap.SimpleEntry<K,V> entryFor(K k, V v) {
3345 +        return new AbstractMap.SimpleEntry<K,V>(k, v);
3346 +    }
3347 +
3348      /* ---------------- Serialization Support -------------- */
3349  
3350      /**
# Line 4665 | Line 4732 | public class ConcurrentHashMapV8<K, V>
4732              (ConcurrentHashMapV8<K,V> map,
4733               BiAction<K,V> action) {
4734              if (action == null) throw new NullPointerException();
4735 <            return new ForEachMappingTask<K,V>(map, null, -1, null, action);
4735 >            return new ForEachMappingTask<K,V>(map, null, -1, action);
4736          }
4737  
4738          /**
# Line 4686 | Line 4753 | public class ConcurrentHashMapV8<K, V>
4753              if (transformer == null || action == null)
4754                  throw new NullPointerException();
4755              return new ForEachTransformedMappingTask<K,V,U>
4756 <                (map, null, -1, null, transformer, action);
4756 >                (map, null, -1, transformer, action);
4757          }
4758  
4759          /**
# Line 4706 | Line 4773 | public class ConcurrentHashMapV8<K, V>
4773               BiFun<? super K, ? super V, ? extends U> searchFunction) {
4774              if (searchFunction == null) throw new NullPointerException();
4775              return new SearchMappingsTask<K,V,U>
4776 <                (map, null, -1, null, searchFunction,
4776 >                (map, null, -1, searchFunction,
4777                   new AtomicReference<U>());
4778          }
4779  
# Line 4815 | Line 4882 | public class ConcurrentHashMapV8<K, V>
4882              (ConcurrentHashMapV8<K,V> map,
4883               Action<K> action) {
4884              if (action == null) throw new NullPointerException();
4885 <            return new ForEachKeyTask<K,V>(map, null, -1, null, action);
4885 >            return new ForEachKeyTask<K,V>(map, null, -1, action);
4886          }
4887  
4888          /**
# Line 4836 | Line 4903 | public class ConcurrentHashMapV8<K, V>
4903              if (transformer == null || action == null)
4904                  throw new NullPointerException();
4905              return new ForEachTransformedKeyTask<K,V,U>
4906 <                (map, null, -1, null, transformer, action);
4906 >                (map, null, -1, transformer, action);
4907          }
4908  
4909          /**
# Line 4856 | Line 4923 | public class ConcurrentHashMapV8<K, V>
4923               Fun<? super K, ? extends U> searchFunction) {
4924              if (searchFunction == null) throw new NullPointerException();
4925              return new SearchKeysTask<K,V,U>
4926 <                (map, null, -1, null, searchFunction,
4926 >                (map, null, -1, searchFunction,
4927                   new AtomicReference<U>());
4928          }
4929  
# Line 4982 | Line 5049 | public class ConcurrentHashMapV8<K, V>
5049              (ConcurrentHashMapV8<K,V> map,
5050               Action<V> action) {
5051              if (action == null) throw new NullPointerException();
5052 <            return new ForEachValueTask<K,V>(map, null, -1, null, action);
5052 >            return new ForEachValueTask<K,V>(map, null, -1, action);
5053          }
5054  
5055          /**
# Line 5002 | Line 5069 | public class ConcurrentHashMapV8<K, V>
5069              if (transformer == null || action == null)
5070                  throw new NullPointerException();
5071              return new ForEachTransformedValueTask<K,V,U>
5072 <                (map, null, -1, null, transformer, action);
5072 >                (map, null, -1, transformer, action);
5073          }
5074  
5075          /**
# Line 5022 | Line 5089 | public class ConcurrentHashMapV8<K, V>
5089               Fun<? super V, ? extends U> searchFunction) {
5090              if (searchFunction == null) throw new NullPointerException();
5091              return new SearchValuesTask<K,V,U>
5092 <                (map, null, -1, null, searchFunction,
5092 >                (map, null, -1, searchFunction,
5093                   new AtomicReference<U>());
5094          }
5095  
# Line 5148 | Line 5215 | public class ConcurrentHashMapV8<K, V>
5215              (ConcurrentHashMapV8<K,V> map,
5216               Action<Map.Entry<K,V>> action) {
5217              if (action == null) throw new NullPointerException();
5218 <            return new ForEachEntryTask<K,V>(map, null, -1, null, action);
5218 >            return new ForEachEntryTask<K,V>(map, null, -1, action);
5219          }
5220  
5221          /**
# Line 5168 | Line 5235 | public class ConcurrentHashMapV8<K, V>
5235              if (transformer == null || action == null)
5236                  throw new NullPointerException();
5237              return new ForEachTransformedEntryTask<K,V,U>
5238 <                (map, null, -1, null, transformer, action);
5238 >                (map, null, -1, transformer, action);
5239          }
5240  
5241          /**
# Line 5188 | Line 5255 | public class ConcurrentHashMapV8<K, V>
5255               Fun<Map.Entry<K,V>, ? extends U> searchFunction) {
5256              if (searchFunction == null) throw new NullPointerException();
5257              return new SearchEntriesTask<K,V,U>
5258 <                (map, null, -1, null, searchFunction,
5258 >                (map, null, -1, searchFunction,
5259                   new AtomicReference<U>());
5260          }
5261  
# Line 5306 | Line 5373 | public class ConcurrentHashMapV8<K, V>
5373  
5374      // -------------------------------------------------------
5375  
5309    /**
5310     * Base for FJ tasks for bulk operations. This adds a variant of
5311     * CountedCompleters and some split and merge bookkeeping to
5312     * iterator functionality. The forEach and reduce methods are
5313     * similar to those illustrated in CountedCompleter documentation,
5314     * except that bottom-up reduction completions perform them within
5315     * their compute methods. The search methods are like forEach
5316     * except they continually poll for success and exit early.  Also,
5317     * exceptions are handled in a simpler manner, by just trying to
5318     * complete root task exceptionally.
5319     */
5320    @SuppressWarnings("serial") static abstract class BulkTask<K,V,R> extends Traverser<K,V,R> {
5321        final BulkTask<K,V,?> parent;  // completion target
5322        int batch;                     // split control; -1 for unknown
5323        int pending;                   // completion control
5324
5325        BulkTask(ConcurrentHashMapV8<K,V> map, BulkTask<K,V,?> parent,
5326                 int batch) {
5327            super(map);
5328            this.parent = parent;
5329            this.batch = batch;
5330            if (parent != null && map != null) { // split parent
5331                Node[] t;
5332                if ((t = parent.tab) == null &&
5333                    (t = parent.tab = map.table) != null)
5334                    parent.baseLimit = parent.baseSize = t.length;
5335                this.tab = t;
5336                this.baseSize = parent.baseSize;
5337                int hi = this.baseLimit = parent.baseLimit;
5338                parent.baseLimit = this.index = this.baseIndex =
5339                    (hi + parent.baseIndex + 1) >>> 1;
5340            }
5341        }
5342
5343        /**
5344         * Forces root task to complete.
5345         * @param ex if null, complete normally, else exceptionally
5346         * @return false to simplify use
5347         */
5348        final boolean tryCompleteComputation(Throwable ex) {
5349            for (BulkTask<K,V,?> a = this;;) {
5350                BulkTask<K,V,?> p = a.parent;
5351                if (p == null) {
5352                    if (ex != null)
5353                        a.completeExceptionally(ex);
5354                    else
5355                        a.quietlyComplete();
5356                    return false;
5357                }
5358                a = p;
5359            }
5360        }
5361
5362        /**
5363         * Version of tryCompleteComputation for function screening checks
5364         */
5365        final boolean abortOnNullFunction() {
5366            return tryCompleteComputation(new Error("Unexpected null function"));
5367        }
5368
5369        // utilities
5370
5371        /** CompareAndSet pending count */
5372        final boolean casPending(int cmp, int val) {
5373            return U.compareAndSwapInt(this, PENDING, cmp, val);
5374        }
5375
5376        /**
5377         * Returns approx exp2 of the number of times (minus one) to
5378         * split task by two before executing leaf action. This value
5379         * is faster to compute and more convenient to use as a guide
5380         * to splitting than is the depth, since it is used while
5381         * dividing by two anyway.
5382         */
5383        final int batch() {
5384            ConcurrentHashMapV8<K, V> m; int b; Node[] t;  ForkJoinPool pool;
5385            if ((b = batch) < 0 && (m = map) != null) { // force initialization
5386                if ((t = tab) == null && (t = tab = m.table) != null)
5387                    baseLimit = baseSize = t.length;
5388                if (t != null) {
5389                    long n = m.counter.sum();
5390                    int par = ((pool = getPool()) == null) ?
5391                        ForkJoinPool.getCommonPoolParallelism() :
5392                        pool.getParallelism();
5393                    int sp = par << 3; // slack of 8
5394                    b = batch = (n <= 0L) ? 0 : (n < (long)sp) ? (int)n : sp;
5395                }
5396            }
5397            return b;
5398        }
5399
5400        /**
5401         * Returns exportable snapshot entry.
5402         */
5403        static <K,V> AbstractMap.SimpleEntry<K,V> entryFor(K k, V v) {
5404            return new AbstractMap.SimpleEntry<K,V>(k, v);
5405        }
5406
5407        // Unsafe mechanics
5408        private static final sun.misc.Unsafe U;
5409        private static final long PENDING;
5410        static {
5411            try {
5412                U = getUnsafe();
5413                PENDING = U.objectFieldOffset
5414                    (BulkTask.class.getDeclaredField("pending"));
5415            } catch (Exception e) {
5416                throw new Error(e);
5417            }
5418        }
5419    }
5420
5421    /**
5422     * Base class for non-reductive actions
5423     */
5424    @SuppressWarnings("serial") static abstract class BulkAction<K,V,R> extends BulkTask<K,V,R> {
5425        BulkAction<K,V,?> nextTask;
5426        BulkAction(ConcurrentHashMapV8<K,V> map, BulkTask<K,V,?> parent,
5427                   int batch, BulkAction<K,V,?> nextTask) {
5428            super(map, parent, batch);
5429            this.nextTask = nextTask;
5430        }
5431
5432        /**
5433         * Try to complete task and upward parents. Upon hitting
5434         * non-completed parent, if a non-FJ task, try to help out the
5435         * computation.
5436         */
5437        final void tryComplete(BulkAction<K,V,?> subtasks) {
5438            BulkTask<K,V,?> a = this, s = a;
5439            for (int c;;) {
5440                if ((c = a.pending) == 0) {
5441                    if ((a = (s = a).parent) == null) {
5442                        s.quietlyComplete();
5443                        break;
5444                    }
5445                }
5446                else if (a.casPending(c, c - 1)) {
5447                    if (subtasks != null && !inForkJoinPool()) {
5448                        while ((s = a.parent) != null)
5449                            a = s;
5450                        while (!a.isDone()) {
5451                            BulkAction<K,V,?> next = subtasks.nextTask;
5452                            if (subtasks.tryUnfork())
5453                                subtasks.exec();
5454                            if ((subtasks = next) == null)
5455                                break;
5456                        }
5457                    }
5458                    break;
5459                }
5460            }
5461        }
5462
5463    }
5464
5376      /*
5377       * Task classes. Coded in a regular but ugly format/style to
5378       * simplify checks that each variant differs in the right way from
# Line 5469 | Line 5380 | public class ConcurrentHashMapV8<K, V>
5380       */
5381  
5382      @SuppressWarnings("serial") static final class ForEachKeyTask<K,V>
5383 <        extends BulkAction<K,V,Void> {
5383 >        extends Traverser<K,V,Void> {
5384          final Action<K> action;
5385          ForEachKeyTask
5386 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5476 <             ForEachKeyTask<K,V> nextTask,
5386 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5387               Action<K> action) {
5388 <            super(m, p, b, nextTask);
5388 >            super(m, p, b);
5389              this.action = action;
5390          }
5391 <        @SuppressWarnings("unchecked") public final boolean exec() {
5392 <            final Action<K> action = this.action;
5393 <            if (action == null)
5394 <                return abortOnNullFunction();
5395 <            ForEachKeyTask<K,V> subtasks = null;
5396 <            try {
5397 <                int b = batch(), c;
5398 <                while (b > 1 && baseIndex != baseLimit) {
5399 <                    do {} while (!casPending(c = pending, c+1));
5490 <                    (subtasks = new ForEachKeyTask<K,V>
5491 <                     (map, this, b >>>= 1, subtasks, action)).fork();
5492 <                }
5493 <                while (advance() != null)
5494 <                    action.apply((K)nextKey);
5495 <            } catch (Throwable ex) {
5496 <                return tryCompleteComputation(ex);
5497 <            }
5498 <            tryComplete(subtasks);
5499 <            return false;
5391 >        @SuppressWarnings("unchecked") public final void compute() {
5392 >            final Action<K> action;
5393 >            if ((action = this.action) == null)
5394 >                throw new NullPointerException();
5395 >            for (int b; (b = preSplit()) > 0;)
5396 >                new ForEachKeyTask<K,V>(map, this, b, action).fork();
5397 >            while (advance() != null)
5398 >                action.apply((K)nextKey);
5399 >            propagateCompletion();
5400          }
5401      }
5402  
5403      @SuppressWarnings("serial") static final class ForEachValueTask<K,V>
5404 <        extends BulkAction<K,V,Void> {
5404 >        extends Traverser<K,V,Void> {
5405          final Action<V> action;
5406          ForEachValueTask
5407 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5508 <             ForEachValueTask<K,V> nextTask,
5407 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5408               Action<V> action) {
5409 <            super(m, p, b, nextTask);
5409 >            super(m, p, b);
5410              this.action = action;
5411          }
5412 <        @SuppressWarnings("unchecked") public final boolean exec() {
5413 <            final Action<V> action = this.action;
5414 <            if (action == null)
5415 <                return abortOnNullFunction();
5416 <            ForEachValueTask<K,V> subtasks = null;
5417 <            try {
5418 <                int b = batch(), c;
5419 <                while (b > 1 && baseIndex != baseLimit) {
5420 <                    do {} while (!casPending(c = pending, c+1));
5421 <                    (subtasks = new ForEachValueTask<K,V>
5523 <                     (map, this, b >>>= 1, subtasks, action)).fork();
5524 <                }
5525 <                Object v;
5526 <                while ((v = advance()) != null)
5527 <                    action.apply((V)v);
5528 <            } catch (Throwable ex) {
5529 <                return tryCompleteComputation(ex);
5530 <            }
5531 <            tryComplete(subtasks);
5532 <            return false;
5412 >        @SuppressWarnings("unchecked") public final void compute() {
5413 >            final Action<V> action;
5414 >            if ((action = this.action) == null)
5415 >                throw new NullPointerException();
5416 >            for (int b; (b = preSplit()) > 0;)
5417 >                new ForEachValueTask<K,V>(map, this, b, action).fork();
5418 >            Object v;
5419 >            while ((v = advance()) != null)
5420 >                action.apply((V)v);
5421 >            propagateCompletion();
5422          }
5423      }
5424  
5425      @SuppressWarnings("serial") static final class ForEachEntryTask<K,V>
5426 <        extends BulkAction<K,V,Void> {
5426 >        extends Traverser<K,V,Void> {
5427          final Action<Entry<K,V>> action;
5428          ForEachEntryTask
5429 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5541 <             ForEachEntryTask<K,V> nextTask,
5429 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5430               Action<Entry<K,V>> action) {
5431 <            super(m, p, b, nextTask);
5431 >            super(m, p, b);
5432              this.action = action;
5433          }
5434 <        @SuppressWarnings("unchecked") public final boolean exec() {
5435 <            final Action<Entry<K,V>> action = this.action;
5436 <            if (action == null)
5437 <                return abortOnNullFunction();
5438 <            ForEachEntryTask<K,V> subtasks = null;
5439 <            try {
5440 <                int b = batch(), c;
5441 <                while (b > 1 && baseIndex != baseLimit) {
5442 <                    do {} while (!casPending(c = pending, c+1));
5443 <                    (subtasks = new ForEachEntryTask<K,V>
5556 <                     (map, this, b >>>= 1, subtasks, action)).fork();
5557 <                }
5558 <                Object v;
5559 <                while ((v = advance()) != null)
5560 <                    action.apply(entryFor((K)nextKey, (V)v));
5561 <            } catch (Throwable ex) {
5562 <                return tryCompleteComputation(ex);
5563 <            }
5564 <            tryComplete(subtasks);
5565 <            return false;
5434 >        @SuppressWarnings("unchecked") public final void compute() {
5435 >            final Action<Entry<K,V>> action;
5436 >            if ((action = this.action) == null)
5437 >                throw new NullPointerException();
5438 >            for (int b; (b = preSplit()) > 0;)
5439 >                new ForEachEntryTask<K,V>(map, this, b, action).fork();
5440 >            Object v;
5441 >            while ((v = advance()) != null)
5442 >                action.apply(entryFor((K)nextKey, (V)v));
5443 >            propagateCompletion();
5444          }
5445      }
5446  
5447      @SuppressWarnings("serial") static final class ForEachMappingTask<K,V>
5448 <        extends BulkAction<K,V,Void> {
5448 >        extends Traverser<K,V,Void> {
5449          final BiAction<K,V> action;
5450          ForEachMappingTask
5451 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5574 <             ForEachMappingTask<K,V> nextTask,
5451 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5452               BiAction<K,V> action) {
5453 <            super(m, p, b, nextTask);
5453 >            super(m, p, b);
5454              this.action = action;
5455          }
5456 <        @SuppressWarnings("unchecked") public final boolean exec() {
5457 <            final BiAction<K,V> action = this.action;
5458 <            if (action == null)
5459 <                return abortOnNullFunction();
5460 <            ForEachMappingTask<K,V> subtasks = null;
5461 <            try {
5462 <                int b = batch(), c;
5463 <                while (b > 1 && baseIndex != baseLimit) {
5464 <                    do {} while (!casPending(c = pending, c+1));
5465 <                    (subtasks = new ForEachMappingTask<K,V>
5589 <                     (map, this, b >>>= 1, subtasks, action)).fork();
5590 <                }
5591 <                Object v;
5592 <                while ((v = advance()) != null)
5593 <                    action.apply((K)nextKey, (V)v);
5594 <            } catch (Throwable ex) {
5595 <                return tryCompleteComputation(ex);
5596 <            }
5597 <            tryComplete(subtasks);
5598 <            return false;
5456 >        @SuppressWarnings("unchecked") public final void compute() {
5457 >            final BiAction<K,V> action;
5458 >            if ((action = this.action) == null)
5459 >                throw new NullPointerException();
5460 >            for (int b; (b = preSplit()) > 0;)
5461 >                new ForEachMappingTask<K,V>(map, this, b, action).fork();
5462 >            Object v;
5463 >            while ((v = advance()) != null)
5464 >                action.apply((K)nextKey, (V)v);
5465 >            propagateCompletion();
5466          }
5467      }
5468  
5469      @SuppressWarnings("serial") static final class ForEachTransformedKeyTask<K,V,U>
5470 <        extends BulkAction<K,V,Void> {
5470 >        extends Traverser<K,V,Void> {
5471          final Fun<? super K, ? extends U> transformer;
5472          final Action<U> action;
5473          ForEachTransformedKeyTask
5474 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5475 <             ForEachTransformedKeyTask<K,V,U> nextTask,
5476 <             Fun<? super K, ? extends U> transformer,
5477 <             Action<U> action) {
5478 <            super(m, p, b, nextTask);
5479 <            this.transformer = transformer;
5480 <            this.action = action;
5481 <
5482 <        }
5483 <        @SuppressWarnings("unchecked") public final boolean exec() {
5484 <            final Fun<? super K, ? extends U> transformer =
5485 <                this.transformer;
5486 <            final Action<U> action = this.action;
5487 <            if (transformer == null || action == null)
5488 <                return abortOnNullFunction();
5489 <            ForEachTransformedKeyTask<K,V,U> subtasks = null;
5490 <            try {
5491 <                int b = batch(), c;
5625 <                while (b > 1 && baseIndex != baseLimit) {
5626 <                    do {} while (!casPending(c = pending, c+1));
5627 <                    (subtasks = new ForEachTransformedKeyTask<K,V,U>
5628 <                     (map, this, b >>>= 1, subtasks, transformer, action)).fork();
5629 <                }
5630 <                U u;
5631 <                while (advance() != null) {
5632 <                    if ((u = transformer.apply((K)nextKey)) != null)
5633 <                        action.apply(u);
5634 <                }
5635 <            } catch (Throwable ex) {
5636 <                return tryCompleteComputation(ex);
5474 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5475 >             Fun<? super K, ? extends U> transformer, Action<U> action) {
5476 >            super(m, p, b);
5477 >            this.transformer = transformer; this.action = action;
5478 >        }
5479 >        @SuppressWarnings("unchecked") public final void compute() {
5480 >            final Fun<? super K, ? extends U> transformer;
5481 >            final Action<U> action;
5482 >            if ((transformer = this.transformer) == null ||
5483 >                (action = this.action) == null)
5484 >                throw new NullPointerException();
5485 >            for (int b; (b = preSplit()) > 0;)
5486 >                new ForEachTransformedKeyTask<K,V,U>
5487 >                     (map, this, b, transformer, action).fork();
5488 >            U u;
5489 >            while (advance() != null) {
5490 >                if ((u = transformer.apply((K)nextKey)) != null)
5491 >                    action.apply(u);
5492              }
5493 <            tryComplete(subtasks);
5639 <            return false;
5493 >            propagateCompletion();
5494          }
5495      }
5496  
5497      @SuppressWarnings("serial") static final class ForEachTransformedValueTask<K,V,U>
5498 <        extends BulkAction<K,V,Void> {
5498 >        extends Traverser<K,V,Void> {
5499          final Fun<? super V, ? extends U> transformer;
5500          final Action<U> action;
5501          ForEachTransformedValueTask
5502 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5503 <             ForEachTransformedValueTask<K,V,U> nextTask,
5504 <             Fun<? super V, ? extends U> transformer,
5505 <             Action<U> action) {
5506 <            super(m, p, b, nextTask);
5507 <            this.transformer = transformer;
5508 <            this.action = action;
5509 <
5510 <        }
5511 <        @SuppressWarnings("unchecked") public final boolean exec() {
5512 <            final Fun<? super V, ? extends U> transformer =
5513 <                this.transformer;
5514 <            final Action<U> action = this.action;
5515 <            if (transformer == null || action == null)
5516 <                return abortOnNullFunction();
5517 <            ForEachTransformedValueTask<K,V,U> subtasks = null;
5518 <            try {
5519 <                int b = batch(), c;
5666 <                while (b > 1 && baseIndex != baseLimit) {
5667 <                    do {} while (!casPending(c = pending, c+1));
5668 <                    (subtasks = new ForEachTransformedValueTask<K,V,U>
5669 <                     (map, this, b >>>= 1, subtasks, transformer, action)).fork();
5670 <                }
5671 <                Object v; U u;
5672 <                while ((v = advance()) != null) {
5673 <                    if ((u = transformer.apply((V)v)) != null)
5674 <                        action.apply(u);
5675 <                }
5676 <            } catch (Throwable ex) {
5677 <                return tryCompleteComputation(ex);
5502 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5503 >             Fun<? super V, ? extends U> transformer, Action<U> action) {
5504 >            super(m, p, b);
5505 >            this.transformer = transformer; this.action = action;
5506 >        }
5507 >        @SuppressWarnings("unchecked") public final void compute() {
5508 >            final Fun<? super V, ? extends U> transformer;
5509 >            final Action<U> action;
5510 >            if ((transformer = this.transformer) == null ||
5511 >                (action = this.action) == null)
5512 >                throw new NullPointerException();
5513 >            for (int b; (b = preSplit()) > 0;)
5514 >                new ForEachTransformedValueTask<K,V,U>
5515 >                    (map, this, b, transformer, action).fork();
5516 >            Object v; U u;
5517 >            while ((v = advance()) != null) {
5518 >                if ((u = transformer.apply((V)v)) != null)
5519 >                    action.apply(u);
5520              }
5521 <            tryComplete(subtasks);
5680 <            return false;
5521 >            propagateCompletion();
5522          }
5523      }
5524  
5525      @SuppressWarnings("serial") static final class ForEachTransformedEntryTask<K,V,U>
5526 <        extends BulkAction<K,V,Void> {
5526 >        extends Traverser<K,V,Void> {
5527          final Fun<Map.Entry<K,V>, ? extends U> transformer;
5528          final Action<U> action;
5529          ForEachTransformedEntryTask
5530 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5531 <             ForEachTransformedEntryTask<K,V,U> nextTask,
5532 <             Fun<Map.Entry<K,V>, ? extends U> transformer,
5533 <             Action<U> action) {
5534 <            super(m, p, b, nextTask);
5535 <            this.transformer = transformer;
5536 <            this.action = action;
5537 <
5538 <        }
5539 <        @SuppressWarnings("unchecked") public final boolean exec() {
5540 <            final Fun<Map.Entry<K,V>, ? extends U> transformer =
5541 <                this.transformer;
5542 <            final Action<U> action = this.action;
5543 <            if (transformer == null || action == null)
5544 <                return abortOnNullFunction();
5545 <            ForEachTransformedEntryTask<K,V,U> subtasks = null;
5546 <            try {
5547 <                int b = batch(), c;
5707 <                while (b > 1 && baseIndex != baseLimit) {
5708 <                    do {} while (!casPending(c = pending, c+1));
5709 <                    (subtasks = new ForEachTransformedEntryTask<K,V,U>
5710 <                     (map, this, b >>>= 1, subtasks, transformer, action)).fork();
5711 <                }
5712 <                Object v; U u;
5713 <                while ((v = advance()) != null) {
5714 <                    if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
5715 <                        action.apply(u);
5716 <                }
5717 <            } catch (Throwable ex) {
5718 <                return tryCompleteComputation(ex);
5530 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5531 >             Fun<Map.Entry<K,V>, ? extends U> transformer, Action<U> action) {
5532 >            super(m, p, b);
5533 >            this.transformer = transformer; this.action = action;
5534 >        }
5535 >        @SuppressWarnings("unchecked") public final void compute() {
5536 >            final Fun<Map.Entry<K,V>, ? extends U> transformer;
5537 >            final Action<U> action;
5538 >            if ((transformer = this.transformer) == null ||
5539 >                (action = this.action) == null)
5540 >                throw new NullPointerException();
5541 >            for (int b; (b = preSplit()) > 0;)
5542 >                new ForEachTransformedEntryTask<K,V,U>
5543 >                    (map, this, b, transformer, action).fork();
5544 >            Object v; U u;
5545 >            while ((v = advance()) != null) {
5546 >                if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
5547 >                    action.apply(u);
5548              }
5549 <            tryComplete(subtasks);
5721 <            return false;
5549 >            propagateCompletion();
5550          }
5551      }
5552  
5553      @SuppressWarnings("serial") static final class ForEachTransformedMappingTask<K,V,U>
5554 <        extends BulkAction<K,V,Void> {
5554 >        extends Traverser<K,V,Void> {
5555          final BiFun<? super K, ? super V, ? extends U> transformer;
5556          final Action<U> action;
5557          ForEachTransformedMappingTask
5558 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5731 <             ForEachTransformedMappingTask<K,V,U> nextTask,
5558 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5559               BiFun<? super K, ? super V, ? extends U> transformer,
5560               Action<U> action) {
5561 <            super(m, p, b, nextTask);
5562 <            this.transformer = transformer;
5736 <            this.action = action;
5737 <
5561 >            super(m, p, b);
5562 >            this.transformer = transformer; this.action = action;
5563          }
5564 <        @SuppressWarnings("unchecked") public final boolean exec() {
5565 <            final BiFun<? super K, ? super V, ? extends U> transformer =
5566 <                this.transformer;
5567 <            final Action<U> action = this.action;
5568 <            if (transformer == null || action == null)
5569 <                return abortOnNullFunction();
5570 <            ForEachTransformedMappingTask<K,V,U> subtasks = null;
5571 <            try {
5572 <                int b = batch(), c;
5573 <                while (b > 1 && baseIndex != baseLimit) {
5574 <                    do {} while (!casPending(c = pending, c+1));
5575 <                    (subtasks = new ForEachTransformedMappingTask<K,V,U>
5576 <                     (map, this, b >>>= 1, subtasks, transformer, action)).fork();
5752 <                }
5753 <                Object v; U u;
5754 <                while ((v = advance()) != null) {
5755 <                    if ((u = transformer.apply((K)nextKey, (V)v)) != null)
5756 <                        action.apply(u);
5757 <                }
5758 <            } catch (Throwable ex) {
5759 <                return tryCompleteComputation(ex);
5564 >        @SuppressWarnings("unchecked") public final void compute() {
5565 >            final BiFun<? super K, ? super V, ? extends U> transformer;
5566 >            final Action<U> action;
5567 >            if ((transformer = this.transformer) == null ||
5568 >                (action = this.action) == null)
5569 >                throw new NullPointerException();
5570 >            for (int b; (b = preSplit()) > 0;)
5571 >                new ForEachTransformedMappingTask<K,V,U>
5572 >                    (map, this, b, transformer, action).fork();
5573 >            Object v; U u;
5574 >            while ((v = advance()) != null) {
5575 >                if ((u = transformer.apply((K)nextKey, (V)v)) != null)
5576 >                    action.apply(u);
5577              }
5578 <            tryComplete(subtasks);
5762 <            return false;
5578 >            propagateCompletion();
5579          }
5580      }
5581  
5582      @SuppressWarnings("serial") static final class SearchKeysTask<K,V,U>
5583 <        extends BulkAction<K,V,U> {
5583 >        extends Traverser<K,V,U> {
5584          final Fun<? super K, ? extends U> searchFunction;
5585          final AtomicReference<U> result;
5586          SearchKeysTask
5587 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5772 <             SearchKeysTask<K,V,U> nextTask,
5587 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5588               Fun<? super K, ? extends U> searchFunction,
5589               AtomicReference<U> result) {
5590 <            super(m, p, b, nextTask);
5590 >            super(m, p, b);
5591              this.searchFunction = searchFunction; this.result = result;
5592          }
5593 <        @SuppressWarnings("unchecked") public final boolean exec() {
5594 <            AtomicReference<U> result = this.result;
5595 <            final Fun<? super K, ? extends U> searchFunction =
5596 <                this.searchFunction;
5597 <            if (searchFunction == null || result == null)
5598 <                return abortOnNullFunction();
5599 <            SearchKeysTask<K,V,U> subtasks = null;
5600 <            try {
5601 <                int b = batch(), c;
5602 <                while (b > 1 && baseIndex != baseLimit && result.get() == null) {
5603 <                    do {} while (!casPending(c = pending, c+1));
5604 <                    (subtasks = new SearchKeysTask<K,V,U>
5605 <                     (map, this, b >>>= 1, subtasks, searchFunction, result)).fork();
5606 <                }
5593 >        public final U getRawResult() { return result.get(); }
5594 >        @SuppressWarnings("unchecked") public final void compute() {
5595 >            final Fun<? super K, ? extends U> searchFunction;
5596 >            final AtomicReference<U> result;
5597 >            if ((searchFunction = this.searchFunction) == null ||
5598 >                (result = this.result) == null)
5599 >                throw new NullPointerException();
5600 >            for (int b;;) {
5601 >                if (result.get() != null)
5602 >                    return;
5603 >                if ((b = preSplit()) <= 0)
5604 >                    break;
5605 >                new SearchKeysTask<K,V,U>
5606 >                    (map, this, b, searchFunction, result).fork();
5607 >            }
5608 >            while (result.get() == null) {
5609                  U u;
5610 <                while (result.get() == null && advance() != null) {
5611 <                    if ((u = searchFunction.apply((K)nextKey)) != null) {
5612 <                        if (result.compareAndSet(null, u))
5613 <                            tryCompleteComputation(null);
5614 <                        break;
5615 <                    }
5610 >                if (advance() == null) {
5611 >                    propagateCompletion();
5612 >                    break;
5613 >                }
5614 >                if ((u = searchFunction.apply((K)nextKey)) != null) {
5615 >                    if (result.compareAndSet(null, u))
5616 >                        quietlyCompleteRoot();
5617 >                    break;
5618                  }
5800            } catch (Throwable ex) {
5801                return tryCompleteComputation(ex);
5619              }
5803            tryComplete(subtasks);
5804            return false;
5620          }
5806        public final U getRawResult() { return result.get(); }
5621      }
5622  
5623      @SuppressWarnings("serial") static final class SearchValuesTask<K,V,U>
5624 <        extends BulkAction<K,V,U> {
5624 >        extends Traverser<K,V,U> {
5625          final Fun<? super V, ? extends U> searchFunction;
5626          final AtomicReference<U> result;
5627          SearchValuesTask
5628 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5815 <             SearchValuesTask<K,V,U> nextTask,
5628 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5629               Fun<? super V, ? extends U> searchFunction,
5630               AtomicReference<U> result) {
5631 <            super(m, p, b, nextTask);
5631 >            super(m, p, b);
5632              this.searchFunction = searchFunction; this.result = result;
5633          }
5634 <        @SuppressWarnings("unchecked") public final boolean exec() {
5635 <            AtomicReference<U> result = this.result;
5636 <            final Fun<? super V, ? extends U> searchFunction =
5637 <                this.searchFunction;
5638 <            if (searchFunction == null || result == null)
5639 <                return abortOnNullFunction();
5640 <            SearchValuesTask<K,V,U> subtasks = null;
5641 <            try {
5642 <                int b = batch(), c;
5643 <                while (b > 1 && baseIndex != baseLimit && result.get() == null) {
5644 <                    do {} while (!casPending(c = pending, c+1));
5645 <                    (subtasks = new SearchValuesTask<K,V,U>
5646 <                     (map, this, b >>>= 1, subtasks, searchFunction, result)).fork();
5647 <                }
5634 >        public final U getRawResult() { return result.get(); }
5635 >        @SuppressWarnings("unchecked") public final void compute() {
5636 >            final Fun<? super V, ? extends U> searchFunction;
5637 >            final AtomicReference<U> result;
5638 >            if ((searchFunction = this.searchFunction) == null ||
5639 >                (result = this.result) == null)
5640 >                throw new NullPointerException();
5641 >            for (int b;;) {
5642 >                if (result.get() != null)
5643 >                    return;
5644 >                if ((b = preSplit()) <= 0)
5645 >                    break;
5646 >                new SearchValuesTask<K,V,U>
5647 >                    (map, this, b, searchFunction, result).fork();
5648 >            }
5649 >            while (result.get() == null) {
5650                  Object v; U u;
5651 <                while (result.get() == null && (v = advance()) != null) {
5652 <                    if ((u = searchFunction.apply((V)v)) != null) {
5653 <                        if (result.compareAndSet(null, u))
5654 <                            tryCompleteComputation(null);
5655 <                        break;
5656 <                    }
5651 >                if ((v = advance()) == null) {
5652 >                    propagateCompletion();
5653 >                    break;
5654 >                }
5655 >                if ((u = searchFunction.apply((V)v)) != null) {
5656 >                    if (result.compareAndSet(null, u))
5657 >                        quietlyCompleteRoot();
5658 >                    break;
5659                  }
5843            } catch (Throwable ex) {
5844                return tryCompleteComputation(ex);
5660              }
5846            tryComplete(subtasks);
5847            return false;
5661          }
5849        public final U getRawResult() { return result.get(); }
5662      }
5663  
5664      @SuppressWarnings("serial") static final class SearchEntriesTask<K,V,U>
5665 <        extends BulkAction<K,V,U> {
5665 >        extends Traverser<K,V,U> {
5666          final Fun<Entry<K,V>, ? extends U> searchFunction;
5667          final AtomicReference<U> result;
5668          SearchEntriesTask
5669 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5858 <             SearchEntriesTask<K,V,U> nextTask,
5669 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5670               Fun<Entry<K,V>, ? extends U> searchFunction,
5671               AtomicReference<U> result) {
5672 <            super(m, p, b, nextTask);
5672 >            super(m, p, b);
5673              this.searchFunction = searchFunction; this.result = result;
5674          }
5675 <        @SuppressWarnings("unchecked") public final boolean exec() {
5676 <            AtomicReference<U> result = this.result;
5677 <            final Fun<Entry<K,V>, ? extends U> searchFunction =
5678 <                this.searchFunction;
5679 <            if (searchFunction == null || result == null)
5680 <                return abortOnNullFunction();
5681 <            SearchEntriesTask<K,V,U> subtasks = null;
5682 <            try {
5683 <                int b = batch(), c;
5684 <                while (b > 1 && baseIndex != baseLimit && result.get() == null) {
5685 <                    do {} while (!casPending(c = pending, c+1));
5686 <                    (subtasks = new SearchEntriesTask<K,V,U>
5687 <                     (map, this, b >>>= 1, subtasks, searchFunction, result)).fork();
5688 <                }
5675 >        public final U getRawResult() { return result.get(); }
5676 >        @SuppressWarnings("unchecked") public final void compute() {
5677 >            final Fun<Entry<K,V>, ? extends U> searchFunction;
5678 >            final AtomicReference<U> result;
5679 >            if ((searchFunction = this.searchFunction) == null ||
5680 >                (result = this.result) == null)
5681 >                throw new NullPointerException();
5682 >            for (int b;;) {
5683 >                if (result.get() != null)
5684 >                    return;
5685 >                if ((b = preSplit()) <= 0)
5686 >                    break;
5687 >                new SearchEntriesTask<K,V,U>
5688 >                    (map, this, b, searchFunction, result).fork();
5689 >            }
5690 >            while (result.get() == null) {
5691                  Object v; U u;
5692 <                while (result.get() == null && (v = advance()) != null) {
5693 <                    if ((u = searchFunction.apply(entryFor((K)nextKey, (V)v))) != null) {
5694 <                        if (result.compareAndSet(null, u))
5695 <                            tryCompleteComputation(null);
5696 <                        break;
5697 <                    }
5692 >                if ((v = advance()) == null) {
5693 >                    propagateCompletion();
5694 >                    break;
5695 >                }
5696 >                if ((u = searchFunction.apply(entryFor((K)nextKey, (V)v))) != null) {
5697 >                    if (result.compareAndSet(null, u))
5698 >                        quietlyCompleteRoot();
5699 >                    return;
5700                  }
5886            } catch (Throwable ex) {
5887                return tryCompleteComputation(ex);
5701              }
5889            tryComplete(subtasks);
5890            return false;
5702          }
5892        public final U getRawResult() { return result.get(); }
5703      }
5704  
5705      @SuppressWarnings("serial") static final class SearchMappingsTask<K,V,U>
5706 <        extends BulkAction<K,V,U> {
5706 >        extends Traverser<K,V,U> {
5707          final BiFun<? super K, ? super V, ? extends U> searchFunction;
5708          final AtomicReference<U> result;
5709          SearchMappingsTask
5710 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5901 <             SearchMappingsTask<K,V,U> nextTask,
5710 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5711               BiFun<? super K, ? super V, ? extends U> searchFunction,
5712               AtomicReference<U> result) {
5713 <            super(m, p, b, nextTask);
5713 >            super(m, p, b);
5714              this.searchFunction = searchFunction; this.result = result;
5715          }
5716 <        @SuppressWarnings("unchecked") public final boolean exec() {
5717 <            AtomicReference<U> result = this.result;
5718 <            final BiFun<? super K, ? super V, ? extends U> searchFunction =
5719 <                this.searchFunction;
5720 <            if (searchFunction == null || result == null)
5721 <                return abortOnNullFunction();
5722 <            SearchMappingsTask<K,V,U> subtasks = null;
5723 <            try {
5724 <                int b = batch(), c;
5725 <                while (b > 1 && baseIndex != baseLimit && result.get() == null) {
5726 <                    do {} while (!casPending(c = pending, c+1));
5727 <                    (subtasks = new SearchMappingsTask<K,V,U>
5728 <                     (map, this, b >>>= 1, subtasks, searchFunction, result)).fork();
5729 <                }
5716 >        public final U getRawResult() { return result.get(); }
5717 >        @SuppressWarnings("unchecked") public final void compute() {
5718 >            final BiFun<? super K, ? super V, ? extends U> searchFunction;
5719 >            final AtomicReference<U> result;
5720 >            if ((searchFunction = this.searchFunction) == null ||
5721 >                (result = this.result) == null)
5722 >                throw new NullPointerException();
5723 >            for (int b;;) {
5724 >                if (result.get() != null)
5725 >                    return;
5726 >                if ((b = preSplit()) <= 0)
5727 >                    break;
5728 >                new SearchMappingsTask<K,V,U>
5729 >                    (map, this, b, searchFunction, result).fork();
5730 >            }
5731 >            while (result.get() == null) {
5732                  Object v; U u;
5733 <                while (result.get() == null && (v = advance()) != null) {
5734 <                    if ((u = searchFunction.apply((K)nextKey, (V)v)) != null) {
5735 <                        if (result.compareAndSet(null, u))
5736 <                            tryCompleteComputation(null);
5737 <                        break;
5738 <                    }
5733 >                if ((v = advance()) == null) {
5734 >                    propagateCompletion();
5735 >                    break;
5736 >                }
5737 >                if ((u = searchFunction.apply((K)nextKey, (V)v)) != null) {
5738 >                    if (result.compareAndSet(null, u))
5739 >                        quietlyCompleteRoot();
5740 >                    break;
5741                  }
5929            } catch (Throwable ex) {
5930                return tryCompleteComputation(ex);
5742              }
5932            tryComplete(subtasks);
5933            return false;
5743          }
5935        public final U getRawResult() { return result.get(); }
5744      }
5745  
5746      @SuppressWarnings("serial") static final class ReduceKeysTask<K,V>
5747 <        extends BulkTask<K,V,K> {
5747 >        extends Traverser<K,V,K> {
5748          final BiFun<? super K, ? super K, ? extends K> reducer;
5749          K result;
5750          ReduceKeysTask<K,V> rights, nextRight;
5751          ReduceKeysTask
5752 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5752 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5753               ReduceKeysTask<K,V> nextRight,
5754               BiFun<? super K, ? super K, ? extends K> reducer) {
5755              super(m, p, b); this.nextRight = nextRight;
5756              this.reducer = reducer;
5757          }
5758 <        @SuppressWarnings("unchecked") public final boolean exec() {
5758 >        public final K getRawResult() { return result; }
5759 >        @SuppressWarnings("unchecked") public final void compute() {
5760              final BiFun<? super K, ? super K, ? extends K> reducer =
5761                  this.reducer;
5762              if (reducer == null)
5763 <                return abortOnNullFunction();
5764 <            try {
5765 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5766 <                    do {} while (!casPending(c = pending, c+1));
5767 <                    (rights = new ReduceKeysTask<K,V>
5768 <                     (map, this, b >>>= 1, rights, reducer)).fork();
5769 <                }
5770 <                K r = null;
5771 <                while (advance() != null) {
5772 <                    K u = (K)nextKey;
5773 <                    r = (r == null) ? u : reducer.apply(r, u);
5774 <                }
5775 <                result = r;
5776 <                for (ReduceKeysTask<K,V> t = this, s;;) {
5777 <                    int c; BulkTask<K,V,?> par; K tr, sr;
5778 <                    if ((c = t.pending) == 0) {
5779 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
5780 <                            if ((sr = s.result) != null)
5781 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
5782 <                        }
5783 <                        if ((par = t.parent) == null ||
5975 <                            !(par instanceof ReduceKeysTask)) {
5976 <                            t.quietlyComplete();
5977 <                            break;
5978 <                        }
5979 <                        t = (ReduceKeysTask<K,V>)par;
5980 <                    }
5981 <                    else if (t.casPending(c, c - 1))
5982 <                        break;
5763 >                throw new NullPointerException();
5764 >            for (int b; (b = preSplit()) > 0;)
5765 >                (rights = new ReduceKeysTask<K,V>
5766 >                 (map, this, b, rights, reducer)).fork();
5767 >            K r = null;
5768 >            while (advance() != null) {
5769 >                K u = (K)nextKey;
5770 >                r = (r == null) ? u : reducer.apply(r, u);
5771 >            }
5772 >            result = r;
5773 >            CountedCompleter<?> c;
5774 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5775 >                ReduceKeysTask<K,V>
5776 >                    t = (ReduceKeysTask<K,V>)c,
5777 >                    s = t.rights;
5778 >                while (s != null) {
5779 >                    K tr, sr;
5780 >                    if ((sr = s.result) != null)
5781 >                        t.result = (((tr = t.result) == null) ? sr :
5782 >                                    reducer.apply(tr, sr));
5783 >                    s = t.rights = s.nextRight;
5784                  }
5984            } catch (Throwable ex) {
5985                return tryCompleteComputation(ex);
5986            }
5987            ReduceKeysTask<K,V> s = rights;
5988            if (s != null && !inForkJoinPool()) {
5989                do  {
5990                    if (s.tryUnfork())
5991                        s.exec();
5992                } while ((s = s.nextRight) != null);
5785              }
5994            return false;
5786          }
5996        public final K getRawResult() { return result; }
5787      }
5788  
5789      @SuppressWarnings("serial") static final class ReduceValuesTask<K,V>
5790 <        extends BulkTask<K,V,V> {
5790 >        extends Traverser<K,V,V> {
5791          final BiFun<? super V, ? super V, ? extends V> reducer;
5792          V result;
5793          ReduceValuesTask<K,V> rights, nextRight;
5794          ReduceValuesTask
5795 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5795 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5796               ReduceValuesTask<K,V> nextRight,
5797               BiFun<? super V, ? super V, ? extends V> reducer) {
5798              super(m, p, b); this.nextRight = nextRight;
5799              this.reducer = reducer;
5800          }
5801 <        @SuppressWarnings("unchecked") public final boolean exec() {
5801 >        public final V getRawResult() { return result; }
5802 >        @SuppressWarnings("unchecked") public final void compute() {
5803              final BiFun<? super V, ? super V, ? extends V> reducer =
5804                  this.reducer;
5805              if (reducer == null)
5806 <                return abortOnNullFunction();
5807 <            try {
5808 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5809 <                    do {} while (!casPending(c = pending, c+1));
5810 <                    (rights = new ReduceValuesTask<K,V>
5811 <                     (map, this, b >>>= 1, rights, reducer)).fork();
5812 <                }
5813 <                V r = null;
5814 <                Object v;
5815 <                while ((v = advance()) != null) {
5816 <                    V u = (V)v;
5817 <                    r = (r == null) ? u : reducer.apply(r, u);
5818 <                }
5819 <                result = r;
5820 <                for (ReduceValuesTask<K,V> t = this, s;;) {
5821 <                    int c; BulkTask<K,V,?> par; V tr, sr;
5822 <                    if ((c = t.pending) == 0) {
5823 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
5824 <                            if ((sr = s.result) != null)
5825 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
5826 <                        }
5827 <                        if ((par = t.parent) == null ||
6037 <                            !(par instanceof ReduceValuesTask)) {
6038 <                            t.quietlyComplete();
6039 <                            break;
6040 <                        }
6041 <                        t = (ReduceValuesTask<K,V>)par;
6042 <                    }
6043 <                    else if (t.casPending(c, c - 1))
6044 <                        break;
5806 >                throw new NullPointerException();
5807 >            for (int b; (b = preSplit()) > 0;)
5808 >                (rights = new ReduceValuesTask<K,V>
5809 >                 (map, this, b, rights, reducer)).fork();
5810 >            V r = null;
5811 >            Object v;
5812 >            while ((v = advance()) != null) {
5813 >                V u = (V)v;
5814 >                r = (r == null) ? u : reducer.apply(r, u);
5815 >            }
5816 >            result = r;
5817 >            CountedCompleter<?> c;
5818 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5819 >                ReduceValuesTask<K,V>
5820 >                    t = (ReduceValuesTask<K,V>)c,
5821 >                    s = t.rights;
5822 >                while (s != null) {
5823 >                    V tr, sr;
5824 >                    if ((sr = s.result) != null)
5825 >                        t.result = (((tr = t.result) == null) ? sr :
5826 >                                    reducer.apply(tr, sr));
5827 >                    s = t.rights = s.nextRight;
5828                  }
6046            } catch (Throwable ex) {
6047                return tryCompleteComputation(ex);
5829              }
6049            ReduceValuesTask<K,V> s = rights;
6050            if (s != null && !inForkJoinPool()) {
6051                do  {
6052                    if (s.tryUnfork())
6053                        s.exec();
6054                } while ((s = s.nextRight) != null);
6055            }
6056            return false;
5830          }
6058        public final V getRawResult() { return result; }
5831      }
5832  
5833      @SuppressWarnings("serial") static final class ReduceEntriesTask<K,V>
5834 <        extends BulkTask<K,V,Map.Entry<K,V>> {
5834 >        extends Traverser<K,V,Map.Entry<K,V>> {
5835          final BiFun<Map.Entry<K,V>, Map.Entry<K,V>, ? extends Map.Entry<K,V>> reducer;
5836          Map.Entry<K,V> result;
5837          ReduceEntriesTask<K,V> rights, nextRight;
5838          ReduceEntriesTask
5839 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5839 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5840               ReduceEntriesTask<K,V> nextRight,
5841               BiFun<Entry<K,V>, Map.Entry<K,V>, ? extends Map.Entry<K,V>> reducer) {
5842              super(m, p, b); this.nextRight = nextRight;
5843              this.reducer = reducer;
5844          }
5845 <        @SuppressWarnings("unchecked") public final boolean exec() {
5845 >        public final Map.Entry<K,V> getRawResult() { return result; }
5846 >        @SuppressWarnings("unchecked") public final void compute() {
5847              final BiFun<Map.Entry<K,V>, Map.Entry<K,V>, ? extends Map.Entry<K,V>> reducer =
5848                  this.reducer;
5849              if (reducer == null)
5850 <                return abortOnNullFunction();
5851 <            try {
5852 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5853 <                    do {} while (!casPending(c = pending, c+1));
5854 <                    (rights = new ReduceEntriesTask<K,V>
5855 <                     (map, this, b >>>= 1, rights, reducer)).fork();
5856 <                }
5857 <                Map.Entry<K,V> r = null;
5858 <                Object v;
5859 <                while ((v = advance()) != null) {
5860 <                    Map.Entry<K,V> u = entryFor((K)nextKey, (V)v);
5861 <                    r = (r == null) ? u : reducer.apply(r, u);
5862 <                }
5863 <                result = r;
5864 <                for (ReduceEntriesTask<K,V> t = this, s;;) {
5865 <                    int c; BulkTask<K,V,?> par; Map.Entry<K,V> tr, sr;
5866 <                    if ((c = t.pending) == 0) {
5867 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
5868 <                            if ((sr = s.result) != null)
5869 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
5870 <                        }
5871 <                        if ((par = t.parent) == null ||
6099 <                            !(par instanceof ReduceEntriesTask)) {
6100 <                            t.quietlyComplete();
6101 <                            break;
6102 <                        }
6103 <                        t = (ReduceEntriesTask<K,V>)par;
6104 <                    }
6105 <                    else if (t.casPending(c, c - 1))
6106 <                        break;
5850 >                throw new NullPointerException();
5851 >            for (int b; (b = preSplit()) > 0;)
5852 >                (rights = new ReduceEntriesTask<K,V>
5853 >                 (map, this, b, rights, reducer)).fork();
5854 >            Map.Entry<K,V> r = null;
5855 >            Object v;
5856 >            while ((v = advance()) != null) {
5857 >                Map.Entry<K,V> u = entryFor((K)nextKey, (V)v);
5858 >                r = (r == null) ? u : reducer.apply(r, u);
5859 >            }
5860 >            result = r;
5861 >            CountedCompleter<?> c;
5862 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5863 >                ReduceEntriesTask<K,V>
5864 >                    t = (ReduceEntriesTask<K,V>)c,
5865 >                    s = t.rights;
5866 >                while (s != null) {
5867 >                    Map.Entry<K,V> tr, sr;
5868 >                    if ((sr = s.result) != null)
5869 >                        t.result = (((tr = t.result) == null) ? sr :
5870 >                                    reducer.apply(tr, sr));
5871 >                    s = t.rights = s.nextRight;
5872                  }
6108            } catch (Throwable ex) {
6109                return tryCompleteComputation(ex);
5873              }
6111            ReduceEntriesTask<K,V> s = rights;
6112            if (s != null && !inForkJoinPool()) {
6113                do  {
6114                    if (s.tryUnfork())
6115                        s.exec();
6116                } while ((s = s.nextRight) != null);
6117            }
6118            return false;
5874          }
6120        public final Map.Entry<K,V> getRawResult() { return result; }
5875      }
5876  
5877      @SuppressWarnings("serial") static final class MapReduceKeysTask<K,V,U>
5878 <        extends BulkTask<K,V,U> {
5878 >        extends Traverser<K,V,U> {
5879          final Fun<? super K, ? extends U> transformer;
5880          final BiFun<? super U, ? super U, ? extends U> reducer;
5881          U result;
5882          MapReduceKeysTask<K,V,U> rights, nextRight;
5883          MapReduceKeysTask
5884 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5884 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5885               MapReduceKeysTask<K,V,U> nextRight,
5886               Fun<? super K, ? extends U> transformer,
5887               BiFun<? super U, ? super U, ? extends U> reducer) {
# Line 6135 | Line 5889 | public class ConcurrentHashMapV8<K, V>
5889              this.transformer = transformer;
5890              this.reducer = reducer;
5891          }
5892 <        @SuppressWarnings("unchecked") public final boolean exec() {
5892 >        public final U getRawResult() { return result; }
5893 >        @SuppressWarnings("unchecked") public final void compute() {
5894              final Fun<? super K, ? extends U> transformer =
5895                  this.transformer;
5896              final BiFun<? super U, ? super U, ? extends U> reducer =
5897                  this.reducer;
5898              if (transformer == null || reducer == null)
5899 <                return abortOnNullFunction();
5900 <            try {
5901 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5902 <                    do {} while (!casPending(c = pending, c+1));
5903 <                    (rights = new MapReduceKeysTask<K,V,U>
5904 <                     (map, this, b >>>= 1, rights, transformer, reducer)).fork();
5905 <                }
5906 <                U r = null, u;
6152 <                while (advance() != null) {
6153 <                    if ((u = transformer.apply((K)nextKey)) != null)
6154 <                        r = (r == null) ? u : reducer.apply(r, u);
6155 <                }
6156 <                result = r;
6157 <                for (MapReduceKeysTask<K,V,U> t = this, s;;) {
6158 <                    int c; BulkTask<K,V,?> par; U tr, sr;
6159 <                    if ((c = t.pending) == 0) {
6160 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6161 <                            if ((sr = s.result) != null)
6162 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
6163 <                        }
6164 <                        if ((par = t.parent) == null ||
6165 <                            !(par instanceof MapReduceKeysTask)) {
6166 <                            t.quietlyComplete();
6167 <                            break;
6168 <                        }
6169 <                        t = (MapReduceKeysTask<K,V,U>)par;
6170 <                    }
6171 <                    else if (t.casPending(c, c - 1))
6172 <                        break;
6173 <                }
6174 <            } catch (Throwable ex) {
6175 <                return tryCompleteComputation(ex);
5899 >                throw new NullPointerException();
5900 >            for (int b; (b = preSplit()) > 0;)
5901 >                (rights = new MapReduceKeysTask<K,V,U>
5902 >                 (map, this, b, rights, transformer, reducer)).fork();
5903 >            U r = null, u;
5904 >            while (advance() != null) {
5905 >                if ((u = transformer.apply((K)nextKey)) != null)
5906 >                    r = (r == null) ? u : reducer.apply(r, u);
5907              }
5908 <            MapReduceKeysTask<K,V,U> s = rights;
5909 <            if (s != null && !inForkJoinPool()) {
5910 <                do  {
5911 <                    if (s.tryUnfork())
5912 <                        s.exec();
5913 <                } while ((s = s.nextRight) != null);
5908 >            result = r;
5909 >            CountedCompleter<?> c;
5910 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5911 >                MapReduceKeysTask<K,V,U>
5912 >                    t = (MapReduceKeysTask<K,V,U>)c,
5913 >                    s = t.rights;
5914 >                while (s != null) {
5915 >                    U tr, sr;
5916 >                    if ((sr = s.result) != null)
5917 >                        t.result = (((tr = t.result) == null) ? sr :
5918 >                                    reducer.apply(tr, sr));
5919 >                    s = t.rights = s.nextRight;
5920 >                }
5921              }
6184            return false;
5922          }
6186        public final U getRawResult() { return result; }
5923      }
5924  
5925      @SuppressWarnings("serial") static final class MapReduceValuesTask<K,V,U>
5926 <        extends BulkTask<K,V,U> {
5926 >        extends Traverser<K,V,U> {
5927          final Fun<? super V, ? extends U> transformer;
5928          final BiFun<? super U, ? super U, ? extends U> reducer;
5929          U result;
5930          MapReduceValuesTask<K,V,U> rights, nextRight;
5931          MapReduceValuesTask
5932 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5932 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5933               MapReduceValuesTask<K,V,U> nextRight,
5934               Fun<? super V, ? extends U> transformer,
5935               BiFun<? super U, ? super U, ? extends U> reducer) {
# Line 6201 | Line 5937 | public class ConcurrentHashMapV8<K, V>
5937              this.transformer = transformer;
5938              this.reducer = reducer;
5939          }
5940 <        @SuppressWarnings("unchecked") public final boolean exec() {
5940 >        public final U getRawResult() { return result; }
5941 >        @SuppressWarnings("unchecked") public final void compute() {
5942              final Fun<? super V, ? extends U> transformer =
5943                  this.transformer;
5944              final BiFun<? super U, ? super U, ? extends U> reducer =
5945                  this.reducer;
5946              if (transformer == null || reducer == null)
5947 <                return abortOnNullFunction();
5948 <            try {
5949 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5950 <                    do {} while (!casPending(c = pending, c+1));
5951 <                    (rights = new MapReduceValuesTask<K,V,U>
5952 <                     (map, this, b >>>= 1, rights, transformer, reducer)).fork();
5953 <                }
5954 <                U r = null, u;
5955 <                Object v;
6219 <                while ((v = advance()) != null) {
6220 <                    if ((u = transformer.apply((V)v)) != null)
6221 <                        r = (r == null) ? u : reducer.apply(r, u);
6222 <                }
6223 <                result = r;
6224 <                for (MapReduceValuesTask<K,V,U> t = this, s;;) {
6225 <                    int c; BulkTask<K,V,?> par; U tr, sr;
6226 <                    if ((c = t.pending) == 0) {
6227 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6228 <                            if ((sr = s.result) != null)
6229 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
6230 <                        }
6231 <                        if ((par = t.parent) == null ||
6232 <                            !(par instanceof MapReduceValuesTask)) {
6233 <                            t.quietlyComplete();
6234 <                            break;
6235 <                        }
6236 <                        t = (MapReduceValuesTask<K,V,U>)par;
6237 <                    }
6238 <                    else if (t.casPending(c, c - 1))
6239 <                        break;
6240 <                }
6241 <            } catch (Throwable ex) {
6242 <                return tryCompleteComputation(ex);
5947 >                throw new NullPointerException();
5948 >            for (int b; (b = preSplit()) > 0;)
5949 >                (rights = new MapReduceValuesTask<K,V,U>
5950 >                 (map, this, b, rights, transformer, reducer)).fork();
5951 >            U r = null, u;
5952 >            Object v;
5953 >            while ((v = advance()) != null) {
5954 >                if ((u = transformer.apply((V)v)) != null)
5955 >                    r = (r == null) ? u : reducer.apply(r, u);
5956              }
5957 <            MapReduceValuesTask<K,V,U> s = rights;
5958 <            if (s != null && !inForkJoinPool()) {
5959 <                do  {
5960 <                    if (s.tryUnfork())
5961 <                        s.exec();
5962 <                } while ((s = s.nextRight) != null);
5957 >            result = r;
5958 >            CountedCompleter<?> c;
5959 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5960 >                MapReduceValuesTask<K,V,U>
5961 >                    t = (MapReduceValuesTask<K,V,U>)c,
5962 >                    s = t.rights;
5963 >                while (s != null) {
5964 >                    U tr, sr;
5965 >                    if ((sr = s.result) != null)
5966 >                        t.result = (((tr = t.result) == null) ? sr :
5967 >                                    reducer.apply(tr, sr));
5968 >                    s = t.rights = s.nextRight;
5969 >                }
5970              }
6251            return false;
5971          }
6253        public final U getRawResult() { return result; }
5972      }
5973  
5974      @SuppressWarnings("serial") static final class MapReduceEntriesTask<K,V,U>
5975 <        extends BulkTask<K,V,U> {
5975 >        extends Traverser<K,V,U> {
5976          final Fun<Map.Entry<K,V>, ? extends U> transformer;
5977          final BiFun<? super U, ? super U, ? extends U> reducer;
5978          U result;
5979          MapReduceEntriesTask<K,V,U> rights, nextRight;
5980          MapReduceEntriesTask
5981 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
5981 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
5982               MapReduceEntriesTask<K,V,U> nextRight,
5983               Fun<Map.Entry<K,V>, ? extends U> transformer,
5984               BiFun<? super U, ? super U, ? extends U> reducer) {
# Line 6268 | Line 5986 | public class ConcurrentHashMapV8<K, V>
5986              this.transformer = transformer;
5987              this.reducer = reducer;
5988          }
5989 <        @SuppressWarnings("unchecked") public final boolean exec() {
5989 >        public final U getRawResult() { return result; }
5990 >        @SuppressWarnings("unchecked") public final void compute() {
5991              final Fun<Map.Entry<K,V>, ? extends U> transformer =
5992                  this.transformer;
5993              final BiFun<? super U, ? super U, ? extends U> reducer =
5994                  this.reducer;
5995              if (transformer == null || reducer == null)
5996 <                return abortOnNullFunction();
5997 <            try {
5998 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5999 <                    do {} while (!casPending(c = pending, c+1));
6000 <                    (rights = new MapReduceEntriesTask<K,V,U>
6001 <                     (map, this, b >>>= 1, rights, transformer, reducer)).fork();
6002 <                }
6003 <                U r = null, u;
6004 <                Object v;
6286 <                while ((v = advance()) != null) {
6287 <                    if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
6288 <                        r = (r == null) ? u : reducer.apply(r, u);
6289 <                }
6290 <                result = r;
6291 <                for (MapReduceEntriesTask<K,V,U> t = this, s;;) {
6292 <                    int c; BulkTask<K,V,?> par; U tr, sr;
6293 <                    if ((c = t.pending) == 0) {
6294 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6295 <                            if ((sr = s.result) != null)
6296 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
6297 <                        }
6298 <                        if ((par = t.parent) == null ||
6299 <                            !(par instanceof MapReduceEntriesTask)) {
6300 <                            t.quietlyComplete();
6301 <                            break;
6302 <                        }
6303 <                        t = (MapReduceEntriesTask<K,V,U>)par;
6304 <                    }
6305 <                    else if (t.casPending(c, c - 1))
6306 <                        break;
6307 <                }
6308 <            } catch (Throwable ex) {
6309 <                return tryCompleteComputation(ex);
5996 >                throw new NullPointerException();
5997 >            for (int b; (b = preSplit()) > 0;)
5998 >                (rights = new MapReduceEntriesTask<K,V,U>
5999 >                 (map, this, b, rights, transformer, reducer)).fork();
6000 >            U r = null, u;
6001 >            Object v;
6002 >            while ((v = advance()) != null) {
6003 >                if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
6004 >                    r = (r == null) ? u : reducer.apply(r, u);
6005              }
6006 <            MapReduceEntriesTask<K,V,U> s = rights;
6007 <            if (s != null && !inForkJoinPool()) {
6008 <                do  {
6009 <                    if (s.tryUnfork())
6010 <                        s.exec();
6011 <                } while ((s = s.nextRight) != null);
6006 >            result = r;
6007 >            CountedCompleter<?> c;
6008 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6009 >                MapReduceEntriesTask<K,V,U>
6010 >                    t = (MapReduceEntriesTask<K,V,U>)c,
6011 >                    s = t.rights;
6012 >                while (s != null) {
6013 >                    U tr, sr;
6014 >                    if ((sr = s.result) != null)
6015 >                        t.result = (((tr = t.result) == null) ? sr :
6016 >                                    reducer.apply(tr, sr));
6017 >                    s = t.rights = s.nextRight;
6018 >                }
6019              }
6318            return false;
6020          }
6320        public final U getRawResult() { return result; }
6021      }
6022  
6023      @SuppressWarnings("serial") static final class MapReduceMappingsTask<K,V,U>
6024 <        extends BulkTask<K,V,U> {
6024 >        extends Traverser<K,V,U> {
6025          final BiFun<? super K, ? super V, ? extends U> transformer;
6026          final BiFun<? super U, ? super U, ? extends U> reducer;
6027          U result;
6028          MapReduceMappingsTask<K,V,U> rights, nextRight;
6029          MapReduceMappingsTask
6030 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6030 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6031               MapReduceMappingsTask<K,V,U> nextRight,
6032               BiFun<? super K, ? super V, ? extends U> transformer,
6033               BiFun<? super U, ? super U, ? extends U> reducer) {
# Line 6335 | Line 6035 | public class ConcurrentHashMapV8<K, V>
6035              this.transformer = transformer;
6036              this.reducer = reducer;
6037          }
6038 <        @SuppressWarnings("unchecked") public final boolean exec() {
6038 >        public final U getRawResult() { return result; }
6039 >        @SuppressWarnings("unchecked") public final void compute() {
6040              final BiFun<? super K, ? super V, ? extends U> transformer =
6041                  this.transformer;
6042              final BiFun<? super U, ? super U, ? extends U> reducer =
6043                  this.reducer;
6044              if (transformer == null || reducer == null)
6045 <                return abortOnNullFunction();
6046 <            try {
6047 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6048 <                    do {} while (!casPending(c = pending, c+1));
6049 <                    (rights = new MapReduceMappingsTask<K,V,U>
6050 <                     (map, this, b >>>= 1, rights, transformer, reducer)).fork();
6051 <                }
6052 <                U r = null, u;
6053 <                Object v;
6353 <                while ((v = advance()) != null) {
6354 <                    if ((u = transformer.apply((K)nextKey, (V)v)) != null)
6355 <                        r = (r == null) ? u : reducer.apply(r, u);
6356 <                }
6357 <                result = r;
6358 <                for (MapReduceMappingsTask<K,V,U> t = this, s;;) {
6359 <                    int c; BulkTask<K,V,?> par; U tr, sr;
6360 <                    if ((c = t.pending) == 0) {
6361 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6362 <                            if ((sr = s.result) != null)
6363 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
6364 <                        }
6365 <                        if ((par = t.parent) == null ||
6366 <                            !(par instanceof MapReduceMappingsTask)) {
6367 <                            t.quietlyComplete();
6368 <                            break;
6369 <                        }
6370 <                        t = (MapReduceMappingsTask<K,V,U>)par;
6371 <                    }
6372 <                    else if (t.casPending(c, c - 1))
6373 <                        break;
6374 <                }
6375 <            } catch (Throwable ex) {
6376 <                return tryCompleteComputation(ex);
6045 >                throw new NullPointerException();
6046 >            for (int b; (b = preSplit()) > 0;)
6047 >                (rights = new MapReduceMappingsTask<K,V,U>
6048 >                 (map, this, b, rights, transformer, reducer)).fork();
6049 >            U r = null, u;
6050 >            Object v;
6051 >            while ((v = advance()) != null) {
6052 >                if ((u = transformer.apply((K)nextKey, (V)v)) != null)
6053 >                    r = (r == null) ? u : reducer.apply(r, u);
6054              }
6055 <            MapReduceMappingsTask<K,V,U> s = rights;
6056 <            if (s != null && !inForkJoinPool()) {
6057 <                do  {
6058 <                    if (s.tryUnfork())
6059 <                        s.exec();
6060 <                } while ((s = s.nextRight) != null);
6055 >            result = r;
6056 >            CountedCompleter<?> c;
6057 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6058 >                MapReduceMappingsTask<K,V,U>
6059 >                    t = (MapReduceMappingsTask<K,V,U>)c,
6060 >                    s = t.rights;
6061 >                while (s != null) {
6062 >                    U tr, sr;
6063 >                    if ((sr = s.result) != null)
6064 >                        t.result = (((tr = t.result) == null) ? sr :
6065 >                                    reducer.apply(tr, sr));
6066 >                    s = t.rights = s.nextRight;
6067 >                }
6068              }
6385            return false;
6069          }
6387        public final U getRawResult() { return result; }
6070      }
6071  
6072      @SuppressWarnings("serial") static final class MapReduceKeysToDoubleTask<K,V>
6073 <        extends BulkTask<K,V,Double> {
6073 >        extends Traverser<K,V,Double> {
6074          final ObjectToDouble<? super K> transformer;
6075          final DoubleByDoubleToDouble reducer;
6076          final double basis;
6077          double result;
6078          MapReduceKeysToDoubleTask<K,V> rights, nextRight;
6079          MapReduceKeysToDoubleTask
6080 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6080 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6081               MapReduceKeysToDoubleTask<K,V> nextRight,
6082               ObjectToDouble<? super K> transformer,
6083               double basis,
# Line 6404 | Line 6086 | public class ConcurrentHashMapV8<K, V>
6086              this.transformer = transformer;
6087              this.basis = basis; this.reducer = reducer;
6088          }
6089 <        @SuppressWarnings("unchecked") public final boolean exec() {
6089 >        public final Double getRawResult() { return result; }
6090 >        @SuppressWarnings("unchecked") public final void compute() {
6091              final ObjectToDouble<? super K> transformer =
6092                  this.transformer;
6093              final DoubleByDoubleToDouble reducer = this.reducer;
6094              if (transformer == null || reducer == null)
6095 <                return abortOnNullFunction();
6096 <            try {
6097 <                final double id = this.basis;
6098 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6099 <                    do {} while (!casPending(c = pending, c+1));
6100 <                    (rights = new MapReduceKeysToDoubleTask<K,V>
6101 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6102 <                }
6103 <                double r = id;
6104 <                while (advance() != null)
6105 <                    r = reducer.apply(r, transformer.apply((K)nextKey));
6106 <                result = r;
6107 <                for (MapReduceKeysToDoubleTask<K,V> t = this, s;;) {
6108 <                    int c; BulkTask<K,V,?> par;
6109 <                    if ((c = t.pending) == 0) {
6110 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6428 <                            t.result = reducer.apply(t.result, s.result);
6429 <                        }
6430 <                        if ((par = t.parent) == null ||
6431 <                            !(par instanceof MapReduceKeysToDoubleTask)) {
6432 <                            t.quietlyComplete();
6433 <                            break;
6434 <                        }
6435 <                        t = (MapReduceKeysToDoubleTask<K,V>)par;
6436 <                    }
6437 <                    else if (t.casPending(c, c - 1))
6438 <                        break;
6095 >                throw new NullPointerException();
6096 >            double r = this.basis;
6097 >            for (int b; (b = preSplit()) > 0;)
6098 >                (rights = new MapReduceKeysToDoubleTask<K,V>
6099 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6100 >            while (advance() != null)
6101 >                r = reducer.apply(r, transformer.apply((K)nextKey));
6102 >            result = r;
6103 >            CountedCompleter<?> c;
6104 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6105 >                MapReduceKeysToDoubleTask<K,V>
6106 >                    t = (MapReduceKeysToDoubleTask<K,V>)c,
6107 >                    s = t.rights;
6108 >                while (s != null) {
6109 >                    t.result = reducer.apply(t.result, s.result);
6110 >                    s = t.rights = s.nextRight;
6111                  }
6440            } catch (Throwable ex) {
6441                return tryCompleteComputation(ex);
6442            }
6443            MapReduceKeysToDoubleTask<K,V> s = rights;
6444            if (s != null && !inForkJoinPool()) {
6445                do  {
6446                    if (s.tryUnfork())
6447                        s.exec();
6448                } while ((s = s.nextRight) != null);
6112              }
6450            return false;
6113          }
6452        public final Double getRawResult() { return result; }
6114      }
6115  
6116      @SuppressWarnings("serial") static final class MapReduceValuesToDoubleTask<K,V>
6117 <        extends BulkTask<K,V,Double> {
6117 >        extends Traverser<K,V,Double> {
6118          final ObjectToDouble<? super V> transformer;
6119          final DoubleByDoubleToDouble reducer;
6120          final double basis;
6121          double result;
6122          MapReduceValuesToDoubleTask<K,V> rights, nextRight;
6123          MapReduceValuesToDoubleTask
6124 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6124 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6125               MapReduceValuesToDoubleTask<K,V> nextRight,
6126               ObjectToDouble<? super V> transformer,
6127               double basis,
# Line 6469 | Line 6130 | public class ConcurrentHashMapV8<K, V>
6130              this.transformer = transformer;
6131              this.basis = basis; this.reducer = reducer;
6132          }
6133 <        @SuppressWarnings("unchecked") public final boolean exec() {
6133 >        public final Double getRawResult() { return result; }
6134 >        @SuppressWarnings("unchecked") public final void compute() {
6135              final ObjectToDouble<? super V> transformer =
6136                  this.transformer;
6137              final DoubleByDoubleToDouble reducer = this.reducer;
6138              if (transformer == null || reducer == null)
6139 <                return abortOnNullFunction();
6140 <            try {
6141 <                final double id = this.basis;
6142 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6143 <                    do {} while (!casPending(c = pending, c+1));
6144 <                    (rights = new MapReduceValuesToDoubleTask<K,V>
6145 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6146 <                }
6147 <                double r = id;
6148 <                Object v;
6149 <                while ((v = advance()) != null)
6150 <                    r = reducer.apply(r, transformer.apply((V)v));
6151 <                result = r;
6152 <                for (MapReduceValuesToDoubleTask<K,V> t = this, s;;) {
6153 <                    int c; BulkTask<K,V,?> par;
6154 <                    if ((c = t.pending) == 0) {
6155 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6494 <                            t.result = reducer.apply(t.result, s.result);
6495 <                        }
6496 <                        if ((par = t.parent) == null ||
6497 <                            !(par instanceof MapReduceValuesToDoubleTask)) {
6498 <                            t.quietlyComplete();
6499 <                            break;
6500 <                        }
6501 <                        t = (MapReduceValuesToDoubleTask<K,V>)par;
6502 <                    }
6503 <                    else if (t.casPending(c, c - 1))
6504 <                        break;
6139 >                throw new NullPointerException();
6140 >            double r = this.basis;
6141 >            for (int b; (b = preSplit()) > 0;)
6142 >                (rights = new MapReduceValuesToDoubleTask<K,V>
6143 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6144 >            Object v;
6145 >            while ((v = advance()) != null)
6146 >                r = reducer.apply(r, transformer.apply((V)v));
6147 >            result = r;
6148 >            CountedCompleter<?> c;
6149 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6150 >                MapReduceValuesToDoubleTask<K,V>
6151 >                    t = (MapReduceValuesToDoubleTask<K,V>)c,
6152 >                    s = t.rights;
6153 >                while (s != null) {
6154 >                    t.result = reducer.apply(t.result, s.result);
6155 >                    s = t.rights = s.nextRight;
6156                  }
6506            } catch (Throwable ex) {
6507                return tryCompleteComputation(ex);
6508            }
6509            MapReduceValuesToDoubleTask<K,V> s = rights;
6510            if (s != null && !inForkJoinPool()) {
6511                do  {
6512                    if (s.tryUnfork())
6513                        s.exec();
6514                } while ((s = s.nextRight) != null);
6157              }
6516            return false;
6158          }
6518        public final Double getRawResult() { return result; }
6159      }
6160  
6161      @SuppressWarnings("serial") static final class MapReduceEntriesToDoubleTask<K,V>
6162 <        extends BulkTask<K,V,Double> {
6162 >        extends Traverser<K,V,Double> {
6163          final ObjectToDouble<Map.Entry<K,V>> transformer;
6164          final DoubleByDoubleToDouble reducer;
6165          final double basis;
6166          double result;
6167          MapReduceEntriesToDoubleTask<K,V> rights, nextRight;
6168          MapReduceEntriesToDoubleTask
6169 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6169 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6170               MapReduceEntriesToDoubleTask<K,V> nextRight,
6171               ObjectToDouble<Map.Entry<K,V>> transformer,
6172               double basis,
# Line 6535 | Line 6175 | public class ConcurrentHashMapV8<K, V>
6175              this.transformer = transformer;
6176              this.basis = basis; this.reducer = reducer;
6177          }
6178 <        @SuppressWarnings("unchecked") public final boolean exec() {
6178 >        public final Double getRawResult() { return result; }
6179 >        @SuppressWarnings("unchecked") public final void compute() {
6180              final ObjectToDouble<Map.Entry<K,V>> transformer =
6181                  this.transformer;
6182              final DoubleByDoubleToDouble reducer = this.reducer;
6183              if (transformer == null || reducer == null)
6184 <                return abortOnNullFunction();
6185 <            try {
6186 <                final double id = this.basis;
6187 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6188 <                    do {} while (!casPending(c = pending, c+1));
6189 <                    (rights = new MapReduceEntriesToDoubleTask<K,V>
6190 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6191 <                }
6192 <                double r = id;
6193 <                Object v;
6194 <                while ((v = advance()) != null)
6195 <                    r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6196 <                result = r;
6197 <                for (MapReduceEntriesToDoubleTask<K,V> t = this, s;;) {
6198 <                    int c; BulkTask<K,V,?> par;
6199 <                    if ((c = t.pending) == 0) {
6200 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6560 <                            t.result = reducer.apply(t.result, s.result);
6561 <                        }
6562 <                        if ((par = t.parent) == null ||
6563 <                            !(par instanceof MapReduceEntriesToDoubleTask)) {
6564 <                            t.quietlyComplete();
6565 <                            break;
6566 <                        }
6567 <                        t = (MapReduceEntriesToDoubleTask<K,V>)par;
6568 <                    }
6569 <                    else if (t.casPending(c, c - 1))
6570 <                        break;
6184 >                throw new NullPointerException();
6185 >            double r = this.basis;
6186 >            for (int b; (b = preSplit()) > 0;)
6187 >                (rights = new MapReduceEntriesToDoubleTask<K,V>
6188 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6189 >            Object v;
6190 >            while ((v = advance()) != null)
6191 >                r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6192 >            result = r;
6193 >            CountedCompleter<?> c;
6194 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6195 >                MapReduceEntriesToDoubleTask<K,V>
6196 >                    t = (MapReduceEntriesToDoubleTask<K,V>)c,
6197 >                    s = t.rights;
6198 >                while (s != null) {
6199 >                    t.result = reducer.apply(t.result, s.result);
6200 >                    s = t.rights = s.nextRight;
6201                  }
6572            } catch (Throwable ex) {
6573                return tryCompleteComputation(ex);
6574            }
6575            MapReduceEntriesToDoubleTask<K,V> s = rights;
6576            if (s != null && !inForkJoinPool()) {
6577                do  {
6578                    if (s.tryUnfork())
6579                        s.exec();
6580                } while ((s = s.nextRight) != null);
6202              }
6582            return false;
6203          }
6584        public final Double getRawResult() { return result; }
6204      }
6205  
6206      @SuppressWarnings("serial") static final class MapReduceMappingsToDoubleTask<K,V>
6207 <        extends BulkTask<K,V,Double> {
6207 >        extends Traverser<K,V,Double> {
6208          final ObjectByObjectToDouble<? super K, ? super V> transformer;
6209          final DoubleByDoubleToDouble reducer;
6210          final double basis;
6211          double result;
6212          MapReduceMappingsToDoubleTask<K,V> rights, nextRight;
6213          MapReduceMappingsToDoubleTask
6214 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6214 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6215               MapReduceMappingsToDoubleTask<K,V> nextRight,
6216               ObjectByObjectToDouble<? super K, ? super V> transformer,
6217               double basis,
# Line 6601 | Line 6220 | public class ConcurrentHashMapV8<K, V>
6220              this.transformer = transformer;
6221              this.basis = basis; this.reducer = reducer;
6222          }
6223 <        @SuppressWarnings("unchecked") public final boolean exec() {
6223 >        public final Double getRawResult() { return result; }
6224 >        @SuppressWarnings("unchecked") public final void compute() {
6225              final ObjectByObjectToDouble<? super K, ? super V> transformer =
6226                  this.transformer;
6227              final DoubleByDoubleToDouble reducer = this.reducer;
6228              if (transformer == null || reducer == null)
6229 <                return abortOnNullFunction();
6230 <            try {
6231 <                final double id = this.basis;
6232 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6233 <                    do {} while (!casPending(c = pending, c+1));
6234 <                    (rights = new MapReduceMappingsToDoubleTask<K,V>
6235 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6236 <                }
6237 <                double r = id;
6238 <                Object v;
6239 <                while ((v = advance()) != null)
6240 <                    r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6241 <                result = r;
6242 <                for (MapReduceMappingsToDoubleTask<K,V> t = this, s;;) {
6243 <                    int c; BulkTask<K,V,?> par;
6244 <                    if ((c = t.pending) == 0) {
6245 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6626 <                            t.result = reducer.apply(t.result, s.result);
6627 <                        }
6628 <                        if ((par = t.parent) == null ||
6629 <                            !(par instanceof MapReduceMappingsToDoubleTask)) {
6630 <                            t.quietlyComplete();
6631 <                            break;
6632 <                        }
6633 <                        t = (MapReduceMappingsToDoubleTask<K,V>)par;
6634 <                    }
6635 <                    else if (t.casPending(c, c - 1))
6636 <                        break;
6229 >                throw new NullPointerException();
6230 >            double r = this.basis;
6231 >            for (int b; (b = preSplit()) > 0;)
6232 >                (rights = new MapReduceMappingsToDoubleTask<K,V>
6233 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6234 >            Object v;
6235 >            while ((v = advance()) != null)
6236 >                r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6237 >            result = r;
6238 >            CountedCompleter<?> c;
6239 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6240 >                MapReduceMappingsToDoubleTask<K,V>
6241 >                    t = (MapReduceMappingsToDoubleTask<K,V>)c,
6242 >                    s = t.rights;
6243 >                while (s != null) {
6244 >                    t.result = reducer.apply(t.result, s.result);
6245 >                    s = t.rights = s.nextRight;
6246                  }
6638            } catch (Throwable ex) {
6639                return tryCompleteComputation(ex);
6640            }
6641            MapReduceMappingsToDoubleTask<K,V> s = rights;
6642            if (s != null && !inForkJoinPool()) {
6643                do  {
6644                    if (s.tryUnfork())
6645                        s.exec();
6646                } while ((s = s.nextRight) != null);
6247              }
6648            return false;
6248          }
6650        public final Double getRawResult() { return result; }
6249      }
6250  
6251      @SuppressWarnings("serial") static final class MapReduceKeysToLongTask<K,V>
6252 <        extends BulkTask<K,V,Long> {
6252 >        extends Traverser<K,V,Long> {
6253          final ObjectToLong<? super K> transformer;
6254          final LongByLongToLong reducer;
6255          final long basis;
6256          long result;
6257          MapReduceKeysToLongTask<K,V> rights, nextRight;
6258          MapReduceKeysToLongTask
6259 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6259 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6260               MapReduceKeysToLongTask<K,V> nextRight,
6261               ObjectToLong<? super K> transformer,
6262               long basis,
# Line 6667 | Line 6265 | public class ConcurrentHashMapV8<K, V>
6265              this.transformer = transformer;
6266              this.basis = basis; this.reducer = reducer;
6267          }
6268 <        @SuppressWarnings("unchecked") public final boolean exec() {
6268 >        public final Long getRawResult() { return result; }
6269 >        @SuppressWarnings("unchecked") public final void compute() {
6270              final ObjectToLong<? super K> transformer =
6271                  this.transformer;
6272              final LongByLongToLong reducer = this.reducer;
6273              if (transformer == null || reducer == null)
6274 <                return abortOnNullFunction();
6275 <            try {
6276 <                final long id = this.basis;
6277 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6278 <                    do {} while (!casPending(c = pending, c+1));
6279 <                    (rights = new MapReduceKeysToLongTask<K,V>
6280 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6281 <                }
6282 <                long r = id;
6283 <                while (advance() != null)
6284 <                    r = reducer.apply(r, transformer.apply((K)nextKey));
6285 <                result = r;
6286 <                for (MapReduceKeysToLongTask<K,V> t = this, s;;) {
6287 <                    int c; BulkTask<K,V,?> par;
6288 <                    if ((c = t.pending) == 0) {
6289 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6691 <                            t.result = reducer.apply(t.result, s.result);
6692 <                        }
6693 <                        if ((par = t.parent) == null ||
6694 <                            !(par instanceof MapReduceKeysToLongTask)) {
6695 <                            t.quietlyComplete();
6696 <                            break;
6697 <                        }
6698 <                        t = (MapReduceKeysToLongTask<K,V>)par;
6699 <                    }
6700 <                    else if (t.casPending(c, c - 1))
6701 <                        break;
6274 >                throw new NullPointerException();
6275 >            long r = this.basis;
6276 >            for (int b; (b = preSplit()) > 0;)
6277 >                (rights = new MapReduceKeysToLongTask<K,V>
6278 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6279 >            while (advance() != null)
6280 >                r = reducer.apply(r, transformer.apply((K)nextKey));
6281 >            result = r;
6282 >            CountedCompleter<?> c;
6283 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6284 >                MapReduceKeysToLongTask<K,V>
6285 >                    t = (MapReduceKeysToLongTask<K,V>)c,
6286 >                    s = t.rights;
6287 >                while (s != null) {
6288 >                    t.result = reducer.apply(t.result, s.result);
6289 >                    s = t.rights = s.nextRight;
6290                  }
6703            } catch (Throwable ex) {
6704                return tryCompleteComputation(ex);
6705            }
6706            MapReduceKeysToLongTask<K,V> s = rights;
6707            if (s != null && !inForkJoinPool()) {
6708                do  {
6709                    if (s.tryUnfork())
6710                        s.exec();
6711                } while ((s = s.nextRight) != null);
6291              }
6713            return false;
6292          }
6715        public final Long getRawResult() { return result; }
6293      }
6294  
6295      @SuppressWarnings("serial") static final class MapReduceValuesToLongTask<K,V>
6296 <        extends BulkTask<K,V,Long> {
6296 >        extends Traverser<K,V,Long> {
6297          final ObjectToLong<? super V> transformer;
6298          final LongByLongToLong reducer;
6299          final long basis;
6300          long result;
6301          MapReduceValuesToLongTask<K,V> rights, nextRight;
6302          MapReduceValuesToLongTask
6303 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6303 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6304               MapReduceValuesToLongTask<K,V> nextRight,
6305               ObjectToLong<? super V> transformer,
6306               long basis,
# Line 6732 | Line 6309 | public class ConcurrentHashMapV8<K, V>
6309              this.transformer = transformer;
6310              this.basis = basis; this.reducer = reducer;
6311          }
6312 <        @SuppressWarnings("unchecked") public final boolean exec() {
6312 >        public final Long getRawResult() { return result; }
6313 >        @SuppressWarnings("unchecked") public final void compute() {
6314              final ObjectToLong<? super V> transformer =
6315                  this.transformer;
6316              final LongByLongToLong reducer = this.reducer;
6317              if (transformer == null || reducer == null)
6318 <                return abortOnNullFunction();
6319 <            try {
6320 <                final long id = this.basis;
6321 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6322 <                    do {} while (!casPending(c = pending, c+1));
6323 <                    (rights = new MapReduceValuesToLongTask<K,V>
6324 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6325 <                }
6326 <                long r = id;
6327 <                Object v;
6328 <                while ((v = advance()) != null)
6329 <                    r = reducer.apply(r, transformer.apply((V)v));
6330 <                result = r;
6331 <                for (MapReduceValuesToLongTask<K,V> t = this, s;;) {
6332 <                    int c; BulkTask<K,V,?> par;
6333 <                    if ((c = t.pending) == 0) {
6334 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6757 <                            t.result = reducer.apply(t.result, s.result);
6758 <                        }
6759 <                        if ((par = t.parent) == null ||
6760 <                            !(par instanceof MapReduceValuesToLongTask)) {
6761 <                            t.quietlyComplete();
6762 <                            break;
6763 <                        }
6764 <                        t = (MapReduceValuesToLongTask<K,V>)par;
6765 <                    }
6766 <                    else if (t.casPending(c, c - 1))
6767 <                        break;
6318 >                throw new NullPointerException();
6319 >            long r = this.basis;
6320 >            for (int b; (b = preSplit()) > 0;)
6321 >                (rights = new MapReduceValuesToLongTask<K,V>
6322 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6323 >            Object v;
6324 >            while ((v = advance()) != null)
6325 >                r = reducer.apply(r, transformer.apply((V)v));
6326 >            result = r;
6327 >            CountedCompleter<?> c;
6328 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6329 >                MapReduceValuesToLongTask<K,V>
6330 >                    t = (MapReduceValuesToLongTask<K,V>)c,
6331 >                    s = t.rights;
6332 >                while (s != null) {
6333 >                    t.result = reducer.apply(t.result, s.result);
6334 >                    s = t.rights = s.nextRight;
6335                  }
6769            } catch (Throwable ex) {
6770                return tryCompleteComputation(ex);
6771            }
6772            MapReduceValuesToLongTask<K,V> s = rights;
6773            if (s != null && !inForkJoinPool()) {
6774                do  {
6775                    if (s.tryUnfork())
6776                        s.exec();
6777                } while ((s = s.nextRight) != null);
6336              }
6779            return false;
6337          }
6781        public final Long getRawResult() { return result; }
6338      }
6339  
6340      @SuppressWarnings("serial") static final class MapReduceEntriesToLongTask<K,V>
6341 <        extends BulkTask<K,V,Long> {
6341 >        extends Traverser<K,V,Long> {
6342          final ObjectToLong<Map.Entry<K,V>> transformer;
6343          final LongByLongToLong reducer;
6344          final long basis;
6345          long result;
6346          MapReduceEntriesToLongTask<K,V> rights, nextRight;
6347          MapReduceEntriesToLongTask
6348 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6348 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6349               MapReduceEntriesToLongTask<K,V> nextRight,
6350               ObjectToLong<Map.Entry<K,V>> transformer,
6351               long basis,
# Line 6798 | Line 6354 | public class ConcurrentHashMapV8<K, V>
6354              this.transformer = transformer;
6355              this.basis = basis; this.reducer = reducer;
6356          }
6357 <        @SuppressWarnings("unchecked") public final boolean exec() {
6357 >        public final Long getRawResult() { return result; }
6358 >        @SuppressWarnings("unchecked") public final void compute() {
6359              final ObjectToLong<Map.Entry<K,V>> transformer =
6360                  this.transformer;
6361              final LongByLongToLong reducer = this.reducer;
6362              if (transformer == null || reducer == null)
6363 <                return abortOnNullFunction();
6364 <            try {
6365 <                final long id = this.basis;
6366 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6367 <                    do {} while (!casPending(c = pending, c+1));
6368 <                    (rights = new MapReduceEntriesToLongTask<K,V>
6369 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6370 <                }
6371 <                long r = id;
6372 <                Object v;
6373 <                while ((v = advance()) != null)
6374 <                    r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6375 <                result = r;
6376 <                for (MapReduceEntriesToLongTask<K,V> t = this, s;;) {
6377 <                    int c; BulkTask<K,V,?> par;
6378 <                    if ((c = t.pending) == 0) {
6379 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6823 <                            t.result = reducer.apply(t.result, s.result);
6824 <                        }
6825 <                        if ((par = t.parent) == null ||
6826 <                            !(par instanceof MapReduceEntriesToLongTask)) {
6827 <                            t.quietlyComplete();
6828 <                            break;
6829 <                        }
6830 <                        t = (MapReduceEntriesToLongTask<K,V>)par;
6831 <                    }
6832 <                    else if (t.casPending(c, c - 1))
6833 <                        break;
6363 >                throw new NullPointerException();
6364 >            long r = this.basis;
6365 >            for (int b; (b = preSplit()) > 0;)
6366 >                (rights = new MapReduceEntriesToLongTask<K,V>
6367 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6368 >            Object v;
6369 >            while ((v = advance()) != null)
6370 >                r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6371 >            result = r;
6372 >            CountedCompleter<?> c;
6373 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6374 >                MapReduceEntriesToLongTask<K,V>
6375 >                    t = (MapReduceEntriesToLongTask<K,V>)c,
6376 >                    s = t.rights;
6377 >                while (s != null) {
6378 >                    t.result = reducer.apply(t.result, s.result);
6379 >                    s = t.rights = s.nextRight;
6380                  }
6835            } catch (Throwable ex) {
6836                return tryCompleteComputation(ex);
6381              }
6838            MapReduceEntriesToLongTask<K,V> s = rights;
6839            if (s != null && !inForkJoinPool()) {
6840                do  {
6841                    if (s.tryUnfork())
6842                        s.exec();
6843                } while ((s = s.nextRight) != null);
6844            }
6845            return false;
6382          }
6847        public final Long getRawResult() { return result; }
6383      }
6384  
6385      @SuppressWarnings("serial") static final class MapReduceMappingsToLongTask<K,V>
6386 <        extends BulkTask<K,V,Long> {
6386 >        extends Traverser<K,V,Long> {
6387          final ObjectByObjectToLong<? super K, ? super V> transformer;
6388          final LongByLongToLong reducer;
6389          final long basis;
6390          long result;
6391          MapReduceMappingsToLongTask<K,V> rights, nextRight;
6392          MapReduceMappingsToLongTask
6393 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6393 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6394               MapReduceMappingsToLongTask<K,V> nextRight,
6395               ObjectByObjectToLong<? super K, ? super V> transformer,
6396               long basis,
# Line 6864 | Line 6399 | public class ConcurrentHashMapV8<K, V>
6399              this.transformer = transformer;
6400              this.basis = basis; this.reducer = reducer;
6401          }
6402 <        @SuppressWarnings("unchecked") public final boolean exec() {
6402 >        public final Long getRawResult() { return result; }
6403 >        @SuppressWarnings("unchecked") public final void compute() {
6404              final ObjectByObjectToLong<? super K, ? super V> transformer =
6405                  this.transformer;
6406              final LongByLongToLong reducer = this.reducer;
6407              if (transformer == null || reducer == null)
6408 <                return abortOnNullFunction();
6409 <            try {
6410 <                final long id = this.basis;
6411 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6412 <                    do {} while (!casPending(c = pending, c+1));
6413 <                    (rights = new MapReduceMappingsToLongTask<K,V>
6414 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6415 <                }
6416 <                long r = id;
6417 <                Object v;
6418 <                while ((v = advance()) != null)
6419 <                    r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6420 <                result = r;
6421 <                for (MapReduceMappingsToLongTask<K,V> t = this, s;;) {
6422 <                    int c; BulkTask<K,V,?> par;
6423 <                    if ((c = t.pending) == 0) {
6424 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6889 <                            t.result = reducer.apply(t.result, s.result);
6890 <                        }
6891 <                        if ((par = t.parent) == null ||
6892 <                            !(par instanceof MapReduceMappingsToLongTask)) {
6893 <                            t.quietlyComplete();
6894 <                            break;
6895 <                        }
6896 <                        t = (MapReduceMappingsToLongTask<K,V>)par;
6897 <                    }
6898 <                    else if (t.casPending(c, c - 1))
6899 <                        break;
6408 >                throw new NullPointerException();
6409 >            long r = this.basis;
6410 >            for (int b; (b = preSplit()) > 0;)
6411 >                (rights = new MapReduceMappingsToLongTask<K,V>
6412 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6413 >            Object v;
6414 >            while ((v = advance()) != null)
6415 >                r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6416 >            result = r;
6417 >            CountedCompleter<?> c;
6418 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6419 >                MapReduceMappingsToLongTask<K,V>
6420 >                    t = (MapReduceMappingsToLongTask<K,V>)c,
6421 >                    s = t.rights;
6422 >                while (s != null) {
6423 >                    t.result = reducer.apply(t.result, s.result);
6424 >                    s = t.rights = s.nextRight;
6425                  }
6901            } catch (Throwable ex) {
6902                return tryCompleteComputation(ex);
6903            }
6904            MapReduceMappingsToLongTask<K,V> s = rights;
6905            if (s != null && !inForkJoinPool()) {
6906                do  {
6907                    if (s.tryUnfork())
6908                        s.exec();
6909                } while ((s = s.nextRight) != null);
6426              }
6911            return false;
6427          }
6913        public final Long getRawResult() { return result; }
6428      }
6429  
6430      @SuppressWarnings("serial") static final class MapReduceKeysToIntTask<K,V>
6431 <        extends BulkTask<K,V,Integer> {
6431 >        extends Traverser<K,V,Integer> {
6432          final ObjectToInt<? super K> transformer;
6433          final IntByIntToInt reducer;
6434          final int basis;
6435          int result;
6436          MapReduceKeysToIntTask<K,V> rights, nextRight;
6437          MapReduceKeysToIntTask
6438 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6438 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6439               MapReduceKeysToIntTask<K,V> nextRight,
6440               ObjectToInt<? super K> transformer,
6441               int basis,
# Line 6930 | Line 6444 | public class ConcurrentHashMapV8<K, V>
6444              this.transformer = transformer;
6445              this.basis = basis; this.reducer = reducer;
6446          }
6447 <        @SuppressWarnings("unchecked") public final boolean exec() {
6447 >        public final Integer getRawResult() { return result; }
6448 >        @SuppressWarnings("unchecked") public final void compute() {
6449              final ObjectToInt<? super K> transformer =
6450                  this.transformer;
6451              final IntByIntToInt reducer = this.reducer;
6452              if (transformer == null || reducer == null)
6453 <                return abortOnNullFunction();
6454 <            try {
6455 <                final int id = this.basis;
6456 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6457 <                    do {} while (!casPending(c = pending, c+1));
6458 <                    (rights = new MapReduceKeysToIntTask<K,V>
6459 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6460 <                }
6461 <                int r = id;
6462 <                while (advance() != null)
6463 <                    r = reducer.apply(r, transformer.apply((K)nextKey));
6464 <                result = r;
6465 <                for (MapReduceKeysToIntTask<K,V> t = this, s;;) {
6466 <                    int c; BulkTask<K,V,?> par;
6467 <                    if ((c = t.pending) == 0) {
6468 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6954 <                            t.result = reducer.apply(t.result, s.result);
6955 <                        }
6956 <                        if ((par = t.parent) == null ||
6957 <                            !(par instanceof MapReduceKeysToIntTask)) {
6958 <                            t.quietlyComplete();
6959 <                            break;
6960 <                        }
6961 <                        t = (MapReduceKeysToIntTask<K,V>)par;
6962 <                    }
6963 <                    else if (t.casPending(c, c - 1))
6964 <                        break;
6453 >                throw new NullPointerException();
6454 >            int r = this.basis;
6455 >            for (int b; (b = preSplit()) > 0;)
6456 >                (rights = new MapReduceKeysToIntTask<K,V>
6457 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6458 >            while (advance() != null)
6459 >                r = reducer.apply(r, transformer.apply((K)nextKey));
6460 >            result = r;
6461 >            CountedCompleter<?> c;
6462 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6463 >                MapReduceKeysToIntTask<K,V>
6464 >                    t = (MapReduceKeysToIntTask<K,V>)c,
6465 >                    s = t.rights;
6466 >                while (s != null) {
6467 >                    t.result = reducer.apply(t.result, s.result);
6468 >                    s = t.rights = s.nextRight;
6469                  }
6966            } catch (Throwable ex) {
6967                return tryCompleteComputation(ex);
6968            }
6969            MapReduceKeysToIntTask<K,V> s = rights;
6970            if (s != null && !inForkJoinPool()) {
6971                do  {
6972                    if (s.tryUnfork())
6973                        s.exec();
6974                } while ((s = s.nextRight) != null);
6470              }
6976            return false;
6471          }
6978        public final Integer getRawResult() { return result; }
6472      }
6473  
6474      @SuppressWarnings("serial") static final class MapReduceValuesToIntTask<K,V>
6475 <        extends BulkTask<K,V,Integer> {
6475 >        extends Traverser<K,V,Integer> {
6476          final ObjectToInt<? super V> transformer;
6477          final IntByIntToInt reducer;
6478          final int basis;
6479          int result;
6480          MapReduceValuesToIntTask<K,V> rights, nextRight;
6481          MapReduceValuesToIntTask
6482 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6482 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6483               MapReduceValuesToIntTask<K,V> nextRight,
6484               ObjectToInt<? super V> transformer,
6485               int basis,
# Line 6995 | Line 6488 | public class ConcurrentHashMapV8<K, V>
6488              this.transformer = transformer;
6489              this.basis = basis; this.reducer = reducer;
6490          }
6491 <        @SuppressWarnings("unchecked") public final boolean exec() {
6491 >        public final Integer getRawResult() { return result; }
6492 >        @SuppressWarnings("unchecked") public final void compute() {
6493              final ObjectToInt<? super V> transformer =
6494                  this.transformer;
6495              final IntByIntToInt reducer = this.reducer;
6496              if (transformer == null || reducer == null)
6497 <                return abortOnNullFunction();
6498 <            try {
6499 <                final int id = this.basis;
6500 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6501 <                    do {} while (!casPending(c = pending, c+1));
6502 <                    (rights = new MapReduceValuesToIntTask<K,V>
6503 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6504 <                }
6505 <                int r = id;
6506 <                Object v;
6507 <                while ((v = advance()) != null)
6508 <                    r = reducer.apply(r, transformer.apply((V)v));
6509 <                result = r;
6510 <                for (MapReduceValuesToIntTask<K,V> t = this, s;;) {
6511 <                    int c; BulkTask<K,V,?> par;
6512 <                    if ((c = t.pending) == 0) {
6513 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
7020 <                            t.result = reducer.apply(t.result, s.result);
7021 <                        }
7022 <                        if ((par = t.parent) == null ||
7023 <                            !(par instanceof MapReduceValuesToIntTask)) {
7024 <                            t.quietlyComplete();
7025 <                            break;
7026 <                        }
7027 <                        t = (MapReduceValuesToIntTask<K,V>)par;
7028 <                    }
7029 <                    else if (t.casPending(c, c - 1))
7030 <                        break;
6497 >                throw new NullPointerException();
6498 >            int r = this.basis;
6499 >            for (int b; (b = preSplit()) > 0;)
6500 >                (rights = new MapReduceValuesToIntTask<K,V>
6501 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6502 >            Object v;
6503 >            while ((v = advance()) != null)
6504 >                r = reducer.apply(r, transformer.apply((V)v));
6505 >            result = r;
6506 >            CountedCompleter<?> c;
6507 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6508 >                MapReduceValuesToIntTask<K,V>
6509 >                    t = (MapReduceValuesToIntTask<K,V>)c,
6510 >                    s = t.rights;
6511 >                while (s != null) {
6512 >                    t.result = reducer.apply(t.result, s.result);
6513 >                    s = t.rights = s.nextRight;
6514                  }
7032            } catch (Throwable ex) {
7033                return tryCompleteComputation(ex);
7034            }
7035            MapReduceValuesToIntTask<K,V> s = rights;
7036            if (s != null && !inForkJoinPool()) {
7037                do  {
7038                    if (s.tryUnfork())
7039                        s.exec();
7040                } while ((s = s.nextRight) != null);
6515              }
7042            return false;
6516          }
7044        public final Integer getRawResult() { return result; }
6517      }
6518  
6519      @SuppressWarnings("serial") static final class MapReduceEntriesToIntTask<K,V>
6520 <        extends BulkTask<K,V,Integer> {
6520 >        extends Traverser<K,V,Integer> {
6521          final ObjectToInt<Map.Entry<K,V>> transformer;
6522          final IntByIntToInt reducer;
6523          final int basis;
6524          int result;
6525          MapReduceEntriesToIntTask<K,V> rights, nextRight;
6526          MapReduceEntriesToIntTask
6527 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6527 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6528               MapReduceEntriesToIntTask<K,V> nextRight,
6529               ObjectToInt<Map.Entry<K,V>> transformer,
6530               int basis,
# Line 7061 | Line 6533 | public class ConcurrentHashMapV8<K, V>
6533              this.transformer = transformer;
6534              this.basis = basis; this.reducer = reducer;
6535          }
6536 <        @SuppressWarnings("unchecked") public final boolean exec() {
6536 >        public final Integer getRawResult() { return result; }
6537 >        @SuppressWarnings("unchecked") public final void compute() {
6538              final ObjectToInt<Map.Entry<K,V>> transformer =
6539                  this.transformer;
6540              final IntByIntToInt reducer = this.reducer;
6541              if (transformer == null || reducer == null)
6542 <                return abortOnNullFunction();
6543 <            try {
6544 <                final int id = this.basis;
6545 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6546 <                    do {} while (!casPending(c = pending, c+1));
6547 <                    (rights = new MapReduceEntriesToIntTask<K,V>
6548 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6549 <                }
6550 <                int r = id;
6551 <                Object v;
6552 <                while ((v = advance()) != null)
6553 <                    r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6554 <                result = r;
6555 <                for (MapReduceEntriesToIntTask<K,V> t = this, s;;) {
6556 <                    int c; BulkTask<K,V,?> par;
6557 <                    if ((c = t.pending) == 0) {
6558 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
7086 <                            t.result = reducer.apply(t.result, s.result);
7087 <                        }
7088 <                        if ((par = t.parent) == null ||
7089 <                            !(par instanceof MapReduceEntriesToIntTask)) {
7090 <                            t.quietlyComplete();
7091 <                            break;
7092 <                        }
7093 <                        t = (MapReduceEntriesToIntTask<K,V>)par;
7094 <                    }
7095 <                    else if (t.casPending(c, c - 1))
7096 <                        break;
6542 >                throw new NullPointerException();
6543 >            int r = this.basis;
6544 >            for (int b; (b = preSplit()) > 0;)
6545 >                (rights = new MapReduceEntriesToIntTask<K,V>
6546 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6547 >            Object v;
6548 >            while ((v = advance()) != null)
6549 >                r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6550 >            result = r;
6551 >            CountedCompleter<?> c;
6552 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6553 >                MapReduceEntriesToIntTask<K,V>
6554 >                    t = (MapReduceEntriesToIntTask<K,V>)c,
6555 >                    s = t.rights;
6556 >                while (s != null) {
6557 >                    t.result = reducer.apply(t.result, s.result);
6558 >                    s = t.rights = s.nextRight;
6559                  }
7098            } catch (Throwable ex) {
7099                return tryCompleteComputation(ex);
6560              }
7101            MapReduceEntriesToIntTask<K,V> s = rights;
7102            if (s != null && !inForkJoinPool()) {
7103                do  {
7104                    if (s.tryUnfork())
7105                        s.exec();
7106                } while ((s = s.nextRight) != null);
7107            }
7108            return false;
6561          }
7110        public final Integer getRawResult() { return result; }
6562      }
6563  
6564      @SuppressWarnings("serial") static final class MapReduceMappingsToIntTask<K,V>
6565 <        extends BulkTask<K,V,Integer> {
6565 >        extends Traverser<K,V,Integer> {
6566          final ObjectByObjectToInt<? super K, ? super V> transformer;
6567          final IntByIntToInt reducer;
6568          final int basis;
6569          int result;
6570          MapReduceMappingsToIntTask<K,V> rights, nextRight;
6571          MapReduceMappingsToIntTask
6572 <            (ConcurrentHashMapV8<K,V> m, BulkTask<K,V,?> p, int b,
6573 <             MapReduceMappingsToIntTask<K,V> rights,
6572 >            (ConcurrentHashMapV8<K,V> m, Traverser<K,V,?> p, int b,
6573 >             MapReduceMappingsToIntTask<K,V> nextRight,
6574               ObjectByObjectToInt<? super K, ? super V> transformer,
6575               int basis,
6576               IntByIntToInt reducer) {
# Line 7127 | Line 6578 | public class ConcurrentHashMapV8<K, V>
6578              this.transformer = transformer;
6579              this.basis = basis; this.reducer = reducer;
6580          }
6581 <        @SuppressWarnings("unchecked") public final boolean exec() {
6581 >        public final Integer getRawResult() { return result; }
6582 >        @SuppressWarnings("unchecked") public final void compute() {
6583              final ObjectByObjectToInt<? super K, ? super V> transformer =
6584                  this.transformer;
6585              final IntByIntToInt reducer = this.reducer;
6586              if (transformer == null || reducer == null)
6587 <                return abortOnNullFunction();
6588 <            try {
6589 <                final int id = this.basis;
6590 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6591 <                    do {} while (!casPending(c = pending, c+1));
6592 <                    (rights = new MapReduceMappingsToIntTask<K,V>
6593 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6594 <                }
6595 <                int r = id;
6596 <                Object v;
6597 <                while ((v = advance()) != null)
6598 <                    r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6599 <                result = r;
6600 <                for (MapReduceMappingsToIntTask<K,V> t = this, s;;) {
6601 <                    int c; BulkTask<K,V,?> par;
6602 <                    if ((c = t.pending) == 0) {
6603 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
7152 <                            t.result = reducer.apply(t.result, s.result);
7153 <                        }
7154 <                        if ((par = t.parent) == null ||
7155 <                            !(par instanceof MapReduceMappingsToIntTask)) {
7156 <                            t.quietlyComplete();
7157 <                            break;
7158 <                        }
7159 <                        t = (MapReduceMappingsToIntTask<K,V>)par;
7160 <                    }
7161 <                    else if (t.casPending(c, c - 1))
7162 <                        break;
6587 >                throw new NullPointerException();
6588 >            int r = this.basis;
6589 >            for (int b; (b = preSplit()) > 0;)
6590 >                (rights = new MapReduceMappingsToIntTask<K,V>
6591 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6592 >            Object v;
6593 >            while ((v = advance()) != null)
6594 >                r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6595 >            result = r;
6596 >            CountedCompleter<?> c;
6597 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6598 >                MapReduceMappingsToIntTask<K,V>
6599 >                    t = (MapReduceMappingsToIntTask<K,V>)c,
6600 >                    s = t.rights;
6601 >                while (s != null) {
6602 >                    t.result = reducer.apply(t.result, s.result);
6603 >                    s = t.rights = s.nextRight;
6604                  }
7164            } catch (Throwable ex) {
7165                return tryCompleteComputation(ex);
6605              }
7167            MapReduceMappingsToIntTask<K,V> s = rights;
7168            if (s != null && !inForkJoinPool()) {
7169                do  {
7170                    if (s.tryUnfork())
7171                        s.exec();
7172                } while ((s = s.nextRight) != null);
7173            }
7174            return false;
6606          }
7176        public final Integer getRawResult() { return result; }
6607      }
6608  
6609      // Unsafe mechanics

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines