Java Concurrency with Producer-Consumer Problem
Producer-Consumer problem is one of the classical synchronisation problems. Generally this problem is not applied to real world scenarios but it is a good beginner challenge for understanding concurrency and synchronisation patterns.
The name of the problem is clear enough to explain the goal. There is a data structure to be filled or emptied by different threads that are type of either producer or consumer. Producers simply adds data to the data structure and consumers removes the data in order to process it.
Event driven programs are a good example for this scenario. Events are the objects that are sent to the event handlers (consumers). When they occur publishers (producers) creates event object and push them to the queue to be processed by event handlers. We should take into consideration the following constraints:
- While items are being added, the state of the queue should be always consistent. So producer and consumer objects should not cause any race condition.
- When then queue is empty, consumer objects should wait until a new item is added.
- When the limit of the queue, say N, reached the producer should wait until the consumer removes some item from the queue.
As a queue data structure we use Deque
intentionally as it is not thread safe, so manual handling of concurrency is needed. So we need to wrap it with Mutex
to keep the consistency.
Deque: A linear collection that supports element insertion and removal at both ends. The name deque is short for "double ended queue" and is usually pronounced "deck".
We can choose ArrayDeque
or LinkedList
as the implementing class of the Deque. As it is more convenient to operate at the beginning and end of the queue LinkedList is a proper choice.
Overview
In order to handle the synchronisation between the threads and keep the queue consistent, we will have the following initiators:
Basic producing operation should happen as follows
Basic consuming operation should happen as follows:
Implementation
First of all we create a component to log the actions with tread name from the consumers and producers.
After that we can implement the the queue. Queue has 2 methods:
- void addItem(Integer i)
- Integer getNextItem()
Concurrency operations are done in the queue to not expose the details to consumers and producers.
As the Queue handles the synchronisation and consistency for adding and removing operations, the producer and consumer classes becomes simple. They only have a method to implement their job and method returns Callable
object.
Producer implementation:
Consumer implementation:
The main method initiates the queue, a consumer and a producer. It creates ForkJoinPool
to run multiple threads by Executors
, submits the producer's startProducing
and consumer's startConsuming
methods to be run parallel.
If we run the program, we can observe the logs as no threads updates the data in the queue at the same time. Consumers waits when there is no item in the queue until one gets produced, and producers waits if the queue is full until some get consumed
As an example, if we process 4 items we get the following output:
Thread: ForkJoinPool-1-worker-19. CONSUMER: Wait to get permit.
Thread: ForkJoinPool-1-worker-5. PRODUCER: Wait to get permit
Thread: ForkJoinPool-1-worker-5. PRODUCER: Lock. ITEM: 1
Thread: ForkJoinPool-1-worker-5. PRODUCER: Add to buffer. ITEM: 1
Thread: ForkJoinPool-1-worker-5. PRODUCER: Unlock. ITEM: 1
Thread: ForkJoinPool-1-worker-5. PRODUCER: Signal the consumer. ITEM: 1
Thread: ForkJoinPool-1-worker-5. PRODUCER: Wait to get permit
Thread: ForkJoinPool-1-worker-19. CONSUMER: Lock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Unlock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Wait to get permit.
Thread: ForkJoinPool-1-worker-5. PRODUCER: Lock. ITEM: 2
Thread: ForkJoinPool-1-worker-5. PRODUCER: Add to buffer. ITEM: 2
Thread: ForkJoinPool-1-worker-5. PRODUCER: Unlock. ITEM: 2
Thread: ForkJoinPool-1-worker-5. PRODUCER: Signal the consumer. ITEM: 2
Thread: ForkJoinPool-1-worker-5. PRODUCER: Wait to get permit
Thread: ForkJoinPool-1-worker-19. CONSUMER: Lock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Unlock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Wait to get permit.
Thread: ForkJoinPool-1-worker-5. PRODUCER: Lock. ITEM: 3
Thread: ForkJoinPool-1-worker-5. PRODUCER: Add to buffer. ITEM: 3
Thread: ForkJoinPool-1-worker-5. PRODUCER: Unlock. ITEM: 3
Thread: ForkJoinPool-1-worker-5. PRODUCER: Signal the consumer. ITEM: 3
Thread: ForkJoinPool-1-worker-5. PRODUCER: Wait to get permit
Thread: ForkJoinPool-1-worker-19. CONSUMER: Lock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Unlock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Wait to get permit.
Thread: ForkJoinPool-1-worker-5. PRODUCER: Lock. ITEM: 4
Thread: ForkJoinPool-1-worker-5. PRODUCER: Add to buffer. ITEM: 4
Thread: ForkJoinPool-1-worker-5. PRODUCER: Unlock. ITEM: 4
Thread: ForkJoinPool-1-worker-5. PRODUCER: Signal the consumer. ITEM: 4
Thread: ForkJoinPool-1-worker-5. PRODUCER: Producer exits
Thread: ForkJoinPool-1-worker-19. CONSUMER: Lock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Unlock.
Thread: ForkJoinPool-1-worker-19. CONSUMER: Consumer exits.
Main exits.
Alternatively we can introduce new consumers or producers easily by adding them to taskList
and we can observe the results. For example having 2 producers and 1 consumer gives us the result so that we can observe producers finish their job quickly and exit, but consumer tries to finish all the items from queue.
Source Code: Link