Home > jee-light > Controlling Thread Execution using CyclicBarriers in Java

Controlling Thread Execution using CyclicBarriers in Java

Barriers are threading concepts which define checkpoints within the thread-execution semantics which aid in controlling a group of threads to proceed after all threads in the group has arrived at that checkpoint.  Java’s java.util.concurrent package defines a CyclicBarrier class which is aptly used for such as scenario. It is prefixed with cyclic to denote that we can reuse it any number of times in case of repeatable checkpoints.

We will see how this specific class could be used for the two different scenarios.

1)Single post-barrier action thread.
Single Post Barrier Action
In this scenario you could create a CyclicBarrier with number of threads and a reference to action thread so that it gets invoked post-barrier. Lets look at an example for deriving an average throughput time taken for compute intensive modules whose start and end times are captured. So the example fits the barrier scenario where by the pre-barrier thread computes the difference between the start and end time and post-barrier action thread computes the average out of the accumulated differences.

public class SingleActionPostBarrier
{ private int numberOfThreads = 1;
  private int responseTimes[];

  public SingleActionPostBarrier(int threadCount)
  {  numberOfThreads = threadCount;
     responseTimes = new int[threadCount];
  }

  public void doRun()
  { CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, new ComputeAverage());
    ExecutorService execservice = Executors.newFixedThreadPool(numberOfThreads);
    for (int i = 0; i < numberOfThreads; i++)
    {   execservice.execute(new Runnable()
        {   public void run()
            {  long start = System.currentTimeMillis();
               //invoke the process centric action method..
               long end = System.currentTimeMillis();
               responseTimes[i] = (end-start)/1000;

               try
               { barrier.await();
               }catch(BarrierBrokenException bbe)
	       {
	       }catch(InterruptedException inex)
	       {}
	     }
         }
     }//end of for-loop
  }

  class ComputeAverage implements Runnable
  {    public void run()
       { int sum = 0;
         for(int diff:responseTimes)
         {  sum += diff;
         }
         if (sum!=0) System.out.println("Average Response Time :"+(sum/responseTimes.length));
       }
  }
}

2) Multiple post-barrier action thread.
Multiple Post Barrier Action
From the previous example, lets tweak it and say we need to repeat the same profile computation but for a different action workflow and capture in another responseTimes array. The end result is compute the difference in profile times for these actions for multi threaded behavior and deduce a sorted list. The following snippet would explain it

public class MultiActionPostBarrier
{ private int numberOfThreads = 1;
  private int responseTimes[];
  private int responseTimesForAnotherAction[];

  public SingleActionPostBarrier(int threadCount)
  {  numberOfThreads = threadCount;
     responseTimes = new int[threadCount];
     responseTimesForAnotherAction = new int[threadCount];
  }

  public void doRun()
  { CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, new SortDiffResponse());
    ExecutorService execservice = Executors.newFixedThreadPool(numberOfThreads);
    for (int i = 0; i < numberOfThreads; i++)
    {   execservice.execute(new Runnable()
        {   public void run()
            {  long start = System.currentTimeMillis();
               //invoke the process centric action method..
               long end = System.currentTimeMillis();
               responseTimes[i] = (end-start)/1000;

               try
               { barrier.await();
               }catch(BarrierBrokenException bbe)
	       {
	       }catch(InterruptedException inex)
	       {}

               start = System.currentTimeMillis();
               //invoke the process centric action method..
               end = System.currentTimeMillis();
               responseTimesForAnotherAction[i] = (end-start)/1000;

		// compute the difference
	       responseTimes[i] = responseTimes[i] - responseTimesForAnotherAction[i];
               try
               { barrier.await();
               }catch(BarrierBrokenException bbe)
	       {
	       }catch(InterruptedException inex)
	       {}
	     }
         }
     }//end of for-loop
  }

  class SortDiffResponse implements Runnable
  {    public void run()
       { Arrays.sort(responseTimes);
         for(int diff:responseTimes)
         {  System.out.println(responseTimes[i]);
         }
       }
  }
}

Using CyclicBarriers one could reuse the barrier checkpoint for complex thread behaviors. Please refer to dzone article for an interesting read on the same,.

Categories: jee-light Tags:
  1. No comments yet.
  1. No trackbacks yet.