Java Concurrency with Producer-Consumer Problem

Java May 24, 2020

Producer-Consumer problem is one of the classical synchronisation problems. Generally this problem is not applied to real world scenarios but it is a good beginner challenge for understanding concurrency and synchronisation patterns.

The name of the problem is clear enough to explain the goal. There is a data structure to be filled or emptied by different threads that are type of either producer or consumer. Producers simply adds data to the data structure and consumers removes the data in order to process it.

Event driven programs are a good example for this scenario. Events are the objects that are sent to the event handlers (consumers). When they occur publishers (producers) creates event object and push them to the queue to be processed by event handlers. We should take into consideration the following constraints:

  • While items are being added, the state of the queue should be always consistent. So producer and consumer objects should not cause any race condition.
  • When then queue is empty, consumer objects should wait until a new item is added.
  • When the limit of the queue, say N, reached the producer should wait until the consumer removes some item from the queue.

As a queue data structure we use Deque intentionally as it is not thread safe, so manual handling of concurrency is needed. So we need to wrap it with Mutex to keep the consistency.

Deque: A linear collection that supports element insertion and removal at both ends. The name deque is short for "double ended queue" and is usually pronounced "deck".  

We can choose ArrayDeque or LinkedList as the implementing class of the Deque. As it is more convenient to operate at the beginning and end of the queue LinkedList is a proper choice.

Overview

In order to handle the synchronisation between the threads and keep the queue consistent, we will have the following initiators:

buffer = new Deque()						 // data structure
mutex = new ReentrantLock() 				 // mutex to protect queue
consumerSemaphore = new Semaphore(0)		 // semaphore to make consumer wait when queue is empty
producerSemaphore = new Semaphore(QUEUE_SIZE) // semaphore to make producer wait when queue is full
Init

Basic producing operation should happen as follows

item = waitForNewItem()
producerSemaphore.acquire()		// wait if queue reached the capacity
mutex.lock()					// enter the critical section
buffer.addFirst(item)			// add item to buffer
mutex.unlock()					// relase the lock of critical section
consumerSemaphore.release()		// signal the consumer that new item is added
Basic producer

Basic consuming operation should happen as follows:

consumerSemaphore.acquire()		// wait if there is no item added
mutex.lock()					// enter the critical region
lastItem = buffer.removeLast()	// get the next item and remove from queue
mutex.unlock()					// release the lock of critical section
producerSemaphore.release()		// notify the producer that one item is removed from queue
process(item)					// process the item
Basic Consumer

Implementation

First of all we create a component to log the actions with tread name from the consumers and producers.

public class Action {
    static void producerLog(String message) {
        System.out.printf("Thread: %s. PRODUCER: %s \n", Thread.currentThread().getName(), message);
    }

    static void consumerLog(String message) {
        System.out.printf("Thread: %s. CONSUMER: %s \n", Thread.currentThread().getName(), message);
    }
}
Action.java

After that we can implement the the queue. Queue has 2 methods:

  • void addItem(Integer i)
  • Integer getNextItem()

Concurrency operations are done in the queue to not expose the details to consumers and producers.

public class Queue {

    private final Deque <Integer> buffer;
    private final ReentrantLock mutex;
    private final Semaphore consumerSemaphore;
    private final Semaphore producerSemaphore;

    public Queue(int size) {
        this.buffer = new LinkedList <>();
        this.mutex = new ReentrantLock();
        this.consumerSemaphore = new Semaphore(0);
        this.producerSemaphore = new Semaphore(size);
    }

    void addItem(Integer i) throws InterruptedException {

        // wait to get permit to add new item. If queue reaches the limit it will be blocked
        Action.producerLog("Wait to get permit");
        producerSemaphore.acquire();

        // exclusive lock for queue to add item
        mutex.lock();
        Action.producerLog("Lock. ITEM: " + i);
        Action.producerLog("Add to buffer. ITEM: " + i);

        // add item
        buffer.addFirst(i);

        Action.producerLog("Unlock. ITEM: " + i);
        // release the lock
        mutex.unlock();

        Action.producerLog("Signal the consumer. ITEM: " + i);
        //  signal the consumer that new item added and can be retrieved
        consumerSemaphore.release();
    }

    Integer getNextItem() throws InterruptedException {
        // wait to get permit. If queue is empty it will be blocked
        Action.consumerLog("Wait to get permit.");
        consumerSemaphore.acquire();

        // exclusive lock for queue to retrieve item
        mutex.lock();
        Action.consumerLog("Lock.");

        // get the item and remove it from queue
        Integer lastItem = buffer.removeLast();

        // release the lock
        Action.consumerLog("Unlock.");
        mutex.unlock();

        producerSemaphore.release();

        return lastItem;
    }

}
Queue.java

As the Queue handles the synchronisation and consistency for adding and removing operations, the producer and consumer classes becomes simple. They only have a method to implement their job and method returns Callable object.

Producer implementation:

class Producer {

    private final Queue buffer;

