Pages

Showing posts with label multi-threading. Show all posts
Showing posts with label multi-threading. Show all posts

Saturday, September 6, 2014

HowTo:: Threading: Producer Consumer with thread-safe PriorityBlockingQueue as buffer with custom ordering


Img.1.:: Producer Consumer Problem examples (powered by IntelliJ Idea 13.x)

  In the previous blog post I've blogged about commonly issue in multi-threading Producer-Consumer problem with shared not synchronised buffer. Buffer also showed how to employ some conditions to control the access to it. (link)
  This ConsumerProducer example is more sophisticated. In this example is used MathPriorityTransferQueue which extends PriorityBlockingQueue and implements TransferQueue to employ better control over the exchange process itself. 

// methods to @Override from interface TransferQueue< e >
boolean tryTransfer(E e){...
boolean tryTransfer(E e, long timeout, TimeUnit unit){...
void transfer(E e) throws InterruptedException {...
boolean hasWaitingConsumer(){...
int getWaitingConsumerCount(){..

// methods to @Override from class PriorityBlockingQueue
public E take() throws InterruptedException{...
public E peek(){...
  The Distribution Systems, Parallel computing, machine learning or artificial intelligence are always excited, this is only small introduction to some small part that is worthy to know to make them possible,  so let's  start slowly with the example. 

   The heart of the example is the implementation of MathPriorityTransferQueue which is the buffer used by MathProducers and MathConsumers. The buffer allows them to work together. The basic idea of the example is that each PRODUCER generates NUMBER_PRODUCER_EVENTs that number  of consumers (NUMBER_CONSUMER) process. The total number of MathEvents that CONSUMERs needs to consume is then number of producers multiplied by the number of MathEvents they generates. 
Each MathEvent has assigned the random priority. The Priority is the key how the event are sorted inside MathPriorityTransferQueue.
The main class MathPrioProdConMain takes responsibility about the process of Producers creation and number of consumers. Important is to point out that Consumers will work only if something is available to consume inside the queue (simply when data are available). 
public class MathPrioProdConMain {
    ...
    private static final int NUMBER_CONSUMER = 2;
    private static final int NUMBER_PRODUCERS= 10;
    private static final int NUMBER_PRODUCER_EVENTS= 1000;

    public static void main(String... args){
        MathPriorityTransferQueue< MathEvent > buffer = new MathPriorityTransferQueue<>();
        ...
        Thread[] producerThread = new Thread[ NUMBER_PRODUCERS ];
        for(int i=0; i < NUMBER_PRODUCERS; i++){
            producerThread[i] = new Thread(new MathProducer(NUMBER_PRODUCER_EVENTS, buffer));
            producerThread[i].start();
        }
        ...
        Thread[] consumerThread = new Thread[ NUMBER_CONSUMER ];
        for(int i=0; i < NUMBER_CONSUMER; i++){
            consumerThread[i] = new Thread(new MathConsumer(buffer));
            consumerThread[i].start();
        }
        ...
    }
}
  To test the ordering abilities of the shared data-structure the main method sends unexpected MathEvents into the buffer with defined priority. On the output shows that the ordering is implemented correctly.
...
MathEvent event = new MathEvent(" Transfer Math SuperEvent ", 22);
buffer.transfer( event );
...
  At the end we wait until all consumer processes are done (all threads simply die). 

...
for(int i=0; i < NUMBER_CONSUMER ; i++)
                consumerThread[ i ].join();
...>
  We may keep in mind that the whole multi-threading process  works like state machine with its locking, un-locking, notifying and operations synchronisation which helps to understand why the rest of the code works.  


  Before we move into the code of MathPriorityTransferQueue we take a brief look what individual MathProducer is doing: 
public class MathProducer implements Runnable{
    ...
    public MathProducer(int maxMathEvents, MathPriorityTransferQueue< MathEvent > buffer){
        this.maxMathEvents = maxMathEvents;
        this.buffer = buffer;
    }
    ...
    @Override
    public void run() {
        for(int i=0; i< maxMathEvents; i++){
            Random random=new Random();
            int priority= random.nextInt(100);
            MathEvent event = new MathEvent("MathProducer-" + Thread.currentThread().getName(), priority);
            buffer.put(event);
        }
    }

}
 MathProducer responsibility is to generate some MathEvents and give them random priority. 

   The same action we do with consumer. The individual MathConsumer  consumes all new MathEvent that are available to him in the queue (queue is responsible for MathEvents prioritising). 
public class MathConsumer implements Runnable {
    ...
    public MathConsumer(MathPriorityTransferQueue< MathEvent > buffer) {
        this.buffer = buffer;
    }
    ...
    @Override
    public void run() {
        while(buffer.peek() != null){
            try{
                MathEvent event = buffer.take();
                logger.debug("MathConsumer-" + Thread.currentThread().getName() +
                        " : " + event.getThread() + " priority: " + event.getPriority());
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}
  Now all parts participating on the example are defined and we can move to the implementation of the example heart MathPriorityTransferQueue . Here is worthy to point out atomic counter implementation to avoid possible race-condition and that the counter shouldn't be implemented by using volatile. Volatile quarantines only happens-before (write-read) but not atomicity at all. 
public class MathPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> {
    ...
    private AtomicInteger counter;
    private LinkedBlockingQueue<E> transferList;
    private ReentrantLock lock;
    ...
    public MathPriorityTransferQueue(){
        counter = new AtomicInteger(0);
        lock = new ReentrantLock();
        transferList = new LinkedBlockingQueue<>();
    }
...
  The counter is used to control important operations that are provided by our DataStructure
1. take() operation to get an MathEvent from the data structure (increment counter) and when operation is finished than decrement the counter.
...
@Override
public E take() throws InterruptedException {
   lock.lock();
   counter.incrementAndGet();
   E result = transferList.poll();

   if(result == null){
     lock.unlock();
     result = super.take();
     lock.lock();
   } else {
      synchronized (result){
        result.notifyAll();
      }
   }
   counter.decrementAndGet();
   lock.unlock();
   return result;
}
...
2. peek() operation to retrieve but don't remove head of the queue if the queue is empty then returns null. Here we need to take care also about the transferList that represents Blocking queue to store accidentally transfered MathEvents
@Override
public E peek() {
   lock.lock();
   E eventMain = super.peek();
   E eventTrans = transferList.peek();
   E result = eventMain != null ? eventMain : eventTrans;
   lock.unlock();
   return result;
}
3. tryTransfer(E e) which tries to transfer MathEvent to the consumer if any is waiting for.
...
@Override
public void transfer(E event) throws InterruptedException{
   lock.lock();
   if(counter.get() != 0){
      put(event);
      lock.unlock();
   } else {
      transferList.add(event);
      lock.unlock();
      synchronized (event){
         event.wait();
      }
   }
}
...
4. tryTransfer(E event, long timeout, TimeUnit unit) throws InterruptedException method tries to provide MathEvent to the Consumer who is waiting maximum time. If not successful then transferList for someTimeout or thread is interrupted. 
...
@Override
public boolean tryTransfer(E event, long timeout, TimeUnit unit) throws InterruptedException {
   lock.lock();
   if(counter.get() != 0){
      put(event);
      lock.unlock();
      return true;
   } else {
       transferList.add(event);
       long someTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
       lock.unlock();
       event.wait(someTimeout);
       lock.lock();
       if(transferList.contains(event)){
           transferList.remove(event);
           lock.unlock();
           return false;
       }else{
           lock.unlock();
           return true;
       }
    }
}
...
5. transfer(E e) throws InterruptedException method transfer successfully when Consumer is waiting for MathEvent if not it it put MathEvent into the transferList and waits until MathEvent is taken out. 
...
@Override
public void transfer(E event) throws InterruptedException{
   lock.lock();
   if(counter.get() != 0){
       put(event);
       lock.unlock();
   } else {
       transferList.add(event);
       lock.unlock();
       synchronized (event){
          event.wait();
       }
   }
}
...
  
and some more as you can find in the example source code but they are not critically important to make this example work (hasWaitingConsumergetWaitingConsumerCount). 
   As DataStructure is implemented we need to implement very important MathEvent method compareTo(MathEvent event) as MathEvent implements Comparable<MathEvent> interface.  
 In the example definition data structure orders elements based on their priority.  
public class MathEvent implements Comparable<MathEvent>{
    ...
    @Override
    public int compareTo(MathEvent event) {
        if (this.priority > event.getPriority()) {
            return -1;
        } else if (this.priority < event.getPriority()) {
            return 1;
        } else {
            return 0;
        }
    }
    ...
}
   Having all those steps done we can run successfully MathPrioProdConMain class. and we get following result (of course based on our setup of CONSUMERS and PRODUCERS with MathEvents )
...
MathConsumer-Thread-6 : MathProducer-Thread-4 priority: 71
MathConsumer-Thread-6 : MathProducer-Thread-2 priority: 71
MathPrioProdConMain: Buffer waiting consumers = 0
MathConsumer-Thread-6 : MathProducer-Thread-0 priority: 71
MathConsumer-Thread-6 : Transfer Math Calculation Event  priority: 1
MathConsumer-Thread-6 : MathProducer-Thread-0 priority: 70
MathConsumer-Thread-6 : MathProducer-Thread-0 priority: 70
...

   More comments can be found inside the source code that is available over my github account (example source code)
Enjoy Multi-threading !

Thursday, September 4, 2014

Threading: Producer Consumer without Synchronisation with SharedBuffer

Java becomes (especially java8 :) to be always more challenging when it goes more into the multi-threading questions and tasks. 
  The Producer-Consumer problem is one of the classical ones to solve. The Producer and Consumer share buffer where the Producer put some entity into and consumer uses such product and process it. 
  Following example shows  capacity limited SharedBuffer with the Conditions (interface java.util.concurrent.locks.Condition since java5) under which can be something inserted and read (has buffer free space, has buffer something to read). 
public class SharedBuffer {
  ...
  private ReentrantLock lock;

  private Condition lines;
  private Condition space;
  ...
  public SharedBuffer(int maxSize){
...
  We do not use synchronised LinkedList for data to be exchanged between Producer and Customer. Also we use only one Producer to store some product into the SharedBuffer (in example: Line of Characters) and more Customers to process such products (represented by CUSTOMER_NUMBER in the code)
  The ProdConOneMain class is responsible for the data preparation for Producer who is responsible for adding product into the SharedBuffer under certain condition (is there free space). 
...
public static void main(String... args){  
   logger.debug("Start ProdCon One Example");

   CharacterLines characterLines = new CharacterLines(MAX_LINES, MAX_LINE_CHARACTERS);
   SharedBuffer buffer = new SharedBuffer(BUFFER_SIZE);

   Producer producer = new Producer(characterLines, buffer);
   Thread producerThread = new Thread(producer, "Producer ");

   Thread[ ] consumerThreads = new Thread [ CONSUMER_NUMBER ];
   producerThread.start();
   for(int i=0; i < CONSUMER_NUMBER; i++){
     consumerThreads[i].start();
   }
...
  When condition for insertLine is not satisfied it calls await() method on Condition space to let producer wait until there space is there. Producer thread will be woken up when Consumer treads call signalAll() on this condition. When Producer is able to store new line he calls signalAll() on the lines condition, because it may happen that Consumers are waiting for new lines. SharedBuffer implementation uses ReentrantLock which is the implementation of the Lock interface and provides re-entrant mutual exclusion Lock with the same basic behaviour and semantic as the implicit monitor lock access used by synchronised methods. ReentrantLock provides more controls (ability to lock interruptibly, timeout during waiting for lock, flexibility... ), as Thread doesn't need to block infinitely, and can specify fairness property (not in this example).
...
public void insertLine(String line){
  lock.lock();

  try{
    while(buffer.size() == maxSize){
      space.await();
    }
    buffer.offer(line);
    logger.debug(Thread.currentThread()
                    .getName() + ": BUFFER Inserted Line: " + buffer.size());
    lines.signalAll();

  } catch (InterruptedException e){
     logger.error(e.toString());
  } finally {
     lock.unlock();
  }
}
...
   As Consumer is accessing the SharedBuffer he needs to communicates over it with Producer. Consumer task is to take the product (line of characters) form the SharedBuffer and process it. It does make sense to access when there is something to read and also by Consumer behaviour we make the space for producer and notify him by calling SignalAll() method on the space Condition.  
public  String getLine(){
  String result = null;
  lock.lock();

  try{
    while((buffer.size() == 0) && (hasPendingLines())){
      lines.await();
    }

    if(hasPendingLines()){
       result = buffer.poll();
       logger.debug(Thread.currentThread().getName() + ": BUFFER Line to Read: " +  buffer.size());
       space.signalAll();
    }

  }catch (InterruptedException e){
    logger.error(e.toString());
  } finally {
    lock.unlock();
  }

  return result;
}
   By having defined SharedBuffer with methods relevant for consumer and producer we can create both of them by implementing Runnable interface: 
public class Producer implements Runnable{

    private CharacterLines characterLines;

    private SharedBuffer  buffer;

    public Producer(CharacterLines characterLines, SharedBuffer buffer) {
        this.characterLines = characterLines;
        this.buffer = buffer;
    }

    @Override
    public void run() {
        buffer.setPendingLines(true);
        while(characterLines.hasMoreLines()){
            buffer.insertLine(characterLines.getNextLine());
        }
        buffer.setPendingLines(false);
    }
}
...
...
...
public class Consumer implements Runnable{

    private SharedBuffer buffer;
    private Random random;

    public Consumer(SharedBuffer buffer) {
        this.buffer = buffer;
        this.random = new Random();
    }
    ...
    private void processLine(String line){
        try{
          ...  
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
Currently we have ready actors in our example and we can run it (viz. ProdConOneMain). The example output should look like following. 
Start ProdCon One Example
Waiting Lines: 10
Producer : BUFFER Inserted Line: 1
Waiting Lines: 9
Producer : BUFFER Inserted Line: 2
Waiting Lines: 8
Consumer 0: BUFFER Line to Read: 1
Consumer 1: BUFFER Line to Read: 0
Producer : BUFFER Inserted Line: 1
...

You can check the code over my githug
This example should be the beginning of the bigger series. 
Enjoy Multi-Threading

Thursday, August 14, 2014

Multithreading with Scala and Akka-Actor : PingPong game example

  This small blog post is not only for those who have read/reading cool Scala Cookbook and more specifically chapter about how to use Akka toolkit. Akka with Actors helps to build up highly concurrent, distributed and fault tolerant event-driven application ("we are reactive") on the JVM.

 I'll little modify example which comes from the Scala Cookbook and make new SBT based project out of it to show multi-threading outcome. The build.sbt file:
name := "miko-scala-akka"
version := "1.0"
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4-SNAPSHOT",
  "com.typesafe.akka" %% "akka-testkit" % "2.4-SNAPSHOT",
  "org.slf4j" % "jcl-over-slf4j" % "1.7.7" ,
  "org.slf4j" % "slf4j-api" % "1.7.7" ,
  "org.slf4j" % "slf4j-log4j12" % "1.7.7",
  "org.apache.logging.log4j" % "log4j" % "2.0" excludeAll(
      ExclusionRule(organization = "com.sun.jdmk"),
      ExclusionRule(organization = "com.sun.jmx"),
      ExclusionRule(organization = "javax.jms")
      ),
  "org.scalatest" % "scalatest_2.11" % "2.2.1" % "test",
  "junit" % "junit" % "4.11" % "test"
)
resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
  Every Actor from Akka toolkit should send immutable messages (note: we will check it). For such purpose is used binary operator "!" after which the message is passed as an argument.
Ping Actor:
class Pong extends Actor{
  val name: String = "pongActor"
  def receive = {
    case PingMessage =>
      println(" pong")
      sender ! PongMessage
    case StopMessage =>
      println(" pong stopped")
      context.stop(self)
  }
}
Pong Actor:
class Pong extends Actor{
  def receive = {
    case PingMessage =>
      println(" pong")
      sender ! PongMessage
    case StopMessage =>
      println(" pong stopped")
      context.stop(self)
  }
}
our Ping and Pong actors are defined but used messages not. We create 4 case object (case object can be serialized, pattern matching support, default implementation of equals and hashCode ...
case object PingMessage {
  println("This is PingMessage")
}
case object PongMessage {
  println("This is PongMessage")
}
case object StartMessage {
  println("This is StartMessage")
}
case object StopMessage {
  println("This is StopMessage")
}
  Now we have everything ready to create final object PingPongTest:
object PingPongTest{
  private val logger = LoggerFactory.getLogger(getClass)
  def main(args: Array[String]): Unit = {
    logger.debug("---PingPong Game Start---")
    val system = ActorSystem("PingPongSystem")
    val pong = system.actorOf(Props[Pong], name = "pong")
    val ping = system.actorOf(Props(new Ping(pong)), name = "ping")

    // Start the game
    ping ! StartMessage
    logger.debug("---PingPong Game Stop---")
    system.shutdown()
  }
}
  The main purpose of updating the PingPong game is to show how Reactive Application (multi-threading) works . The Simple Logging Facade (SLF4j) Logger is used here to hightlight the simple reactive application flow.
DEBUG: miko.scala.akka.pingpong.PingPongTest$ - ---PingPong Game App Start---
This is StartMessage
DEBUG: miko.scala.akka.pingpong.PingPongTest$ - ---PingPong Game App End---
ping
This is PingMessage
 pong
This is PongMessage
ping
 pong
ping
 pong
ping
 pong
ping
 pong
ping
 pong
ping
 pong
ping
This is StopMessage
ping stopped
 pong stopped
  From the file output is visible how exciting multi-threading environment is. The event-driven distributed system design could more fit to the nature reality of the world around us. 
Enjoy with GiTHub example