Pages

Showing posts with label thread-safe. Show all posts
Showing posts with label thread-safe. Show all posts

Sunday, September 14, 2014

HowTo:: Design patterns with Scala or Java: GoF behavioural pattern Chain of Responsibilities

   
Img.1.: Chain Responsibilities pattern concept

   This pattern is really useful it allows the sender (sends the request) to be decoupled from the receivers. Those receivers are organised into a sequence. 
Inside such sequence every member has a opportunity to process the request or not and forward it to the next one.
  In this blog post I want to compare Java vs. Scala implementations of "Chain of Responsibilities pattern" and its possible implementation (Img.1.).

1. Java 
In following Java implementation is important point  abstract class AbstractHandler1 which has access to the successor property. The successor property provides concrete AbstractHandler1 handler implementation (creates the chain of different handlers).
a) AbstractHandler1:
public abstract  class AbstractHandler1 {
    AbstractHandler1 successor;

    public void setSuccessor(AbstractHandler1 successor){
        this.successor = successor;
    }

    abstract public  void handleRequest(ReqTypeEnum request);
}
Now we create ReqTypeEnum object which is thread-safe and its implementation guarantee that initialisation happens only ones and is immutable. In other hand implementation makes the Java code more readable. 
b) ReqTypeEnum
public enum ReqTypeEnum {
    typeOne(0),
    typeTwo(1),
    typeThree(2);

    private int code;

    private static final Map<Integer, ReqTypeEnum> codeToRequestTypeMapping = new HashMap<Integer, ReqTypeEnum>();
    static{
        for(ReqTypeEnum rt: ReqTypeEnum.values()){
            codeToRequestTypeMapping.put(rt.getCode(), rt);
        }
    }

    private ReqTypeEnum(int c){
        code = c;
    }

    public int getCode(){
        return code;
    }

    public static ReqTypeEnum getRequestType(int code) {
        return codeToRequestTypeMapping.get(code);
    }

}
   After having request handler skeleton (AbstractHandler1defined  and request type (ReqTypeEnum) we can make step forward and create couple of concrete hanlder implementations (example 3):
***************************
public class ConHandler1 extends AbstractHandler1 {
    private static final Logger logger = LoggerFactory.getLogger(ConHandler1.class);
    @Override
    public void handleRequest(ReqTypeEnum request) {
        if(request == ReqTypeEnum.typeOne){
            logger.debug("Concrete Handler 1 = " + request);
        } else {
            logger.debug("Concrete Handler 1 -> Doesn't handle request= " + request);
            if(successor != null){
                successor.handleRequest(request);
            }
        }
    }
}
***************************
public class ConHandler2 extends AbstractHandler1 {

    private static final Logger logger = LoggerFactory.getLogger(ConHandler2.class);

    @Override
    public void handleRequest(ReqTypeEnum request) {
        if(request == ReqTypeEnum.typeTwo){
            logger.debug("Concrete Handler 2 = " + request);
        } else {
            logger.debug("Concrete Handler 2 -> Doesn't handle request= " + request);
            if(successor != null){
                successor.handleRequest(request);
            }
        }
    }
}
***************************
public class ConHandler3 extends AbstractHandler1 {

    private static final Logger logger = LoggerFactory.getLogger(ConHandler3.class);

    @Override
    public void handleRequest(ReqTypeEnum request) {
        if(request == ReqTypeEnum.typeThree){
            logger.debug("Concrete Handler 3 = " + request);
        } else {
            logger.debug("Concrete Handler 3 -> Doesn't handle request= " + request);
            if(successor != null){
                successor.handleRequest(request);
            }
        }
    }
}
and prepare demo setup:
private static AbstractHandler1 setDemoUp(){
   ConHandler1 conHandler1 = new ConHandler1();
   ConHandler2 conHandler2 = new ConHandler2();
   ConHandler3 conHandler3 = new ConHandler3();

   conHandler1.setSuccessor(conHandler2);
   conHandler2.setSuccessor(conHandler3);

   return conHandler1;
}
  Java implementation is ready to launch  
public static void main(String... args){
   ...
   AbstractHandler1 responsibilityChain = setDemoUp();

   responsibilityChain.handleRequest(ReqTypeEnum.typeOne);
   responsibilityChain.handleRequest(ReqTypeEnum.typeTwo);
   responsibilityChain.handleRequest(ReqTypeEnum.typeThree);
   ...
}
  The demo output shows the chain of request handling:
ChainDemo1 - ***Start Chain of Responsibility Demo***
ConHandler1 - Concrete Handler 1 = typeOne
ConHandler1 - Concrete Handler 1 -> Doesn't handle request= typeTwo
ConHandler2 - Concrete Handler 2 = typeTwo
ConHandler1 - Concrete Handler 1 -> Doesn't handle request= typeThree
ConHandler2 - Concrete Handler 2 -> Doesn't handle request= typeThree
ConHandler3 - Concrete Handler 3 = typeThree


2.Scala
  As Java is up, let's move to the Scala possible implementation. This implementation should allow runtime configuration of registered handlers in the chain. Each handler is implemented as Scala object:
object ConHandler1 extends ChainHandler[String]{
  private val name: String = "Concrete Handler One"
  private val logger = LoggerFactory.getLogger(getClass)
  def accept(obj: String) = obj equals name
  def handle(obj: String) = logger.debug("SERVUS " + name)
}
***************************
object ConHandler2 extends ChainHandler[String]{
  private val name: String = "Concrete Handler Two"
  private val logger = LoggerFactory.getLogger(getClass)
  def accept(obj: String) = obj equals name
  def handle(obj: String) = logger.debug("HALLO " + name)
} 
  The ChainHandler is implemented as the generic trait and indicates that every concrete handler implementation, inside the chain, is able to decide whether is able to handle the request.
trait ChainHandler[T] {
  def accept(obj: T): Boolean
  def handle(obj: T): Unit
}
Each request is passed as the parameter which concrete handler uses for processing.
  The process of the requests chain is controlled by ChainManager Scala class. ChainManager groups all assigned concrete handlers and put them into the ListBuffer (chain). 
case class ChainManager[T](n: String) {
  private val logger = LoggerFactory.getLogger(getClass)
  val name: String = n
  private val chain = new ListBuffer[ChainHandler[T]]
  def add(h: ChainHandler[T]) = {
    chain += h
    ChainManager.this
  }
  def apply(obj: T) = {
    val handler = Option(chain.filter(n => n accept obj) head)
    handler match {
      case Some(chainElement) => chainElement handle obj
      case None => logger.error("no handler")
    }
  }
}
As the add() handler methods returns references to the ChainManager itself. It allows linking add() method to simplify the construction.  
controller.add(ConHandler1).add(ConHandler2)
Now we have all elements to run the sample program: 
object ChainOfRespManagerTest {
  private val logger = LoggerFactory.getLogger(getClass)
  def main(args: Array[String]): Unit = {
    val controller = ChainManager[String]("Manager Approach")
    logger.debug("---Chain of Responsibility: "+ controller.name +"---")
    controller.add(ConHandler1).add(ConHandler2)
    controller apply "Concrete Handler One"
    controller apply "Concrete Handler Two"
  }
}
and observe the out put of it:
ChainOfRespManagerTest$ - ---Chain of Responsibility: Manager Approach---
ConHandler1$ - SERVUS Concrete Handler One
ConHandler2$ - HALLO Concrete Handler Two
  

  Conclusion: Although java is cool language such implementation shows necessity to write not only little more code to achieve the similar result but also the solution is not, in some parts, generics enough. 
   The Scala solution illustrates how the invocation of the chain behaviour can be made to look like the part of the language product.   
Enjoy ! 

Saturday, September 6, 2014

HowTo:: Threading: Producer Consumer with thread-safe PriorityBlockingQueue as buffer with custom ordering


Img.1.:: Producer Consumer Problem examples (powered by IntelliJ Idea 13.x)

  In the previous blog post I've blogged about commonly issue in multi-threading Producer-Consumer problem with shared not synchronised buffer. Buffer also showed how to employ some conditions to control the access to it. (link)
  This ConsumerProducer example is more sophisticated. In this example is used MathPriorityTransferQueue which extends PriorityBlockingQueue and implements TransferQueue to employ better control over the exchange process itself. 

// methods to @Override from interface TransferQueue< e >
boolean tryTransfer(E e){...
boolean tryTransfer(E e, long timeout, TimeUnit unit){...
void transfer(E e) throws InterruptedException {...
boolean hasWaitingConsumer(){...
int getWaitingConsumerCount(){..

// methods to @Override from class PriorityBlockingQueue
public E take() throws InterruptedException{...
public E peek(){...
  The Distribution Systems, Parallel computing, machine learning or artificial intelligence are always excited, this is only small introduction to some small part that is worthy to know to make them possible,  so let's  start slowly with the example. 

   The heart of the example is the implementation of MathPriorityTransferQueue which is the buffer used by MathProducers and MathConsumers. The buffer allows them to work together. The basic idea of the example is that each PRODUCER generates NUMBER_PRODUCER_EVENTs that number  of consumers (NUMBER_CONSUMER) process. The total number of MathEvents that CONSUMERs needs to consume is then number of producers multiplied by the number of MathEvents they generates. 
Each MathEvent has assigned the random priority. The Priority is the key how the event are sorted inside MathPriorityTransferQueue.
The main class MathPrioProdConMain takes responsibility about the process of Producers creation and number of consumers. Important is to point out that Consumers will work only if something is available to consume inside the queue (simply when data are available). 
public class MathPrioProdConMain {
    ...
    private static final int NUMBER_CONSUMER = 2;
    private static final int NUMBER_PRODUCERS= 10;
    private static final int NUMBER_PRODUCER_EVENTS= 1000;

    public static void main(String... args){
        MathPriorityTransferQueue< MathEvent > buffer = new MathPriorityTransferQueue<>();
        ...
        Thread[] producerThread = new Thread[ NUMBER_PRODUCERS ];
        for(int i=0; i < NUMBER_PRODUCERS; i++){
            producerThread[i] = new Thread(new MathProducer(NUMBER_PRODUCER_EVENTS, buffer));
            producerThread[i].start();
        }
        ...
        Thread[] consumerThread = new Thread[ NUMBER_CONSUMER ];
        for(int i=0; i < NUMBER_CONSUMER; i++){
            consumerThread[i] = new Thread(new MathConsumer(buffer));
            consumerThread[i].start();
        }
        ...
    }
}
  To test the ordering abilities of the shared data-structure the main method sends unexpected MathEvents into the buffer with defined priority. On the output shows that the ordering is implemented correctly.
...
MathEvent event = new MathEvent(" Transfer Math SuperEvent ", 22);
buffer.transfer( event );
...
  At the end we wait until all consumer processes are done (all threads simply die). 

...
for(int i=0; i < NUMBER_CONSUMER ; i++)
                consumerThread[ i ].join();
...>
  We may keep in mind that the whole multi-threading process  works like state machine with its locking, un-locking, notifying and operations synchronisation which helps to understand why the rest of the code works.  


  Before we move into the code of MathPriorityTransferQueue we take a brief look what individual MathProducer is doing: 
public class MathProducer implements Runnable{
    ...
    public MathProducer(int maxMathEvents, MathPriorityTransferQueue< MathEvent > buffer){
        this.maxMathEvents = maxMathEvents;
        this.buffer = buffer;
    }
    ...
    @Override
    public void run() {
        for(int i=0; i< maxMathEvents; i++){
            Random random=new Random();
            int priority= random.nextInt(100);
            MathEvent event = new MathEvent("MathProducer-" + Thread.currentThread().getName(), priority);
            buffer.put(event);
        }
    }

}
 MathProducer responsibility is to generate some MathEvents and give them random priority. 

   The same action we do with consumer. The individual MathConsumer  consumes all new MathEvent that are available to him in the queue (queue is responsible for MathEvents prioritising). 
public class MathConsumer implements Runnable {
    ...
    public MathConsumer(MathPriorityTransferQueue< MathEvent > buffer) {
        this.buffer = buffer;
    }
    ...
    @Override
    public void run() {
        while(buffer.peek() != null){
            try{
                MathEvent event = buffer.take();
                logger.debug("MathConsumer-" + Thread.currentThread().getName() +
                        " : " + event.getThread() + " priority: " + event.getPriority());
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}
  Now all parts participating on the example are defined and we can move to the implementation of the example heart MathPriorityTransferQueue . Here is worthy to point out atomic counter implementation to avoid possible race-condition and that the counter shouldn't be implemented by using volatile. Volatile quarantines only happens-before (write-read) but not atomicity at all. 
public class MathPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> {
    ...
    private AtomicInteger counter;
    private LinkedBlockingQueue<E> transferList;
    private ReentrantLock lock;
    ...
    public MathPriorityTransferQueue(){
        counter = new AtomicInteger(0);
        lock = new ReentrantLock();
        transferList = new LinkedBlockingQueue<>();
    }
...
  The counter is used to control important operations that are provided by our DataStructure
1. take() operation to get an MathEvent from the data structure (increment counter) and when operation is finished than decrement the counter.
...
@Override
public E take() throws InterruptedException {
   lock.lock();
   counter.incrementAndGet();
   E result = transferList.poll();

   if(result == null){
     lock.unlock();
     result = super.take();
     lock.lock();
   } else {
      synchronized (result){
        result.notifyAll();
      }
   }
   counter.decrementAndGet();
   lock.unlock();
   return result;
}
...
2. peek() operation to retrieve but don't remove head of the queue if the queue is empty then returns null. Here we need to take care also about the transferList that represents Blocking queue to store accidentally transfered MathEvents
@Override
public E peek() {
   lock.lock();
   E eventMain = super.peek();
   E eventTrans = transferList.peek();
   E result = eventMain != null ? eventMain : eventTrans;
   lock.unlock();
   return result;
}
3. tryTransfer(E e) which tries to transfer MathEvent to the consumer if any is waiting for.
...
@Override
public void transfer(E event) throws InterruptedException{
   lock.lock();
   if(counter.get() != 0){
      put(event);
      lock.unlock();
   } else {
      transferList.add(event);
      lock.unlock();
      synchronized (event){
         event.wait();
      }
   }
}
...
4. tryTransfer(E event, long timeout, TimeUnit unit) throws InterruptedException method tries to provide MathEvent to the Consumer who is waiting maximum time. If not successful then transferList for someTimeout or thread is interrupted. 
...
@Override
public boolean tryTransfer(E event, long timeout, TimeUnit unit) throws InterruptedException {
   lock.lock();
   if(counter.get() != 0){
      put(event);
      lock.unlock();
      return true;
   } else {
       transferList.add(event);
       long someTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
       lock.unlock();
       event.wait(someTimeout);
       lock.lock();
       if(transferList.contains(event)){
           transferList.remove(event);
           lock.unlock();
           return false;
       }else{
           lock.unlock();
           return true;
       }
    }
}
...
5. transfer(E e) throws InterruptedException method transfer successfully when Consumer is waiting for MathEvent if not it it put MathEvent into the transferList and waits until MathEvent is taken out. 
...
@Override
public void transfer(E event) throws InterruptedException{
   lock.lock();
   if(counter.get() != 0){
       put(event);
       lock.unlock();
   } else {
       transferList.add(event);
       lock.unlock();
       synchronized (event){
          event.wait();
       }
   }
}
...
  
and some more as you can find in the example source code but they are not critically important to make this example work (hasWaitingConsumergetWaitingConsumerCount). 
   As DataStructure is implemented we need to implement very important MathEvent method compareTo(MathEvent event) as MathEvent implements Comparable<MathEvent> interface.  
 In the example definition data structure orders elements based on their priority.  
public class MathEvent implements Comparable<MathEvent>{
    ...
    @Override
    public int compareTo(MathEvent event) {
        if (this.priority > event.getPriority()) {
            return -1;
        } else if (this.priority < event.getPriority()) {
            return 1;
        } else {
            return 0;
        }
    }
    ...
}
   Having all those steps done we can run successfully MathPrioProdConMain class. and we get following result (of course based on our setup of CONSUMERS and PRODUCERS with MathEvents )
...
MathConsumer-Thread-6 : MathProducer-Thread-4 priority: 71
MathConsumer-Thread-6 : MathProducer-Thread-2 priority: 71
MathPrioProdConMain: Buffer waiting consumers = 0
MathConsumer-Thread-6 : MathProducer-Thread-0 priority: 71
MathConsumer-Thread-6 : Transfer Math Calculation Event  priority: 1
MathConsumer-Thread-6 : MathProducer-Thread-0 priority: 70
MathConsumer-Thread-6 : MathProducer-Thread-0 priority: 70
...

   More comments can be found inside the source code that is available over my github account (example source code)
Enjoy Multi-threading !