ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.92 by dl, Tue Feb 22 10:50:51 2011 UTC vs.
Revision 1.103 by jsr166, Wed Jun 1 21:04:30 2011 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   package jsr166y;
# Line 151 | Line 151 | public class ForkJoinPool extends Abstra
151       * Updates tend not to contend with each other except during
152       * bursts while submitted tasks begin or end.  In some cases when
153       * they do contend, threads can instead do something else
154 <     * (usually, scan for tesks) until contention subsides.
154 >     * (usually, scan for tasks) until contention subsides.
155       *
156       * To enable packing, we restrict maximum parallelism to (1<<15)-1
157       * (which is far in excess of normal operating range) to allow
# Line 195 | Line 195 | public class ForkJoinPool extends Abstra
195       * shutdown schemes.
196       *
197       * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
198 <     * let workers spin indefinitely scanning for tasks when none are
199 <     * can be immediately found, and we cannot start/resume workers
200 <     * unless there appear to be tasks available.  On the other hand,
201 <     * we must quickly prod them into action when new tasks are
202 <     * submitted or generated.  We park/unpark workers after placing
203 <     * in an event wait queue when they cannot find work. This "queue"
204 <     * is actually a simple Treiber stack, headed by the "id" field of
205 <     * ctl, plus a 15bit counter value to both wake up waiters (by
206 <     * advancing their count) and avoid ABA effects. Successors are
207 <     * held in worker field "nextWait".  Queuing deals with several
208 <     * intrinsic races, mainly that a task-producing thread can miss
209 <     * seeing (and signalling) another thread that gave up looking for
210 <     * work but has not yet entered the wait queue. We solve this by
211 <     * requiring a full sweep of all workers both before (in scan())
212 <     * and after (in awaitWork()) a newly waiting worker is added to
213 <     * the wait queue. During a rescan, the worker might release some
214 <     * other queued worker rather than itself, which has the same net
215 <     * effect.
198 >     * let workers spin indefinitely scanning for tasks when none can
199 >     * be found immediately, and we cannot start/resume workers unless
200 >     * there appear to be tasks available.  On the other hand, we must
201 >     * quickly prod them into action when new tasks are submitted or
202 >     * generated.  We park/unpark workers after placing in an event
203 >     * wait queue when they cannot find work. This "queue" is actually
204 >     * a simple Treiber stack, headed by the "id" field of ctl, plus a
205 >     * 15bit counter value to both wake up waiters (by advancing their
206 >     * count) and avoid ABA effects. Successors are held in worker
207 >     * field "nextWait".  Queuing deals with several intrinsic races,
208 >     * mainly that a task-producing thread can miss seeing (and
209 >     * signalling) another thread that gave up looking for work but
210 >     * has not yet entered the wait queue. We solve this by requiring
211 >     * a full sweep of all workers both before (in scan()) and after
212 >     * (in tryAwaitWork()) a newly waiting worker is added to the wait
213 >     * queue. During a rescan, the worker might release some other
214 >     * queued worker rather than itself, which has the same net
215 >     * effect. Because enqueued workers may actually be rescanning
216 >     * rather than waiting, we set and clear the "parked" field of
217 >     * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
218 >     * (Use of the parked field requires a secondary recheck to avoid
219 >     * missed signals.)
220       *
221       * Signalling.  We create or wake up workers only when there
222       * appears to be at least one task they might be able to find and
# Line 229 | Line 233 | public class ForkJoinPool extends Abstra
233       * Trimming workers. To release resources after periods of lack of
234       * use, a worker starting to wait when the pool is quiescent will
235       * time out and terminate if the pool has remained quiescent for
236 <     * SHRINK_RATE nanosecs.
236 >     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
237 >     * terminating all workers after long periods of non-use.
238       *
239       * Submissions. External submissions are maintained in an
240       * array-based queue that is structured identically to
241 <     * ForkJoinWorkerThread queues (which see) except for the use of
242 <     * submissionLock in method addSubmission. Unlike worker queues,
243 <     * multiple external threads can add new submissions.
241 >     * ForkJoinWorkerThread queues except for the use of
242 >     * submissionLock in method addSubmission. Unlike the case for
243 >     * worker queues, multiple external threads can add new
244 >     * submissions, so adding requires a lock.
245       *
246       * Compensation. Beyond work-stealing support and lifecycle
247       * control, the main responsibility of this framework is to take
# Line 272 | Line 278 | public class ForkJoinPool extends Abstra
278       * if blocking would leave less than one active (non-waiting,
279       * non-blocked) worker. Additionally, to avoid some false alarms
280       * due to GC, lagging counters, system activity, etc, compensated
281 <     * blocking for joins is only attempted after a number of rechecks
282 <     * proportional to the current apparent deficit (where retries are
283 <     * interspersed with Thread.yield, for good citizenship).  The
284 <     * variable blockedCount, incremented before blocking and
285 <     * decremented after, is sometimes needed to distinguish cases of
286 <     * waiting for work vs blocking on joins or other managed sync,
287 <     * but both the cases are equivalent for most pool control, so we
288 <     * can update non-atomically. (Additionally, contention on
283 <     * blockedCount alleviates some contention on ctl).
281 >     * blocking for joins is only attempted after rechecks stabilize
282 >     * (retries are interspersed with Thread.yield, for good
283 >     * citizenship).  The variable blockedCount, incremented before
284 >     * blocking and decremented after, is sometimes needed to
285 >     * distinguish cases of waiting for work vs blocking on joins or
286 >     * other managed sync. Both cases are equivalent for most pool
287 >     * control, so we can update non-atomically. (Additionally,
288 >     * contention on blockedCount alleviates some contention on ctl).
289       *
290       * Shutdown and Termination. A call to shutdownNow atomically sets
291       * the ctl stop bit and then (non-atomically) sets each workers
# Line 478 | Line 483 | public class ForkJoinPool extends Abstra
483       * negative, there is at least one waiting worker, and when e is
484       * negative, the pool is terminating.  To deal with these possibly
485       * negative fields, we use casts in and out of "short" and/or
486 <     * signed shifts to maintain signedness.  Note: AC_SHIFT is
482 <     * redundantly declared in ForkJoinWorkerThread in order to
483 <     * integrate a surplus-threads check.
486 >     * signed shifts to maintain signedness.
487       */
488      volatile long ctl;
489  
# Line 524 | Line 527 | public class ForkJoinPool extends Abstra
527  
528      /**
529       * Index (mod submission queue length) of next element to take
530 <     * from submission queue.
530 >     * from submission queue. Usage is identical to that for
531 >     * per-worker queues -- see ForkJoinWorkerThread internal
532 >     * documentation.
533       */
534      volatile int queueBase;
535  
536      /**
537       * Index (mod submission queue length) of next element to add
538 <     * in submission queue.
538 >     * in submission queue. Usage is identical to that for
539 >     * per-worker queues -- see ForkJoinWorkerThread internal
540 >     * documentation.
541       */
542      int queueTop;
543  
# Line 568 | Line 575 | public class ForkJoinPool extends Abstra
575      private int nextWorkerIndex;
576  
577      /**
578 <     * SeqLock and index masking for for updates to workers array.
579 <     * Locked when SG_UNIT is set. Unlocking clears bit by adding
578 >     * SeqLock and index masking for updates to workers array.  Locked
579 >     * when SG_UNIT is set. Unlocking clears bit by adding
580       * SG_UNIT. Staleness of read-only operations can be checked by
581       * comparing scanGuard to value before the reads. The low 16 bits
582       * (i.e, anding with SMASK) hold (the smallest power of two
# Line 707 | Line 714 | public class ForkJoinPool extends Abstra
714       */
715      private boolean scan(ForkJoinWorkerThread w, int a) {
716          int g = scanGuard; // mask 0 avoids useless scans if only one active
717 <        int m = parallelism == 1 - a? 0 : g & SMASK;
717 >        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
718          ForkJoinWorkerThread[] ws = workers;
719          if (ws == null || ws.length <= m)         // staleness check
720              return false;
# Line 754 | Line 761 | public class ForkJoinPool extends Abstra
761      }
762  
763      /**
764 <     * Tries to enqueue worker in wait queue and await change in
765 <     * worker's eventCount.  Before blocking, rescans queues to avoid
766 <     * missed signals.  If the pool is quiescent, possibly terminates
767 <     * worker upon exit.
764 >     * Tries to enqueue worker w in wait queue and await change in
765 >     * worker's eventCount.  If the pool is quiescent and there is
766 >     * more than one worker, possibly terminates worker upon exit.
767 >     * Otherwise, before blocking, rescans queues to avoid missed
768 >     * signals.  Upon finding work, releases at least one worker
769 >     * (which may be the current worker). Rescans restart upon
770 >     * detected staleness or failure to release due to
771 >     * contention. Note the unusual conventions about Thread.interrupt
772 >     * here and elsewhere: Because interrupts are used solely to alert
773 >     * threads to check termination, which is checked here anyway, we
774 >     * clear status (using Thread.interrupted) before any call to
775 >     * park, so that park does not immediately return due to status
776 >     * being set via some other unrelated call to interrupt in user
777 >     * code.
778       *
779       * @param w the calling worker
780       * @param c the ctl value on entry
# Line 765 | Line 782 | public class ForkJoinPool extends Abstra
782       */
783      private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
784          int v = w.eventCount;
785 <        w.nextWait = (int)c;                       // w's successor record
785 >        w.nextWait = (int)c;                      // w's successor record
786          long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
787          if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
788 <            long d = ctl; // return true if lost to a deq, to force rescan
788 >            long d = ctl; // return true if lost to a deq, to force scan
789              return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
790          }
791 <        if (parallelism + (int)(c >> AC_SHIFT) == 1 &&
791 >        for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount
792 >            long s = stealCount;
793 >            if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
794 >                sc = w.stealCount = 0;
795 >            else if (w.eventCount != v)
796 >                return true;                      // update next time
797 >        }
798 >        if ((!shutdown || !tryTerminate(false)) &&
799 >            (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
800              blockedCount == 0 && quiescerCount == 0)
801 <            idleAwaitWork(w, v);               // quiescent -- maybe shrink
802 <
778 <        boolean rescanned = false;
779 <        for (int sc;;) {
801 >            idleAwaitWork(w, nc, c, v);           // quiescent
802 >        for (boolean rescanned = false;;) {
803              if (w.eventCount != v)
804                  return true;
805 <            if ((sc = w.stealCount) != 0) {
783 <                long s = stealCount;               // accumulate stealCount
784 <                if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s+sc))
785 <                    w.stealCount = 0;
786 <            }
787 <            else if (!rescanned) {
805 >            if (!rescanned) {
806                  int g = scanGuard, m = g & SMASK;
807                  ForkJoinWorkerThread[] ws = workers;
808                  if (ws != null && m < ws.length) {
# Line 821 | Line 839 | public class ForkJoinPool extends Abstra
839      }
840  
841      /**
842 <     * If pool is quiescent, checks for termination, and waits for
843 <     * event signal for up to SHRINK_RATE nanosecs. On timeout, if ctl
844 <     * has not changed, terminates the worker. Upon its termination
845 <     * (see deregisterWorker), it may wake up another worker to
846 <     * possibly repeat this process.
842 >     * If inactivating worker w has caused pool to become
843 >     * quiescent, check for pool termination, and wait for event
844 >     * for up to SHRINK_RATE nanosecs (rescans are unnecessary in
845 >     * this case because quiescence reflects consensus about lack
846 >     * of work). On timeout, if ctl has not changed, terminate the
847 >     * worker. Upon its termination (see deregisterWorker), it may
848 >     * wake up another worker to possibly repeat this process.
849       *
850       * @param w the calling worker
851 <     * @param v the eventCount w must wait until changed
852 <     */
853 <    private void idleAwaitWork(ForkJoinWorkerThread w, int v) {
854 <        ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
855 <        if (shutdown)
856 <            tryTerminate(false);
857 <        long c = ctl;
858 <        long nc = (((c & (AC_MASK|TC_MASK)) + AC_UNIT) |
859 <                   (long)(w.nextWait & E_MASK)); // ctl value to release w
860 <        if (w.eventCount == v &&
861 <            parallelism + (int)(c >> AC_SHIFT) == 0 &&
862 <            blockedCount == 0 && quiescerCount == 0) {
843 <            long startTime = System.nanoTime();
844 <            Thread.interrupted();
845 <            if (w.eventCount == v) {
851 >     * @param currentCtl the ctl value after enqueuing w
852 >     * @param prevCtl the ctl value if w terminated
853 >     * @param v the eventCount w awaits change
854 >     */
855 >    private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
856 >                               long prevCtl, int v) {
857 >        if (w.eventCount == v) {
858 >            if (shutdown)
859 >                tryTerminate(false);
860 >            ForkJoinTask.helpExpungeStaleExceptions(); // help clean weak refs
861 >            while (ctl == currentCtl) {
862 >                long startTime = System.nanoTime();
863                  w.parked = true;
864 <                if (w.eventCount == v)
864 >                if (w.eventCount == v)             // must recheck
865                      LockSupport.parkNanos(this, SHRINK_RATE);
866                  w.parked = false;
867 <                if (w.eventCount == v && ctl == c &&
868 <                    System.nanoTime() - startTime >= SHRINK_RATE &&
869 <                    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
870 <                    w.terminate = true;
871 <                    w.eventCount = ((int)c + EC_UNIT) & E_MASK;
867 >                if (w.eventCount != v)
868 >                    break;
869 >                else if (System.nanoTime() - startTime <
870 >                         SHRINK_RATE - (SHRINK_RATE / 10)) // timing slop
871 >                    Thread.interrupted();          // spurious wakeup
872 >                else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
873 >                                                   currentCtl, prevCtl)) {
874 >                    w.terminate = true;            // restore previous
875 >                    w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK;
876 >                    break;
877                  }
878              }
879          }
# Line 887 | Line 909 | public class ForkJoinPool extends Abstra
909  
910      /**
911       * Creates or doubles submissionQueue array.
912 <     * Basically identical to ForkJoinWorkerThread version
912 >     * Basically identical to ForkJoinWorkerThread version.
913       */
914      private void growSubmissionQueue() {
915          ForkJoinTask<?>[] oldQ = submissionQueue;
# Line 974 | Line 996 | public class ForkJoinPool extends Abstra
996          do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
997                                                  c = ctl, c + AC_UNIT));
998          int b;
999 <        do {} while(!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
1000 <                                              b = blockedCount, b - 1));
999 >        do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
1000 >                                               b = blockedCount, b - 1));
1001      }
1002  
1003      /**
# Line 992 | Line 1014 | public class ForkJoinPool extends Abstra
1014                  joinMe.tryAwaitDone(0L);
1015                  postBlock();
1016              }
1017 <            if ((ctl & STOP_BIT) != 0L)
1017 >            else if ((ctl & STOP_BIT) != 0L)
1018                  joinMe.cancelIgnoringExceptions();
1019          }
1020      }
# Line 1127 | Line 1149 | public class ForkJoinPool extends Abstra
1149                          ws[k] = w;
1150                          nextWorkerIndex = k + 1;
1151                          int m = g & SMASK;
1152 <                        g = k >= m? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1152 >                        g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
1153                      }
1154                  } finally {
1155                      scanGuard = g;
# Line 1207 | Line 1229 | public class ForkJoinPool extends Abstra
1229                  if ((int)(c >> AC_SHIFT) != -parallelism)
1230                      return false;
1231                  if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
1232 <                    queueTop - queueBase > 0) {
1232 >                    queueBase != queueTop) {
1233                      if (ctl == c) // staleness check
1234                          return false;
1235                      continue;
# Line 1216 | Line 1238 | public class ForkJoinPool extends Abstra
1238              if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
1239                  startTerminating();
1240          }
1241 <        if ((short)(c >>> TC_SHIFT) == -parallelism) {
1242 <            submissionLock.lock();
1243 <            termination.signalAll();
1244 <            submissionLock.unlock();
1241 >        if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
1242 >            final ReentrantLock lock = this.submissionLock;
1243 >            lock.lock();
1244 >            try {
1245 >                termination.signalAll();
1246 >            } finally {
1247 >                lock.unlock();
1248 >            }
1249          }
1250          return true;
1251      }
1252  
1253      /**
1254       * Runs up to three passes through workers: (0) Setting
1255 <     * termination status for each worker, followed by wakeups up
1256 <     * queued workers (1) helping cancel tasks (2) interrupting
1255 >     * termination status for each worker, followed by wakeups up to
1256 >     * queued workers; (1) helping cancel tasks; (2) interrupting
1257       * lagging threads (likely in external tasks, but possibly also
1258       * blocked in joins).  Each pass repeats previous steps because of
1259       * potential lagging thread creation.
# Line 1273 | Line 1299 | public class ForkJoinPool extends Abstra
1299  
1300      /**
1301       * Tries to set the termination status of waiting workers, and
1302 <     * then wake them up (after which they will terminate).
1302 >     * then wakes them up (after which they will terminate).
1303       */
1304      private void terminateWaiters() {
1305          ForkJoinWorkerThread[] ws = workers;
# Line 1306 | Line 1332 | public class ForkJoinPool extends Abstra
1332       */
1333      final void addQuiescerCount(int delta) {
1334          int c;
1335 <        do {} while(!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
1336 <                                              c = quiescerCount, c + delta));
1335 >        do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
1336 >                                               c = quiescerCount, c + delta));
1337      }
1338  
1339      /**
# Line 1332 | Line 1358 | public class ForkJoinPool extends Abstra
1358      final int idlePerActive() {
1359          // Approximate at powers of two for small values, saturate past 4
1360          int p = parallelism;
1361 <        int a = p + (int)(ctl >> AC_SHIFT);
1362 <        return (a > (p >>>= 1) ? 0 :
1363 <                a > (p >>>= 1) ? 1 :
1364 <                a > (p >>>= 1) ? 2 :
1365 <                a > (p >>>= 1) ? 4 :
1366 <                8);
1361 >        int a = p + (int)(ctl >> AC_SHIFT);
1362 >        return (a > (p >>>= 1) ? 0 :
1363 >                a > (p >>>= 1) ? 1 :
1364 >                a > (p >>>= 1) ? 2 :
1365 >                a > (p >>>= 1) ? 4 :
1366 >                8);
1367      }
1368  
1369      // Exported methods
# Line 1660 | Line 1686 | public class ForkJoinPool extends Abstra
1686       */
1687      public int getRunningThreadCount() {
1688          int r = parallelism + (int)(ctl >> AC_SHIFT);
1689 <        return r <= 0? 0 : r; // suppress momentarily negative values
1689 >        return (r <= 0) ? 0 : r; // suppress momentarily negative values
1690      }
1691  
1692      /**
# Line 1672 | Line 1698 | public class ForkJoinPool extends Abstra
1698       */
1699      public int getActiveThreadCount() {
1700          int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
1701 <        return r <= 0? 0 : r; // suppress momentarily negative values
1701 >        return (r <= 0) ? 0 : r; // suppress momentarily negative values
1702      }
1703  
1704      /**
# Line 1729 | Line 1755 | public class ForkJoinPool extends Abstra
1755  
1756      /**
1757       * Returns an estimate of the number of tasks submitted to this
1758 <     * pool that have not yet begun executing.  This meThod may take
1758 >     * pool that have not yet begun executing.  This method may take
1759       * time proportional to the number of submissions.
1760       *
1761       * @return the number of queued submissions
# Line 1827 | Line 1853 | public class ForkJoinPool extends Abstra
1853          int ac = rc + blockedCount;
1854          String level;
1855          if ((c & STOP_BIT) != 0)
1856 <            level = (tc == 0)? "Terminated" : "Terminating";
1856 >            level = (tc == 0) ? "Terminated" : "Terminating";
1857          else
1858 <            level = shutdown? "Shutting down" : "Running";
1858 >            level = shutdown ? "Shutting down" : "Running";
1859          return super.toString() +
1860              "[" + level +
1861              ", parallelism = " + pc +
# Line 1966 | Line 1992 | public class ForkJoinPool extends Abstra
1992       * {@code isReleasable} must return {@code true} if blocking is
1993       * not necessary. Method {@code block} blocks the current thread
1994       * if necessary (perhaps internally invoking {@code isReleasable}
1995 <     * before actually blocking). The unusual methods in this API
1996 <     * accommodate synchronizers that may, but don't usually, block
1997 <     * for long periods. Similarly, they allow more efficient internal
1998 <     * handling of cases in which additional workers may be, but
1999 <     * usually are not, needed to ensure sufficient parallelism.
2000 <     * Toward this end, implementations of method {@code isReleasable}
2001 <     * must be amenable to repeated invocation.
1995 >     * before actually blocking). These actions are performed by any
1996 >     * thread invoking {@link ForkJoinPool#managedBlock}.  The
1997 >     * unusual methods in this API accommodate synchronizers that may,
1998 >     * but don't usually, block for long periods. Similarly, they
1999 >     * allow more efficient internal handling of cases in which
2000 >     * additional workers may be, but usually are not, needed to
2001 >     * ensure sufficient parallelism.  Toward this end,
2002 >     * implementations of method {@code isReleasable} must be amenable
2003 >     * to repeated invocation.
2004       *
2005       * <p>For example, here is a ManagedBlocker based on a
2006       * ReentrantLock:
# Line 2093 | Line 2121 | public class ForkJoinPool extends Abstra
2121          int s;
2122          try {
2123              UNSAFE = getUnsafe();
2124 <            Class k = ForkJoinPool.class;
2124 >            Class<?> k = ForkJoinPool.class;
2125              ctlOffset = UNSAFE.objectFieldOffset
2126                  (k.getDeclaredField("ctl"));
2127              stealCountOffset = UNSAFE.objectFieldOffset
# Line 2106 | Line 2134 | public class ForkJoinPool extends Abstra
2134                  (k.getDeclaredField("scanGuard"));
2135              nextWorkerNumberOffset = UNSAFE.objectFieldOffset
2136                  (k.getDeclaredField("nextWorkerNumber"));
2137 <            Class a = ForkJoinTask[].class;
2137 >            Class<?> a = ForkJoinTask[].class;
2138              ABASE = UNSAFE.arrayBaseOffset(a);
2139              s = UNSAFE.arrayIndexScale(a);
2140          } catch (Exception e) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines