ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ConcurrentHashMap.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ConcurrentHashMap.java (file contents):
Revision 1.145 by jsr166, Sun Nov 18 18:03:10 2012 UTC vs.
Revision 1.146 by dl, Fri Nov 23 17:50:56 2012 UTC

# Line 7 | Line 7
7   package java.util.concurrent;
8   import java.util.concurrent.atomic.LongAdder;
9   import java.util.concurrent.ForkJoinPool;
10 < import java.util.concurrent.ForkJoinTask;
10 > import java.util.concurrent.CountedCompleter;
11  
12   import java.util.Comparator;
13   import java.util.Arrays;
# Line 2386 | Line 2386 | public class ConcurrentHashMap<K, V>
2386       * across threads, iteration terminates if a bounds checks fails
2387       * for a table read.
2388       *
2389 <     * This class extends ForkJoinTask to streamline parallel
2390 <     * iteration in bulk operations (see BulkTask). This adds only an
2391 <     * int of space overhead, which is close enough to negligible in
2392 <     * cases where it is not needed to not worry about it.  Because
2393 <     * ForkJoinTask is Serializable, but iterators need not be, we
2394 <     * need to add warning suppressions.
2389 >     * This class extends CountedCompleter to streamline parallel
2390 >     * iteration in bulk operations. This adds only a few fields of
2391 >     * space overhead, which is small enough in cases where it is not
2392 >     * needed to not worry about it.  Because CountedCompleter is
2393 >     * Serializable, but iterators need not be, we need to add warning
2394 >     * suppressions.
2395       */
2396 <    @SuppressWarnings("serial") static class Traverser<K,V,R> extends ForkJoinTask<R> {
2396 >    @SuppressWarnings("serial") static class Traverser<K,V,R> extends CountedCompleter<R> {
2397          final ConcurrentHashMap<K, V> map;
2398          Node next;           // the next entry to use
2399          Object nextKey;      // cached key field of next
# Line 2403 | Line 2403 | public class ConcurrentHashMap<K, V>
2403          int baseIndex;       // current index of initial table
2404          int baseLimit;       // index bound for initial table
2405          int baseSize;        // initial table size
2406 +        int batch;           // split control
2407  
2408          /** Creates iterator for all entries in the table. */
2409          Traverser(ConcurrentHashMap<K, V> map) {
2410              this.map = map;
2411          }
2412  
2413 <        /** Creates iterator for split() methods */
2414 <        Traverser(Traverser<K,V,?> it) {
2415 <            ConcurrentHashMap<K, V> m; Node[] t;
2416 <            if ((m = this.map = it.map) == null)
2417 <                t = null;
2418 <            else if ((t = it.tab) == null && // force parent tab initialization
2419 <                     (t = it.tab = m.table) != null)
2420 <                it.baseLimit = it.baseSize = t.length;
2421 <            this.tab = t;
2422 <            this.baseSize = it.baseSize;
2423 <            it.baseLimit = this.index = this.baseIndex =
2424 <                ((this.baseLimit = it.baseLimit) + it.baseIndex + 1) >>> 1;
2413 >        /** Creates iterator for split() methods and task constructors */
2414 >        Traverser(ConcurrentHashMap<K,V> map, Traverser<K,V,?> it, int batch) {
2415 >            super(it);
2416 >            this.batch = batch;
2417 >            if ((this.map = map) != null && it != null) { // split parent
2418 >                Node[] t;
2419 >                if ((t = it.tab) == null &&
2420 >                    (t = it.tab = map.table) != null)
2421 >                    it.baseLimit = it.baseSize = t.length;
2422 >                this.tab = t;
2423 >                this.baseSize = it.baseSize;
2424 >                int hi = this.baseLimit = it.baseLimit;
2425 >                it.baseLimit = this.index = this.baseIndex =
2426 >                    (hi + it.baseIndex + 1) >>> 1;
2427 >            }
2428          }
2429  
2430          /**
# Line 2473 | Line 2477 | public class ConcurrentHashMap<K, V>
2477          }
2478  
2479          public final boolean hasMoreElements() { return hasNext(); }
2480 <        public final void setRawResult(Object x) { }
2481 <        public R getRawResult() { return null; }
2482 <        public boolean exec() { return true; }
2480 >
2481 >        public void compute() { } // default no-op CountedCompleter body
2482 >
2483 >        /**
2484 >         * Returns a batch value > 0 if this task should (and must) be
2485 >         * split, if so, adding to pending count, and in any case
2486 >         * updating batch value. The initial batch value is approx
2487 >         * exp2 of the number of times (minus one) to split task by
2488 >         * two before executing leaf action. This value is faster to
2489 >         * compute and more convenient to use as a guide to splitting
2490 >         * than is the depth, since it is used while dividing by two
2491 >         * anyway.
2492 >         */
2493 >        final int preSplit() {
2494 >            ConcurrentHashMap<K, V> m; int b; Node[] t;  ForkJoinPool pool;
2495 >            if ((b = batch) < 0 && (m = map) != null) { // force initialization
2496 >                if ((t = tab) == null && (t = tab = m.table) != null)
2497 >                    baseLimit = baseSize = t.length;
2498 >                if (t != null) {
2499 >                    long n = m.counter.sum();
2500 >                    int par = ((pool = getPool()) == null) ?
2501 >                        ForkJoinPool.getCommonPoolParallelism() :
2502 >                        pool.getParallelism();
2503 >                    int sp = par << 3; // slack of 8
2504 >                    b = (n <= 0L) ? 0 : (n < (long)sp) ? (int)n : sp;
2505 >                }
2506 >            }
2507 >            b = (b <= 1 || baseIndex == baseLimit)? 0 : (b >>> 1);
2508 >            if ((batch = b) > 0)
2509 >                addToPendingCount(1);
2510 >            return b;
2511 >        }
2512 >
2513      }
2514  
2515      /* ---------------- Public operations -------------- */
# Line 2615 | Line 2649 | public class ConcurrentHashMap<K, V>
2649       * Returns the number of mappings. This method should be used
2650       * instead of {@link #size} because a ConcurrentHashMap may
2651       * contain more mappings than can be represented as an int. The
2652 <     * value returned is a snapshot; the actual count may differ if
2653 <     * there are ongoing concurrent insertions or removals.
2652 >     * value returned is an estimate; the actual count may differ if
2653 >     * there are concurrent insertions or removals.
2654       *
2655       * @return the number of mappings
2656       */
# Line 3177 | Line 3211 | public class ConcurrentHashMap<K, V>
3211      @SuppressWarnings("serial") static final class KeyIterator<K,V> extends Traverser<K,V,Object>
3212          implements Spliterator<K>, Enumeration<K> {
3213          KeyIterator(ConcurrentHashMap<K, V> map) { super(map); }
3214 <        KeyIterator(Traverser<K,V,Object> it) {
3215 <            super(it);
3214 >        KeyIterator(ConcurrentHashMap<K, V> map, Traverser<K,V,Object> it) {
3215 >            super(map, it, -1);
3216          }
3217          public KeyIterator<K,V> split() {
3218              if (nextKey != null)
3219                  throw new IllegalStateException();
3220 <            return new KeyIterator<K,V>(this);
3220 >            return new KeyIterator<K,V>(map, this);
3221          }
3222          @SuppressWarnings("unchecked") public final K next() {
3223              if (nextVal == null && advance() == null)
# Line 3199 | Line 3233 | public class ConcurrentHashMap<K, V>
3233      @SuppressWarnings("serial") static final class ValueIterator<K,V> extends Traverser<K,V,Object>
3234          implements Spliterator<V>, Enumeration<V> {
3235          ValueIterator(ConcurrentHashMap<K, V> map) { super(map); }
3236 <        ValueIterator(Traverser<K,V,Object> it) {
3237 <            super(it);
3236 >        ValueIterator(ConcurrentHashMap<K, V> map, Traverser<K,V,Object> it) {
3237 >            super(map, it, -1);
3238          }
3239          public ValueIterator<K,V> split() {
3240              if (nextKey != null)
3241                  throw new IllegalStateException();
3242 <            return new ValueIterator<K,V>(this);
3242 >            return new ValueIterator<K,V>(map, this);
3243          }
3244  
3245          @SuppressWarnings("unchecked") public final V next() {
# Line 3222 | Line 3256 | public class ConcurrentHashMap<K, V>
3256      @SuppressWarnings("serial") static final class EntryIterator<K,V> extends Traverser<K,V,Object>
3257          implements Spliterator<Map.Entry<K,V>> {
3258          EntryIterator(ConcurrentHashMap<K, V> map) { super(map); }
3259 <        EntryIterator(Traverser<K,V,Object> it) {
3260 <            super(it);
3259 >        EntryIterator(ConcurrentHashMap<K, V> map, Traverser<K,V,Object> it) {
3260 >            super(map, it, -1);
3261          }
3262          public EntryIterator<K,V> split() {
3263              if (nextKey != null)
3264                  throw new IllegalStateException();
3265 <            return new EntryIterator<K,V>(this);
3265 >            return new EntryIterator<K,V>(map, this);
3266          }
3267  
3268          @SuppressWarnings("unchecked") public final Map.Entry<K,V> next() {
# Line 3284 | Line 3318 | public class ConcurrentHashMap<K, V>
3318          }
3319      }
3320  
3321 +    /**
3322 +     * Returns exportable snapshot entry for the given key and value
3323 +     * when write-through can't or shouldn't be used.
3324 +     */
3325 +    static <K,V> AbstractMap.SimpleEntry<K,V> entryFor(K k, V v) {
3326 +        return new AbstractMap.SimpleEntry<K,V>(k, v);
3327 +    }
3328 +
3329      /* ---------------- Serialization Support -------------- */
3330  
3331      /**
# Line 4671 | Line 4713 | public class ConcurrentHashMap<K, V>
4713              (ConcurrentHashMap<K,V> map,
4714               BiAction<K,V> action) {
4715              if (action == null) throw new NullPointerException();
4716 <            return new ForEachMappingTask<K,V>(map, null, -1, null, action);
4716 >            return new ForEachMappingTask<K,V>(map, null, -1, action);
4717          }
4718  
4719          /**
# Line 4692 | Line 4734 | public class ConcurrentHashMap<K, V>
4734              if (transformer == null || action == null)
4735                  throw new NullPointerException();
4736              return new ForEachTransformedMappingTask<K,V,U>
4737 <                (map, null, -1, null, transformer, action);
4737 >                (map, null, -1, transformer, action);
4738          }
4739  
4740          /**
# Line 4712 | Line 4754 | public class ConcurrentHashMap<K, V>
4754               BiFun<? super K, ? super V, ? extends U> searchFunction) {
4755              if (searchFunction == null) throw new NullPointerException();
4756              return new SearchMappingsTask<K,V,U>
4757 <                (map, null, -1, null, searchFunction,
4757 >                (map, null, -1, searchFunction,
4758                   new AtomicReference<U>());
4759          }
4760  
# Line 4821 | Line 4863 | public class ConcurrentHashMap<K, V>
4863              (ConcurrentHashMap<K,V> map,
4864               Action<K> action) {
4865              if (action == null) throw new NullPointerException();
4866 <            return new ForEachKeyTask<K,V>(map, null, -1, null, action);
4866 >            return new ForEachKeyTask<K,V>(map, null, -1, action);
4867          }
4868  
4869          /**
# Line 4842 | Line 4884 | public class ConcurrentHashMap<K, V>
4884              if (transformer == null || action == null)
4885                  throw new NullPointerException();
4886              return new ForEachTransformedKeyTask<K,V,U>
4887 <                (map, null, -1, null, transformer, action);
4887 >                (map, null, -1, transformer, action);
4888          }
4889  
4890          /**
# Line 4862 | Line 4904 | public class ConcurrentHashMap<K, V>
4904               Fun<? super K, ? extends U> searchFunction) {
4905              if (searchFunction == null) throw new NullPointerException();
4906              return new SearchKeysTask<K,V,U>
4907 <                (map, null, -1, null, searchFunction,
4907 >                (map, null, -1, searchFunction,
4908                   new AtomicReference<U>());
4909          }
4910  
# Line 4988 | Line 5030 | public class ConcurrentHashMap<K, V>
5030              (ConcurrentHashMap<K,V> map,
5031               Action<V> action) {
5032              if (action == null) throw new NullPointerException();
5033 <            return new ForEachValueTask<K,V>(map, null, -1, null, action);
5033 >            return new ForEachValueTask<K,V>(map, null, -1, action);
5034          }
5035  
5036          /**
# Line 5008 | Line 5050 | public class ConcurrentHashMap<K, V>
5050              if (transformer == null || action == null)
5051                  throw new NullPointerException();
5052              return new ForEachTransformedValueTask<K,V,U>
5053 <                (map, null, -1, null, transformer, action);
5053 >                (map, null, -1, transformer, action);
5054          }
5055  
5056          /**
# Line 5028 | Line 5070 | public class ConcurrentHashMap<K, V>
5070               Fun<? super V, ? extends U> searchFunction) {
5071              if (searchFunction == null) throw new NullPointerException();
5072              return new SearchValuesTask<K,V,U>
5073 <                (map, null, -1, null, searchFunction,
5073 >                (map, null, -1, searchFunction,
5074                   new AtomicReference<U>());
5075          }
5076  
# Line 5154 | Line 5196 | public class ConcurrentHashMap<K, V>
5196              (ConcurrentHashMap<K,V> map,
5197               Action<Map.Entry<K,V>> action) {
5198              if (action == null) throw new NullPointerException();
5199 <            return new ForEachEntryTask<K,V>(map, null, -1, null, action);
5199 >            return new ForEachEntryTask<K,V>(map, null, -1, action);
5200          }
5201  
5202          /**
# Line 5174 | Line 5216 | public class ConcurrentHashMap<K, V>
5216              if (transformer == null || action == null)
5217                  throw new NullPointerException();
5218              return new ForEachTransformedEntryTask<K,V,U>
5219 <                (map, null, -1, null, transformer, action);
5219 >                (map, null, -1, transformer, action);
5220          }
5221  
5222          /**
# Line 5194 | Line 5236 | public class ConcurrentHashMap<K, V>
5236               Fun<Map.Entry<K,V>, ? extends U> searchFunction) {
5237              if (searchFunction == null) throw new NullPointerException();
5238              return new SearchEntriesTask<K,V,U>
5239 <                (map, null, -1, null, searchFunction,
5239 >                (map, null, -1, searchFunction,
5240                   new AtomicReference<U>());
5241          }
5242  
# Line 5312 | Line 5354 | public class ConcurrentHashMap<K, V>
5354  
5355      // -------------------------------------------------------
5356  
5315    /**
5316     * Base for FJ tasks for bulk operations. This adds a variant of
5317     * CountedCompleters and some split and merge bookkeeping to
5318     * iterator functionality. The forEach and reduce methods are
5319     * similar to those illustrated in CountedCompleter documentation,
5320     * except that bottom-up reduction completions perform them within
5321     * their compute methods. The search methods are like forEach
5322     * except they continually poll for success and exit early.  Also,
5323     * exceptions are handled in a simpler manner, by just trying to
5324     * complete root task exceptionally.
5325     */
5326    @SuppressWarnings("serial") static abstract class BulkTask<K,V,R> extends Traverser<K,V,R> {
5327        final BulkTask<K,V,?> parent;  // completion target
5328        int batch;                     // split control; -1 for unknown
5329        int pending;                   // completion control
5330
5331        BulkTask(ConcurrentHashMap<K,V> map, BulkTask<K,V,?> parent,
5332                 int batch) {
5333            super(map);
5334            this.parent = parent;
5335            this.batch = batch;
5336            if (parent != null && map != null) { // split parent
5337                Node[] t;
5338                if ((t = parent.tab) == null &&
5339                    (t = parent.tab = map.table) != null)
5340                    parent.baseLimit = parent.baseSize = t.length;
5341                this.tab = t;
5342                this.baseSize = parent.baseSize;
5343                int hi = this.baseLimit = parent.baseLimit;
5344                parent.baseLimit = this.index = this.baseIndex =
5345                    (hi + parent.baseIndex + 1) >>> 1;
5346            }
5347        }
5348
5349        /**
5350         * Forces root task to complete.
5351         * @param ex if null, complete normally, else exceptionally
5352         * @return false to simplify use
5353         */
5354        final boolean tryCompleteComputation(Throwable ex) {
5355            for (BulkTask<K,V,?> a = this;;) {
5356                BulkTask<K,V,?> p = a.parent;
5357                if (p == null) {
5358                    if (ex != null)
5359                        a.completeExceptionally(ex);
5360                    else
5361                        a.quietlyComplete();
5362                    return false;
5363                }
5364                a = p;
5365            }
5366        }
5367
5368        /**
5369         * Version of tryCompleteComputation for function screening checks
5370         */
5371        final boolean abortOnNullFunction() {
5372            return tryCompleteComputation(new Error("Unexpected null function"));
5373        }
5374
5375        // utilities
5376
5377        /** CompareAndSet pending count */
5378        final boolean casPending(int cmp, int val) {
5379            return U.compareAndSwapInt(this, PENDING, cmp, val);
5380        }
5381
5382        /**
5383         * Returns approx exp2 of the number of times (minus one) to
5384         * split task by two before executing leaf action. This value
5385         * is faster to compute and more convenient to use as a guide
5386         * to splitting than is the depth, since it is used while
5387         * dividing by two anyway.
5388         */
5389        final int batch() {
5390            ConcurrentHashMap<K, V> m; int b; Node[] t;  ForkJoinPool pool;
5391            if ((b = batch) < 0 && (m = map) != null) { // force initialization
5392                if ((t = tab) == null && (t = tab = m.table) != null)
5393                    baseLimit = baseSize = t.length;
5394                if (t != null) {
5395                    long n = m.counter.sum();
5396                    int par = ((pool = getPool()) == null) ?
5397                        ForkJoinPool.getCommonPoolParallelism() :
5398                        pool.getParallelism();
5399                    int sp = par << 3; // slack of 8
5400                    b = batch = (n <= 0L) ? 0 : (n < (long)sp) ? (int)n : sp;
5401                }
5402            }
5403            return b;
5404        }
5405
5406        /**
5407         * Returns exportable snapshot entry.
5408         */
5409        static <K,V> AbstractMap.SimpleEntry<K,V> entryFor(K k, V v) {
5410            return new AbstractMap.SimpleEntry<K,V>(k, v);
5411        }
5412
5413        // Unsafe mechanics
5414        private static final sun.misc.Unsafe U;
5415        private static final long PENDING;
5416        static {
5417            try {
5418                U = sun.misc.Unsafe.getUnsafe();
5419                PENDING = U.objectFieldOffset
5420                    (BulkTask.class.getDeclaredField("pending"));
5421            } catch (Exception e) {
5422                throw new Error(e);
5423            }
5424        }
5425    }
5426
5427    /**
5428     * Base class for non-reductive actions
5429     */
5430    @SuppressWarnings("serial") static abstract class BulkAction<K,V,R> extends BulkTask<K,V,R> {
5431        BulkAction<K,V,?> nextTask;
5432        BulkAction(ConcurrentHashMap<K,V> map, BulkTask<K,V,?> parent,
5433                   int batch, BulkAction<K,V,?> nextTask) {
5434            super(map, parent, batch);
5435            this.nextTask = nextTask;
5436        }
5437
5438        /**
5439         * Try to complete task and upward parents. Upon hitting
5440         * non-completed parent, if a non-FJ task, try to help out the
5441         * computation.
5442         */
5443        final void tryComplete(BulkAction<K,V,?> subtasks) {
5444            BulkTask<K,V,?> a = this, s = a;
5445            for (int c;;) {
5446                if ((c = a.pending) == 0) {
5447                    if ((a = (s = a).parent) == null) {
5448                        s.quietlyComplete();
5449                        break;
5450                    }
5451                }
5452                else if (a.casPending(c, c - 1)) {
5453                    if (subtasks != null && !inForkJoinPool()) {
5454                        while ((s = a.parent) != null)
5455                            a = s;
5456                        while (!a.isDone()) {
5457                            BulkAction<K,V,?> next = subtasks.nextTask;
5458                            if (subtasks.tryUnfork())
5459                                subtasks.exec();
5460                            if ((subtasks = next) == null)
5461                                break;
5462                        }
5463                    }
5464                    break;
5465                }
5466            }
5467        }
5468
5469    }
5470
5357      /*
5358       * Task classes. Coded in a regular but ugly format/style to
5359       * simplify checks that each variant differs in the right way from
# Line 5475 | Line 5361 | public class ConcurrentHashMap<K, V>
5361       */
5362  
5363      @SuppressWarnings("serial") static final class ForEachKeyTask<K,V>
5364 <        extends BulkAction<K,V,Void> {
5364 >        extends Traverser<K,V,Void> {
5365          final Action<K> action;
5366          ForEachKeyTask
5367 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5482 <             ForEachKeyTask<K,V> nextTask,
5367 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5368               Action<K> action) {
5369 <            super(m, p, b, nextTask);
5369 >            super(m, p, b);
5370              this.action = action;
5371          }
5372 <        @SuppressWarnings("unchecked") public final boolean exec() {
5373 <            final Action<K> action = this.action;
5374 <            if (action == null)
5375 <                return abortOnNullFunction();
5376 <            ForEachKeyTask<K,V> subtasks = null;
5377 <            try {
5378 <                int b = batch(), c;
5379 <                while (b > 1 && baseIndex != baseLimit) {
5380 <                    do {} while (!casPending(c = pending, c+1));
5496 <                    (subtasks = new ForEachKeyTask<K,V>
5497 <                     (map, this, b >>>= 1, subtasks, action)).fork();
5498 <                }
5499 <                while (advance() != null)
5500 <                    action.apply((K)nextKey);
5501 <            } catch (Throwable ex) {
5502 <                return tryCompleteComputation(ex);
5503 <            }
5504 <            tryComplete(subtasks);
5505 <            return false;
5372 >        @SuppressWarnings("unchecked") public final void compute() {
5373 >            final Action<K> action;
5374 >            if ((action = this.action) == null)
5375 >                throw new NullPointerException();
5376 >            for (int b; (b = preSplit()) > 0;)
5377 >                new ForEachKeyTask<K,V>(map, this, b, action).fork();
5378 >            while (advance() != null)
5379 >                action.apply((K)nextKey);
5380 >            propagateCompletion();
5381          }
5382      }
5383  
5384      @SuppressWarnings("serial") static final class ForEachValueTask<K,V>
5385 <        extends BulkAction<K,V,Void> {
5385 >        extends Traverser<K,V,Void> {
5386          final Action<V> action;
5387          ForEachValueTask
5388 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5514 <             ForEachValueTask<K,V> nextTask,
5388 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5389               Action<V> action) {
5390 <            super(m, p, b, nextTask);
5390 >            super(m, p, b);
5391              this.action = action;
5392          }
5393 <        @SuppressWarnings("unchecked") public final boolean exec() {
5394 <            final Action<V> action = this.action;
5395 <            if (action == null)
5396 <                return abortOnNullFunction();
5397 <            ForEachValueTask<K,V> subtasks = null;
5398 <            try {
5399 <                int b = batch(), c;
5400 <                while (b > 1 && baseIndex != baseLimit) {
5401 <                    do {} while (!casPending(c = pending, c+1));
5402 <                    (subtasks = new ForEachValueTask<K,V>
5529 <                     (map, this, b >>>= 1, subtasks, action)).fork();
5530 <                }
5531 <                Object v;
5532 <                while ((v = advance()) != null)
5533 <                    action.apply((V)v);
5534 <            } catch (Throwable ex) {
5535 <                return tryCompleteComputation(ex);
5536 <            }
5537 <            tryComplete(subtasks);
5538 <            return false;
5393 >        @SuppressWarnings("unchecked") public final void compute() {
5394 >            final Action<V> action;
5395 >            if ((action = this.action) == null)
5396 >                throw new NullPointerException();
5397 >            for (int b; (b = preSplit()) > 0;)
5398 >                new ForEachValueTask<K,V>(map, this, b, action).fork();
5399 >            Object v;
5400 >            while ((v = advance()) != null)
5401 >                action.apply((V)v);
5402 >            propagateCompletion();
5403          }
5404      }
5405  
5406      @SuppressWarnings("serial") static final class ForEachEntryTask<K,V>
5407 <        extends BulkAction<K,V,Void> {
5407 >        extends Traverser<K,V,Void> {
5408          final Action<Entry<K,V>> action;
5409          ForEachEntryTask
5410 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5547 <             ForEachEntryTask<K,V> nextTask,
5410 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5411               Action<Entry<K,V>> action) {
5412 <            super(m, p, b, nextTask);
5412 >            super(m, p, b);
5413              this.action = action;
5414          }
5415 <        @SuppressWarnings("unchecked") public final boolean exec() {
5416 <            final Action<Entry<K,V>> action = this.action;
5417 <            if (action == null)
5418 <                return abortOnNullFunction();
5419 <            ForEachEntryTask<K,V> subtasks = null;
5420 <            try {
5421 <                int b = batch(), c;
5422 <                while (b > 1 && baseIndex != baseLimit) {
5423 <                    do {} while (!casPending(c = pending, c+1));
5424 <                    (subtasks = new ForEachEntryTask<K,V>
5562 <                     (map, this, b >>>= 1, subtasks, action)).fork();
5563 <                }
5564 <                Object v;
5565 <                while ((v = advance()) != null)
5566 <                    action.apply(entryFor((K)nextKey, (V)v));
5567 <            } catch (Throwable ex) {
5568 <                return tryCompleteComputation(ex);
5569 <            }
5570 <            tryComplete(subtasks);
5571 <            return false;
5415 >        @SuppressWarnings("unchecked") public final void compute() {
5416 >            final Action<Entry<K,V>> action;
5417 >            if ((action = this.action) == null)
5418 >                throw new NullPointerException();
5419 >            for (int b; (b = preSplit()) > 0;)
5420 >                new ForEachEntryTask<K,V>(map, this, b, action).fork();
5421 >            Object v;
5422 >            while ((v = advance()) != null)
5423 >                action.apply(entryFor((K)nextKey, (V)v));
5424 >            propagateCompletion();
5425          }
5426      }
5427  
5428      @SuppressWarnings("serial") static final class ForEachMappingTask<K,V>
5429 <        extends BulkAction<K,V,Void> {
5429 >        extends Traverser<K,V,Void> {
5430          final BiAction<K,V> action;
5431          ForEachMappingTask
5432 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5580 <             ForEachMappingTask<K,V> nextTask,
5432 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5433               BiAction<K,V> action) {
5434 <            super(m, p, b, nextTask);
5434 >            super(m, p, b);
5435              this.action = action;
5436          }
5437 <        @SuppressWarnings("unchecked") public final boolean exec() {
5438 <            final BiAction<K,V> action = this.action;
5439 <            if (action == null)
5440 <                return abortOnNullFunction();
5441 <            ForEachMappingTask<K,V> subtasks = null;
5442 <            try {
5443 <                int b = batch(), c;
5444 <                while (b > 1 && baseIndex != baseLimit) {
5445 <                    do {} while (!casPending(c = pending, c+1));
5446 <                    (subtasks = new ForEachMappingTask<K,V>
5595 <                     (map, this, b >>>= 1, subtasks, action)).fork();
5596 <                }
5597 <                Object v;
5598 <                while ((v = advance()) != null)
5599 <                    action.apply((K)nextKey, (V)v);
5600 <            } catch (Throwable ex) {
5601 <                return tryCompleteComputation(ex);
5602 <            }
5603 <            tryComplete(subtasks);
5604 <            return false;
5437 >        @SuppressWarnings("unchecked") public final void compute() {
5438 >            final BiAction<K,V> action;
5439 >            if ((action = this.action) == null)
5440 >                throw new NullPointerException();
5441 >            for (int b; (b = preSplit()) > 0;)
5442 >                new ForEachMappingTask<K,V>(map, this, b, action).fork();
5443 >            Object v;
5444 >            while ((v = advance()) != null)
5445 >                action.apply((K)nextKey, (V)v);
5446 >            propagateCompletion();
5447          }
5448      }
5449  
5450      @SuppressWarnings("serial") static final class ForEachTransformedKeyTask<K,V,U>
5451 <        extends BulkAction<K,V,Void> {
5451 >        extends Traverser<K,V,Void> {
5452          final Fun<? super K, ? extends U> transformer;
5453          final Action<U> action;
5454          ForEachTransformedKeyTask
5455 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5456 <             ForEachTransformedKeyTask<K,V,U> nextTask,
5457 <             Fun<? super K, ? extends U> transformer,
5458 <             Action<U> action) {
5459 <            super(m, p, b, nextTask);
5460 <            this.transformer = transformer;
5461 <            this.action = action;
5462 <
5463 <        }
5464 <        @SuppressWarnings("unchecked") public final boolean exec() {
5465 <            final Fun<? super K, ? extends U> transformer =
5466 <                this.transformer;
5467 <            final Action<U> action = this.action;
5468 <            if (transformer == null || action == null)
5469 <                return abortOnNullFunction();
5470 <            ForEachTransformedKeyTask<K,V,U> subtasks = null;
5471 <            try {
5472 <                int b = batch(), c;
5631 <                while (b > 1 && baseIndex != baseLimit) {
5632 <                    do {} while (!casPending(c = pending, c+1));
5633 <                    (subtasks = new ForEachTransformedKeyTask<K,V,U>
5634 <                     (map, this, b >>>= 1, subtasks, transformer, action)).fork();
5635 <                }
5636 <                U u;
5637 <                while (advance() != null) {
5638 <                    if ((u = transformer.apply((K)nextKey)) != null)
5639 <                        action.apply(u);
5640 <                }
5641 <            } catch (Throwable ex) {
5642 <                return tryCompleteComputation(ex);
5455 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5456 >             Fun<? super K, ? extends U> transformer, Action<U> action) {
5457 >            super(m, p, b);
5458 >            this.transformer = transformer; this.action = action;
5459 >        }
5460 >        @SuppressWarnings("unchecked") public final void compute() {
5461 >            final Fun<? super K, ? extends U> transformer;
5462 >            final Action<U> action;
5463 >            if ((transformer = this.transformer) == null ||
5464 >                (action = this.action) == null)
5465 >                throw new NullPointerException();
5466 >            for (int b; (b = preSplit()) > 0;)
5467 >                new ForEachTransformedKeyTask<K,V,U>
5468 >                     (map, this, b, transformer, action).fork();
5469 >            U u;
5470 >            while (advance() != null) {
5471 >                if ((u = transformer.apply((K)nextKey)) != null)
5472 >                    action.apply(u);
5473              }
5474 <            tryComplete(subtasks);
5645 <            return false;
5474 >            propagateCompletion();
5475          }
5476      }
5477  
5478      @SuppressWarnings("serial") static final class ForEachTransformedValueTask<K,V,U>
5479 <        extends BulkAction<K,V,Void> {
5479 >        extends Traverser<K,V,Void> {
5480          final Fun<? super V, ? extends U> transformer;
5481          final Action<U> action;
5482          ForEachTransformedValueTask
5483 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5484 <             ForEachTransformedValueTask<K,V,U> nextTask,
5485 <             Fun<? super V, ? extends U> transformer,
5486 <             Action<U> action) {
5487 <            super(m, p, b, nextTask);
5488 <            this.transformer = transformer;
5489 <            this.action = action;
5490 <
5491 <        }
5492 <        @SuppressWarnings("unchecked") public final boolean exec() {
5493 <            final Fun<? super V, ? extends U> transformer =
5494 <                this.transformer;
5495 <            final Action<U> action = this.action;
5496 <            if (transformer == null || action == null)
5497 <                return abortOnNullFunction();
5498 <            ForEachTransformedValueTask<K,V,U> subtasks = null;
5499 <            try {
5500 <                int b = batch(), c;
5672 <                while (b > 1 && baseIndex != baseLimit) {
5673 <                    do {} while (!casPending(c = pending, c+1));
5674 <                    (subtasks = new ForEachTransformedValueTask<K,V,U>
5675 <                     (map, this, b >>>= 1, subtasks, transformer, action)).fork();
5676 <                }
5677 <                Object v; U u;
5678 <                while ((v = advance()) != null) {
5679 <                    if ((u = transformer.apply((V)v)) != null)
5680 <                        action.apply(u);
5681 <                }
5682 <            } catch (Throwable ex) {
5683 <                return tryCompleteComputation(ex);
5483 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5484 >             Fun<? super V, ? extends U> transformer, Action<U> action) {
5485 >            super(m, p, b);
5486 >            this.transformer = transformer; this.action = action;
5487 >        }
5488 >        @SuppressWarnings("unchecked") public final void compute() {
5489 >            final Fun<? super V, ? extends U> transformer;
5490 >            final Action<U> action;
5491 >            if ((transformer = this.transformer) == null ||
5492 >                (action = this.action) == null)
5493 >                throw new NullPointerException();
5494 >            for (int b; (b = preSplit()) > 0;)
5495 >                new ForEachTransformedValueTask<K,V,U>
5496 >                    (map, this, b, transformer, action).fork();
5497 >            Object v; U u;
5498 >            while ((v = advance()) != null) {
5499 >                if ((u = transformer.apply((V)v)) != null)
5500 >                    action.apply(u);
5501              }
5502 <            tryComplete(subtasks);
5686 <            return false;
5502 >            propagateCompletion();
5503          }
5504      }
5505  
5506      @SuppressWarnings("serial") static final class ForEachTransformedEntryTask<K,V,U>
5507 <        extends BulkAction<K,V,Void> {
5507 >        extends Traverser<K,V,Void> {
5508          final Fun<Map.Entry<K,V>, ? extends U> transformer;
5509          final Action<U> action;
5510          ForEachTransformedEntryTask
5511 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5512 <             ForEachTransformedEntryTask<K,V,U> nextTask,
5513 <             Fun<Map.Entry<K,V>, ? extends U> transformer,
5514 <             Action<U> action) {
5515 <            super(m, p, b, nextTask);
5516 <            this.transformer = transformer;
5517 <            this.action = action;
5518 <
5519 <        }
5520 <        @SuppressWarnings("unchecked") public final boolean exec() {
5521 <            final Fun<Map.Entry<K,V>, ? extends U> transformer =
5522 <                this.transformer;
5523 <            final Action<U> action = this.action;
5524 <            if (transformer == null || action == null)
5525 <                return abortOnNullFunction();
5526 <            ForEachTransformedEntryTask<K,V,U> subtasks = null;
5527 <            try {
5528 <                int b = batch(), c;
5713 <                while (b > 1 && baseIndex != baseLimit) {
5714 <                    do {} while (!casPending(c = pending, c+1));
5715 <                    (subtasks = new ForEachTransformedEntryTask<K,V,U>
5716 <                     (map, this, b >>>= 1, subtasks, transformer, action)).fork();
5717 <                }
5718 <                Object v; U u;
5719 <                while ((v = advance()) != null) {
5720 <                    if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
5721 <                        action.apply(u);
5722 <                }
5723 <            } catch (Throwable ex) {
5724 <                return tryCompleteComputation(ex);
5511 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5512 >             Fun<Map.Entry<K,V>, ? extends U> transformer, Action<U> action) {
5513 >            super(m, p, b);
5514 >            this.transformer = transformer; this.action = action;
5515 >        }
5516 >        @SuppressWarnings("unchecked") public final void compute() {
5517 >            final Fun<Map.Entry<K,V>, ? extends U> transformer;
5518 >            final Action<U> action;
5519 >            if ((transformer = this.transformer) == null ||
5520 >                (action = this.action) == null)
5521 >                throw new NullPointerException();
5522 >            for (int b; (b = preSplit()) > 0;)
5523 >                new ForEachTransformedEntryTask<K,V,U>
5524 >                    (map, this, b, transformer, action).fork();
5525 >            Object v; U u;
5526 >            while ((v = advance()) != null) {
5527 >                if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
5528 >                    action.apply(u);
5529              }
5530 <            tryComplete(subtasks);
5727 <            return false;
5530 >            propagateCompletion();
5531          }
5532      }
5533  
5534      @SuppressWarnings("serial") static final class ForEachTransformedMappingTask<K,V,U>
5535 <        extends BulkAction<K,V,Void> {
5535 >        extends Traverser<K,V,Void> {
5536          final BiFun<? super K, ? super V, ? extends U> transformer;
5537          final Action<U> action;
5538          ForEachTransformedMappingTask
5539 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5737 <             ForEachTransformedMappingTask<K,V,U> nextTask,
5539 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5540               BiFun<? super K, ? super V, ? extends U> transformer,
5541               Action<U> action) {
5542 <            super(m, p, b, nextTask);
5543 <            this.transformer = transformer;
5742 <            this.action = action;
5743 <
5542 >            super(m, p, b);
5543 >            this.transformer = transformer; this.action = action;
5544          }
5545 <        @SuppressWarnings("unchecked") public final boolean exec() {
5546 <            final BiFun<? super K, ? super V, ? extends U> transformer =
5547 <                this.transformer;
5548 <            final Action<U> action = this.action;
5549 <            if (transformer == null || action == null)
5550 <                return abortOnNullFunction();
5551 <            ForEachTransformedMappingTask<K,V,U> subtasks = null;
5552 <            try {
5553 <                int b = batch(), c;
5554 <                while (b > 1 && baseIndex != baseLimit) {
5555 <                    do {} while (!casPending(c = pending, c+1));
5556 <                    (subtasks = new ForEachTransformedMappingTask<K,V,U>
5557 <                     (map, this, b >>>= 1, subtasks, transformer, action)).fork();
5758 <                }
5759 <                Object v; U u;
5760 <                while ((v = advance()) != null) {
5761 <                    if ((u = transformer.apply((K)nextKey, (V)v)) != null)
5762 <                        action.apply(u);
5763 <                }
5764 <            } catch (Throwable ex) {
5765 <                return tryCompleteComputation(ex);
5545 >        @SuppressWarnings("unchecked") public final void compute() {
5546 >            final BiFun<? super K, ? super V, ? extends U> transformer;
5547 >            final Action<U> action;
5548 >            if ((transformer = this.transformer) == null ||
5549 >                (action = this.action) == null)
5550 >                throw new NullPointerException();
5551 >            for (int b; (b = preSplit()) > 0;)
5552 >                new ForEachTransformedMappingTask<K,V,U>
5553 >                    (map, this, b, transformer, action).fork();
5554 >            Object v; U u;
5555 >            while ((v = advance()) != null) {
5556 >                if ((u = transformer.apply((K)nextKey, (V)v)) != null)
5557 >                    action.apply(u);
5558              }
5559 <            tryComplete(subtasks);
5768 <            return false;
5559 >            propagateCompletion();
5560          }
5561      }
5562  
5563      @SuppressWarnings("serial") static final class SearchKeysTask<K,V,U>
5564 <        extends BulkAction<K,V,U> {
5564 >        extends Traverser<K,V,U> {
5565          final Fun<? super K, ? extends U> searchFunction;
5566          final AtomicReference<U> result;
5567          SearchKeysTask
5568 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5778 <             SearchKeysTask<K,V,U> nextTask,
5568 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5569               Fun<? super K, ? extends U> searchFunction,
5570               AtomicReference<U> result) {
5571 <            super(m, p, b, nextTask);
5571 >            super(m, p, b);
5572              this.searchFunction = searchFunction; this.result = result;
5573          }
5574 <        @SuppressWarnings("unchecked") public final boolean exec() {
5575 <            AtomicReference<U> result = this.result;
5576 <            final Fun<? super K, ? extends U> searchFunction =
5577 <                this.searchFunction;
5578 <            if (searchFunction == null || result == null)
5579 <                return abortOnNullFunction();
5580 <            SearchKeysTask<K,V,U> subtasks = null;
5581 <            try {
5582 <                int b = batch(), c;
5583 <                while (b > 1 && baseIndex != baseLimit && result.get() == null) {
5584 <                    do {} while (!casPending(c = pending, c+1));
5585 <                    (subtasks = new SearchKeysTask<K,V,U>
5586 <                     (map, this, b >>>= 1, subtasks, searchFunction, result)).fork();
5587 <                }
5574 >        public final U getRawResult() { return result.get(); }
5575 >        @SuppressWarnings("unchecked") public final void compute() {
5576 >            final Fun<? super K, ? extends U> searchFunction;
5577 >            final AtomicReference<U> result;
5578 >            if ((searchFunction = this.searchFunction) == null ||
5579 >                (result = this.result) == null)
5580 >                throw new NullPointerException();
5581 >            for (int b;;) {
5582 >                if (result.get() != null)
5583 >                    return;
5584 >                if ((b = preSplit()) <= 0)
5585 >                    break;
5586 >                new SearchKeysTask<K,V,U>
5587 >                    (map, this, b, searchFunction, result).fork();
5588 >            }
5589 >            while (result.get() == null) {
5590                  U u;
5591 <                while (result.get() == null && advance() != null) {
5592 <                    if ((u = searchFunction.apply((K)nextKey)) != null) {
5593 <                        if (result.compareAndSet(null, u))
5594 <                            tryCompleteComputation(null);
5595 <                        break;
5596 <                    }
5591 >                if (advance() == null) {
5592 >                    propagateCompletion();
5593 >                    break;
5594 >                }
5595 >                if ((u = searchFunction.apply((K)nextKey)) != null) {
5596 >                    if (result.compareAndSet(null, u))
5597 >                        quietlyCompleteRoot();
5598 >                    break;
5599                  }
5806            } catch (Throwable ex) {
5807                return tryCompleteComputation(ex);
5600              }
5809            tryComplete(subtasks);
5810            return false;
5601          }
5812        public final U getRawResult() { return result.get(); }
5602      }
5603  
5604      @SuppressWarnings("serial") static final class SearchValuesTask<K,V,U>
5605 <        extends BulkAction<K,V,U> {
5605 >        extends Traverser<K,V,U> {
5606          final Fun<? super V, ? extends U> searchFunction;
5607          final AtomicReference<U> result;
5608          SearchValuesTask
5609 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5821 <             SearchValuesTask<K,V,U> nextTask,
5609 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5610               Fun<? super V, ? extends U> searchFunction,
5611               AtomicReference<U> result) {
5612 <            super(m, p, b, nextTask);
5612 >            super(m, p, b);
5613              this.searchFunction = searchFunction; this.result = result;
5614          }
5615 <        @SuppressWarnings("unchecked") public final boolean exec() {
5616 <            AtomicReference<U> result = this.result;
5617 <            final Fun<? super V, ? extends U> searchFunction =
5618 <                this.searchFunction;
5619 <            if (searchFunction == null || result == null)
5620 <                return abortOnNullFunction();
5621 <            SearchValuesTask<K,V,U> subtasks = null;
5622 <            try {
5623 <                int b = batch(), c;
5624 <                while (b > 1 && baseIndex != baseLimit && result.get() == null) {
5625 <                    do {} while (!casPending(c = pending, c+1));
5626 <                    (subtasks = new SearchValuesTask<K,V,U>
5627 <                     (map, this, b >>>= 1, subtasks, searchFunction, result)).fork();
5628 <                }
5615 >        public final U getRawResult() { return result.get(); }
5616 >        @SuppressWarnings("unchecked") public final void compute() {
5617 >            final Fun<? super V, ? extends U> searchFunction;
5618 >            final AtomicReference<U> result;
5619 >            if ((searchFunction = this.searchFunction) == null ||
5620 >                (result = this.result) == null)
5621 >                throw new NullPointerException();
5622 >            for (int b;;) {
5623 >                if (result.get() != null)
5624 >                    return;
5625 >                if ((b = preSplit()) <= 0)
5626 >                    break;
5627 >                new SearchValuesTask<K,V,U>
5628 >                    (map, this, b, searchFunction, result).fork();
5629 >            }
5630 >            while (result.get() == null) {
5631                  Object v; U u;
5632 <                while (result.get() == null && (v = advance()) != null) {
5633 <                    if ((u = searchFunction.apply((V)v)) != null) {
5634 <                        if (result.compareAndSet(null, u))
5635 <                            tryCompleteComputation(null);
5636 <                        break;
5637 <                    }
5632 >                if ((v = advance()) == null) {
5633 >                    propagateCompletion();
5634 >                    break;
5635 >                }
5636 >                if ((u = searchFunction.apply((V)v)) != null) {
5637 >                    if (result.compareAndSet(null, u))
5638 >                        quietlyCompleteRoot();
5639 >                    break;
5640                  }
5849            } catch (Throwable ex) {
5850                return tryCompleteComputation(ex);
5641              }
5852            tryComplete(subtasks);
5853            return false;
5642          }
5855        public final U getRawResult() { return result.get(); }
5643      }
5644  
5645      @SuppressWarnings("serial") static final class SearchEntriesTask<K,V,U>
5646 <        extends BulkAction<K,V,U> {
5646 >        extends Traverser<K,V,U> {
5647          final Fun<Entry<K,V>, ? extends U> searchFunction;
5648          final AtomicReference<U> result;
5649          SearchEntriesTask
5650 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5864 <             SearchEntriesTask<K,V,U> nextTask,
5650 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5651               Fun<Entry<K,V>, ? extends U> searchFunction,
5652               AtomicReference<U> result) {
5653 <            super(m, p, b, nextTask);
5653 >            super(m, p, b);
5654              this.searchFunction = searchFunction; this.result = result;
5655          }
5656 <        @SuppressWarnings("unchecked") public final boolean exec() {
5657 <            AtomicReference<U> result = this.result;
5658 <            final Fun<Entry<K,V>, ? extends U> searchFunction =
5659 <                this.searchFunction;
5660 <            if (searchFunction == null || result == null)
5661 <                return abortOnNullFunction();
5662 <            SearchEntriesTask<K,V,U> subtasks = null;
5663 <            try {
5664 <                int b = batch(), c;
5665 <                while (b > 1 && baseIndex != baseLimit && result.get() == null) {
5666 <                    do {} while (!casPending(c = pending, c+1));
5667 <                    (subtasks = new SearchEntriesTask<K,V,U>
5668 <                     (map, this, b >>>= 1, subtasks, searchFunction, result)).fork();
5669 <                }
5656 >        public final U getRawResult() { return result.get(); }
5657 >        @SuppressWarnings("unchecked") public final void compute() {
5658 >            final Fun<Entry<K,V>, ? extends U> searchFunction;
5659 >            final AtomicReference<U> result;
5660 >            if ((searchFunction = this.searchFunction) == null ||
5661 >                (result = this.result) == null)
5662 >                throw new NullPointerException();
5663 >            for (int b;;) {
5664 >                if (result.get() != null)
5665 >                    return;
5666 >                if ((b = preSplit()) <= 0)
5667 >                    break;
5668 >                new SearchEntriesTask<K,V,U>
5669 >                    (map, this, b, searchFunction, result).fork();
5670 >            }
5671 >            while (result.get() == null) {
5672                  Object v; U u;
5673 <                while (result.get() == null && (v = advance()) != null) {
5674 <                    if ((u = searchFunction.apply(entryFor((K)nextKey, (V)v))) != null) {
5675 <                        if (result.compareAndSet(null, u))
5676 <                            tryCompleteComputation(null);
5677 <                        break;
5678 <                    }
5673 >                if ((v = advance()) == null) {
5674 >                    propagateCompletion();
5675 >                    break;
5676 >                }
5677 >                if ((u = searchFunction.apply(entryFor((K)nextKey, (V)v))) != null) {
5678 >                    if (result.compareAndSet(null, u))
5679 >                        quietlyCompleteRoot();
5680 >                    return;
5681                  }
5892            } catch (Throwable ex) {
5893                return tryCompleteComputation(ex);
5682              }
5895            tryComplete(subtasks);
5896            return false;
5683          }
5898        public final U getRawResult() { return result.get(); }
5684      }
5685  
5686      @SuppressWarnings("serial") static final class SearchMappingsTask<K,V,U>
5687 <        extends BulkAction<K,V,U> {
5687 >        extends Traverser<K,V,U> {
5688          final BiFun<? super K, ? super V, ? extends U> searchFunction;
5689          final AtomicReference<U> result;
5690          SearchMappingsTask
5691 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5907 <             SearchMappingsTask<K,V,U> nextTask,
5691 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5692               BiFun<? super K, ? super V, ? extends U> searchFunction,
5693               AtomicReference<U> result) {
5694 <            super(m, p, b, nextTask);
5694 >            super(m, p, b);
5695              this.searchFunction = searchFunction; this.result = result;
5696          }
5697 <        @SuppressWarnings("unchecked") public final boolean exec() {
5698 <            AtomicReference<U> result = this.result;
5699 <            final BiFun<? super K, ? super V, ? extends U> searchFunction =
5700 <                this.searchFunction;
5701 <            if (searchFunction == null || result == null)
5702 <                return abortOnNullFunction();
5703 <            SearchMappingsTask<K,V,U> subtasks = null;
5704 <            try {
5705 <                int b = batch(), c;
5706 <                while (b > 1 && baseIndex != baseLimit && result.get() == null) {
5707 <                    do {} while (!casPending(c = pending, c+1));
5708 <                    (subtasks = new SearchMappingsTask<K,V,U>
5709 <                     (map, this, b >>>= 1, subtasks, searchFunction, result)).fork();
5710 <                }
5697 >        public final U getRawResult() { return result.get(); }
5698 >        @SuppressWarnings("unchecked") public final void compute() {
5699 >            final BiFun<? super K, ? super V, ? extends U> searchFunction;
5700 >            final AtomicReference<U> result;
5701 >            if ((searchFunction = this.searchFunction) == null ||
5702 >                (result = this.result) == null)
5703 >                throw new NullPointerException();
5704 >            for (int b;;) {
5705 >                if (result.get() != null)
5706 >                    return;
5707 >                if ((b = preSplit()) <= 0)
5708 >                    break;
5709 >                new SearchMappingsTask<K,V,U>
5710 >                    (map, this, b, searchFunction, result).fork();
5711 >            }
5712 >            while (result.get() == null) {
5713                  Object v; U u;
5714 <                while (result.get() == null && (v = advance()) != null) {
5715 <                    if ((u = searchFunction.apply((K)nextKey, (V)v)) != null) {
5716 <                        if (result.compareAndSet(null, u))
5717 <                            tryCompleteComputation(null);
5718 <                        break;
5719 <                    }
5714 >                if ((v = advance()) == null) {
5715 >                    propagateCompletion();
5716 >                    break;
5717 >                }
5718 >                if ((u = searchFunction.apply((K)nextKey, (V)v)) != null) {
5719 >                    if (result.compareAndSet(null, u))
5720 >                        quietlyCompleteRoot();
5721 >                    break;
5722                  }
5935            } catch (Throwable ex) {
5936                return tryCompleteComputation(ex);
5723              }
5938            tryComplete(subtasks);
5939            return false;
5724          }
5941        public final U getRawResult() { return result.get(); }
5725      }
5726  
5727      @SuppressWarnings("serial") static final class ReduceKeysTask<K,V>
5728 <        extends BulkTask<K,V,K> {
5728 >        extends Traverser<K,V,K> {
5729          final BiFun<? super K, ? super K, ? extends K> reducer;
5730          K result;
5731          ReduceKeysTask<K,V> rights, nextRight;
5732          ReduceKeysTask
5733 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5733 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5734               ReduceKeysTask<K,V> nextRight,
5735               BiFun<? super K, ? super K, ? extends K> reducer) {
5736              super(m, p, b); this.nextRight = nextRight;
5737              this.reducer = reducer;
5738          }
5739 <        @SuppressWarnings("unchecked") public final boolean exec() {
5739 >        public final K getRawResult() { return result; }
5740 >        @SuppressWarnings("unchecked") public final void compute() {
5741              final BiFun<? super K, ? super K, ? extends K> reducer =
5742                  this.reducer;
5743              if (reducer == null)
5744 <                return abortOnNullFunction();
5745 <            try {
5746 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5747 <                    do {} while (!casPending(c = pending, c+1));
5748 <                    (rights = new ReduceKeysTask<K,V>
5749 <                     (map, this, b >>>= 1, rights, reducer)).fork();
5750 <                }
5751 <                K r = null;
5752 <                while (advance() != null) {
5753 <                    K u = (K)nextKey;
5754 <                    r = (r == null) ? u : reducer.apply(r, u);
5755 <                }
5756 <                result = r;
5757 <                for (ReduceKeysTask<K,V> t = this, s;;) {
5758 <                    int c; BulkTask<K,V,?> par; K tr, sr;
5759 <                    if ((c = t.pending) == 0) {
5760 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
5761 <                            if ((sr = s.result) != null)
5762 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
5763 <                        }
5764 <                        if ((par = t.parent) == null ||
5981 <                            !(par instanceof ReduceKeysTask)) {
5982 <                            t.quietlyComplete();
5983 <                            break;
5984 <                        }
5985 <                        t = (ReduceKeysTask<K,V>)par;
5986 <                    }
5987 <                    else if (t.casPending(c, c - 1))
5988 <                        break;
5744 >                throw new NullPointerException();
5745 >            for (int b; (b = preSplit()) > 0;)
5746 >                (rights = new ReduceKeysTask<K,V>
5747 >                 (map, this, b, rights, reducer)).fork();
5748 >            K r = null;
5749 >            while (advance() != null) {
5750 >                K u = (K)nextKey;
5751 >                r = (r == null) ? u : reducer.apply(r, u);
5752 >            }
5753 >            result = r;
5754 >            CountedCompleter<?> c;
5755 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5756 >                ReduceKeysTask<K,V>
5757 >                    t = (ReduceKeysTask<K,V>)c,
5758 >                    s = t.rights;
5759 >                while (s != null) {
5760 >                    K tr, sr;
5761 >                    if ((sr = s.result) != null)
5762 >                        t.result = (((tr = t.result) == null) ? sr :
5763 >                                    reducer.apply(tr, sr));
5764 >                    s = t.rights = s.nextRight;
5765                  }
5990            } catch (Throwable ex) {
5991                return tryCompleteComputation(ex);
5992            }
5993            ReduceKeysTask<K,V> s = rights;
5994            if (s != null && !inForkJoinPool()) {
5995                do  {
5996                    if (s.tryUnfork())
5997                        s.exec();
5998                } while ((s = s.nextRight) != null);
5766              }
6000            return false;
5767          }
6002        public final K getRawResult() { return result; }
5768      }
5769  
5770      @SuppressWarnings("serial") static final class ReduceValuesTask<K,V>
5771 <        extends BulkTask<K,V,V> {
5771 >        extends Traverser<K,V,V> {
5772          final BiFun<? super V, ? super V, ? extends V> reducer;
5773          V result;
5774          ReduceValuesTask<K,V> rights, nextRight;
5775          ReduceValuesTask
5776 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5776 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5777               ReduceValuesTask<K,V> nextRight,
5778               BiFun<? super V, ? super V, ? extends V> reducer) {
5779              super(m, p, b); this.nextRight = nextRight;
5780              this.reducer = reducer;
5781          }
5782 <        @SuppressWarnings("unchecked") public final boolean exec() {
5782 >        public final V getRawResult() { return result; }
5783 >        @SuppressWarnings("unchecked") public final void compute() {
5784              final BiFun<? super V, ? super V, ? extends V> reducer =
5785                  this.reducer;
5786              if (reducer == null)
5787 <                return abortOnNullFunction();
5788 <            try {
5789 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5790 <                    do {} while (!casPending(c = pending, c+1));
5791 <                    (rights = new ReduceValuesTask<K,V>
5792 <                     (map, this, b >>>= 1, rights, reducer)).fork();
5793 <                }
5794 <                V r = null;
5795 <                Object v;
5796 <                while ((v = advance()) != null) {
5797 <                    V u = (V)v;
5798 <                    r = (r == null) ? u : reducer.apply(r, u);
5799 <                }
5800 <                result = r;
5801 <                for (ReduceValuesTask<K,V> t = this, s;;) {
5802 <                    int c; BulkTask<K,V,?> par; V tr, sr;
5803 <                    if ((c = t.pending) == 0) {
5804 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
5805 <                            if ((sr = s.result) != null)
5806 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
5807 <                        }
5808 <                        if ((par = t.parent) == null ||
6043 <                            !(par instanceof ReduceValuesTask)) {
6044 <                            t.quietlyComplete();
6045 <                            break;
6046 <                        }
6047 <                        t = (ReduceValuesTask<K,V>)par;
6048 <                    }
6049 <                    else if (t.casPending(c, c - 1))
6050 <                        break;
5787 >                throw new NullPointerException();
5788 >            for (int b; (b = preSplit()) > 0;)
5789 >                (rights = new ReduceValuesTask<K,V>
5790 >                 (map, this, b, rights, reducer)).fork();
5791 >            V r = null;
5792 >            Object v;
5793 >            while ((v = advance()) != null) {
5794 >                V u = (V)v;
5795 >                r = (r == null) ? u : reducer.apply(r, u);
5796 >            }
5797 >            result = r;
5798 >            CountedCompleter<?> c;
5799 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5800 >                ReduceValuesTask<K,V>
5801 >                    t = (ReduceValuesTask<K,V>)c,
5802 >                    s = t.rights;
5803 >                while (s != null) {
5804 >                    V tr, sr;
5805 >                    if ((sr = s.result) != null)
5806 >                        t.result = (((tr = t.result) == null) ? sr :
5807 >                                    reducer.apply(tr, sr));
5808 >                    s = t.rights = s.nextRight;
5809                  }
6052            } catch (Throwable ex) {
6053                return tryCompleteComputation(ex);
5810              }
6055            ReduceValuesTask<K,V> s = rights;
6056            if (s != null && !inForkJoinPool()) {
6057                do  {
6058                    if (s.tryUnfork())
6059                        s.exec();
6060                } while ((s = s.nextRight) != null);
6061            }
6062            return false;
5811          }
6064        public final V getRawResult() { return result; }
5812      }
5813  
5814      @SuppressWarnings("serial") static final class ReduceEntriesTask<K,V>
5815 <        extends BulkTask<K,V,Map.Entry<K,V>> {
5815 >        extends Traverser<K,V,Map.Entry<K,V>> {
5816          final BiFun<Map.Entry<K,V>, Map.Entry<K,V>, ? extends Map.Entry<K,V>> reducer;
5817          Map.Entry<K,V> result;
5818          ReduceEntriesTask<K,V> rights, nextRight;
5819          ReduceEntriesTask
5820 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5820 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5821               ReduceEntriesTask<K,V> nextRight,
5822               BiFun<Entry<K,V>, Map.Entry<K,V>, ? extends Map.Entry<K,V>> reducer) {
5823              super(m, p, b); this.nextRight = nextRight;
5824              this.reducer = reducer;
5825          }
5826 <        @SuppressWarnings("unchecked") public final boolean exec() {
5826 >        public final Map.Entry<K,V> getRawResult() { return result; }
5827 >        @SuppressWarnings("unchecked") public final void compute() {
5828              final BiFun<Map.Entry<K,V>, Map.Entry<K,V>, ? extends Map.Entry<K,V>> reducer =
5829                  this.reducer;
5830              if (reducer == null)
5831 <                return abortOnNullFunction();
5832 <            try {
5833 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5834 <                    do {} while (!casPending(c = pending, c+1));
5835 <                    (rights = new ReduceEntriesTask<K,V>
5836 <                     (map, this, b >>>= 1, rights, reducer)).fork();
5837 <                }
5838 <                Map.Entry<K,V> r = null;
5839 <                Object v;
5840 <                while ((v = advance()) != null) {
5841 <                    Map.Entry<K,V> u = entryFor((K)nextKey, (V)v);
5842 <                    r = (r == null) ? u : reducer.apply(r, u);
5843 <                }
5844 <                result = r;
5845 <                for (ReduceEntriesTask<K,V> t = this, s;;) {
5846 <                    int c; BulkTask<K,V,?> par; Map.Entry<K,V> tr, sr;
5847 <                    if ((c = t.pending) == 0) {
5848 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
5849 <                            if ((sr = s.result) != null)
5850 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
5851 <                        }
5852 <                        if ((par = t.parent) == null ||
6105 <                            !(par instanceof ReduceEntriesTask)) {
6106 <                            t.quietlyComplete();
6107 <                            break;
6108 <                        }
6109 <                        t = (ReduceEntriesTask<K,V>)par;
6110 <                    }
6111 <                    else if (t.casPending(c, c - 1))
6112 <                        break;
5831 >                throw new NullPointerException();
5832 >            for (int b; (b = preSplit()) > 0;)
5833 >                (rights = new ReduceEntriesTask<K,V>
5834 >                 (map, this, b, rights, reducer)).fork();
5835 >            Map.Entry<K,V> r = null;
5836 >            Object v;
5837 >            while ((v = advance()) != null) {
5838 >                Map.Entry<K,V> u = entryFor((K)nextKey, (V)v);
5839 >                r = (r == null) ? u : reducer.apply(r, u);
5840 >            }
5841 >            result = r;
5842 >            CountedCompleter<?> c;
5843 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5844 >                ReduceEntriesTask<K,V>
5845 >                    t = (ReduceEntriesTask<K,V>)c,
5846 >                    s = t.rights;
5847 >                while (s != null) {
5848 >                    Map.Entry<K,V> tr, sr;
5849 >                    if ((sr = s.result) != null)
5850 >                        t.result = (((tr = t.result) == null) ? sr :
5851 >                                    reducer.apply(tr, sr));
5852 >                    s = t.rights = s.nextRight;
5853                  }
6114            } catch (Throwable ex) {
6115                return tryCompleteComputation(ex);
6116            }
6117            ReduceEntriesTask<K,V> s = rights;
6118            if (s != null && !inForkJoinPool()) {
6119                do  {
6120                    if (s.tryUnfork())
6121                        s.exec();
6122                } while ((s = s.nextRight) != null);
5854              }
6124            return false;
5855          }
6126        public final Map.Entry<K,V> getRawResult() { return result; }
5856      }
5857  
5858      @SuppressWarnings("serial") static final class MapReduceKeysTask<K,V,U>
5859 <        extends BulkTask<K,V,U> {
5859 >        extends Traverser<K,V,U> {
5860          final Fun<? super K, ? extends U> transformer;
5861          final BiFun<? super U, ? super U, ? extends U> reducer;
5862          U result;
5863          MapReduceKeysTask<K,V,U> rights, nextRight;
5864          MapReduceKeysTask
5865 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5865 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5866               MapReduceKeysTask<K,V,U> nextRight,
5867               Fun<? super K, ? extends U> transformer,
5868               BiFun<? super U, ? super U, ? extends U> reducer) {
# Line 6141 | Line 5870 | public class ConcurrentHashMap<K, V>
5870              this.transformer = transformer;
5871              this.reducer = reducer;
5872          }
5873 <        @SuppressWarnings("unchecked") public final boolean exec() {
5873 >        public final U getRawResult() { return result; }
5874 >        @SuppressWarnings("unchecked") public final void compute() {
5875              final Fun<? super K, ? extends U> transformer =
5876                  this.transformer;
5877              final BiFun<? super U, ? super U, ? extends U> reducer =
5878                  this.reducer;
5879              if (transformer == null || reducer == null)
5880 <                return abortOnNullFunction();
5881 <            try {
5882 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5883 <                    do {} while (!casPending(c = pending, c+1));
5884 <                    (rights = new MapReduceKeysTask<K,V,U>
5885 <                     (map, this, b >>>= 1, rights, transformer, reducer)).fork();
5886 <                }
5887 <                U r = null, u;
6158 <                while (advance() != null) {
6159 <                    if ((u = transformer.apply((K)nextKey)) != null)
6160 <                        r = (r == null) ? u : reducer.apply(r, u);
6161 <                }
6162 <                result = r;
6163 <                for (MapReduceKeysTask<K,V,U> t = this, s;;) {
6164 <                    int c; BulkTask<K,V,?> par; U tr, sr;
6165 <                    if ((c = t.pending) == 0) {
6166 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6167 <                            if ((sr = s.result) != null)
6168 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
6169 <                        }
6170 <                        if ((par = t.parent) == null ||
6171 <                            !(par instanceof MapReduceKeysTask)) {
6172 <                            t.quietlyComplete();
6173 <                            break;
6174 <                        }
6175 <                        t = (MapReduceKeysTask<K,V,U>)par;
6176 <                    }
6177 <                    else if (t.casPending(c, c - 1))
6178 <                        break;
6179 <                }
6180 <            } catch (Throwable ex) {
6181 <                return tryCompleteComputation(ex);
5880 >                throw new NullPointerException();
5881 >            for (int b; (b = preSplit()) > 0;)
5882 >                (rights = new MapReduceKeysTask<K,V,U>
5883 >                 (map, this, b, rights, transformer, reducer)).fork();
5884 >            U r = null, u;
5885 >            while (advance() != null) {
5886 >                if ((u = transformer.apply((K)nextKey)) != null)
5887 >                    r = (r == null) ? u : reducer.apply(r, u);
5888              }
5889 <            MapReduceKeysTask<K,V,U> s = rights;
5890 <            if (s != null && !inForkJoinPool()) {
5891 <                do  {
5892 <                    if (s.tryUnfork())
5893 <                        s.exec();
5894 <                } while ((s = s.nextRight) != null);
5889 >            result = r;
5890 >            CountedCompleter<?> c;
5891 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5892 >                MapReduceKeysTask<K,V,U>
5893 >                    t = (MapReduceKeysTask<K,V,U>)c,
5894 >                    s = t.rights;
5895 >                while (s != null) {
5896 >                    U tr, sr;
5897 >                    if ((sr = s.result) != null)
5898 >                        t.result = (((tr = t.result) == null) ? sr :
5899 >                                    reducer.apply(tr, sr));
5900 >                    s = t.rights = s.nextRight;
5901 >                }
5902              }
6190            return false;
5903          }
6192        public final U getRawResult() { return result; }
5904      }
5905  
5906      @SuppressWarnings("serial") static final class MapReduceValuesTask<K,V,U>
5907 <        extends BulkTask<K,V,U> {
5907 >        extends Traverser<K,V,U> {
5908          final Fun<? super V, ? extends U> transformer;
5909          final BiFun<? super U, ? super U, ? extends U> reducer;
5910          U result;
5911          MapReduceValuesTask<K,V,U> rights, nextRight;
5912          MapReduceValuesTask
5913 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5913 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5914               MapReduceValuesTask<K,V,U> nextRight,
5915               Fun<? super V, ? extends U> transformer,
5916               BiFun<? super U, ? super U, ? extends U> reducer) {
# Line 6207 | Line 5918 | public class ConcurrentHashMap<K, V>
5918              this.transformer = transformer;
5919              this.reducer = reducer;
5920          }
5921 <        @SuppressWarnings("unchecked") public final boolean exec() {
5921 >        public final U getRawResult() { return result; }
5922 >        @SuppressWarnings("unchecked") public final void compute() {
5923              final Fun<? super V, ? extends U> transformer =
5924                  this.transformer;
5925              final BiFun<? super U, ? super U, ? extends U> reducer =
5926                  this.reducer;
5927              if (transformer == null || reducer == null)
5928 <                return abortOnNullFunction();
5929 <            try {
5930 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5931 <                    do {} while (!casPending(c = pending, c+1));
5932 <                    (rights = new MapReduceValuesTask<K,V,U>
5933 <                     (map, this, b >>>= 1, rights, transformer, reducer)).fork();
5934 <                }
5935 <                U r = null, u;
5936 <                Object v;
6225 <                while ((v = advance()) != null) {
6226 <                    if ((u = transformer.apply((V)v)) != null)
6227 <                        r = (r == null) ? u : reducer.apply(r, u);
6228 <                }
6229 <                result = r;
6230 <                for (MapReduceValuesTask<K,V,U> t = this, s;;) {
6231 <                    int c; BulkTask<K,V,?> par; U tr, sr;
6232 <                    if ((c = t.pending) == 0) {
6233 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6234 <                            if ((sr = s.result) != null)
6235 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
6236 <                        }
6237 <                        if ((par = t.parent) == null ||
6238 <                            !(par instanceof MapReduceValuesTask)) {
6239 <                            t.quietlyComplete();
6240 <                            break;
6241 <                        }
6242 <                        t = (MapReduceValuesTask<K,V,U>)par;
6243 <                    }
6244 <                    else if (t.casPending(c, c - 1))
6245 <                        break;
6246 <                }
6247 <            } catch (Throwable ex) {
6248 <                return tryCompleteComputation(ex);
5928 >                throw new NullPointerException();
5929 >            for (int b; (b = preSplit()) > 0;)
5930 >                (rights = new MapReduceValuesTask<K,V,U>
5931 >                 (map, this, b, rights, transformer, reducer)).fork();
5932 >            U r = null, u;
5933 >            Object v;
5934 >            while ((v = advance()) != null) {
5935 >                if ((u = transformer.apply((V)v)) != null)
5936 >                    r = (r == null) ? u : reducer.apply(r, u);
5937              }
5938 <            MapReduceValuesTask<K,V,U> s = rights;
5939 <            if (s != null && !inForkJoinPool()) {
5940 <                do  {
5941 <                    if (s.tryUnfork())
5942 <                        s.exec();
5943 <                } while ((s = s.nextRight) != null);
5938 >            result = r;
5939 >            CountedCompleter<?> c;
5940 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5941 >                MapReduceValuesTask<K,V,U>
5942 >                    t = (MapReduceValuesTask<K,V,U>)c,
5943 >                    s = t.rights;
5944 >                while (s != null) {
5945 >                    U tr, sr;
5946 >                    if ((sr = s.result) != null)
5947 >                        t.result = (((tr = t.result) == null) ? sr :
5948 >                                    reducer.apply(tr, sr));
5949 >                    s = t.rights = s.nextRight;
5950 >                }
5951              }
6257            return false;
5952          }
6259        public final U getRawResult() { return result; }
5953      }
5954  
5955      @SuppressWarnings("serial") static final class MapReduceEntriesTask<K,V,U>
5956 <        extends BulkTask<K,V,U> {
5956 >        extends Traverser<K,V,U> {
5957          final Fun<Map.Entry<K,V>, ? extends U> transformer;
5958          final BiFun<? super U, ? super U, ? extends U> reducer;
5959          U result;
5960          MapReduceEntriesTask<K,V,U> rights, nextRight;
5961          MapReduceEntriesTask
5962 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
5962 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
5963               MapReduceEntriesTask<K,V,U> nextRight,
5964               Fun<Map.Entry<K,V>, ? extends U> transformer,
5965               BiFun<? super U, ? super U, ? extends U> reducer) {
# Line 6274 | Line 5967 | public class ConcurrentHashMap<K, V>
5967              this.transformer = transformer;
5968              this.reducer = reducer;
5969          }
5970 <        @SuppressWarnings("unchecked") public final boolean exec() {
5970 >        public final U getRawResult() { return result; }
5971 >        @SuppressWarnings("unchecked") public final void compute() {
5972              final Fun<Map.Entry<K,V>, ? extends U> transformer =
5973                  this.transformer;
5974              final BiFun<? super U, ? super U, ? extends U> reducer =
5975                  this.reducer;
5976              if (transformer == null || reducer == null)
5977 <                return abortOnNullFunction();
5978 <            try {
5979 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
5980 <                    do {} while (!casPending(c = pending, c+1));
5981 <                    (rights = new MapReduceEntriesTask<K,V,U>
5982 <                     (map, this, b >>>= 1, rights, transformer, reducer)).fork();
5983 <                }
5984 <                U r = null, u;
5985 <                Object v;
6292 <                while ((v = advance()) != null) {
6293 <                    if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
6294 <                        r = (r == null) ? u : reducer.apply(r, u);
6295 <                }
6296 <                result = r;
6297 <                for (MapReduceEntriesTask<K,V,U> t = this, s;;) {
6298 <                    int c; BulkTask<K,V,?> par; U tr, sr;
6299 <                    if ((c = t.pending) == 0) {
6300 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6301 <                            if ((sr = s.result) != null)
6302 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
6303 <                        }
6304 <                        if ((par = t.parent) == null ||
6305 <                            !(par instanceof MapReduceEntriesTask)) {
6306 <                            t.quietlyComplete();
6307 <                            break;
6308 <                        }
6309 <                        t = (MapReduceEntriesTask<K,V,U>)par;
6310 <                    }
6311 <                    else if (t.casPending(c, c - 1))
6312 <                        break;
6313 <                }
6314 <            } catch (Throwable ex) {
6315 <                return tryCompleteComputation(ex);
5977 >                throw new NullPointerException();
5978 >            for (int b; (b = preSplit()) > 0;)
5979 >                (rights = new MapReduceEntriesTask<K,V,U>
5980 >                 (map, this, b, rights, transformer, reducer)).fork();
5981 >            U r = null, u;
5982 >            Object v;
5983 >            while ((v = advance()) != null) {
5984 >                if ((u = transformer.apply(entryFor((K)nextKey, (V)v))) != null)
5985 >                    r = (r == null) ? u : reducer.apply(r, u);
5986              }
5987 <            MapReduceEntriesTask<K,V,U> s = rights;
5988 <            if (s != null && !inForkJoinPool()) {
5989 <                do  {
5990 <                    if (s.tryUnfork())
5991 <                        s.exec();
5992 <                } while ((s = s.nextRight) != null);
5987 >            result = r;
5988 >            CountedCompleter<?> c;
5989 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
5990 >                MapReduceEntriesTask<K,V,U>
5991 >                    t = (MapReduceEntriesTask<K,V,U>)c,
5992 >                    s = t.rights;
5993 >                while (s != null) {
5994 >                    U tr, sr;
5995 >                    if ((sr = s.result) != null)
5996 >                        t.result = (((tr = t.result) == null) ? sr :
5997 >                                    reducer.apply(tr, sr));
5998 >                    s = t.rights = s.nextRight;
5999 >                }
6000              }
6324            return false;
6001          }
6326        public final U getRawResult() { return result; }
6002      }
6003  
6004      @SuppressWarnings("serial") static final class MapReduceMappingsTask<K,V,U>
6005 <        extends BulkTask<K,V,U> {
6005 >        extends Traverser<K,V,U> {
6006          final BiFun<? super K, ? super V, ? extends U> transformer;
6007          final BiFun<? super U, ? super U, ? extends U> reducer;
6008          U result;
6009          MapReduceMappingsTask<K,V,U> rights, nextRight;
6010          MapReduceMappingsTask
6011 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6011 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6012               MapReduceMappingsTask<K,V,U> nextRight,
6013               BiFun<? super K, ? super V, ? extends U> transformer,
6014               BiFun<? super U, ? super U, ? extends U> reducer) {
# Line 6341 | Line 6016 | public class ConcurrentHashMap<K, V>
6016              this.transformer = transformer;
6017              this.reducer = reducer;
6018          }
6019 <        @SuppressWarnings("unchecked") public final boolean exec() {
6019 >        public final U getRawResult() { return result; }
6020 >        @SuppressWarnings("unchecked") public final void compute() {
6021              final BiFun<? super K, ? super V, ? extends U> transformer =
6022                  this.transformer;
6023              final BiFun<? super U, ? super U, ? extends U> reducer =
6024                  this.reducer;
6025              if (transformer == null || reducer == null)
6026 <                return abortOnNullFunction();
6027 <            try {
6028 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6029 <                    do {} while (!casPending(c = pending, c+1));
6030 <                    (rights = new MapReduceMappingsTask<K,V,U>
6031 <                     (map, this, b >>>= 1, rights, transformer, reducer)).fork();
6032 <                }
6033 <                U r = null, u;
6034 <                Object v;
6359 <                while ((v = advance()) != null) {
6360 <                    if ((u = transformer.apply((K)nextKey, (V)v)) != null)
6361 <                        r = (r == null) ? u : reducer.apply(r, u);
6362 <                }
6363 <                result = r;
6364 <                for (MapReduceMappingsTask<K,V,U> t = this, s;;) {
6365 <                    int c; BulkTask<K,V,?> par; U tr, sr;
6366 <                    if ((c = t.pending) == 0) {
6367 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6368 <                            if ((sr = s.result) != null)
6369 <                                t.result = ((tr = t.result) == null) ? sr : reducer.apply(tr, sr);
6370 <                        }
6371 <                        if ((par = t.parent) == null ||
6372 <                            !(par instanceof MapReduceMappingsTask)) {
6373 <                            t.quietlyComplete();
6374 <                            break;
6375 <                        }
6376 <                        t = (MapReduceMappingsTask<K,V,U>)par;
6377 <                    }
6378 <                    else if (t.casPending(c, c - 1))
6379 <                        break;
6380 <                }
6381 <            } catch (Throwable ex) {
6382 <                return tryCompleteComputation(ex);
6026 >                throw new NullPointerException();
6027 >            for (int b; (b = preSplit()) > 0;)
6028 >                (rights = new MapReduceMappingsTask<K,V,U>
6029 >                 (map, this, b, rights, transformer, reducer)).fork();
6030 >            U r = null, u;
6031 >            Object v;
6032 >            while ((v = advance()) != null) {
6033 >                if ((u = transformer.apply((K)nextKey, (V)v)) != null)
6034 >                    r = (r == null) ? u : reducer.apply(r, u);
6035              }
6036 <            MapReduceMappingsTask<K,V,U> s = rights;
6037 <            if (s != null && !inForkJoinPool()) {
6038 <                do  {
6039 <                    if (s.tryUnfork())
6040 <                        s.exec();
6041 <                } while ((s = s.nextRight) != null);
6036 >            result = r;
6037 >            CountedCompleter<?> c;
6038 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6039 >                MapReduceMappingsTask<K,V,U>
6040 >                    t = (MapReduceMappingsTask<K,V,U>)c,
6041 >                    s = t.rights;
6042 >                while (s != null) {
6043 >                    U tr, sr;
6044 >                    if ((sr = s.result) != null)
6045 >                        t.result = (((tr = t.result) == null) ? sr :
6046 >                                    reducer.apply(tr, sr));
6047 >                    s = t.rights = s.nextRight;
6048 >                }
6049              }
6391            return false;
6050          }
6393        public final U getRawResult() { return result; }
6051      }
6052  
6053      @SuppressWarnings("serial") static final class MapReduceKeysToDoubleTask<K,V>
6054 <        extends BulkTask<K,V,Double> {
6054 >        extends Traverser<K,V,Double> {
6055          final ObjectToDouble<? super K> transformer;
6056          final DoubleByDoubleToDouble reducer;
6057          final double basis;
6058          double result;
6059          MapReduceKeysToDoubleTask<K,V> rights, nextRight;
6060          MapReduceKeysToDoubleTask
6061 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6061 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6062               MapReduceKeysToDoubleTask<K,V> nextRight,
6063               ObjectToDouble<? super K> transformer,
6064               double basis,
# Line 6410 | Line 6067 | public class ConcurrentHashMap<K, V>
6067              this.transformer = transformer;
6068              this.basis = basis; this.reducer = reducer;
6069          }
6070 <        @SuppressWarnings("unchecked") public final boolean exec() {
6070 >        public final Double getRawResult() { return result; }
6071 >        @SuppressWarnings("unchecked") public final void compute() {
6072              final ObjectToDouble<? super K> transformer =
6073                  this.transformer;
6074              final DoubleByDoubleToDouble reducer = this.reducer;
6075              if (transformer == null || reducer == null)
6076 <                return abortOnNullFunction();
6077 <            try {
6078 <                final double id = this.basis;
6079 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6080 <                    do {} while (!casPending(c = pending, c+1));
6081 <                    (rights = new MapReduceKeysToDoubleTask<K,V>
6082 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6083 <                }
6084 <                double r = id;
6085 <                while (advance() != null)
6086 <                    r = reducer.apply(r, transformer.apply((K)nextKey));
6087 <                result = r;
6088 <                for (MapReduceKeysToDoubleTask<K,V> t = this, s;;) {
6089 <                    int c; BulkTask<K,V,?> par;
6090 <                    if ((c = t.pending) == 0) {
6091 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6434 <                            t.result = reducer.apply(t.result, s.result);
6435 <                        }
6436 <                        if ((par = t.parent) == null ||
6437 <                            !(par instanceof MapReduceKeysToDoubleTask)) {
6438 <                            t.quietlyComplete();
6439 <                            break;
6440 <                        }
6441 <                        t = (MapReduceKeysToDoubleTask<K,V>)par;
6442 <                    }
6443 <                    else if (t.casPending(c, c - 1))
6444 <                        break;
6076 >                throw new NullPointerException();
6077 >            double r = this.basis;
6078 >            for (int b; (b = preSplit()) > 0;)
6079 >                (rights = new MapReduceKeysToDoubleTask<K,V>
6080 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6081 >            while (advance() != null)
6082 >                r = reducer.apply(r, transformer.apply((K)nextKey));
6083 >            result = r;
6084 >            CountedCompleter<?> c;
6085 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6086 >                MapReduceKeysToDoubleTask<K,V>
6087 >                    t = (MapReduceKeysToDoubleTask<K,V>)c,
6088 >                    s = t.rights;
6089 >                while (s != null) {
6090 >                    t.result = reducer.apply(t.result, s.result);
6091 >                    s = t.rights = s.nextRight;
6092                  }
6446            } catch (Throwable ex) {
6447                return tryCompleteComputation(ex);
6448            }
6449            MapReduceKeysToDoubleTask<K,V> s = rights;
6450            if (s != null && !inForkJoinPool()) {
6451                do  {
6452                    if (s.tryUnfork())
6453                        s.exec();
6454                } while ((s = s.nextRight) != null);
6093              }
6456            return false;
6094          }
6458        public final Double getRawResult() { return result; }
6095      }
6096  
6097      @SuppressWarnings("serial") static final class MapReduceValuesToDoubleTask<K,V>
6098 <        extends BulkTask<K,V,Double> {
6098 >        extends Traverser<K,V,Double> {
6099          final ObjectToDouble<? super V> transformer;
6100          final DoubleByDoubleToDouble reducer;
6101          final double basis;
6102          double result;
6103          MapReduceValuesToDoubleTask<K,V> rights, nextRight;
6104          MapReduceValuesToDoubleTask
6105 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6105 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6106               MapReduceValuesToDoubleTask<K,V> nextRight,
6107               ObjectToDouble<? super V> transformer,
6108               double basis,
# Line 6475 | Line 6111 | public class ConcurrentHashMap<K, V>
6111              this.transformer = transformer;
6112              this.basis = basis; this.reducer = reducer;
6113          }
6114 <        @SuppressWarnings("unchecked") public final boolean exec() {
6114 >        public final Double getRawResult() { return result; }
6115 >        @SuppressWarnings("unchecked") public final void compute() {
6116              final ObjectToDouble<? super V> transformer =
6117                  this.transformer;
6118              final DoubleByDoubleToDouble reducer = this.reducer;
6119              if (transformer == null || reducer == null)
6120 <                return abortOnNullFunction();
6121 <            try {
6122 <                final double id = this.basis;
6123 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6124 <                    do {} while (!casPending(c = pending, c+1));
6125 <                    (rights = new MapReduceValuesToDoubleTask<K,V>
6126 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6127 <                }
6128 <                double r = id;
6129 <                Object v;
6130 <                while ((v = advance()) != null)
6131 <                    r = reducer.apply(r, transformer.apply((V)v));
6132 <                result = r;
6133 <                for (MapReduceValuesToDoubleTask<K,V> t = this, s;;) {
6134 <                    int c; BulkTask<K,V,?> par;
6135 <                    if ((c = t.pending) == 0) {
6136 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6500 <                            t.result = reducer.apply(t.result, s.result);
6501 <                        }
6502 <                        if ((par = t.parent) == null ||
6503 <                            !(par instanceof MapReduceValuesToDoubleTask)) {
6504 <                            t.quietlyComplete();
6505 <                            break;
6506 <                        }
6507 <                        t = (MapReduceValuesToDoubleTask<K,V>)par;
6508 <                    }
6509 <                    else if (t.casPending(c, c - 1))
6510 <                        break;
6120 >                throw new NullPointerException();
6121 >            double r = this.basis;
6122 >            for (int b; (b = preSplit()) > 0;)
6123 >                (rights = new MapReduceValuesToDoubleTask<K,V>
6124 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6125 >            Object v;
6126 >            while ((v = advance()) != null)
6127 >                r = reducer.apply(r, transformer.apply((V)v));
6128 >            result = r;
6129 >            CountedCompleter<?> c;
6130 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6131 >                MapReduceValuesToDoubleTask<K,V>
6132 >                    t = (MapReduceValuesToDoubleTask<K,V>)c,
6133 >                    s = t.rights;
6134 >                while (s != null) {
6135 >                    t.result = reducer.apply(t.result, s.result);
6136 >                    s = t.rights = s.nextRight;
6137                  }
6512            } catch (Throwable ex) {
6513                return tryCompleteComputation(ex);
6138              }
6515            MapReduceValuesToDoubleTask<K,V> s = rights;
6516            if (s != null && !inForkJoinPool()) {
6517                do  {
6518                    if (s.tryUnfork())
6519                        s.exec();
6520                } while ((s = s.nextRight) != null);
6521            }
6522            return false;
6139          }
6524        public final Double getRawResult() { return result; }
6140      }
6141  
6142      @SuppressWarnings("serial") static final class MapReduceEntriesToDoubleTask<K,V>
6143 <        extends BulkTask<K,V,Double> {
6143 >        extends Traverser<K,V,Double> {
6144          final ObjectToDouble<Map.Entry<K,V>> transformer;
6145          final DoubleByDoubleToDouble reducer;
6146          final double basis;
6147          double result;
6148          MapReduceEntriesToDoubleTask<K,V> rights, nextRight;
6149          MapReduceEntriesToDoubleTask
6150 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6150 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6151               MapReduceEntriesToDoubleTask<K,V> nextRight,
6152               ObjectToDouble<Map.Entry<K,V>> transformer,
6153               double basis,
# Line 6541 | Line 6156 | public class ConcurrentHashMap<K, V>
6156              this.transformer = transformer;
6157              this.basis = basis; this.reducer = reducer;
6158          }
6159 <        @SuppressWarnings("unchecked") public final boolean exec() {
6159 >        public final Double getRawResult() { return result; }
6160 >        @SuppressWarnings("unchecked") public final void compute() {
6161              final ObjectToDouble<Map.Entry<K,V>> transformer =
6162                  this.transformer;
6163              final DoubleByDoubleToDouble reducer = this.reducer;
6164              if (transformer == null || reducer == null)
6165 <                return abortOnNullFunction();
6166 <            try {
6167 <                final double id = this.basis;
6168 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6169 <                    do {} while (!casPending(c = pending, c+1));
6170 <                    (rights = new MapReduceEntriesToDoubleTask<K,V>
6171 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6172 <                }
6173 <                double r = id;
6174 <                Object v;
6175 <                while ((v = advance()) != null)
6176 <                    r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6177 <                result = r;
6178 <                for (MapReduceEntriesToDoubleTask<K,V> t = this, s;;) {
6179 <                    int c; BulkTask<K,V,?> par;
6180 <                    if ((c = t.pending) == 0) {
6181 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6566 <                            t.result = reducer.apply(t.result, s.result);
6567 <                        }
6568 <                        if ((par = t.parent) == null ||
6569 <                            !(par instanceof MapReduceEntriesToDoubleTask)) {
6570 <                            t.quietlyComplete();
6571 <                            break;
6572 <                        }
6573 <                        t = (MapReduceEntriesToDoubleTask<K,V>)par;
6574 <                    }
6575 <                    else if (t.casPending(c, c - 1))
6576 <                        break;
6165 >                throw new NullPointerException();
6166 >            double r = this.basis;
6167 >            for (int b; (b = preSplit()) > 0;)
6168 >                (rights = new MapReduceEntriesToDoubleTask<K,V>
6169 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6170 >            Object v;
6171 >            while ((v = advance()) != null)
6172 >                r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6173 >            result = r;
6174 >            CountedCompleter<?> c;
6175 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6176 >                MapReduceEntriesToDoubleTask<K,V>
6177 >                    t = (MapReduceEntriesToDoubleTask<K,V>)c,
6178 >                    s = t.rights;
6179 >                while (s != null) {
6180 >                    t.result = reducer.apply(t.result, s.result);
6181 >                    s = t.rights = s.nextRight;
6182                  }
6578            } catch (Throwable ex) {
6579                return tryCompleteComputation(ex);
6580            }
6581            MapReduceEntriesToDoubleTask<K,V> s = rights;
6582            if (s != null && !inForkJoinPool()) {
6583                do  {
6584                    if (s.tryUnfork())
6585                        s.exec();
6586                } while ((s = s.nextRight) != null);
6183              }
6588            return false;
6184          }
6590        public final Double getRawResult() { return result; }
6185      }
6186  
6187      @SuppressWarnings("serial") static final class MapReduceMappingsToDoubleTask<K,V>
6188 <        extends BulkTask<K,V,Double> {
6188 >        extends Traverser<K,V,Double> {
6189          final ObjectByObjectToDouble<? super K, ? super V> transformer;
6190          final DoubleByDoubleToDouble reducer;
6191          final double basis;
6192          double result;
6193          MapReduceMappingsToDoubleTask<K,V> rights, nextRight;
6194          MapReduceMappingsToDoubleTask
6195 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6195 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6196               MapReduceMappingsToDoubleTask<K,V> nextRight,
6197               ObjectByObjectToDouble<? super K, ? super V> transformer,
6198               double basis,
# Line 6607 | Line 6201 | public class ConcurrentHashMap<K, V>
6201              this.transformer = transformer;
6202              this.basis = basis; this.reducer = reducer;
6203          }
6204 <        @SuppressWarnings("unchecked") public final boolean exec() {
6204 >        public final Double getRawResult() { return result; }
6205 >        @SuppressWarnings("unchecked") public final void compute() {
6206              final ObjectByObjectToDouble<? super K, ? super V> transformer =
6207                  this.transformer;
6208              final DoubleByDoubleToDouble reducer = this.reducer;
6209              if (transformer == null || reducer == null)
6210 <                return abortOnNullFunction();
6211 <            try {
6212 <                final double id = this.basis;
6213 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6214 <                    do {} while (!casPending(c = pending, c+1));
6215 <                    (rights = new MapReduceMappingsToDoubleTask<K,V>
6216 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6217 <                }
6218 <                double r = id;
6219 <                Object v;
6220 <                while ((v = advance()) != null)
6221 <                    r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6222 <                result = r;
6223 <                for (MapReduceMappingsToDoubleTask<K,V> t = this, s;;) {
6224 <                    int c; BulkTask<K,V,?> par;
6225 <                    if ((c = t.pending) == 0) {
6226 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6632 <                            t.result = reducer.apply(t.result, s.result);
6633 <                        }
6634 <                        if ((par = t.parent) == null ||
6635 <                            !(par instanceof MapReduceMappingsToDoubleTask)) {
6636 <                            t.quietlyComplete();
6637 <                            break;
6638 <                        }
6639 <                        t = (MapReduceMappingsToDoubleTask<K,V>)par;
6640 <                    }
6641 <                    else if (t.casPending(c, c - 1))
6642 <                        break;
6210 >                throw new NullPointerException();
6211 >            double r = this.basis;
6212 >            for (int b; (b = preSplit()) > 0;)
6213 >                (rights = new MapReduceMappingsToDoubleTask<K,V>
6214 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6215 >            Object v;
6216 >            while ((v = advance()) != null)
6217 >                r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6218 >            result = r;
6219 >            CountedCompleter<?> c;
6220 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6221 >                MapReduceMappingsToDoubleTask<K,V>
6222 >                    t = (MapReduceMappingsToDoubleTask<K,V>)c,
6223 >                    s = t.rights;
6224 >                while (s != null) {
6225 >                    t.result = reducer.apply(t.result, s.result);
6226 >                    s = t.rights = s.nextRight;
6227                  }
6644            } catch (Throwable ex) {
6645                return tryCompleteComputation(ex);
6228              }
6647            MapReduceMappingsToDoubleTask<K,V> s = rights;
6648            if (s != null && !inForkJoinPool()) {
6649                do  {
6650                    if (s.tryUnfork())
6651                        s.exec();
6652                } while ((s = s.nextRight) != null);
6653            }
6654            return false;
6229          }
6656        public final Double getRawResult() { return result; }
6230      }
6231  
6232      @SuppressWarnings("serial") static final class MapReduceKeysToLongTask<K,V>
6233 <        extends BulkTask<K,V,Long> {
6233 >        extends Traverser<K,V,Long> {
6234          final ObjectToLong<? super K> transformer;
6235          final LongByLongToLong reducer;
6236          final long basis;
6237          long result;
6238          MapReduceKeysToLongTask<K,V> rights, nextRight;
6239          MapReduceKeysToLongTask
6240 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6240 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6241               MapReduceKeysToLongTask<K,V> nextRight,
6242               ObjectToLong<? super K> transformer,
6243               long basis,
# Line 6673 | Line 6246 | public class ConcurrentHashMap<K, V>
6246              this.transformer = transformer;
6247              this.basis = basis; this.reducer = reducer;
6248          }
6249 <        @SuppressWarnings("unchecked") public final boolean exec() {
6249 >        public final Long getRawResult() { return result; }
6250 >        @SuppressWarnings("unchecked") public final void compute() {
6251              final ObjectToLong<? super K> transformer =
6252                  this.transformer;
6253              final LongByLongToLong reducer = this.reducer;
6254              if (transformer == null || reducer == null)
6255 <                return abortOnNullFunction();
6256 <            try {
6257 <                final long id = this.basis;
6258 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6259 <                    do {} while (!casPending(c = pending, c+1));
6260 <                    (rights = new MapReduceKeysToLongTask<K,V>
6261 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6262 <                }
6263 <                long r = id;
6264 <                while (advance() != null)
6265 <                    r = reducer.apply(r, transformer.apply((K)nextKey));
6266 <                result = r;
6267 <                for (MapReduceKeysToLongTask<K,V> t = this, s;;) {
6268 <                    int c; BulkTask<K,V,?> par;
6269 <                    if ((c = t.pending) == 0) {
6270 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6697 <                            t.result = reducer.apply(t.result, s.result);
6698 <                        }
6699 <                        if ((par = t.parent) == null ||
6700 <                            !(par instanceof MapReduceKeysToLongTask)) {
6701 <                            t.quietlyComplete();
6702 <                            break;
6703 <                        }
6704 <                        t = (MapReduceKeysToLongTask<K,V>)par;
6705 <                    }
6706 <                    else if (t.casPending(c, c - 1))
6707 <                        break;
6255 >                throw new NullPointerException();
6256 >            long r = this.basis;
6257 >            for (int b; (b = preSplit()) > 0;)
6258 >                (rights = new MapReduceKeysToLongTask<K,V>
6259 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6260 >            while (advance() != null)
6261 >                r = reducer.apply(r, transformer.apply((K)nextKey));
6262 >            result = r;
6263 >            CountedCompleter<?> c;
6264 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6265 >                MapReduceKeysToLongTask<K,V>
6266 >                    t = (MapReduceKeysToLongTask<K,V>)c,
6267 >                    s = t.rights;
6268 >                while (s != null) {
6269 >                    t.result = reducer.apply(t.result, s.result);
6270 >                    s = t.rights = s.nextRight;
6271                  }
6709            } catch (Throwable ex) {
6710                return tryCompleteComputation(ex);
6711            }
6712            MapReduceKeysToLongTask<K,V> s = rights;
6713            if (s != null && !inForkJoinPool()) {
6714                do  {
6715                    if (s.tryUnfork())
6716                        s.exec();
6717                } while ((s = s.nextRight) != null);
6272              }
6719            return false;
6273          }
6721        public final Long getRawResult() { return result; }
6274      }
6275  
6276      @SuppressWarnings("serial") static final class MapReduceValuesToLongTask<K,V>
6277 <        extends BulkTask<K,V,Long> {
6277 >        extends Traverser<K,V,Long> {
6278          final ObjectToLong<? super V> transformer;
6279          final LongByLongToLong reducer;
6280          final long basis;
6281          long result;
6282          MapReduceValuesToLongTask<K,V> rights, nextRight;
6283          MapReduceValuesToLongTask
6284 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6284 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6285               MapReduceValuesToLongTask<K,V> nextRight,
6286               ObjectToLong<? super V> transformer,
6287               long basis,
# Line 6738 | Line 6290 | public class ConcurrentHashMap<K, V>
6290              this.transformer = transformer;
6291              this.basis = basis; this.reducer = reducer;
6292          }
6293 <        @SuppressWarnings("unchecked") public final boolean exec() {
6293 >        public final Long getRawResult() { return result; }
6294 >        @SuppressWarnings("unchecked") public final void compute() {
6295              final ObjectToLong<? super V> transformer =
6296                  this.transformer;
6297              final LongByLongToLong reducer = this.reducer;
6298              if (transformer == null || reducer == null)
6299 <                return abortOnNullFunction();
6300 <            try {
6301 <                final long id = this.basis;
6302 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6303 <                    do {} while (!casPending(c = pending, c+1));
6304 <                    (rights = new MapReduceValuesToLongTask<K,V>
6305 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6306 <                }
6307 <                long r = id;
6308 <                Object v;
6309 <                while ((v = advance()) != null)
6310 <                    r = reducer.apply(r, transformer.apply((V)v));
6311 <                result = r;
6312 <                for (MapReduceValuesToLongTask<K,V> t = this, s;;) {
6313 <                    int c; BulkTask<K,V,?> par;
6314 <                    if ((c = t.pending) == 0) {
6315 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6763 <                            t.result = reducer.apply(t.result, s.result);
6764 <                        }
6765 <                        if ((par = t.parent) == null ||
6766 <                            !(par instanceof MapReduceValuesToLongTask)) {
6767 <                            t.quietlyComplete();
6768 <                            break;
6769 <                        }
6770 <                        t = (MapReduceValuesToLongTask<K,V>)par;
6771 <                    }
6772 <                    else if (t.casPending(c, c - 1))
6773 <                        break;
6299 >                throw new NullPointerException();
6300 >            long r = this.basis;
6301 >            for (int b; (b = preSplit()) > 0;)
6302 >                (rights = new MapReduceValuesToLongTask<K,V>
6303 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6304 >            Object v;
6305 >            while ((v = advance()) != null)
6306 >                r = reducer.apply(r, transformer.apply((V)v));
6307 >            result = r;
6308 >            CountedCompleter<?> c;
6309 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6310 >                MapReduceValuesToLongTask<K,V>
6311 >                    t = (MapReduceValuesToLongTask<K,V>)c,
6312 >                    s = t.rights;
6313 >                while (s != null) {
6314 >                    t.result = reducer.apply(t.result, s.result);
6315 >                    s = t.rights = s.nextRight;
6316                  }
6775            } catch (Throwable ex) {
6776                return tryCompleteComputation(ex);
6317              }
6778            MapReduceValuesToLongTask<K,V> s = rights;
6779            if (s != null && !inForkJoinPool()) {
6780                do  {
6781                    if (s.tryUnfork())
6782                        s.exec();
6783                } while ((s = s.nextRight) != null);
6784            }
6785            return false;
6318          }
6787        public final Long getRawResult() { return result; }
6319      }
6320  
6321      @SuppressWarnings("serial") static final class MapReduceEntriesToLongTask<K,V>
6322 <        extends BulkTask<K,V,Long> {
6322 >        extends Traverser<K,V,Long> {
6323          final ObjectToLong<Map.Entry<K,V>> transformer;
6324          final LongByLongToLong reducer;
6325          final long basis;
6326          long result;
6327          MapReduceEntriesToLongTask<K,V> rights, nextRight;
6328          MapReduceEntriesToLongTask
6329 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6329 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6330               MapReduceEntriesToLongTask<K,V> nextRight,
6331               ObjectToLong<Map.Entry<K,V>> transformer,
6332               long basis,
# Line 6804 | Line 6335 | public class ConcurrentHashMap<K, V>
6335              this.transformer = transformer;
6336              this.basis = basis; this.reducer = reducer;
6337          }
6338 <        @SuppressWarnings("unchecked") public final boolean exec() {
6338 >        public final Long getRawResult() { return result; }
6339 >        @SuppressWarnings("unchecked") public final void compute() {
6340              final ObjectToLong<Map.Entry<K,V>> transformer =
6341                  this.transformer;
6342              final LongByLongToLong reducer = this.reducer;
6343              if (transformer == null || reducer == null)
6344 <                return abortOnNullFunction();
6345 <            try {
6346 <                final long id = this.basis;
6347 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6348 <                    do {} while (!casPending(c = pending, c+1));
6349 <                    (rights = new MapReduceEntriesToLongTask<K,V>
6350 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6351 <                }
6352 <                long r = id;
6353 <                Object v;
6354 <                while ((v = advance()) != null)
6355 <                    r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6356 <                result = r;
6357 <                for (MapReduceEntriesToLongTask<K,V> t = this, s;;) {
6358 <                    int c; BulkTask<K,V,?> par;
6359 <                    if ((c = t.pending) == 0) {
6360 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6829 <                            t.result = reducer.apply(t.result, s.result);
6830 <                        }
6831 <                        if ((par = t.parent) == null ||
6832 <                            !(par instanceof MapReduceEntriesToLongTask)) {
6833 <                            t.quietlyComplete();
6834 <                            break;
6835 <                        }
6836 <                        t = (MapReduceEntriesToLongTask<K,V>)par;
6837 <                    }
6838 <                    else if (t.casPending(c, c - 1))
6839 <                        break;
6344 >                throw new NullPointerException();
6345 >            long r = this.basis;
6346 >            for (int b; (b = preSplit()) > 0;)
6347 >                (rights = new MapReduceEntriesToLongTask<K,V>
6348 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6349 >            Object v;
6350 >            while ((v = advance()) != null)
6351 >                r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6352 >            result = r;
6353 >            CountedCompleter<?> c;
6354 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6355 >                MapReduceEntriesToLongTask<K,V>
6356 >                    t = (MapReduceEntriesToLongTask<K,V>)c,
6357 >                    s = t.rights;
6358 >                while (s != null) {
6359 >                    t.result = reducer.apply(t.result, s.result);
6360 >                    s = t.rights = s.nextRight;
6361                  }
6841            } catch (Throwable ex) {
6842                return tryCompleteComputation(ex);
6843            }
6844            MapReduceEntriesToLongTask<K,V> s = rights;
6845            if (s != null && !inForkJoinPool()) {
6846                do  {
6847                    if (s.tryUnfork())
6848                        s.exec();
6849                } while ((s = s.nextRight) != null);
6362              }
6851            return false;
6363          }
6853        public final Long getRawResult() { return result; }
6364      }
6365  
6366      @SuppressWarnings("serial") static final class MapReduceMappingsToLongTask<K,V>
6367 <        extends BulkTask<K,V,Long> {
6367 >        extends Traverser<K,V,Long> {
6368          final ObjectByObjectToLong<? super K, ? super V> transformer;
6369          final LongByLongToLong reducer;
6370          final long basis;
6371          long result;
6372          MapReduceMappingsToLongTask<K,V> rights, nextRight;
6373          MapReduceMappingsToLongTask
6374 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6374 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6375               MapReduceMappingsToLongTask<K,V> nextRight,
6376               ObjectByObjectToLong<? super K, ? super V> transformer,
6377               long basis,
# Line 6870 | Line 6380 | public class ConcurrentHashMap<K, V>
6380              this.transformer = transformer;
6381              this.basis = basis; this.reducer = reducer;
6382          }
6383 <        @SuppressWarnings("unchecked") public final boolean exec() {
6383 >        public final Long getRawResult() { return result; }
6384 >        @SuppressWarnings("unchecked") public final void compute() {
6385              final ObjectByObjectToLong<? super K, ? super V> transformer =
6386                  this.transformer;
6387              final LongByLongToLong reducer = this.reducer;
6388              if (transformer == null || reducer == null)
6389 <                return abortOnNullFunction();
6390 <            try {
6391 <                final long id = this.basis;
6392 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6393 <                    do {} while (!casPending(c = pending, c+1));
6394 <                    (rights = new MapReduceMappingsToLongTask<K,V>
6395 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6396 <                }
6397 <                long r = id;
6398 <                Object v;
6399 <                while ((v = advance()) != null)
6400 <                    r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6401 <                result = r;
6402 <                for (MapReduceMappingsToLongTask<K,V> t = this, s;;) {
6403 <                    int c; BulkTask<K,V,?> par;
6404 <                    if ((c = t.pending) == 0) {
6405 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6895 <                            t.result = reducer.apply(t.result, s.result);
6896 <                        }
6897 <                        if ((par = t.parent) == null ||
6898 <                            !(par instanceof MapReduceMappingsToLongTask)) {
6899 <                            t.quietlyComplete();
6900 <                            break;
6901 <                        }
6902 <                        t = (MapReduceMappingsToLongTask<K,V>)par;
6903 <                    }
6904 <                    else if (t.casPending(c, c - 1))
6905 <                        break;
6389 >                throw new NullPointerException();
6390 >            long r = this.basis;
6391 >            for (int b; (b = preSplit()) > 0;)
6392 >                (rights = new MapReduceMappingsToLongTask<K,V>
6393 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6394 >            Object v;
6395 >            while ((v = advance()) != null)
6396 >                r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6397 >            result = r;
6398 >            CountedCompleter<?> c;
6399 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6400 >                MapReduceMappingsToLongTask<K,V>
6401 >                    t = (MapReduceMappingsToLongTask<K,V>)c,
6402 >                    s = t.rights;
6403 >                while (s != null) {
6404 >                    t.result = reducer.apply(t.result, s.result);
6405 >                    s = t.rights = s.nextRight;
6406                  }
6907            } catch (Throwable ex) {
6908                return tryCompleteComputation(ex);
6407              }
6910            MapReduceMappingsToLongTask<K,V> s = rights;
6911            if (s != null && !inForkJoinPool()) {
6912                do  {
6913                    if (s.tryUnfork())
6914                        s.exec();
6915                } while ((s = s.nextRight) != null);
6916            }
6917            return false;
6408          }
6919        public final Long getRawResult() { return result; }
6409      }
6410  
6411      @SuppressWarnings("serial") static final class MapReduceKeysToIntTask<K,V>
6412 <        extends BulkTask<K,V,Integer> {
6412 >        extends Traverser<K,V,Integer> {
6413          final ObjectToInt<? super K> transformer;
6414          final IntByIntToInt reducer;
6415          final int basis;
6416          int result;
6417          MapReduceKeysToIntTask<K,V> rights, nextRight;
6418          MapReduceKeysToIntTask
6419 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6419 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6420               MapReduceKeysToIntTask<K,V> nextRight,
6421               ObjectToInt<? super K> transformer,
6422               int basis,
# Line 6936 | Line 6425 | public class ConcurrentHashMap<K, V>
6425              this.transformer = transformer;
6426              this.basis = basis; this.reducer = reducer;
6427          }
6428 <        @SuppressWarnings("unchecked") public final boolean exec() {
6428 >        public final Integer getRawResult() { return result; }
6429 >        @SuppressWarnings("unchecked") public final void compute() {
6430              final ObjectToInt<? super K> transformer =
6431                  this.transformer;
6432              final IntByIntToInt reducer = this.reducer;
6433              if (transformer == null || reducer == null)
6434 <                return abortOnNullFunction();
6435 <            try {
6436 <                final int id = this.basis;
6437 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6438 <                    do {} while (!casPending(c = pending, c+1));
6439 <                    (rights = new MapReduceKeysToIntTask<K,V>
6440 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6441 <                }
6442 <                int r = id;
6443 <                while (advance() != null)
6444 <                    r = reducer.apply(r, transformer.apply((K)nextKey));
6445 <                result = r;
6446 <                for (MapReduceKeysToIntTask<K,V> t = this, s;;) {
6447 <                    int c; BulkTask<K,V,?> par;
6448 <                    if ((c = t.pending) == 0) {
6449 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
6960 <                            t.result = reducer.apply(t.result, s.result);
6961 <                        }
6962 <                        if ((par = t.parent) == null ||
6963 <                            !(par instanceof MapReduceKeysToIntTask)) {
6964 <                            t.quietlyComplete();
6965 <                            break;
6966 <                        }
6967 <                        t = (MapReduceKeysToIntTask<K,V>)par;
6968 <                    }
6969 <                    else if (t.casPending(c, c - 1))
6970 <                        break;
6434 >                throw new NullPointerException();
6435 >            int r = this.basis;
6436 >            for (int b; (b = preSplit()) > 0;)
6437 >                (rights = new MapReduceKeysToIntTask<K,V>
6438 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6439 >            while (advance() != null)
6440 >                r = reducer.apply(r, transformer.apply((K)nextKey));
6441 >            result = r;
6442 >            CountedCompleter<?> c;
6443 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6444 >                MapReduceKeysToIntTask<K,V>
6445 >                    t = (MapReduceKeysToIntTask<K,V>)c,
6446 >                    s = t.rights;
6447 >                while (s != null) {
6448 >                    t.result = reducer.apply(t.result, s.result);
6449 >                    s = t.rights = s.nextRight;
6450                  }
6972            } catch (Throwable ex) {
6973                return tryCompleteComputation(ex);
6974            }
6975            MapReduceKeysToIntTask<K,V> s = rights;
6976            if (s != null && !inForkJoinPool()) {
6977                do  {
6978                    if (s.tryUnfork())
6979                        s.exec();
6980                } while ((s = s.nextRight) != null);
6451              }
6982            return false;
6452          }
6984        public final Integer getRawResult() { return result; }
6453      }
6454  
6455      @SuppressWarnings("serial") static final class MapReduceValuesToIntTask<K,V>
6456 <        extends BulkTask<K,V,Integer> {
6456 >        extends Traverser<K,V,Integer> {
6457          final ObjectToInt<? super V> transformer;
6458          final IntByIntToInt reducer;
6459          final int basis;
6460          int result;
6461          MapReduceValuesToIntTask<K,V> rights, nextRight;
6462          MapReduceValuesToIntTask
6463 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6463 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6464               MapReduceValuesToIntTask<K,V> nextRight,
6465               ObjectToInt<? super V> transformer,
6466               int basis,
# Line 7001 | Line 6469 | public class ConcurrentHashMap<K, V>
6469              this.transformer = transformer;
6470              this.basis = basis; this.reducer = reducer;
6471          }
6472 <        @SuppressWarnings("unchecked") public final boolean exec() {
6472 >        public final Integer getRawResult() { return result; }
6473 >        @SuppressWarnings("unchecked") public final void compute() {
6474              final ObjectToInt<? super V> transformer =
6475                  this.transformer;
6476              final IntByIntToInt reducer = this.reducer;
6477              if (transformer == null || reducer == null)
6478 <                return abortOnNullFunction();
6479 <            try {
6480 <                final int id = this.basis;
6481 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6482 <                    do {} while (!casPending(c = pending, c+1));
6483 <                    (rights = new MapReduceValuesToIntTask<K,V>
6484 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6485 <                }
6486 <                int r = id;
6487 <                Object v;
6488 <                while ((v = advance()) != null)
6489 <                    r = reducer.apply(r, transformer.apply((V)v));
6490 <                result = r;
6491 <                for (MapReduceValuesToIntTask<K,V> t = this, s;;) {
6492 <                    int c; BulkTask<K,V,?> par;
6493 <                    if ((c = t.pending) == 0) {
6494 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
7026 <                            t.result = reducer.apply(t.result, s.result);
7027 <                        }
7028 <                        if ((par = t.parent) == null ||
7029 <                            !(par instanceof MapReduceValuesToIntTask)) {
7030 <                            t.quietlyComplete();
7031 <                            break;
7032 <                        }
7033 <                        t = (MapReduceValuesToIntTask<K,V>)par;
7034 <                    }
7035 <                    else if (t.casPending(c, c - 1))
7036 <                        break;
6478 >                throw new NullPointerException();
6479 >            int r = this.basis;
6480 >            for (int b; (b = preSplit()) > 0;)
6481 >                (rights = new MapReduceValuesToIntTask<K,V>
6482 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6483 >            Object v;
6484 >            while ((v = advance()) != null)
6485 >                r = reducer.apply(r, transformer.apply((V)v));
6486 >            result = r;
6487 >            CountedCompleter<?> c;
6488 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6489 >                MapReduceValuesToIntTask<K,V>
6490 >                    t = (MapReduceValuesToIntTask<K,V>)c,
6491 >                    s = t.rights;
6492 >                while (s != null) {
6493 >                    t.result = reducer.apply(t.result, s.result);
6494 >                    s = t.rights = s.nextRight;
6495                  }
7038            } catch (Throwable ex) {
7039                return tryCompleteComputation(ex);
6496              }
7041            MapReduceValuesToIntTask<K,V> s = rights;
7042            if (s != null && !inForkJoinPool()) {
7043                do  {
7044                    if (s.tryUnfork())
7045                        s.exec();
7046                } while ((s = s.nextRight) != null);
7047            }
7048            return false;
6497          }
7050        public final Integer getRawResult() { return result; }
6498      }
6499  
6500      @SuppressWarnings("serial") static final class MapReduceEntriesToIntTask<K,V>
6501 <        extends BulkTask<K,V,Integer> {
6501 >        extends Traverser<K,V,Integer> {
6502          final ObjectToInt<Map.Entry<K,V>> transformer;
6503          final IntByIntToInt reducer;
6504          final int basis;
6505          int result;
6506          MapReduceEntriesToIntTask<K,V> rights, nextRight;
6507          MapReduceEntriesToIntTask
6508 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6508 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6509               MapReduceEntriesToIntTask<K,V> nextRight,
6510               ObjectToInt<Map.Entry<K,V>> transformer,
6511               int basis,
# Line 7067 | Line 6514 | public class ConcurrentHashMap<K, V>
6514              this.transformer = transformer;
6515              this.basis = basis; this.reducer = reducer;
6516          }
6517 <        @SuppressWarnings("unchecked") public final boolean exec() {
6517 >        public final Integer getRawResult() { return result; }
6518 >        @SuppressWarnings("unchecked") public final void compute() {
6519              final ObjectToInt<Map.Entry<K,V>> transformer =
6520                  this.transformer;
6521              final IntByIntToInt reducer = this.reducer;
6522              if (transformer == null || reducer == null)
6523 <                return abortOnNullFunction();
6524 <            try {
6525 <                final int id = this.basis;
6526 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6527 <                    do {} while (!casPending(c = pending, c+1));
6528 <                    (rights = new MapReduceEntriesToIntTask<K,V>
6529 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6530 <                }
6531 <                int r = id;
6532 <                Object v;
6533 <                while ((v = advance()) != null)
6534 <                    r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6535 <                result = r;
6536 <                for (MapReduceEntriesToIntTask<K,V> t = this, s;;) {
6537 <                    int c; BulkTask<K,V,?> par;
6538 <                    if ((c = t.pending) == 0) {
6539 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
7092 <                            t.result = reducer.apply(t.result, s.result);
7093 <                        }
7094 <                        if ((par = t.parent) == null ||
7095 <                            !(par instanceof MapReduceEntriesToIntTask)) {
7096 <                            t.quietlyComplete();
7097 <                            break;
7098 <                        }
7099 <                        t = (MapReduceEntriesToIntTask<K,V>)par;
7100 <                    }
7101 <                    else if (t.casPending(c, c - 1))
7102 <                        break;
6523 >                throw new NullPointerException();
6524 >            int r = this.basis;
6525 >            for (int b; (b = preSplit()) > 0;)
6526 >                (rights = new MapReduceEntriesToIntTask<K,V>
6527 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6528 >            Object v;
6529 >            while ((v = advance()) != null)
6530 >                r = reducer.apply(r, transformer.apply(entryFor((K)nextKey, (V)v)));
6531 >            result = r;
6532 >            CountedCompleter<?> c;
6533 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6534 >                MapReduceEntriesToIntTask<K,V>
6535 >                    t = (MapReduceEntriesToIntTask<K,V>)c,
6536 >                    s = t.rights;
6537 >                while (s != null) {
6538 >                    t.result = reducer.apply(t.result, s.result);
6539 >                    s = t.rights = s.nextRight;
6540                  }
7104            } catch (Throwable ex) {
7105                return tryCompleteComputation(ex);
7106            }
7107            MapReduceEntriesToIntTask<K,V> s = rights;
7108            if (s != null && !inForkJoinPool()) {
7109                do  {
7110                    if (s.tryUnfork())
7111                        s.exec();
7112                } while ((s = s.nextRight) != null);
6541              }
7114            return false;
6542          }
7116        public final Integer getRawResult() { return result; }
6543      }
6544  
6545      @SuppressWarnings("serial") static final class MapReduceMappingsToIntTask<K,V>
6546 <        extends BulkTask<K,V,Integer> {
6546 >        extends Traverser<K,V,Integer> {
6547          final ObjectByObjectToInt<? super K, ? super V> transformer;
6548          final IntByIntToInt reducer;
6549          final int basis;
6550          int result;
6551          MapReduceMappingsToIntTask<K,V> rights, nextRight;
6552          MapReduceMappingsToIntTask
6553 <            (ConcurrentHashMap<K,V> m, BulkTask<K,V,?> p, int b,
6554 <             MapReduceMappingsToIntTask<K,V> rights,
6553 >            (ConcurrentHashMap<K,V> m, Traverser<K,V,?> p, int b,
6554 >             MapReduceMappingsToIntTask<K,V> nextRight,
6555               ObjectByObjectToInt<? super K, ? super V> transformer,
6556               int basis,
6557               IntByIntToInt reducer) {
# Line 7133 | Line 6559 | public class ConcurrentHashMap<K, V>
6559              this.transformer = transformer;
6560              this.basis = basis; this.reducer = reducer;
6561          }
6562 <        @SuppressWarnings("unchecked") public final boolean exec() {
6562 >        public final Integer getRawResult() { return result; }
6563 >        @SuppressWarnings("unchecked") public final void compute() {
6564              final ObjectByObjectToInt<? super K, ? super V> transformer =
6565                  this.transformer;
6566              final IntByIntToInt reducer = this.reducer;
6567              if (transformer == null || reducer == null)
6568 <                return abortOnNullFunction();
6569 <            try {
6570 <                final int id = this.basis;
6571 <                for (int c, b = batch(); b > 1 && baseIndex != baseLimit;) {
6572 <                    do {} while (!casPending(c = pending, c+1));
6573 <                    (rights = new MapReduceMappingsToIntTask<K,V>
6574 <                     (map, this, b >>>= 1, rights, transformer, id, reducer)).fork();
6575 <                }
6576 <                int r = id;
6577 <                Object v;
6578 <                while ((v = advance()) != null)
6579 <                    r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6580 <                result = r;
6581 <                for (MapReduceMappingsToIntTask<K,V> t = this, s;;) {
6582 <                    int c; BulkTask<K,V,?> par;
6583 <                    if ((c = t.pending) == 0) {
6584 <                        for (s = t.rights; s != null; s = t.rights = s.nextRight) {
7158 <                            t.result = reducer.apply(t.result, s.result);
7159 <                        }
7160 <                        if ((par = t.parent) == null ||
7161 <                            !(par instanceof MapReduceMappingsToIntTask)) {
7162 <                            t.quietlyComplete();
7163 <                            break;
7164 <                        }
7165 <                        t = (MapReduceMappingsToIntTask<K,V>)par;
7166 <                    }
7167 <                    else if (t.casPending(c, c - 1))
7168 <                        break;
6568 >                throw new NullPointerException();
6569 >            int r = this.basis;
6570 >            for (int b; (b = preSplit()) > 0;)
6571 >                (rights = new MapReduceMappingsToIntTask<K,V>
6572 >                 (map, this, b, rights, transformer, r, reducer)).fork();
6573 >            Object v;
6574 >            while ((v = advance()) != null)
6575 >                r = reducer.apply(r, transformer.apply((K)nextKey, (V)v));
6576 >            result = r;
6577 >            CountedCompleter<?> c;
6578 >            for (c = firstComplete(); c != null; c = c.nextComplete()) {
6579 >                MapReduceMappingsToIntTask<K,V>
6580 >                    t = (MapReduceMappingsToIntTask<K,V>)c,
6581 >                    s = t.rights;
6582 >                while (s != null) {
6583 >                    t.result = reducer.apply(t.result, s.result);
6584 >                    s = t.rights = s.nextRight;
6585                  }
7170            } catch (Throwable ex) {
7171                return tryCompleteComputation(ex);
7172            }
7173            MapReduceMappingsToIntTask<K,V> s = rights;
7174            if (s != null && !inForkJoinPool()) {
7175                do  {
7176                    if (s.tryUnfork())
7177                        s.exec();
7178                } while ((s = s.nextRight) != null);
6586              }
7180            return false;
6587          }
7182        public final Integer getRawResult() { return result; }
6588      }
6589  
6590 +
6591      // Unsafe mechanics
6592      private static final sun.misc.Unsafe UNSAFE;
6593      private static final long counterOffset;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines