In Java, we can create a thread by extending the Thread class. This approach involves creating a new class that inherits from Thread and overriding its run() method, which contains the code that will be executed when the thread is started.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class FirstThread {
final static List<String> items = List.of(
"Book", "Pen", "Notebook", "Laptop", "Backpack",
"Eraser", "Pencil", "Folder", "Calculator", "Highlighter",
"Stapler", "Tape", "Glue", "Ruler", "Marker",
"Sketchbook", "Whiteboard", "Chalk", "Scissors", "Paper Clips", "Storage Box");
public static void main(String[] args) throws InterruptedException {
List<Thread> threads = new ArrayList<>();
threads.add(new AscendingItemThread());
threads.add(new DescendingItemThread());
for (Thread thread : threads) {
// thread.setDaemon(true);
thread.start();
}
}
private static class AscendingItemThread extends Thread {
@Override
public void run() {
try {
for (String item : items) {
char[] characters = item.toCharArray();
Arrays.sort(characters);
System.out.println(new String(characters));
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class DescendingItemThread extends Thread {
@Override
public void run() {
try {
for (String item : items) {
System.out.println(item);
Thread.sleep(1000);
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Volatile
The volatile keyword in Java is used to indicate that a variable's value may be changed by different threads, ensuring that updates to the variable are visible to all threads immediately. It helps prevent visibility problems in multithreaded applications by ensuring that reads and writes to the variable are done directly from and to the main memory.
Without volatile, threads may cache variables in local memory (like CPU registers or thread-local caches), which can lead to inconsistent views of a variable between threads. If one thread modifies a variable, another thread might not immediately see that change.
With volatile, the JVM ensures that every read of the variable is directly from the main memory and that every write to the variable is immediately visible to other threads. This guarantees that when one thread changes the value of a volatile variable, other threads will see the updated value without delay.
In a nutshell, volatile is useful in scenarios where you need to ensure that the latest value of a variable is immediately visible to other threads.
import java.util.Random;
public class Volatile {
public static void main(String[] args) {
Metrics metrics = new Metrics();
BusinessLogic businessLogicThread1 = new BusinessLogic(metrics);
BusinessLogic businessLogicThread2 = new BusinessLogic(metrics);
MetricsPrinter metricsPrinter = new MetricsPrinter(metrics);
businessLogicThread1.start();
businessLogicThread2.start();
metricsPrinter.start();
}
public static class MetricsPrinter extends Thread {
private Metrics metrics;
public MetricsPrinter(Metrics metrics) {
this.metrics = metrics;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
double currentAverage = metrics.getAverage();
System.out.println("Current Average is " + currentAverage);
}
}
}
public static class BusinessLogic extends Thread {
private Metrics metrics;
private Random random = new Random();
public BusinessLogic(Metrics metrics) {
this.metrics = metrics;
}
@Override
public void run() {
while (true) {
long start = System.currentTimeMillis();
try {
Thread.sleep(random.nextInt(2));
} catch (InterruptedException e) {
}
long end = System.currentTimeMillis();
metrics.addSample(end - start);
}
}
}
public static class Metrics {
private long count = 0;
private volatile double average = 0.0;
public synchronized void addSample(long sample) {
double currentSum = average * count;
count++;
average = (currentSum + sample) / count;
}
public double getAverage() {
return average;
}
}
}
DeadLocks
A deadlock is a situation in which two or more threads are blocked forever, waiting for each other to release resources or locks that they need to continue executing. Deadlocks typically occur in multithreaded environments when threads hold one resource and wait for another, creating a circular dependency. Since no thread can proceed, the program comes to a halt.
The example below is designed to demonstrate a potential deadlock situation, where both trains could end up waiting indefinitely for each other to release the locks on the roads.
The code has a potential deadlock situation:
If TrainA locks roadA and then tries to lock roadB while at the same time TrainB locks roadB and tries to lock roadA, both trains will be waiting for each other to release the locks, resulting in a deadlock.
To avoid deadlock in this scenario, you can implement a strict locking order for acquiring the locks on the roads. Specifically, both TrainA and TrainB should always attempt to lock roadA before roadB. This ensures that if one train holds the lock on roadA, the other train cannot lock roadB and vice versa, thus preventing circular waiting and eliminating the possibility of deadlock.
import java.util.Random;
public class DeadLock {
public static void main(String[] args) {
Intersection intersection = new Intersection();
Thread trainAThread = new Thread(new TrainA(intersection));
Thread trainBThread = new Thread(new TrainB(intersection));
trainAThread.start();
trainBThread.start();
}
public static class TrainB implements Runnable {
private Intersection intersection;
private Random random = new Random();
public TrainB(Intersection intersection) {
this.intersection = intersection;
}
@Override
public void run() {
while (true) {
long sleepingTime = random.nextInt(5);
try {
Thread.sleep(sleepingTime);
} catch (InterruptedException e) {
}
intersection.takeRoadB();
}
}
}
public static class TrainA implements Runnable {
private Intersection intersection;
private Random random = new Random();
public TrainA(Intersection intersection) {
this.intersection = intersection;
}
@Override
public void run() {
while (true) {
long sleepingTime = random.nextInt(5);
try {
Thread.sleep(sleepingTime);
} catch (InterruptedException e) {
}
intersection.takeRoadA();
}
}
}
public static class Intersection {
private Object roadA = new Object();
private Object roadB = new Object();
public void takeRoadA() {
synchronized (roadA) {
System.out.println("Road A is locked by thread " + Thread.currentThread().getName());
synchronized (roadB) {
System.out.println("Train is passing through road A");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
}
}
public void takeRoadB() {
// Solution: Avoid circular lock and keep a strict order for lock
// synchronized (roadA)
// synchronized (roadB)
synchronized (roadB) {
System.out.println("Road B is locked by thread " + Thread.currentThread().getName());
synchronized (roadA) {
System.out.println("Train is passing through road B");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
}
}
}
}
Synchronized
The synchronized keyword is used on the increment and decrement methods of the InventoryCounter class. This ensures that these methods can only be accessed by one thread at a time, which is crucial in a multi-threaded environment where multiple threads may attempt to modify the same shared resource (in this case, the items variable).
Key Points:
-
Thread Safety: By marking the increment and decrement methods as synchronized, the code prevents race conditions. This means that if one thread is executing increment, another thread cannot execute decrement (or vice versa) until the first thread has finished its execution of the synchronized method.
-
Mutual Exclusion: The synchronized keyword provides mutual exclusion, ensuring that only one thread can modify the items variable at any given time. This is essential for maintaining the integrity of the shared resource.
-
Performance Consideration: While synchronization helps in avoiding inconsistencies, it can also lead to performance bottlenecks if many threads are trying to access the synchronized methods simultaneously. This is because threads may be forced to wait for their turn to access the synchronized methods.
-
Usage in Threads: In the main method, two threads (IncrementingThread and DecrementingThread) are created and started. Each thread performs a loop of 10,000 increments or decrements on the InventoryCounter, demonstrating how synchronization manages concurrent access to the items variable.
Overall, the use of synchronized in this code is a fundamental approach to ensure that the inventory count remains accurate despite concurrent modifications from multiple threads.
public class Main {
public static void main(String[] args) throws InterruptedException {
InventoryCounter inventoryCounter = new InventoryCounter();
IncrementingThread incrementingThread = new IncrementingThread(inventoryCounter);
DecrementingThread decrementingThread = new DecrementingThread(inventoryCounter);
incrementingThread.start();
decrementingThread.start();
incrementingThread.join();
decrementingThread.join();
System.out.println("We currently have " + inventoryCounter.getItems() + " items");
}
public static class DecrementingThread extends Thread {
private final InventoryCounter inventoryCounter;
public DecrementingThread(InventoryCounter inventoryCounter) {
this.inventoryCounter = inventoryCounter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
inventoryCounter.decrement();
}
}
}
public static class IncrementingThread extends Thread {
private final InventoryCounter inventoryCounter;
public IncrementingThread(InventoryCounter inventoryCounter) {
this.inventoryCounter = inventoryCounter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
inventoryCounter.increment();
}
}
}
private static class InventoryCounter {
private int items = 0;
public synchronized void increment() {
items++;
}
public synchronized void decrement() {
items--;
}
public int getItems() {
return items;
}
}
}
In this version of the Java code, synchronization is achieved using a custom lock object (myLock) instead of the synchronized keyword directly on the methods. This approach provides more flexibility and control over synchronization.
Key Points:
-
Custom Lock Object: The myLock object is used as a monitor for synchronization. The synchronized block within the increment and decrement methods ensures that only one thread can execute the code inside the block at a time, effectively controlling access to the items variable.
-
Thread Safety: Similar to the previous example, this implementation ensures thread safety by preventing race conditions. When one thread is executing the increment or decrement method, other threads attempting to enter the synchronized block will be blocked until the first thread exits the block.
-
Granular Control: Using a custom lock object allows for more granular control over synchronization. For instance, you could synchronize on different objects for different parts of your code if needed, which can help in reducing contention and improving performance in more complex scenarios.
-
Performance Consideration: While this approach can be more flexible, it still carries the same potential performance implications as using the synchronized keyword. If many threads are trying to access the synchronized blocks simultaneously, they will still be forced to wait, which can lead to bottlenecks.
-
Usage in Threads: The main method creates and starts two threads (IncrementingThread and DecrementingThread), each performing a loop of 10,000 increments or decrements on the InventoryCounter. The synchronization ensures that the final count of items is accurate despite concurrent modifications.
Overall, this implementation demonstrates an alternative way to achieve thread safety in Java using synchronized blocks with a custom lock object, providing flexibility while maintaining the integrity of shared resources.
public class Main {
private final Object myLock = new Object();
public static void main(String[] args) throws InterruptedException {
new Main().run();
}
public void run() throws InterruptedException {
InventoryCounter inventoryCounter = new InventoryCounter();
IncrementingThread incrementingThread = new IncrementingThread(inventoryCounter);
DecrementingThread decrementingThread = new DecrementingThread(inventoryCounter);
incrementingThread.start();
decrementingThread.start();
incrementingThread.join();
decrementingThread.join();
System.out.println("We currently have " + inventoryCounter.getItems() + " items");
}
public class DecrementingThread extends Thread {
private final InventoryCounter inventoryCounter;
public DecrementingThread(InventoryCounter inventoryCounter) {
this.inventoryCounter = inventoryCounter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
inventoryCounter.decrement();
}
}
}
public class IncrementingThread extends Thread {
private final InventoryCounter inventoryCounter;
public IncrementingThread(InventoryCounter inventoryCounter) {
this.inventoryCounter = inventoryCounter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
inventoryCounter.increment();
}
}
}
private class InventoryCounter {
private int items = 0;
public void increment() {
synchronized (myLock) {
items++;
}
}
public void decrement() {
synchronized (myLock) {
items--;
}
}
public int getItems() {
return items;
}
}
}
ReentrantLock & ReentrantReadWriteLock
ReentrantLock and ReentrantReadWriteLock are both synchronization mechanisms in Java that are part of the java.util.concurrent.locks package. However, they serve different purposes and have different characteristics. Here are the key differences between the two:
1. Locking Mechanism
-
ReentrantLock:
- It is a mutual exclusion lock that allows only one thread to hold the lock at any given time.
- If a thread holds the lock, other threads trying to acquire the same lock will be blocked until the lock is released.
-
ReentrantReadWriteLock:
- It allows multiple threads to read the shared resource concurrently, but only one thread can write to the resource at a time.
- It has two types of locks: a read lock and a write lock.
- Read Lock: Multiple threads can acquire the read lock simultaneously as long as no thread holds the write lock.
- Write Lock: Only one thread can hold the write lock, and no other thread can hold either the read or write lock while the write lock is held.
2. Performance
-
ReentrantLock:
- It is suitable for scenarios where there are frequent write operations or when the critical section is short.
- It can lead to contention if many threads are trying to acquire the lock simultaneously.
-
ReentrantReadWriteLock:
- It is more efficient in scenarios where there are many read operations and fewer write operations.
- It allows for better concurrency by enabling multiple threads to read simultaneously, which can improve performance in read-heavy applications.
3. Use Cases
-
ReentrantLock:
- Use it when you need a simple mutual exclusion lock and when the critical section is not heavily read-oriented.
- Suitable for scenarios where you need to manage a single shared resource with exclusive access.
-
ReentrantReadWriteLock:
- Use it when you have a shared resource that is read frequently but written infrequently.
- Ideal for scenarios like caching, where multiple threads can read the cache simultaneously, but updates to the cache need to be exclusive.
4. API Differences
-
ReentrantLock:
- Provides methods like lock(), unlock(), tryLock(), and newCondition() for managing the lock and associated conditions.
-
ReentrantReadWriteLock:
- Provides separate methods for acquiring read and write locks: readLock() and writeLock(), each with their own lock(), unlock(), and tryLock() methods.
Summary
In summary, use ReentrantLock for simple mutual exclusion scenarios, and use ReentrantReadWriteLock when you need to optimize for scenarios with many reads and fewer writes. The choice between the two depends on the specific requirements of your application and the expected read/write patterns.
ReentrantLock Example
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
class Counter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
}
finally {
lock.unlock();
}
}
public int getCount() {
return count;
}
}
public class Main {
public static void main(String[] args) {
Counter counter = new Counter();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
}));
}
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("Final count: " + counter.getCount());
}
}
ReentrantReadWriteLock Example
import java.util.concurrent.locks.ReentrantReadWriteLock;
class SharedResource {
private int data;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Method to read the data
public int read() {
lock.readLock().lock(); // Acquire the read lock
try {
return data;
} finally {
lock.readLock().unlock(); // Ensure the read lock is released
}
}
// Method to write data
public void write(int value) {
lock.writeLock().lock(); // Acquire the write lock
try {
data = value;
} finally {
lock.writeLock().unlock(); // Ensure the write lock is released
}
}
}
public class Main {
public static void main(String[] args) {
SharedResource sharedResource = new SharedResource();
// Create and start reader threads
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
int value = sharedResource.read();
System.out.println("Read value: " + value);
try {
Thread.sleep(100); // Simulate some delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
// Create and start writer threads
for (int i = 0; i < 2; i++) {
final int writerId = i;
new Thread(() -> {
for (int j = 0; j < 5; j++) {
sharedResource.write(writerId * 10 + j);
System.out.println("Written value: " + (writerId * 10 + j));
try {
Thread.sleep(200); // Simulate some delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
}
Atomics
AtomicInteger and AtomicReference are part of the java.util.concurrent.atomic package in Java, which provides classes that support lock-free thread-safe programming on single variables. These classes are useful in concurrent programming where multiple threads need to read and update shared variables without using traditional synchronization mechanisms like synchronized blocks or ReentrantLock.
Atomic Integer
AtomicInteger is a class that provides an integer value that may be updated atomically. It supports various atomic operations, such as incrementing, decrementing, and setting the value. Here are some key features and methods of AtomicInteger:
-
Constructor: You can create an AtomicInteger with an initial value.
AtomicInteger atomicInt = new AtomicInteger(0);
-
Methods:
get()
: Returns the current value.set(int newValue)
: Sets to the given value.getAndSet(int newValue)
: Sets to the given value and returns the old value.incrementAndGet()
: Increments by one and returns the updated value.decrementAndGet()
: Decrements by one and returns the updated value.compareAndSet(int expect, int update)
: Atomically sets the value to the given updated value if the current value equals the expected value.
Example of Atomic Integer
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerExample {
public static void main(String[] args) {
AtomicInteger atomicInt = new AtomicInteger(0);
// Increment the value
int incrementedValue = atomicInt.incrementAndGet();
System.out.println("Incremented Value: " + incrementedValue); // Output: 1
// Compare and set
boolean wasUpdated = atomicInt.compareAndSet(1, 2);
System.out.println("Was Updated: " + wasUpdated); // Output: true
System.out.println("Current Value: " + atomicInt.get()); // Output: 2
}
}
Atomic Reference
AtomicReference is a class that provides an object reference that may be updated atomically. It is useful for managing references to objects in a thread-safe manner. Here are some key features and methods of AtomicReference:
-
Constructor: You can create an AtomicReference with an initial reference.
AtomicReference<MyObject> atomicRef = new AtomicReference<>(new MyObject());
-
Methods:
get()
: Returns the current reference.set(MyObject newValue)
: Sets to the given reference.getAndSet(MyObject newValue)
: Sets to the given reference and returns the old reference.compareAndSet(MyObject expect, MyObject update)
: Atomically sets the reference to the given updated reference if the current reference is equal to the expected reference.
Example of AtomicInteger
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
public static void main(String[] args) throws InterruptedException {
InventoryCounter inventoryCounter = new InventoryCounter();
IncrementingThread incrementingThread = new IncrementingThread(inventoryCounter);
DecrementingThread decrementingThread = new DecrementingThread(inventoryCounter);
incrementingThread.start();
decrementingThread.start();
incrementingThread.join();
decrementingThread.join();
System.out.println("We currently have " + inventoryCounter.getItems() + " items");
}
public static class DecrementingThread extends Thread {
private final InventoryCounter inventoryCounter;
public DecrementingThread(InventoryCounter inventoryCounter) {
this.inventoryCounter = inventoryCounter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
inventoryCounter.decrement();
}
}
}
public static class IncrementingThread extends Thread {
private final InventoryCounter inventoryCounter;
public IncrementingThread(InventoryCounter inventoryCounter) {
this.inventoryCounter = inventoryCounter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
inventoryCounter.increment();
}
}
}
private static class InventoryCounter {
private final AtomicInteger items = new AtomicInteger(0);
public void increment() {
items.incrementAndGet();
}
public void decrement() {
items.decrementAndGet();
}
public int getItems() {
return items.get();
}
}
}
Example of AtomicInteger
import java.util.concurrent.atomic.AtomicReference;
public class Main {
public static void main(String[] args) throws InterruptedException {
InventoryCounter inventoryCounter = new InventoryCounter();
IncrementingThread incrementingThread = new IncrementingThread(inventoryCounter);
DecrementingThread decrementingThread = new DecrementingThread(inventoryCounter);
incrementingThread.start();
decrementingThread.start();
incrementingThread.join();
decrementingThread.join();
System.out.println("We currently have " + inventoryCounter.getItems() + " items");
}
public static class DecrementingThread extends Thread {
private final InventoryCounter inventoryCounter;
public DecrementingThread(InventoryCounter inventoryCounter) {
this.inventoryCounter = inventoryCounter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
inventoryCounter.decrement();
}
}
}
public static class IncrementingThread extends Thread {
private final InventoryCounter inventoryCounter;
public IncrementingThread(InventoryCounter inventoryCounter) {
this.inventoryCounter = inventoryCounter;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
inventoryCounter.increment();
}
}
}
private static class InventoryCounter {
// Using AtomicReference to hold the current count as an Integer object
private final AtomicReference<Integer> items = new AtomicReference<>(0);
public void increment() {
// Atomically update the reference to the new count
while (true) {
Integer current = items.get();
Integer newValue = current + 1;
if (items.compareAndSet(current, newValue)) {
break; // Successfully updated
}
}
}
public void decrement() {
// Atomically update the reference to the new count
while (true) {
Integer current = items.get();
Integer newValue = current - 1;
if (items.compareAndSet(current, newValue)) {
break; // Successfully updated
}
}
}
public int getItems() {
return items.get();
}
}
}
Inter Thread Communication
Inter-Thread Communication (ITC) refers to the mechanisms that allow threads within a process to communicate and synchronize their actions. Since threads share the same memory space, they can exchange data and signals directly, but this can lead to issues like race conditions if not managed properly. Common ITC methods include:
- Mutexes: Used to ensure that only one thread can access a resource at a time.
- Semaphores: Used to control access to a common resource by multiple threads.
- Condition Variables: Allow threads to wait for certain conditions to be met before proceeding.
- Message Queues: Enable threads to send and receive messages in a structured way.
Effective ITC is crucial for building efficient and safe multithreaded applications.
class SharedResource {
private int data;
private boolean available = false;
public synchronized int getData() throws InterruptedException {
while (!available) {
wait(); // Wait until data is available
}
available = false; // Reset availability
notify(); // Notify producer that data has been consumed
return data;
}
public synchronized void setData(int data) throws InterruptedException {
while (available) {
wait(); // Wait until data is consumed
}
this.data = data;
available = true; // Mark data as available
notify(); // Notify consumer that data is available
}
}
class Producer implements Runnable {
private final SharedResource sharedResource;
public Producer(SharedResource sharedResource) {
this.sharedResource = sharedResource;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
System.out.println("Producing: " + i);
sharedResource.setData(i); // Produce data
Thread.sleep(500); // Simulate time taken to produce
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
class Consumer implements Runnable {
private final SharedResource sharedResource;
public Consumer(SharedResource sharedResource) {
this.sharedResource = sharedResource;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
int data = sharedResource.getData(); // Consume data
System.out.println("Consuming: " + data);
Thread.sleep(1000); // Simulate time taken to consume
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public class InterThreadCom {
public static void main(String[] args) {
SharedResource sharedResource = new SharedResource();
Thread producerThread = new Thread(new Producer(sharedResource));
Thread consumerThread = new Thread(new Consumer(sharedResource));
producerThread.start();
consumerThread.start();
try {
producerThread.join(); // Wait for the producer to finish
consumerThread.join(); // Wait for the consumer to finish
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
CountDownLatch
import java.util.concurrent.CountDownLatch;
class SharedResource {
private int data;
private boolean available = false;
public synchronized int getData() throws InterruptedException {
while (!available) {
wait(); // Wait until data is available
}
available = false; // Reset availability
notify(); // Notify producer that data has been consumed
return data;
}
public synchronized void setData(int data) throws InterruptedException {
while (available) {
wait(); // Wait until data is consumed
}
this.data = data;
available = true; // Mark data as available
notify(); // Notify consumer that data is available
}
}
class Producer implements Runnable {
private final SharedResource sharedResource;
private final CountDownLatch latch;
public Producer(SharedResource sharedResource, CountDownLatch latch) {
this.sharedResource = sharedResource;
this.latch = latch;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
System.out.println("Producing: " + i);
sharedResource.setData(i); // Produce data
Thread.sleep(500); // Simulate time taken to produce
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
latch.countDown(); // Signal that the producer has finished
}
}
class Consumer implements Runnable {
private final SharedResource sharedResource;
private final CountDownLatch latch;
public Consumer(SharedResource sharedResource, CountDownLatch latch) {
this.sharedResource = sharedResource;
this.latch = latch;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
int data = sharedResource.getData(); // Consume data
System.out.println("Consuming: " + data);
Thread.sleep(1000); // Simulate time taken to consume
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
latch.countDown(); // Signal that the consumer has finished
}
}
public class InterThreadCom {
public static void main(String[] args) {
SharedResource sharedResource = new SharedResource();
CountDownLatch latch = new CountDownLatch(2); // Count down for producer and consumer
Thread producerThread = new Thread(new Producer(sharedResource, latch));
Thread consumerThread = new Thread(new Consumer(sharedResource, latch));
producerThread.start();
consumerThread.start();
try {
latch.await(); // Wait for both producer and consumer to finish
System.out.println("Both producer and consumer have completed their tasks.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
The first example you provided uses traditional synchronization mechanisms (i.e., wait()
and notify()
) for inter-thread communication between a producer and a consumer. The modified
example incorporates CountDownLatch
to manage synchronization and coordination between the
threads. Here are the benefits of using CountDownLatch
over the traditional approach:
Benefits of Using CountDownLatch
-
Simplified Coordination:
- CountDownLatch: It provides a straightforward way to wait for multiple threads to complete
their tasks. You can simply call
latch.await()
in the main thread, and it will block until the count reaches zero. - Traditional Approach: Using
wait()
andnotify()
requires more boilerplate code to manage the state of the threads and ensure that they are properly synchronized.
- CountDownLatch: It provides a straightforward way to wait for multiple threads to complete
their tasks. You can simply call
-
Decoupling of Threads:
- CountDownLatch: The main thread does not need to know the details of how the producer and consumer operate. It only needs to wait for their completion, making the code cleaner and easier to understand.
- Traditional Approach: The main thread may need to manage the state of the shared resource and ensure that the producer and consumer are properly synchronized, which can lead to more complex code.
-
One-Time Use:
- CountDownLatch: It is designed for one-time use, which is ideal for scenarios where you want to wait for a specific number of events to occur. Once the count reaches zero, it cannot be reset, which is often the desired behavior in many applications.
- Traditional Approach: The
wait()
andnotify()
methods can be reused, but managing the state of the shared resource can become cumbersome, especially if you need to reset the state.
-
Clarity of Intent:
- CountDownLatch: The intent of waiting for a certain number of tasks to complete is clear
and explicit. The use of
CountDownLatch
communicates the purpose of synchronization effectively. - Traditional Approach: The intent may be less clear, as it relies on the state of the
shared resource and the use of
wait()
andnotify()
, which can be more difficult to follow.
- CountDownLatch: The intent of waiting for a certain number of tasks to complete is clear
and explicit. The use of
-
Error Handling:
- CountDownLatch: It provides a cleaner way to handle thread completion. If a thread encounters an error, you can handle it in the thread itself without affecting the main thread's waiting logic.
- Traditional Approach: Error handling can be more complex, as you need to ensure that the state of the shared resource is consistent and that the waiting threads are notified appropriately.
-
Flexibility:
- CountDownLatch: You can easily adjust the number of threads to wait for by changing the
count in the
CountDownLatch
constructor. This makes it flexible for different scenarios. - Traditional Approach: Adjusting the number of threads and managing their states can require significant changes to the code.
- CountDownLatch: You can easily adjust the number of threads to wait for by changing the
count in the
Summary
While both approaches can achieve inter-thread communication, using CountDownLatch
simplifies the
code, improves clarity, and provides a more robust mechanism for coordinating the completion of
multiple threads. It reduces the complexity associated with traditional synchronization methods and
makes the intent of the code clearer. This can lead to fewer bugs and easier maintenance in
concurrent programming scenarios.
Throughput
Throughput in the context of multithreading refers to the number of tasks or operations that a multithreaded application can complete in a given period of time. It is a critical performance metric that helps assess the efficiency and effectiveness of a multithreaded system.
Throughput in multithreading is a measure of how many tasks or operations a multithreaded application can complete in a given time frame. It is influenced by factors such as concurrency, resource utilization, context switching, and synchronization overhead. Optimizing throughput in multithreaded applications is essential for achieving high performance and responsiveness, especially in environments where multiple tasks need to be processed simultaneously.
Key Aspects of Throughput in Multithreading:
-
Tasks per Second:
- Throughput can be measured in terms of the number of tasks or operations completed per second. A higher number indicates better performance and resource utilization.
-
Concurrency:
- Multithreading allows multiple threads to run concurrently, which can significantly increase throughput by making better use of CPU resources. This is especially beneficial in CPU-bound applications where multiple threads can perform computations simultaneously.
-
Resource Utilization:
- Effective multithreading can lead to improved resource utilization, as threads can share resources (like memory and I/O) and reduce idle time. This can enhance overall throughput.
-
Context Switching:
- While multithreading can improve throughput, excessive context switching (the process of saving and loading the state of threads) can negatively impact performance. Each context switch incurs overhead, which can reduce the effective throughput.
-
Synchronization Overhead:
- In multithreaded applications, threads often need to synchronize access to shared resources. This synchronization can introduce delays and reduce throughput if not managed properly.
-
Scalability:
- Throughput in multithreaded applications can be affected by how well the application scales with the addition of more threads. Ideally, increasing the number of threads should lead to a proportional increase in throughput, but this is not always the case due to contention and resource limitations.
-
Performance Bottlenecks:
- Identifying and addressing bottlenecks (such as locks, contention for shared resources, or I/O limitations) is crucial for maximizing throughput in multithreaded applications.
Example
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class ThroughputHttpServer {
private static final int PORT = 8000;
private static final int NUMBER_OF_THREADS = 6;
public static void main(String[] args) throws IOException {
startServer();
}
public static void startServer() throws IOException {
HttpServer server = HttpServer.create(new InetSocketAddress(PORT), 0);
server.createContext("/search", new WordCountHandler());
Executor executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
server.setExecutor(executor);
System.out.println("Listen on port: " + PORT);
server.start();
}
private static class WordCountHandler implements HttpHandler {
@Override
public void handle(HttpExchange httpExchange) throws IOException {
String query = httpExchange.getRequestURI().getQuery();
String[] keyValue = query.split("=");
String action = keyValue[0];
String word = keyValue[1];
System.out.println("Look up word: " + word);
if (!action.equals("word")) {
httpExchange.sendResponseHeaders(400, 0);
return;
}
BigInteger veryBig = new BigInteger(new Random().nextInt(1000) * 4, new Random());
BigInteger r = veryBig.nextProbablePrime();
long count = r.longValue();
byte[] response = Long.toString(count).getBytes();
httpExchange.sendResponseHeaders(200, response.length);
OutputStream outputStream = httpExchange.getResponseBody();
outputStream.write(response);
outputStream.close();
}
}
}
Data Race and Race Condition
Data Race
A data race occurs when two or more threads (or processes) access the same piece of data at the same time, and at least one of those accesses involves modifying the data. Because the threads are running concurrently, the final value of the data can be unpredictable and may depend on the timing of the threads. This can lead to inconsistent or incorrect results.
Race Condition
A race condition is a broader concept that refers to a situation where the outcome of a program depends on the sequence or timing of uncontrollable events, such as the order in which threads are scheduled to run. This can happen even if there are no direct conflicts over shared data. Essentially, if the timing of events affects the program's behavior or results, a race condition exists.
Key Differences
- Scope: A data race specifically involves concurrent access to shared data, while a race condition can involve any situation where the timing of events affects the program's correctness.
- Outcome: Data races lead to unpredictable results due to simultaneous access to data, whereas race conditions can lead to unexpected behavior based on the order of operations, which may or may not involve shared data.
public class DataRace {
public static void main(String[] args) {
SharedClass sharedClass = new SharedClass();
Thread thread1 = new Thread(() -> {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
sharedClass.increment();
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
sharedClass.checkForDataRace();
}
});
thread1.start();
thread2.start();
}
public static class SharedClass {
private int x = 0;
private int y = 0;
public void increment() {
x++;
y++;
}
public void checkForDataRace() {
if (y > x) {
System.out.println("y > x - Data Race is detected");
}
}
}
}
Thread Per Task - Thread Per Request
In Java, the concepts of "Thread Per Task" and "Thread Per Request" refer to different threading models used to handle concurrent tasks or requests in applications, particularly in server environments. Here's a breakdown of each model:
Thread Per Task
-
Definition: In the Thread Per Task model, a new thread is created for each task that needs to be executed. This means that whenever a task is initiated, a dedicated thread is spawned to handle that task.
-
Advantages:
- Simplicity: The model is straightforward to implement, as each task runs in its own thread.
- Isolation: Each task is isolated in its own thread, which can simplify debugging and error handling.
-
Disadvantages:
- Resource Intensive: Creating a new thread for every task can lead to high resource consumption, especially if tasks are short-lived or if there are many concurrent tasks.
- Overhead: The overhead of thread creation and destruction can degrade performance, particularly under high load.
- Scalability Issues: This model may not scale well with a large number of concurrent tasks, as the system can become overwhelmed with too many threads.
Thread Per Request
-
Definition: The Thread Per Request model is a specific application of the Thread Per Task model, commonly used in web servers. In this model, a new thread is created for each incoming request to the server.
-
Advantages:
- Simplicity: Like Thread Per Task, it is easy to understand and implement.
- Request Isolation: Each request is handled in its own thread, which can help with managing state and errors.
-
Disadvantages:
- Resource Consumption: Similar to Thread Per Task, creating a thread for each request can lead to high memory and CPU usage.
- Thread Management: The server may struggle to manage a large number of threads, leading to performance bottlenecks.
- Limited Scalability: Under heavy load, the server may not be able to handle all incoming requests efficiently, leading to delays or dropped requests.
Alternatives
Due to the limitations of both models, many modern applications use thread pools or asynchronous processing models:
-
Thread Pool: Instead of creating a new thread for each task/request, a fixed number of threads are maintained in a pool. Tasks are submitted to the pool, and available threads execute them. This reduces the overhead of thread creation and improves resource management.
-
Asynchronous Processing: Using frameworks like CompletableFuture or reactive programming ( e.g., using Project Reactor or RxJava) allows for non-blocking I/O operations, which can handle many requests concurrently without the need for a large number of threads.
Cached Thread Pool
Each task creates a new thread, which can lead to high resource consumption, especially if many tasks are initiated simultaneously. This can exhaust system resources (like memory and CPU), leading to performance degradation or even application crashes.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class IoBoundApplication {
private static final int NUMBER_OF_TASKS = 1000;
public static void main(String[] args) {
System.out.printf("Running %d tasks\n", NUMBER_OF_TASKS);
long start = System.currentTimeMillis();
performTasks();
System.out.printf("Tasks took %dms to complete\n", System.currentTimeMillis() - start);
}
private static void performTasks() {
try (ExecutorService executorService = Executors.newCachedThreadPool()) {
for (int i = 0; i < NUMBER_OF_TASKS; i++) {
executorService.submit(() -> blockingIoOperation());
}
}
}
private static void blockingIoOperation() {
System.out.println("Executing a blocking task from thread: " + Thread.currentThread());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Fixed Thread Pool
A fixed thread pool limits the number of concurrent threads, which helps manage system resources more effectively. Excess tasks are queued until a thread becomes available, preventing resource exhaustion.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private static final int NUMBER_OF_TASKS = 10_000;
public static void main(String[] args) {
System.out.printf("Running %d tasks\n", NUMBER_OF_TASKS);
long start = System.currentTimeMillis();
performTasks();
System.out.printf("Tasks took %dms to complete\n", System.currentTimeMillis() - start);
}
private static void performTasks() {
try (ExecutorService executorService = Executors.newFixedThreadPool(1000)) {
for (int i = 0; i < NUMBER_OF_TASKS; i++) {
executorService.submit(() -> {
for (int j = 0; j < 100; j++) {
blockingIoOperation();
}
});
}
}
}
// Simulates a long blocking IO
private static void blockingIoOperation() {
System.out.println("Executing a blocking task from thread: " + Thread.currentThread());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Java Virtual Threads
Java Virtual Threads are a feature introduced in Project Loom, which aims to simplify concurrent programming in Java by providing a lightweight implementation of threads. Unlike traditional Java threads, which are mapped to operating system threads, virtual threads are managed by the Java Virtual Machine (JVM) and are much lighter in terms of resource consumption.
Key Features of Java Virtual Threads:
-
Lightweight: Virtual threads are much lighter than traditional threads, allowing you to create thousands or even millions of them without significant overhead.
-
Simplified Concurrency: They enable a more straightforward programming model for concurrent applications. You can write code that looks synchronous but runs asynchronously, making it easier to understand and maintain.
-
Blocking Operations: Virtual threads can block without consuming a platform thread. This means that when a virtual thread is waiting for I/O or other blocking operations, it does not tie up a system thread, allowing for better resource utilization.
-
Structured Concurrency: Project Loom introduces the concept of structured concurrency, which helps manage the lifecycle of threads and ensures that they are properly cleaned up when they are no longer needed.
-
Integration with Existing APIs: Virtual threads are designed to work seamlessly with existing Java APIs, including those that use traditional threads, making it easier to adopt them in existing applications.
Examples
import java.util.ArrayList;
import java.util.List;
public class Main {
private static final int NUMBER_OF_VIRTUAL_THREADS = 1_000;
public static void main(String[] args) throws InterruptedException {
Runnable runnable = () -> System.out.println("Inside thread: " + Thread.currentThread());
List<Thread> virtualThreads = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_VIRTUAL_THREADS; i++) {
Thread virtualThread = Thread.ofVirtual().unstarted(runnable);
virtualThreads.add(virtualThread);
}
long start = System.nanoTime();
for (Thread virtualThread : virtualThreads) {
virtualThread.start();
}
for (Thread virtualThread : virtualThreads) {
virtualThread.join();
}
long end = System.nanoTime();
long elapsedTime = end - start;
double seconds = elapsedTime / 1_000_000_000.0;
System.out.println("Time at: " + seconds);
}
}
import java.util.ArrayList;
import java.util.List;
public class Main {
private static final int NUMBER_OF_VIRTUAL_THREADS = 1_000;
public static void main(String[] args) throws InterruptedException {
List<Thread> virtualThreads = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_VIRTUAL_THREADS; i++) {
Thread virtualThread = Thread.ofVirtual().unstarted(new BlockingTask());
virtualThreads.add(virtualThread);
}
long start = System.nanoTime();
for (Thread virtualThread : virtualThreads) {
virtualThread.start();
}
for (Thread virtualThread : virtualThreads) {
virtualThread.join();
}
long end = System.nanoTime();
long elapsedTime = end - start;
double seconds = elapsedTime / 1_000_000_000.0;
System.out.println("Time at: " + seconds);
}
private static class BlockingTask implements Runnable {
@Override
public void run() {
System.out.println("Inside thread: " + Thread.currentThread() + " before blocking call");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Inside thread: " + Thread.currentThread() + " after blocking call");
}
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
private static final int NUMBER_OF_TASKS = 10_000;
public static void main(String[] args) {
System.out.printf("Running %d tasks\n", NUMBER_OF_TASKS);
long start = System.currentTimeMillis();
performTasks();
System.out.printf("Tasks took %dms to complete\n", System.currentTimeMillis() - start);
}
private static void performTasks() {
try (ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < NUMBER_OF_TASKS; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 100; j++) {
blockingIoOperation();
}
}
});
}
}
}
// Simulates a long blocking IO
private static void blockingIoOperation() {
System.out.println("Executing a blocking task from thread: " + Thread.currentThread());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}