ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java (file contents):
Revision 1.161 by jsr166, Mon Oct 1 00:10:53 2018 UTC vs.
Revision 1.162 by dl, Thu Jun 4 12:03:50 2020 UTC

# Line 280 | Line 280 | public class LinkedTransferQueue<E> exte
280       * 2. Await match or cancellation (method awaitMatch)
281       *
282       *    Wait for another thread to match node; instead cancelling if
283 <     *    the current thread was interrupted or the wait timed out. On
284 <     *    multiprocessors, we use front-of-queue spinning: If a node
285 <     *    appears to be the first unmatched node in the queue, it
286 <     *    spins a bit before blocking. In either case, before blocking
287 <     *    it tries to unsplice any nodes between the current "head"
288 <     *    and the first unmatched node.
289 <     *
290 <     *    Front-of-queue spinning vastly improves performance of
291 <     *    heavily contended queues. And so long as it is relatively
292 <     *    brief and "quiet", spinning does not much impact performance
293 <     *    of less-contended queues.  During spins threads check their
294 <     *    interrupt status and generate a thread-local random number
295 <     *    to decide to occasionally perform a Thread.yield. While
296 <     *    yield has underdefined specs, we assume that it might help,
297 <     *    and will not hurt, in limiting impact of spinning on busy
298 <     *    systems.  We also use smaller (1/2) spins for nodes that are
299 <     *    not known to be front but whose predecessors have not
300 <     *    blocked -- these "chained" spins avoid artifacts of
301 <     *    front-of-queue rules which otherwise lead to alternating
302 <     *    nodes spinning vs blocking. Further, front threads that
303 <     *    represent phase changes (from data to request node or vice
304 <     *    versa) compared to their predecessors receive additional
305 <     *    chained spins, reflecting longer paths typically required to
306 <     *    unblock threads during phase changes.
307 <     *
283 >     *    the current thread was interrupted or the wait timed out. To
284 >     *    improve performance in common single-source / single-sink
285 >     *    usages when there are more tasks that cores, an initial
286 >     *    Thread.yield is tried when there is apparently only one
287 >     *    waiter.  In other cases, waiters may help with some
288 >     *    bookkeeping, then park/unpark.
289       *
290       * ** Unlinking removed interior nodes **
291       *
# Line 340 | Line 321 | public class LinkedTransferQueue<E> exte
321       *
322       * When these cases arise, rather than always retraversing the
323       * entire list to find an actual predecessor to unlink (which
324 <     * won't help for case (1) anyway), we record a conservative
325 <     * estimate of possible unsplice failures (in "sweepVotes").
326 <     * We trigger a full sweep when the estimate exceeds a threshold
346 <     * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
347 <     * removal failures to tolerate before sweeping through, unlinking
348 <     * cancelled nodes that were not unlinked upon initial removal.
349 <     * We perform sweeps by the thread hitting threshold (rather than
350 <     * background threads or by spreading work to other threads)
351 <     * because in the main contexts in which removal occurs, the
352 <     * caller is timed-out or cancelled, which are not time-critical
353 <     * enough to warrant the overhead that alternatives would impose
354 <     * on other threads.
355 <     *
356 <     * Because the sweepVotes estimate is conservative, and because
357 <     * nodes become unlinked "naturally" as they fall off the head of
358 <     * the queue, and because we allow votes to accumulate even while
359 <     * sweeps are in progress, there are typically significantly fewer
360 <     * such nodes than estimated.  Choice of a threshold value
361 <     * balances the likelihood of wasted effort and contention, versus
362 <     * providing a worst-case bound on retention of interior nodes in
363 <     * quiescent queues. The value defined below was chosen
364 <     * empirically to balance these under various timeout scenarios.
365 <     *
366 <     * Because traversal operations on the linked list of nodes are a
324 >     * won't help for case (1) anyway), we record the need to sweep the
325 >     * next time any thread would otherwise block in awaitMatch. Also,
326 >     * because traversal operations on the linked list of nodes are a
327       * natural opportunity to sweep dead nodes, we generally do so,
328       * including all the operations that might remove elements as they
329       * traverse, such as removeIf and Iterator.remove.  This largely
# Line 376 | Line 336 | public class LinkedTransferQueue<E> exte
336       * self-linked.
337       */
338  
379    /** True if on multiprocessor */
380    private static final boolean MP =
381        Runtime.getRuntime().availableProcessors() > 1;
382
339      /**
340 <     * The number of times to spin (with randomly interspersed calls
341 <     * to Thread.yield) on multiprocessor before blocking when a node
342 <     * is apparently the first waiter in the queue.  See above for
387 <     * explanation. Must be a power of two. The value is empirically
388 <     * derived -- it works pretty well across a variety of processors,
389 <     * numbers of CPUs, and OSes.
340 >     * The number of nanoseconds for which it is faster to spin
341 >     * rather than to use timed park. A rough estimate suffices.
342 >     * Using a power of two minus one simplifies some comparisons.
343       */
344 <    private static final int FRONT_SPINS   = 1 << 7;
392 <
393 <    /**
394 <     * The number of times to spin before blocking when a node is
395 <     * preceded by another node that is apparently spinning.  Also
396 <     * serves as an increment to FRONT_SPINS on phase changes, and as
397 <     * base average frequency for yielding during spins. Must be a
398 <     * power of two.
399 <     */
400 <    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
344 >    static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
345  
346      /**
347       * The maximum number of estimated removal failures (sweepVotes)
# Line 413 | Line 357 | public class LinkedTransferQueue<E> exte
357       * them after use.  Writes that are intrinsically ordered wrt
358       * other accesses or CASes use simple relaxed forms.
359       */
360 <    static final class Node {
360 >    static final class Node implements ForkJoinPool.ManagedBlocker {
361          final boolean isData;   // false if this is a request node
362          volatile Object item;   // initially non-null if isData; CASed to match
363          volatile Node next;
# Line 458 | Line 402 | public class LinkedTransferQueue<E> exte
402          final void appendRelaxed(Node next) {
403              // assert next != null;
404              // assert this.next == null;
405 <            NEXT.set(this, next);
462 <        }
463 <
464 <        /**
465 <         * Sets item (of a request node) to self and waiter to null,
466 <         * to avoid garbage retention after matching or cancelling.
467 <         * Uses relaxed writes because order is already constrained in
468 <         * the only calling contexts: item is forgotten only after
469 <         * volatile/atomic mechanics that extract items, and visitors
470 <         * of request nodes only ever check whether item is null.
471 <         * Similarly, clearing waiter follows either CAS or return
472 <         * from park (if ever parked; else we don't care).
473 <         */
474 <        final void forgetContents() {
475 <            // assert isMatched();
476 <            if (!isData)
477 <                ITEM.set(this, this);
478 <            WAITER.set(this, null);
405 >            NEXT.setOpaque(this, next);
406          }
407  
408          /**
# Line 505 | Line 432 | public class LinkedTransferQueue<E> exte
432              return d != haveData && d != (item == null);
433          }
434  
435 +        public final boolean isReleasable() {
436 +            return (isData == (item == null)) ||
437 +                Thread.currentThread().isInterrupted();
438 +        }
439 +
440 +        public final boolean block() {
441 +            while (!isReleasable()) LockSupport.park();
442 +            return true;
443 +        }
444 +
445          private static final long serialVersionUID = -3375979862319811754L;
446      }
447  
# Line 537 | Line 474 | public class LinkedTransferQueue<E> exte
474      private transient volatile Node tail;
475  
476      /** The number of apparent failures to unsplice cancelled nodes */
477 <    private transient volatile int sweepVotes;
477 >    private transient volatile boolean needSweep;
478  
479      private boolean casTail(Node cmp, Node val) {
480          // assert cmp != null;
# Line 549 | Line 486 | public class LinkedTransferQueue<E> exte
486          return HEAD.compareAndSet(this, cmp, val);
487      }
488  
552    /** Atomic version of ++sweepVotes. */
553    private int incSweepVotes() {
554        return (int) SWEEPVOTES.getAndAdd(this, 1) + 1;
555    }
556
489      /**
490       * Tries to CAS pred.next (or head, if pred is null) from c to p.
491       * Caller must ensure that we're not unlinking the trailing node.
# Line 660 | Line 592 | public class LinkedTransferQueue<E> exte
592      }
593  
594      /**
595 <     * Spins/yields/blocks until node s is matched or caller gives up.
595 >     * Possibly blocks until node s is matched or caller gives up.
596       *
597       * @param s the waiting node
598       * @param pred the predecessor of s, or null if unknown (the null
# Line 671 | Line 603 | public class LinkedTransferQueue<E> exte
603       * @param nanos timeout in nanosecs, used only if timed is true
604       * @return matched item, or e if unmatched on interrupt or timeout
605       */
606 +    @SuppressWarnings("unchecked")
607      private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
608 +        final boolean isData = s.isData;
609          final long deadline = timed ? System.nanoTime() + nanos : 0L;
610 <        Thread w = Thread.currentThread();
611 <        int spins = -1; // initialized after first item and cancel checks
612 <        ThreadLocalRandom randomYields = null; // bound if needed
613 <
614 <        for (;;) {
615 <            final Object item;
616 <            if ((item = s.item) != e) {       // matched
617 <                // assert item != s;
618 <                s.forgetContents();           // avoid garbage
685 <                @SuppressWarnings("unchecked") E itemE = (E) item;
686 <                return itemE;
687 <            }
688 <            else if (w.isInterrupted() || (timed && nanos <= 0L)) {
689 <                // try to cancel and unlink
690 <                if (s.casItem(e, s.isData ? null : s)) {
691 <                    unsplice(pred, s);
610 >        final Thread w = Thread.currentThread();
611 >        int stat = -1;                   // -1: may yield, +1: park, else 0
612 >        Object item;
613 >        while ((item = s.item) == e) {
614 >            if (needSweep)               // help clean
615 >                sweep();
616 >            else if ((timed && nanos <= 0L) || w.isInterrupted()) {
617 >                if (s.casItem(e, (e == null) ? s : null)) {
618 >                    unsplice(pred, s);   // cancelled
619                      return e;
620                  }
694                // return normally if lost CAS
695            }
696            else if (spins < 0) {            // establish spins at/near front
697                if ((spins = spinsFor(pred, s.isData)) > 0)
698                    randomYields = ThreadLocalRandom.current();
621              }
622 <            else if (spins > 0) {             // spin
623 <                --spins;
624 <                if (randomYields.nextInt(CHAINED_SPINS) == 0)
625 <                    Thread.yield();           // occasionally yield
622 >            else if (stat <= 0) {
623 >                if (pred != null && pred.next == s) {
624 >                    if (stat < 0 &&
625 >                        (pred.isData != isData || pred.isMatched())) {
626 >                        stat = 0;        // yield once if first
627 >                        Thread.yield();
628 >                    }
629 >                    else {
630 >                        stat = 1;
631 >                        s.waiter = w;    // enable unpark
632 >                    }
633 >                }                        // else signal in progress
634              }
635 <            else if (s.waiter == null) {
636 <                s.waiter = w;                 // request unpark then recheck
635 >            else if ((item = s.item) != e)
636 >                break;                   // recheck
637 >            else if (!timed) {
638 >                LockSupport.setCurrentBlocker(this);
639 >                try {
640 >                    ForkJoinPool.managedBlock(s);
641 >                } catch (InterruptedException cannotHappen) { }
642 >                LockSupport.setCurrentBlocker(null);
643              }
644 <            else if (timed) {
644 >            else {
645                  nanos = deadline - System.nanoTime();
646 <                if (nanos > 0L)
646 >                if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
647                      LockSupport.parkNanos(this, nanos);
648              }
713            else {
714                LockSupport.park(this);
715            }
716        }
717    }
718
719    /**
720     * Returns spin/yield value for a node with given predecessor and
721     * data mode. See above for explanation.
722     */
723    private static int spinsFor(Node pred, boolean haveData) {
724        if (MP && pred != null) {
725            if (pred.isData != haveData)      // phase change
726                return FRONT_SPINS + CHAINED_SPINS;
727            if (pred.isMatched())             // probably at front
728                return FRONT_SPINS;
729            if (pred.waiter == null)          // pred apparently spinning
730                return CHAINED_SPINS;
649          }
650 <        return 0;
650 >        if (stat == 1)
651 >            WAITER.set(s, null);
652 >        if (!isData)
653 >            ITEM.set(s, s);              // self-link to avoid garbage
654 >        return (E) item;
655      }
656  
657      /* -------------- Traversal methods -------------- */
# Line 1152 | Line 1074 | public class LinkedTransferQueue<E> exte
1074           * See above for rationale. Briefly: if pred still points to
1075           * s, try to unlink s.  If s cannot be unlinked, because it is
1076           * trailing node or pred might be unlinked, and neither pred
1077 <         * nor s are head or offlist, add to sweepVotes, and if enough
1156 <         * votes have accumulated, sweep.
1077 >         * nor s are head or offlist, set needSweep;
1078           */
1079          if (pred != null && pred.next == s) {
1080              Node n = s.next;
# Line 1171 | Line 1092 | public class LinkedTransferQueue<E> exte
1092                      if (hn != h && casHead(h, hn))
1093                          h.selfLink();  // advance head
1094                  }
1095 <                // sweep every SWEEP_THRESHOLD votes
1096 <                if (pred.next != pred && s.next != s // recheck if offlist
1176 <                    && (incSweepVotes() & (SWEEP_THRESHOLD - 1)) == 0)
1177 <                    sweep();
1095 >                if (pred.next != pred && s.next != s)
1096 >                    needSweep = true;
1097              }
1098          }
1099      }
# Line 1184 | Line 1103 | public class LinkedTransferQueue<E> exte
1103       * traversal from head.
1104       */
1105      private void sweep() {
1106 +        needSweep = false;
1107          for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
1108              if (!s.isMatched())
1109                  // Unmatched nodes are never self-linked
# Line 1236 | Line 1156 | public class LinkedTransferQueue<E> exte
1156       * @throws NullPointerException if the specified element is null
1157       */
1158      public void put(E e) {
1159 <        xfer(e, true, ASYNC, 0);
1159 >        xfer(e, true, ASYNC, 0L);
1160      }
1161  
1162      /**
# Line 1249 | Line 1169 | public class LinkedTransferQueue<E> exte
1169       * @throws NullPointerException if the specified element is null
1170       */
1171      public boolean offer(E e, long timeout, TimeUnit unit) {
1172 <        xfer(e, true, ASYNC, 0);
1172 >        xfer(e, true, ASYNC, 0L);
1173          return true;
1174      }
1175  
# Line 1261 | Line 1181 | public class LinkedTransferQueue<E> exte
1181       * @throws NullPointerException if the specified element is null
1182       */
1183      public boolean offer(E e) {
1184 <        xfer(e, true, ASYNC, 0);
1184 >        xfer(e, true, ASYNC, 0L);
1185          return true;
1186      }
1187  
# Line 1274 | Line 1194 | public class LinkedTransferQueue<E> exte
1194       * @throws NullPointerException if the specified element is null
1195       */
1196      public boolean add(E e) {
1197 <        xfer(e, true, ASYNC, 0);
1197 >        xfer(e, true, ASYNC, 0L);
1198          return true;
1199      }
1200  
# Line 1289 | Line 1209 | public class LinkedTransferQueue<E> exte
1209       * @throws NullPointerException if the specified element is null
1210       */
1211      public boolean tryTransfer(E e) {
1212 <        return xfer(e, true, NOW, 0) == null;
1212 >        return xfer(e, true, NOW, 0L) == null;
1213      }
1214  
1215      /**
# Line 1304 | Line 1224 | public class LinkedTransferQueue<E> exte
1224       * @throws NullPointerException if the specified element is null
1225       */
1226      public void transfer(E e) throws InterruptedException {
1227 <        if (xfer(e, true, SYNC, 0) != null) {
1227 >        if (xfer(e, true, SYNC, 0L) != null) {
1228              Thread.interrupted(); // failure possible only due to interrupt
1229              throw new InterruptedException();
1230          }
# Line 1334 | Line 1254 | public class LinkedTransferQueue<E> exte
1254      }
1255  
1256      public E take() throws InterruptedException {
1257 <        E e = xfer(null, false, SYNC, 0);
1257 >        E e = xfer(null, false, SYNC, 0L);
1258          if (e != null)
1259              return e;
1260          Thread.interrupted();
# Line 1349 | Line 1269 | public class LinkedTransferQueue<E> exte
1269      }
1270  
1271      public E poll() {
1272 <        return xfer(null, false, NOW, 0);
1272 >        return xfer(null, false, NOW, 0L);
1273      }
1274  
1275      /**
# Line 1693 | Line 1613 | public class LinkedTransferQueue<E> exte
1613      // VarHandle mechanics
1614      private static final VarHandle HEAD;
1615      private static final VarHandle TAIL;
1696    private static final VarHandle SWEEPVOTES;
1616      static final VarHandle ITEM;
1617      static final VarHandle NEXT;
1618      static final VarHandle WAITER;
# Line 1704 | Line 1623 | public class LinkedTransferQueue<E> exte
1623                                     Node.class);
1624              TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail",
1625                                     Node.class);
1707            SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes",
1708                                         int.class);
1626              ITEM = l.findVarHandle(Node.class, "item", Object.class);
1627              NEXT = l.findVarHandle(Node.class, "next", Node.class);
1628              WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines