The Producer-Consumer problem is one of the classical ones to solve. The Producer and Consumer share buffer where the Producer put some entity into and consumer uses such product and process it.
Following example shows capacity limited SharedBuffer with the Conditions (interface java.util.concurrent.locks.Condition since java5) under which can be something inserted and read (has buffer free space, has buffer something to read).
public class SharedBuffer { ... private ReentrantLock lock; private Condition lines; private Condition space; ... public SharedBuffer(int maxSize){ ...We do not use synchronised LinkedList for data to be exchanged between Producer and Customer. Also we use only one Producer to store some product into the SharedBuffer (in example: Line of Characters) and more Customers to process such products (represented by CUSTOMER_NUMBER in the code)
The ProdConOneMain class is responsible for the data preparation for Producer who is responsible for adding product into the SharedBuffer under certain condition (is there free space).
... public static void main(String... args){ logger.debug("Start ProdCon One Example"); CharacterLines characterLines = new CharacterLines(MAX_LINES, MAX_LINE_CHARACTERS); SharedBuffer buffer = new SharedBuffer(BUFFER_SIZE); Producer producer = new Producer(characterLines, buffer); Thread producerThread = new Thread(producer, "Producer "); Thread[ ] consumerThreads = new Thread [ CONSUMER_NUMBER ]; producerThread.start(); for(int i=0; i < CONSUMER_NUMBER; i++){ consumerThreads[i].start(); } ...When condition for insertLine is not satisfied it calls await() method on Condition space to let producer wait until there space is there. Producer thread will be woken up when Consumer treads call signalAll() on this condition. When Producer is able to store new line he calls signalAll() on the lines condition, because it may happen that Consumers are waiting for new lines. SharedBuffer implementation uses ReentrantLock which is the implementation of the Lock interface and provides re-entrant mutual exclusion Lock with the same basic behaviour and semantic as the implicit monitor lock access used by synchronised methods. ReentrantLock provides more controls (ability to lock interruptibly, timeout during waiting for lock, flexibility... ), as Thread doesn't need to block infinitely, and can specify fairness property (not in this example).
... public void insertLine(String line){ lock.lock(); try{ while(buffer.size() == maxSize){ space.await(); } buffer.offer(line); logger.debug(Thread.currentThread() .getName() + ": BUFFER Inserted Line: " + buffer.size()); lines.signalAll(); } catch (InterruptedException e){ logger.error(e.toString()); } finally { lock.unlock(); } } ...As Consumer is accessing the SharedBuffer he needs to communicates over it with Producer. Consumer task is to take the product (line of characters) form the SharedBuffer and process it. It does make sense to access when there is something to read and also by Consumer behaviour we make the space for producer and notify him by calling SignalAll() method on the space Condition.
public String getLine(){ String result = null; lock.lock(); try{ while((buffer.size() == 0) && (hasPendingLines())){ lines.await(); } if(hasPendingLines()){ result = buffer.poll(); logger.debug(Thread.currentThread().getName() + ": BUFFER Line to Read: " + buffer.size()); space.signalAll(); } }catch (InterruptedException e){ logger.error(e.toString()); } finally { lock.unlock(); } return result; }By having defined SharedBuffer with methods relevant for consumer and producer we can create both of them by implementing Runnable interface:
public class Producer implements Runnable{ private CharacterLines characterLines; private SharedBuffer buffer; public Producer(CharacterLines characterLines, SharedBuffer buffer) { this.characterLines = characterLines; this.buffer = buffer; } @Override public void run() { buffer.setPendingLines(true); while(characterLines.hasMoreLines()){ buffer.insertLine(characterLines.getNextLine()); } buffer.setPendingLines(false); } } ... ... ... public class Consumer implements Runnable{ private SharedBuffer buffer; private Random random; public Consumer(SharedBuffer buffer) { this.buffer = buffer; this.random = new Random(); } ... private void processLine(String line){ try{ ... } catch (InterruptedException e){ e.printStackTrace(); } } }Currently we have ready actors in our example and we can run it (viz. ProdConOneMain). The example output should look like following.
Start ProdCon One Example Waiting Lines: 10 Producer : BUFFER Inserted Line: 1 Waiting Lines: 9 Producer : BUFFER Inserted Line: 2 Waiting Lines: 8 Consumer 0: BUFFER Line to Read: 1 Consumer 1: BUFFER Line to Read: 0 Producer : BUFFER Inserted Line: 1 ...
You can check the code over my githug.
This example should be the beginning of the bigger series.
Enjoy Multi-Threading
No comments:
Post a Comment