Process Synchronization
Objective
- Introduce the concepts of critical sections and race conditions.
- Understand the need for synchronization to ensure shared data consistency.
- Understand the concept of a semaphore as a synchronization and signaling tool.
- Use Java synchronization techniques: monitors, locks, and semaphores.
- Solve the Reader-Writer problem using monitors and locks.
- Solve the Producer-Consumer problem using monitors and semaphores.
Summary
notify()
synchronized
wait()
Condition
Lock
ReentrantLock
Semaphore
Condition.await()
Condition.newCondition()
Condition.signal()
Condition.signalAll()
ReentrantLock.lock()
ReentrantLock.unlock()
Semaphore.acquire()
Semaphore.release()
Thread Synchronization
It is the operation of coordinating the access to shared data by multiple concurrent threads. This is done through the concept of mutual exclusion where each thread accessing the shared data excludes all other threads from doing so simultaneously.
Critical Section
Considering a system with n processes, each of which has a segment of code, called a critical section, to execute on shared data. Then, the important feature of this system is that no two processes are executing in their critical sections at the same time.
The critical-section problem is to design a protocol that processes can use to request permission for entering its critical section.
Race Condition
It is the situation where several processes access and manipulates the same shared data concurrently, and the outcome of the execution depends on the particular order in which the access takes place.
To guard against the race condition, we need to synchronize processes to ensure that only one process at a time can access and manipulate the shared data.
Thread Synchronization
It is the operation of coordinating the access to shared data by multiple concurrent threads. This is done through the concept of mutual exclusion where each thread accessing the shared data excludes all other threads from doing so simultaneously.
“There are only two hard problems in distributed systems: 2. Exactly-once delivery 1. Guaranteed order of messages 2. Exactly-once delivery’ —Mathias Verraes
Exercise 1
Examine and run SyncProblem
to explore a simple synchronization problem:
public class SyncProblem {
static double a = 10;
static double b;
public static void main(String[] args) {
Runnable r1 = () -> {
if (a == 10) {
try {
Thread.sleep(0);
b = a / 2.0;
System.out.println(Thread.currentThread().getName() + ": " + b);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable r2 = () -> { a = 12; };
Thread threadA = new Thread(r1, "Thread A");
Thread threadB = new Thread(r2, "Thread B");
threadA.start();
threadB.start();
}
}
Threads can be synchronized using different mechanisms such as:
- Monitors, a
synchronized
method/block:wait()
/notify()
. - Reentrant lock:
ReentrantLock.lock()
/ReentrantLock.unlock()
. - Semaphore:
Semaphore.acquire()
/Semaphore.release()
.
Monitors
The monitor allows one thread at a time to execute inside a synchronized
statement on the shared object. This is accomplished by acquiring a lock on the shared object whenever the thread enters the synchronized
statement.
When a method is declared to be synchronized
, calling the method requires owing the lock for the object. If the lock is owned by another thread, the calling thread will be blocked and placed in the wait set which represents the set of threads waiting for the lock to become available.
The general usage of the keyword synchronized
is as follows:
synchronized (object) {
while (condition) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// critical section
notify();
}
When a thread calls the wait()
method, the following happens:
- The thread releases the lock for the object.
- The state of the thread is set to
blocked
. - The thread is placed in the
wait
set for the object.
When a thread calls the notify()
method, the following happens:
- Picks an arbitrary thread
T
from the list of threads in thewait
set. - Moves
T
from thewait
set to theentry
set. - Sets the state of
T
fromblocked
torunnable
.
Exercise 2
Examine and run Sync
to explore synchronized methods:
import java.util.concurrent.ThreadLocalRandom;
public class Sync {
static int counter = 1;
public static void main(String[] args) {
Runnable r = () -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("ID value: %d (%s)%n", getID(),
Thread.currentThread().getName());
};
for (Integer k = 0; k < 10000; k += 1) {
new Thread(r, k.toString()).start();
}
}
public static synchronized int getID() { return counter++; }
}
Exercise 3
Examine Countdown
and run CountdownDriver
to explore block level synchronization:
public class Countdown {
public void printCount() {
try {
for (int i = 10; i > 0; i--) {
System.out.println(" --- " + i);
}
System.out.println("Blast off!");
} catch (Exception e) {
System.out.println("Thread interrupted.");
}
}
}
public class CountdownDriver {
public static void main(String[] args) {
Countdown CD = new Countdown();
Runnable r = () -> {
synchronized (CD) {
CD.printCount();
}
};
Thread one = new Thread(r, "one");
Thread two = new Thread(r, "two");
one.start();
two.start();
}
}
Exercise 4
Examine CustomThread
and run WaitNotify
to explore waiting and notifying:
class CustomThread extends Thread {
int total;
@Override
public void run() {
synchronized (this) {
for (int i = 0; i < 10; i++) {
total += i;
}
this.notify();
}
}
}
public class WaitNotify {
public static void main(String[] args) {
CustomThread b = new CustomThread();
b.start();
synchronized (b) {
try {
System.out.println("Waiting for the second thread to complete...");
b.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Total is: " + b.total);
}
}
}
Reentrant Locks
A reentrant lock, in Java, ReentractLock
, implements the Lock
interface and its general usage is as follows:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Lock lock = new ReentrantLock();
try {
lock.lock();
// critical section
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
Calling the method lock()
should end with unlock()
to ensure that the lock is released and avoid all kinds of deadlocks. By enclosing unlock()
in a finally
clause, we ensure that the lock is released once the critical section completes, even if an exception occurs within the try
block.
Exercise 5
Examine ReentrantLockWrapper
and run ThreadSystem
to explore locking and unlocking:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockWrapper {
Lock lock = new ReentrantLock();
public void print(int count) {
lock.lock();
try {
for (int i = 1; i <= 10; i++) {
System.out.println(count * i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ThreadSystem {
public static void main(String[] args) {
ReentrantLockWrapper r = new ReentrantLockWrapper();
A a = new A(r);
B b = new B(r);
Thread t1 = new Thread(a);
Thread t2 = new Thread(b);
t1.start();
t2.start();
}
}
You are required to add two additional classes, A
and B
, to demonstrate locking and unlocking.
Threads can also wait on a condition to become true, after having acquired a lock, before entering the critical section:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
while (!pass) { // pass is a boolean expression
condition.await();
}
// pass is true
// critical section
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
Note that await()
puts the thread to sleep and releases the lock and signal()
wakes up a thread to acquire the lock and check the condition.
When multiple threads manipulate a shared object using locks, ensure that if one thread calls the condition method await()
, another thread will call the condition method signal()
, that is, they must be called in pairs, every await()
has a matching signal()
, otherwise a deadlock could occur. It is a good practice to always invoke the method await()
in a loop that tests for an appropriate condition so that the thread checks for the condition when it wakes up.
If multiple threads are waiting on the condition, after having called await()
, then the one with the longest waiting-time will acquire the lock when it is available to ensure fairness.
Exercise 6
Examine Customer
and run BarberShop
to explore locking, waiting, signaling, and unlocking:
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
class Customer extends Thread {
@Override
public void run() {
BarberShop.lock.lock();
try {
while (BarberShop.customers >=
BarberShop.MAX_CHAIRS) { // wait for a free chair
BarberShop.condition.await();
}
BarberShop.customers += 1; // acquire a chair
BarberShop.condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
BarberShop.lock.unlock();
}
System.out.printf("%s Haircut started...%n",
Thread.currentThread().getName());
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(5000, 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
BarberShop.lock.lock();
BarberShop.customers -= 1; // release a chair
System.out.printf("%s Haircut complete!%n",
Thread.currentThread().getName());
BarberShop.condition.signal();
BarberShop.lock.unlock();
}
}
public class BarberShop {
static final int MAX_CHAIRS = 4;
static int customers = 0;
static Lock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) {
IntStream.range(0, 10).forEach(k -> { new Customer().start(); });
}
}
Exercise 7
Examine Worker
and run SharedStack
to explore locking, waiting, signaling, and unlocking:
import java.util.Stack;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
class Worker extends Thread {
private Lock lock;
private Condition condition;
private Stack<Integer> stack;
public Worker(Lock lock, Condition condition, Stack<Integer> stack) {
this.lock = lock;
this.condition = condition;
this.stack = stack;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
while (stack.size() == 0) {
condition.await();
}
Thread.sleep(ThreadLocalRandom.current().nextInt(0, 100));
System.out.println(this.getName() + ": " + stack.pop());
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public class SharedStack {
public static void main(String[] args) {
Stack<Integer> stack = new Stack<>();
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
IntStream.range(0, 10).forEach(
k -> { new Worker(lock, condition, stack).start(); });
do {
lock.lock();
try {
while (stack.size() != 0) {
condition.await();
}
IntStream.range(0, 5).forEach(k -> stack.push(k));
condition.signalAll(); // signal to all threads waiting
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} while (true);
}
}
Semaphores
A semaphore is a protected variable that is non-negative and shared between threads. It is initialized to a value representing the number of resources available and constitutes a classic method of controlling access by multiple threads to a common resource. A trivial semaphore is a plain variable that is changed, for example, incremented or decremented, or toggled, depending on programmer-defined conditions.
A semaphore S
, apart from initialization, is accessed only through two standard operations: acquire()
and release()
. Each thread that wishes to use a resource performs an acquire()
operation on the semaphore, thereby decrementing the count. On the other hand, when a thread releases a resource, it performs a release()
operation on the semaphore, thereby incrementing the count.
Operating systems often distinguish between counting and binary semaphores (mutexes which are mutually exclusion locks). The initialized value of a counting semaphore can range over unrestricted domain, while the initialized value of a binary semaphore can range only between 0
and 1
.
Binary semaphores are usually used to control access to the critical section for a process or thread, while counting semaphores are usually used to control access to a given resource consisting of finite number of instances or for signaling.
Note: When the count for the semaphore goes to 0
, all the resources are being used. Thus, a thread that wishes to use a resource will block until the count becomes positive, that is, a resource becomes available.
Semaphores are used to grant access to shared resources to desired threads concurrently. They ensure that the shared resources or code is accessed by only the desired threads at the same time.
A semaphore maintains a count, also called permit, which will be the maximum number of threads the semaphore will allow access to a shared resource. Java provides a counting semaphore and its general usage is as follows:
import java.util.concurrent.Semaphore;
Semaphore sem = new Semaphore(1);
try {
sem.acquire();
// critical section
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sem.release();
}
Exercise 8
A charger hub with 4 ports can be used by four people to charge their devices simultaneously. 30 people are waiting to get their devices charged. Use semaphores to enforce synchronization. This is the same as the BarberShop
problem from Exercise 6.
Reader–Writer
It is a situation where the writer’s thread of an application updates a shared data, while the reader’s thread reads from that shared data. The problem may occur if a reader and a writer access the shared data simultaneously leading to invalid results as the reads and writes are not in order (synchronized).
Exercise 9
Examine Buffer
, UnsynchronizedBuffer
, Reader
, Writer
and run SharedBufferDriver
to observe the reader/writer problem:
public interface Buffer {
public void set(int value); // place int value into Buffer
public int get(); // return int value from Buffer
}
public class UnsynchronizedBuffer implements Buffer {
private int buffer = -1; // shared by reader and writer threads
// place value into buffer
@Override
public void set(int value) {
System.out.printf("%-15s%10s", "Writer writes", value);
buffer = value;
}
// return value from buffer
@Override
public int get() {
System.out.printf("%-15s%10s", "Reader reads", buffer);
return buffer;
}
}
import java.util.concurrent.ThreadLocalRandom;
public class Reader extends Thread {
private Buffer sharedLocation; // reference to the shared object
public Reader(Buffer shared) { this.sharedLocation = shared; }
// read sharedLocation's value multiple times and add the values
@Override
public void run() {
int total = 0;
for (int count = 1; count <= 10; count++) {
// sleep 0 to 3 seconds, then read value from buffer and add to total
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(3000)); // sleep thread
total += this.sharedLocation.get(); // add value to total
System.out.printf("%10s%10s%n", "", total);
}
// if the sleeping thread is interrupted, print the stack trace
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("%n%s: %d. %s%n%n", "Reader is done", total,
"Terminating...");
}
}
import java.util.concurrent.ThreadLocalRandom;
public class Writer extends Thread {
private Buffer sharedLocation; // reference to the shared object
public Writer(Buffer shared) { this.sharedLocation = shared; }
// store values from 1 to 10 in sharedLocation
@Override
public void run() {
int total = 0;
for (int count = 1; count <= 10; count++) {
// sleep 0 to 3 seconds, then place value in buffer
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(3000)); // sleep thread
this.sharedLocation.set(count); // set value in buffer
total += count; // increment total of values
System.out.printf("%10s%n", total);
}
// if the sleeping thread is interrupted, print the stack trace
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("%n%s%n%n", "Writer is done. Terminating...");
}
}
public class SharedBufferDriver {
public static void main(String[] args) {
// create an UnsynchronizedBuffer to store integer values
Buffer sharedLocation = new UnsynchronizedBuffer();
// create a new thread pool with two threads
Writer writer = new Writer(sharedLocation);
Reader reader = new Reader(sharedLocation);
System.out.printf("%-15s%10s%10s%10s%n", "Action", "Value", "Written",
"Read");
System.out.printf("---------------------------------------------%n");
// start writer and reader, giving each of them access
writer.start();
reader.start();
}
}
Note: To validate your solution to this problem, always use the same Buffer
interface, and the Writer
, Reader
, and SharedBufferTest
classes.
Exercise 10
Create a SynchronizedBuffer
class that uses monitors to enforce synchronization and fix the problem observed in Exercise 9.
Exercise 11
Create a ReentrantLockBuffer
class that uses a lock to enforce synchronization and fix the problem observed in Exercise 9.
Producer–Consumer
It is a situation where the writer’s thread of an application updates a shared bounded buffer in a circular fashion, while the reader’s thread reads from that shared buffer also in a circular fashion. The problem may occur if either a writer overwrites data in the buffer before it is consumed by the reader, or a reader consumes the duplicate data.
Exercise 12
Modify Exercise 10 to use a circular bounded buffer of size 3 instead of using single shared variable, then create a SynchronizedBoundedBuffer
class that uses monitors to enforce synchronization and fix the problem observed.
Exercise 13
Created a SemaphoreBoundedBuffer
that uses semaphores to enforce synchronization and solve the problem presented in Exercise 12.
Hint: Use three semaphores: producer
, consumer
, and mutex
, that is, two counting semaphores and one binary semaphore, and do not use any while
loops.