ThreadSafeProgressMonitor.java

  1. /*
  2.  * Copyright (C) 2010, Google Inc. and others
  3.  *
  4.  * This program and the accompanying materials are made available under the
  5.  * terms of the Eclipse Distribution License v. 1.0 which is available at
  6.  * https://www.eclipse.org/org/documents/edl-v10.php.
  7.  *
  8.  * SPDX-License-Identifier: BSD-3-Clause
  9.  */

  10. package org.eclipse.jgit.lib;

  11. import java.util.concurrent.Semaphore;
  12. import java.util.concurrent.atomic.AtomicInteger;
  13. import java.util.concurrent.locks.ReentrantLock;

  14. /**
  15.  * Wrapper around the general {@link org.eclipse.jgit.lib.ProgressMonitor} to
  16.  * make it thread safe.
  17.  *
  18.  * Updates to the underlying ProgressMonitor are made only from the thread that
  19.  * allocated this wrapper. Callers are responsible for ensuring the allocating
  20.  * thread uses {@link #pollForUpdates()} or {@link #waitForCompletion()} to
  21.  * update the underlying ProgressMonitor.
  22.  *
  23.  * Only {@link #update(int)}, {@link #isCancelled()}, and {@link #endWorker()}
  24.  * may be invoked from a worker thread. All other methods of the ProgressMonitor
  25.  * interface can only be called from the thread that allocates this wrapper.
  26.  */
  27. public class ThreadSafeProgressMonitor implements ProgressMonitor {
  28.     private final ProgressMonitor pm;

  29.     private final ReentrantLock lock;

  30.     private final Thread mainThread;

  31.     private final AtomicInteger workers;

  32.     private final AtomicInteger pendingUpdates;

  33.     private final Semaphore process;

  34.     /**
  35.      * Wrap a ProgressMonitor to be thread safe.
  36.      *
  37.      * @param pm
  38.      *            the underlying monitor to receive events.
  39.      */
  40.     public ThreadSafeProgressMonitor(ProgressMonitor pm) {
  41.         this.pm = pm;
  42.         this.lock = new ReentrantLock();
  43.         this.mainThread = Thread.currentThread();
  44.         this.workers = new AtomicInteger(0);
  45.         this.pendingUpdates = new AtomicInteger(0);
  46.         this.process = new Semaphore(0);
  47.     }

  48.     /** {@inheritDoc} */
  49.     @Override
  50.     public void start(int totalTasks) {
  51.         if (!isMainThread())
  52.             throw new IllegalStateException();
  53.         pm.start(totalTasks);
  54.     }

  55.     /** {@inheritDoc} */
  56.     @Override
  57.     public void beginTask(String title, int totalWork) {
  58.         if (!isMainThread())
  59.             throw new IllegalStateException();
  60.         pm.beginTask(title, totalWork);
  61.     }

  62.     /**
  63.      * Notify the monitor a worker is starting.
  64.      */
  65.     public void startWorker() {
  66.         startWorkers(1);
  67.     }

  68.     /**
  69.      * Notify the monitor of workers starting.
  70.      *
  71.      * @param count
  72.      *            the number of worker threads that are starting.
  73.      */
  74.     public void startWorkers(int count) {
  75.         workers.addAndGet(count);
  76.     }

  77.     /**
  78.      * Notify the monitor a worker is finished.
  79.      */
  80.     public void endWorker() {
  81.         if (workers.decrementAndGet() == 0)
  82.             process.release();
  83.     }

  84.     /**
  85.      * Non-blocking poll for pending updates.
  86.      *
  87.      * This method can only be invoked by the same thread that allocated this
  88.      * ThreadSafeProgressMonior.
  89.      */
  90.     public void pollForUpdates() {
  91.         assert isMainThread();
  92.         doUpdates();
  93.     }

  94.     /**
  95.      * Process pending updates and wait for workers to finish.
  96.      *
  97.      * This method can only be invoked by the same thread that allocated this
  98.      * ThreadSafeProgressMonior.
  99.      *
  100.      * @throws java.lang.InterruptedException
  101.      *             if the main thread is interrupted while waiting for
  102.      *             completion of workers.
  103.      */
  104.     public void waitForCompletion() throws InterruptedException {
  105.         assert isMainThread();
  106.         while (0 < workers.get()) {
  107.             doUpdates();
  108.             process.acquire();
  109.         }
  110.         doUpdates();
  111.     }

  112.     private void doUpdates() {
  113.         int cnt = pendingUpdates.getAndSet(0);
  114.         if (0 < cnt)
  115.             pm.update(cnt);
  116.     }

  117.     /** {@inheritDoc} */
  118.     @Override
  119.     public void update(int completed) {
  120.         if (0 == pendingUpdates.getAndAdd(completed))
  121.             process.release();
  122.     }

  123.     /** {@inheritDoc} */
  124.     @Override
  125.     public boolean isCancelled() {
  126.         lock.lock();
  127.         try {
  128.             return pm.isCancelled();
  129.         } finally {
  130.             lock.unlock();
  131.         }
  132.     }

  133.     /** {@inheritDoc} */
  134.     @Override
  135.     public void endTask() {
  136.         if (!isMainThread())
  137.             throw new IllegalStateException();
  138.         pm.endTask();
  139.     }

  140.     private boolean isMainThread() {
  141.         return Thread.currentThread() == mainThread;
  142.     }
  143. }