    Producer(Queue buffer) {
        this.buffer = buffer;
    }


    Callable<Boolean> startProducing(int numberOfTasks) {
        return () -> {
            for (int i = 1; i < numberOfTasks; i++) {
                buffer.addItem(i);
            }
            Action.producerLog("Producer exits");
            return true;
        };

    }

}
Producer.java

Consumer implementation:

class Consumer {

    private final Queue buffer;

    Consumer(Queue buffer) {
        this.buffer = buffer;
    }

    Callable<Boolean> startConsuming(int numberOfTasks) {
        return () -> {
            for(int i=1; i < numberOfTasks; i++) {
                Integer nextItem = buffer.getNextItem();
                processItem(nextItem);
            }
            Action.consumerLog("Consumer exits.");
            return true;
        };

    }

    private void processItem(Integer item) {
        // process the item
    }

}
Consumer.java

The main method initiates the queue, a consumer and a producer. It creates ForkJoinPool to run multiple threads by Executors , submits the producer's startProducing and consumer's startConsuming methods to be run parallel.

public class Main {

    private static int SIZE = 4;
    private static int NUMBER_OF_TASKS = 10;

    public static void main(String[] args) {

        Queue buffer = new Queue(SIZE);
        Consumer consumer = new Consumer(buffer);
        Producer producer = new Producer(buffer);

        ExecutorService executorService = Executors.newWorkStealingPool();

        List<Callable<Boolean>> taskList = new ArrayList<>();
        taskList.add(consumer.startConsuming(NUMBER_OF_TASKS));
        taskList.add(producer.startProducing(NUMBER_OF_TASKS));


        try {
            List <Future <Boolean>> futures = executorService.invokeAll(taskList);

            // wait for all threads to finish
            for (Future <Boolean> future : futures) {
                future.get();
            }

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdownNow();
            System.out.println("Main exits.");
        }
    }
}
Main.java

If we run the program,  we can observe the logs as no threads updates the data in the queue at the same time. Consumers waits when there is no item in the queue until one gets produced, and producers waits if the queue is full until some get consumed

As an example, if we process 4 items we get the following output:

Thread: ForkJoinPool-1-worker-19. CONSUMER: Wait to get permit.
Thread: ForkJoinPool-1-worker-5. PRODUCER: Wait to get permit
Thread: ForkJoinPool-1-worker-5. PRODUCER: Lock. ITEM: 1
Thread: ForkJoinPool-1-worker-5. PRODUCER: Add to buffer. ITEM: 1
Thread: ForkJoinPool-1-worker-5. PRODUCER: Unlock. ITEM: 1
Thread: ForkJoinPool-1-worker-5. PRODUCER: Signal the consumer. ITEM: 1
Thread: ForkJoinPool-1-worker-5. PRODUCER: Wait to get permit
Thread: ForkJoinPool-1-worker-19. CONSUMER: Lock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Unlock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Wait to get permit.
Thread: ForkJoinPool-1-worker-5. PRODUCER: Lock. ITEM: 2
Thread: ForkJoinPool-1-worker-5. PRODUCER: Add to buffer. ITEM: 2
Thread: ForkJoinPool-1-worker-5. PRODUCER: Unlock. ITEM: 2
Thread: ForkJoinPool-1-worker-5. PRODUCER: Signal the consumer. ITEM: 2
Thread: ForkJoinPool-1-worker-5. PRODUCER: Wait to get permit
Thread: ForkJoinPool-1-worker-19. CONSUMER: Lock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Unlock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Wait to get permit.
Thread: ForkJoinPool-1-worker-5. PRODUCER: Lock. ITEM: 3
Thread: ForkJoinPool-1-worker-5. PRODUCER: Add to buffer. ITEM: 3
Thread: ForkJoinPool-1-worker-5. PRODUCER: Unlock. ITEM: 3
Thread: ForkJoinPool-1-worker-5. PRODUCER: Signal the consumer. ITEM: 3
Thread: ForkJoinPool-1-worker-5. PRODUCER: Wait to get permit
Thread: ForkJoinPool-1-worker-19. CONSUMER: Lock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Unlock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Wait to get permit.
Thread: ForkJoinPool-1-worker-5. PRODUCER: Lock. ITEM: 4
Thread: ForkJoinPool-1-worker-5. PRODUCER: Add to buffer. ITEM: 4
Thread: ForkJoinPool-1-worker-5. PRODUCER: Unlock. ITEM: 4
Thread: ForkJoinPool-1-worker-5. PRODUCER: Signal the consumer. ITEM: 4
Thread: ForkJoinPool-1-worker-5. PRODUCER: Producer exits
Thread: ForkJoinPool-1-worker-19. CONSUMER: Lock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Unlock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Consumer exits.
Main exits.

Alternatively we can introduce new consumers or producers easily by adding them to taskList and we can observe the results. For example having 2 producers and 1 consumer gives us the result so that we can observe producers finish their job quickly and exit, but consumer tries to finish all the items from queue.

Source Code: Link

Tags