Producer Consumer Problem is well-known example of multi-process synchronization problem. It is described as synchronization problem over a fixed size data buffer implemented as queue being modified by two or more different processes referred as producers and consumers. For a data buffer, we can have multiple number of producers and consumers. Producers task is to keep generating data and place it into buffer while Consumers task is to keep consuming data from buffer. Problem is to ensure that Producers do not add data into buffer if its full and Consumer do not consumer data if buffer is empty.
For resolving above problem, Producer and Consumer should behave as below.
Producer
1. Check if Buffer is full or not. If full, then wait() for buffer items to get consumed.
2. Generate data and put it into Buffer.
3. Notify Consumer that Data has been placed into Buffer.
4. Repeat step 1-3
Consumer
1. Check if Buffer has items. If empty, then wait() for buffer to get filled.
2. Consume data from Buffer.
3. Notify Producer that Data has been consumed from Buffer
4. Repeat step 1-3
We will implement Producer Consumer Problem Java using two approaches.
1. Synchronized Instance Methods (using wait and notify/notifyAll method in Java)
A synchronized instance method in Java is synchronized on the instance (object) owning the method. Thus, each instance has its synchronized methods synchronized on a different object: the owning instance. Only one thread can execute inside a synchronized instance method.
Producer Consumer Problem Java Source Code Using Synchronized Instance Methods
import java.util.Random;
public class ProducerConsumerProblem {
public static void main(String[] args) {
ProducerConsumerProblem pcp = new ProducerConsumerProblem();
DataBuffer buffer = pcp.new DataBuffer(2);
Producer p1 = pcp.new Producer(buffer, "Producer 1");
Producer p2 = pcp.new Producer(buffer, "Producer 2");
Producer p3 = pcp.new Producer(buffer, "Producer 3");
Producer p4 = pcp.new Producer(buffer, "Producer 4");
Producer p5 = pcp.new Producer(buffer, "Producer 5");
Consumer c1 = pcp.new Consumer(buffer, "Consumer 1");
Consumer c2 = pcp.new Consumer(buffer, "Consumer 2");
Consumer c3 = pcp.new Consumer(buffer, "Consumer 3");
Consumer c4 = pcp.new Consumer(buffer, "Consumer 4");
Consumer c5 = pcp.new Consumer(buffer, "Consumer 5");
p1.start();p2.start();p3.start();p4.start();p5.start();
c1.start();c2.start();c3.start();c4.start();c5.start();
}
class DataBuffer{
//Circular Queue implementation is used
private int dataBuffer[];
private int capacity;
private int front = 0;
private int currentSize = 0;
public DataBuffer(int capacity) {
this.capacity = capacity;
dataBuffer = new int[capacity];
}
private boolean isEmpty(){
return (currentSize == 0);
}
private boolean isFull(){
return (currentSize == capacity);
}
public synchronized void produce(int data, String producerName){
while(isFull()){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
dataBuffer[(front+currentSize)%capacity] = data;
System.out.println("Data "+ data +" produced by "+producerName);
currentSize++;
notifyAll();
}
public synchronized int consume(String consumerName){
while(isEmpty()){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int data = dataBuffer[front];
front = (front+1)%capacity;
currentSize--;
notifyAll();
System.out.println("Data "+ data+" consumed by "+consumerName);
return data;
}
}
class Producer extends Thread{
private DataBuffer buffer;
private Random random;
public Producer(DataBuffer buffer, String threadName){
this.buffer = buffer;
setName(threadName);
random = new Random();
}
@Override
public void run() {
while(true){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
int data = random.nextInt(100);
buffer.produce(data, getName());
}
}
}
class Consumer extends Thread{
private DataBuffer buffer;
public Consumer(DataBuffer buffer, String threadName){
this.buffer = buffer;
setName(threadName);
}
@Override
public void run() {
while(true){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
buffer.consume(getName());
}
}
}
}
2. BlockingQueue (java.util.concurrent.BlockingQueue)
BlockingQueues are thread-safe. They wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element. For this purpose we will use put() to add data to buffer and take() to remove data from buffer.
Producer Consumer Problem Java Source Code Using BlockingQueue
import
java.util.Random;
import
java.util.concurrent.ArrayBlockingQueue;
import
java.util.concurrent.BlockingQueue;
public
class ProducerConsumerProblem {
public static void main(String[] args)
{
ProducerConsumerProblem pcp = new
ProducerConsumerProblem();
ArrayBlockingQueue buffer
= new ArrayBlockingQueue<>(2);
Producer p1 = pcp.new Producer(buffer,
"Producer 1");
Producer p2 = pcp.new Producer(buffer,
"Producer 2");
Producer p3 = pcp.new Producer(buffer,
"Producer 3");
Producer p4 = pcp.new Producer(buffer,
"Producer 4");
Producer p5 = pcp.new Producer(buffer,
"Producer 5");
Consumer c1 = pcp.new Consumer(buffer,
"Consumer 1");
Consumer c2 = pcp.new Consumer(buffer,
"Consumer 2");
Consumer c3 = pcp.new Consumer(buffer,
"Consumer 3");
Consumer c4 = pcp.new Consumer(buffer,
"Consumer 4");
Consumer c5 = pcp.new Consumer(buffer,
"Consumer 5");
p1.start();p2.start();p3.start();p4.start();p5.start();
c1.start();c2.start();c3.start();c4.start();c5.start();
}
class Producer extends Thread{
private BlockingQueue
buffer;
private Random random;
public
Producer(BlockingQueue buffer, String threadName){
this.buffer = buffer;
setName(threadName);
random = new Random();
}
@Override
public void run() {
while(true){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
int data = random.nextInt(100);
try {
//Inserts the specified element
into this queue
//waits if necessary for space
to become available.
buffer.put(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer extends Thread{
private BlockingQueue
buffer;
public
Consumer(BlockingQueue buffer, String threadName){
this.buffer = buffer;
setName(threadName);
}
@Override
public void run() {
while(true){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//take() Retrieves and removes
the head of this queue
//waits if necessary until an
element becomes available.
buffer.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
No comments:
Post a Comment