Java Threads

In Java, we can create a thread by extending the Thread class. This approach involves creating a new class that inherits from Thread and overriding its run() method, which contains the code that will be executed when the thread is started.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class FirstThread {

    final static List<String> items = List.of(
            "Book", "Pen", "Notebook", "Laptop", "Backpack",
            "Eraser", "Pencil", "Folder", "Calculator", "Highlighter",
            "Stapler", "Tape", "Glue", "Ruler", "Marker",
            "Sketchbook", "Whiteboard", "Chalk", "Scissors", "Paper Clips", "Storage Box");

    public static void main(String[] args) throws InterruptedException {

        List<Thread> threads = new ArrayList<>();

        threads.add(new AscendingItemThread());
        threads.add(new DescendingItemThread());

        for (Thread thread : threads) {
            // thread.setDaemon(true);
            thread.start();
        }
    }

    private static class AscendingItemThread extends Thread {

        @Override
        public void run() {

            try {

                for (String item : items) {
                    char[] characters = item.toCharArray();
                    Arrays.sort(characters);
                    System.out.println(new String(characters));
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static class DescendingItemThread extends Thread {

        @Override
        public void run() {

            try {
            
                for (String item : items) {
                    System.out.println(item);
                    Thread.sleep(1000);
                }
            } 
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Volatile

The volatile keyword in Java is used to indicate that a variable's value may be changed by different threads, ensuring that updates to the variable are visible to all threads immediately. It helps prevent visibility problems in multithreaded applications by ensuring that reads and writes to the variable are done directly from and to the main memory.

Without volatile, threads may cache variables in local memory (like CPU registers or thread-local caches), which can lead to inconsistent views of a variable between threads. If one thread modifies a variable, another thread might not immediately see that change.

With volatile, the JVM ensures that every read of the variable is directly from the main memory and that every write to the variable is immediately visible to other threads. This guarantees that when one thread changes the value of a volatile variable, other threads will see the updated value without delay.

In a nutshell, volatile is useful in scenarios where you need to ensure that the latest value of a variable is immediately visible to other threads.


import java.util.Random;

public class Volatile {

    public static void main(String[] args) {

        Metrics metrics = new Metrics();

        BusinessLogic businessLogicThread1 = new BusinessLogic(metrics);

        BusinessLogic businessLogicThread2 = new BusinessLogic(metrics);

        MetricsPrinter metricsPrinter = new MetricsPrinter(metrics);

        businessLogicThread1.start();
        businessLogicThread2.start();
        metricsPrinter.start();
    }

    public static class MetricsPrinter extends Thread {
        private Metrics metrics;

        public MetricsPrinter(Metrics metrics) {
            this.metrics = metrics;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                }

                double currentAverage = metrics.getAverage();

                System.out.println("Current Average is " + currentAverage);
            }
        }
    }

    public static class BusinessLogic extends Thread {
        private Metrics metrics;
        private Random random = new Random();

        public BusinessLogic(Metrics metrics) {
            this.metrics = metrics;
        }

        @Override
        public void run() {
            while (true) {
                long start = System.currentTimeMillis();

                try {
                    Thread.sleep(random.nextInt(2));
                } catch (InterruptedException e) {
                }

                long end = System.currentTimeMillis();

                metrics.addSample(end - start);
            }
        }
    }

    public static class Metrics {
        private long count = 0;
        private volatile double average = 0.0;

        public synchronized void addSample(long sample) {
            double currentSum = average * count;
            count++;
            average = (currentSum + sample) / count;
        }

        public double getAverage() {
            return average;
        }
    }
}

DeadLocks

A deadlock is a situation in which two or more threads are blocked forever, waiting for each other to release resources or locks that they need to continue executing. Deadlocks typically occur in multithreaded environments when threads hold one resource and wait for another, creating a circular dependency. Since no thread can proceed, the program comes to a halt.

The example below is designed to demonstrate a potential deadlock situation, where both trains could end up waiting indefinitely for each other to release the locks on the roads.

The code has a potential deadlock situation:

If TrainA locks roadA and then tries to lock roadB while at the same time TrainB locks roadB and tries to lock roadA, both trains will be waiting for each other to release the locks, resulting in a deadlock.

To avoid deadlock in this scenario, you can implement a strict locking order for acquiring the locks on the roads. Specifically, both TrainA and TrainB should always attempt to lock roadA before roadB. This ensures that if one train holds the lock on roadA, the other train cannot lock roadB and vice versa, thus preventing circular waiting and eliminating the possibility of deadlock.

import java.util.Random;

public class DeadLock {

    public static void main(String[] args) {

        Intersection intersection = new Intersection();
        Thread trainAThread = new Thread(new TrainA(intersection));
        Thread trainBThread = new Thread(new TrainB(intersection));

        trainAThread.start();
        trainBThread.start();
    }

    public static class TrainB implements Runnable {

        private Intersection intersection;

        private Random random = new Random();

        public TrainB(Intersection intersection) {
            this.intersection = intersection;
        }

        @Override
        public void run() {

            while (true) {
                long sleepingTime = random.nextInt(5);
                try {
                    Thread.sleep(sleepingTime);
                } catch (InterruptedException e) {
                }

                intersection.takeRoadB();
            }
        }
    }

    public static class TrainA implements Runnable {
        private Intersection intersection;
        private Random random = new Random();

        public TrainA(Intersection intersection) {
            this.intersection = intersection;
        }

        @Override
        public void run() {
            while (true) {
                long sleepingTime = random.nextInt(5);
                try {
                    Thread.sleep(sleepingTime);
                } catch (InterruptedException e) {
                }

                intersection.takeRoadA();
            }
        }
    }

    public static class Intersection {
        private Object roadA = new Object();
        private Object roadB = new Object();

        public void takeRoadA() {
            synchronized (roadA) {
                System.out.println("Road A is locked by thread " + Thread.currentThread().getName());

                synchronized (roadB) {
                    System.out.println("Train is passing through road A");
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }

        public void takeRoadB() {

            // Solution: Avoid circular lock and keep a strict order for lock
            // synchronized (roadA)
            // synchronized (roadB)
            synchronized (roadB) {
                System.out.println("Road B is locked by thread " + Thread.currentThread().getName());

                synchronized (roadA) {
                    System.out.println("Train is passing through road B");

                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    }
}

Synchronized

The synchronized keyword is used on the increment and decrement methods of the InventoryCounter class. This ensures that these methods can only be accessed by one thread at a time, which is crucial in a multi-threaded environment where multiple threads may attempt to modify the same shared resource (in this case, the items variable).

Key Points:

  1. Thread Safety: By marking the increment and decrement methods as synchronized, the code prevents race conditions. This means that if one thread is executing increment, another thread cannot execute decrement (or vice versa) until the first thread has finished its execution of the synchronized method.

  2. Mutual Exclusion: The synchronized keyword provides mutual exclusion, ensuring that only one thread can modify the items variable at any given time. This is essential for maintaining the integrity of the shared resource.

  3. Performance Consideration: While synchronization helps in avoiding inconsistencies, it can also lead to performance bottlenecks if many threads are trying to access the synchronized methods simultaneously. This is because threads may be forced to wait for their turn to access the synchronized methods.

  4. Usage in Threads: In the main method, two threads (IncrementingThread and DecrementingThread) are created and started. Each thread performs a loop of 10,000 increments or decrements on the InventoryCounter, demonstrating how synchronization manages concurrent access to the items variable.

Overall, the use of synchronized in this code is a fundamental approach to ensure that the inventory count remains accurate despite concurrent modifications from multiple threads.


public class Main {

    public static void main(String[] args) throws InterruptedException {
        InventoryCounter inventoryCounter = new InventoryCounter();
        IncrementingThread incrementingThread = new IncrementingThread(inventoryCounter);
        DecrementingThread decrementingThread = new DecrementingThread(inventoryCounter);

        incrementingThread.start();
        decrementingThread.start();

        incrementingThread.join();
        decrementingThread.join();

        System.out.println("We currently have " + inventoryCounter.getItems() + " items");
    }

    public static class DecrementingThread extends Thread {

        private final InventoryCounter inventoryCounter;

        public DecrementingThread(InventoryCounter inventoryCounter) {
            this.inventoryCounter = inventoryCounter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                inventoryCounter.decrement();
            }
        }
    }

    public static class IncrementingThread extends Thread {

        private final InventoryCounter inventoryCounter;

        public IncrementingThread(InventoryCounter inventoryCounter) {
            this.inventoryCounter = inventoryCounter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                inventoryCounter.increment();
            }
        }
    }

    private static class InventoryCounter {
        private int items = 0;

        public synchronized void increment() {
            items++;
        }

        public synchronized void decrement() {
            items--;
        }

        public int getItems() {
            return items;
        }
    }
}

In this version of the Java code, synchronization is achieved using a custom lock object (myLock) instead of the synchronized keyword directly on the methods. This approach provides more flexibility and control over synchronization.

Key Points:

  1. Custom Lock Object: The myLock object is used as a monitor for synchronization. The synchronized block within the increment and decrement methods ensures that only one thread can execute the code inside the block at a time, effectively controlling access to the items variable.

  2. Thread Safety: Similar to the previous example, this implementation ensures thread safety by preventing race conditions. When one thread is executing the increment or decrement method, other threads attempting to enter the synchronized block will be blocked until the first thread exits the block.

  3. Granular Control: Using a custom lock object allows for more granular control over synchronization. For instance, you could synchronize on different objects for different parts of your code if needed, which can help in reducing contention and improving performance in more complex scenarios.

  4. Performance Consideration: While this approach can be more flexible, it still carries the same potential performance implications as using the synchronized keyword. If many threads are trying to access the synchronized blocks simultaneously, they will still be forced to wait, which can lead to bottlenecks.

  5. Usage in Threads: The main method creates and starts two threads (IncrementingThread and DecrementingThread), each performing a loop of 10,000 increments or decrements on the InventoryCounter. The synchronization ensures that the final count of items is accurate despite concurrent modifications.

Overall, this implementation demonstrates an alternative way to achieve thread safety in Java using synchronized blocks with a custom lock object, providing flexibility while maintaining the integrity of shared resources.

public class Main {

    private final Object myLock = new Object();

    public static void main(String[] args) throws InterruptedException {
        new Main().run();
    }

    public void run() throws InterruptedException {
        InventoryCounter inventoryCounter = new InventoryCounter();
        IncrementingThread incrementingThread = new IncrementingThread(inventoryCounter);
        DecrementingThread decrementingThread = new DecrementingThread(inventoryCounter);

        incrementingThread.start();
        decrementingThread.start();

        incrementingThread.join();
        decrementingThread.join();

        System.out.println("We currently have " + inventoryCounter.getItems() + " items");
    }

    public class DecrementingThread extends Thread {

        private final InventoryCounter inventoryCounter;

        public DecrementingThread(InventoryCounter inventoryCounter) {
            this.inventoryCounter = inventoryCounter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                inventoryCounter.decrement();
            }
        }
    }

    public class IncrementingThread extends Thread {

        private final InventoryCounter inventoryCounter;

        public IncrementingThread(InventoryCounter inventoryCounter) {
            this.inventoryCounter = inventoryCounter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                inventoryCounter.increment();
            }
        }
    }

    private class InventoryCounter {
        private int items = 0;

        public void increment() {
            synchronized (myLock) {
                items++;
            }
        }

        public void decrement() {
            synchronized (myLock) {
                items--;
            }
        }

        public int getItems() {
            return items;
        }
    }
}

ReentrantLock & ReentrantReadWriteLock

ReentrantLock and ReentrantReadWriteLock are both synchronization mechanisms in Java that are part of the java.util.concurrent.locks package. However, they serve different purposes and have different characteristics. Here are the key differences between the two:

1. Locking Mechanism

2. Performance

3. Use Cases

4. API Differences

Summary

In summary, use ReentrantLock for simple mutual exclusion scenarios, and use ReentrantReadWriteLock when you need to optimize for scenarios with many reads and fewer writes. The choice between the two depends on the specific requirements of your application and the expected read/write patterns.

ReentrantLock Example

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

class Counter {

    private int count = 0;

    private final ReentrantLock lock = new ReentrantLock();

    public void increment() {
        lock.lock();
        try {
            count++;
        }
        finally {
            lock.unlock();
        }
    }

    public int getCount() {
        return count;
    }
}

public class Main {

    public static void main(String[] args) {

        Counter counter = new Counter();
        List<Thread> threads = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            }));
        }

        threads.forEach(Thread::start);

        threads.forEach(t -> {
          try {
            t.join();
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
        });

        System.out.println("Final count: " + counter.getCount());
    }
}

ReentrantReadWriteLock Example

import java.util.concurrent.locks.ReentrantReadWriteLock;

class SharedResource {

    private int data;

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    // Method to read the data
    public int read() {
        lock.readLock().lock(); // Acquire the read lock
        try {
            return data;
        } finally {
            lock.readLock().unlock(); // Ensure the read lock is released
        }
    }

    // Method to write data
    public void write(int value) {
        lock.writeLock().lock(); // Acquire the write lock
        try {
            data = value;
        } finally {
            lock.writeLock().unlock(); // Ensure the write lock is released
        }
    }
}

public class Main {

    public static void main(String[] args) {

        SharedResource sharedResource = new SharedResource();

        // Create and start reader threads
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    int value = sharedResource.read();
                    System.out.println("Read value: " + value);
                    try {
                        Thread.sleep(100); // Simulate some delay
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }).start();
        }

        // Create and start writer threads
        for (int i = 0; i < 2; i++) {
            final int writerId = i;
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    sharedResource.write(writerId * 10 + j);
                    System.out.println("Written value: " + (writerId * 10 + j));
                    try {
                        Thread.sleep(200); // Simulate some delay
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }).start();
        }
    }
}

Atomics

AtomicInteger and AtomicReference are part of the java.util.concurrent.atomic package in Java, which provides classes that support lock-free thread-safe programming on single variables. These classes are useful in concurrent programming where multiple threads need to read and update shared variables without using traditional synchronization mechanisms like synchronized blocks or ReentrantLock.

Atomic Integer

AtomicInteger is a class that provides an integer value that may be updated atomically. It supports various atomic operations, such as incrementing, decrementing, and setting the value. Here are some key features and methods of AtomicInteger:

Example of Atomic Integer

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerExample {

  public static void main(String[] args) {
    AtomicInteger atomicInt = new AtomicInteger(0);

    // Increment the value
    int incrementedValue = atomicInt.incrementAndGet();
    System.out.println("Incremented Value: " + incrementedValue); // Output: 1

    // Compare and set
    boolean wasUpdated = atomicInt.compareAndSet(1, 2);
    System.out.println("Was Updated: " + wasUpdated); // Output: true
    System.out.println("Current Value: " + atomicInt.get()); // Output: 2
  }
}

Atomic Reference

AtomicReference is a class that provides an object reference that may be updated atomically. It is useful for managing references to objects in a thread-safe manner. Here are some key features and methods of AtomicReference:

Example of AtomicInteger

import java.util.concurrent.atomic.AtomicInteger;

public class Main {

  public static void main(String[] args) throws InterruptedException {

    InventoryCounter inventoryCounter = new InventoryCounter();

    IncrementingThread incrementingThread = new IncrementingThread(inventoryCounter);

    DecrementingThread decrementingThread = new DecrementingThread(inventoryCounter);

    incrementingThread.start();
    decrementingThread.start();

    incrementingThread.join();
    decrementingThread.join();

    System.out.println("We currently have " + inventoryCounter.getItems() + " items");
  }

  public static class DecrementingThread extends Thread {

    private final InventoryCounter inventoryCounter;

    public DecrementingThread(InventoryCounter inventoryCounter) {
      this.inventoryCounter = inventoryCounter;
    }

    @Override
    public void run() {
      for (int i = 0; i < 10000; i++) {
        inventoryCounter.decrement();
      }
    }
  }

  public static class IncrementingThread extends Thread {

    private final InventoryCounter inventoryCounter;

    public IncrementingThread(InventoryCounter inventoryCounter) {
      this.inventoryCounter = inventoryCounter;
    }

    @Override
    public void run() {
      for (int i = 0; i < 10000; i++) {
        inventoryCounter.increment();
      }
    }
  }

  private static class InventoryCounter {

    private final AtomicInteger items = new AtomicInteger(0);

    public void increment() {
      items.incrementAndGet();
    }

    public void decrement() {
      items.decrementAndGet();
    }

    public int getItems() {
      return items.get();
    }
  }
}

Example of AtomicInteger

import java.util.concurrent.atomic.AtomicReference;

public class Main {

    public static void main(String[] args) throws InterruptedException {

        InventoryCounter inventoryCounter = new InventoryCounter();

        IncrementingThread incrementingThread = new IncrementingThread(inventoryCounter);
        DecrementingThread decrementingThread = new DecrementingThread(inventoryCounter);

        incrementingThread.start();
        decrementingThread.start();

        incrementingThread.join();
        decrementingThread.join();

        System.out.println("We currently have " + inventoryCounter.getItems() + " items");
    }

    public static class DecrementingThread extends Thread {

        private final InventoryCounter inventoryCounter;

        public DecrementingThread(InventoryCounter inventoryCounter) {
            this.inventoryCounter = inventoryCounter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                inventoryCounter.decrement();
            }
        }
    }

    public static class IncrementingThread extends Thread {

        private final InventoryCounter inventoryCounter;

        public IncrementingThread(InventoryCounter inventoryCounter) {
            this.inventoryCounter = inventoryCounter;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                inventoryCounter.increment();
            }
        }
    }

    private static class InventoryCounter {

        // Using AtomicReference to hold the current count as an Integer object
        private final AtomicReference<Integer> items = new AtomicReference<>(0);

        public void increment() {
            // Atomically update the reference to the new count
            while (true) {
                Integer current = items.get();
                Integer newValue = current + 1;
                if (items.compareAndSet(current, newValue)) {
                    break; // Successfully updated
                }
            }
        }

        public void decrement() {
            // Atomically update the reference to the new count
            while (true) {
                Integer current = items.get();
                Integer newValue = current - 1;
                if (items.compareAndSet(current, newValue)) {
                    break; // Successfully updated
                }
            }
        }

        public int getItems() {
            return items.get();
        }
    }
}

Inter Thread Communication

Inter-Thread Communication (ITC) refers to the mechanisms that allow threads within a process to communicate and synchronize their actions. Since threads share the same memory space, they can exchange data and signals directly, but this can lead to issues like race conditions if not managed properly. Common ITC methods include:

  1. Mutexes: Used to ensure that only one thread can access a resource at a time.
  2. Semaphores: Used to control access to a common resource by multiple threads.
  3. Condition Variables: Allow threads to wait for certain conditions to be met before proceeding.
  4. Message Queues: Enable threads to send and receive messages in a structured way.

Effective ITC is crucial for building efficient and safe multithreaded applications.

class SharedResource {

  private int data;
  private boolean available = false;

  public synchronized int getData() throws InterruptedException {
    while (!available) {
      wait(); // Wait until data is available
    }
    available = false; // Reset availability
    notify(); // Notify producer that data has been consumed
    return data;
  }

  public synchronized void setData(int data) throws InterruptedException {
    while (available) {
      wait(); // Wait until data is consumed
    }
    this.data = data;
    available = true; // Mark data as available
    notify(); // Notify consumer that data is available
  }
}

class Producer implements Runnable {

  private final SharedResource sharedResource;

  public Producer(SharedResource sharedResource) {
    this.sharedResource = sharedResource;
  }

  @Override
  public void run() {
    for (int i = 0; i < 3; i++) {
      try {
        System.out.println("Producing: " + i);
        sharedResource.setData(i); // Produce data
        Thread.sleep(500); // Simulate time taken to produce
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

class Consumer implements Runnable {

  private final SharedResource sharedResource;

  public Consumer(SharedResource sharedResource) {
    this.sharedResource = sharedResource;
  }

  @Override
  public void run() {
    for (int i = 0; i < 3; i++) {
      try {
        int data = sharedResource.getData(); // Consume data
        System.out.println("Consuming: " + data);
        Thread.sleep(1000); // Simulate time taken to consume
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

public class InterThreadCom {

  public static void main(String[] args) {

    SharedResource sharedResource = new SharedResource();

    Thread producerThread = new Thread(new Producer(sharedResource));
    Thread consumerThread = new Thread(new Consumer(sharedResource));

    producerThread.start();
    consumerThread.start();

    try {
      producerThread.join(); // Wait for the producer to finish
      consumerThread.join(); // Wait for the consumer to finish
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

CountDownLatch

import java.util.concurrent.CountDownLatch;

class SharedResource {

  private int data;
  private boolean available = false;

  public synchronized int getData() throws InterruptedException {
    while (!available) {
      wait(); // Wait until data is available
    }
    available = false; // Reset availability
    notify(); // Notify producer that data has been consumed
    return data;
  }

  public synchronized void setData(int data) throws InterruptedException {
    while (available) {
      wait(); // Wait until data is consumed
    }
    this.data = data;
    available = true; // Mark data as available
    notify(); // Notify consumer that data is available
  }
}

class Producer implements Runnable {

  private final SharedResource sharedResource;
  private final CountDownLatch latch;

  public Producer(SharedResource sharedResource, CountDownLatch latch) {
    this.sharedResource = sharedResource;
    this.latch = latch;
  }

  @Override
  public void run() {
    for (int i = 0; i < 3; i++) {
      try {
        System.out.println("Producing: " + i);
        sharedResource.setData(i); // Produce data
        Thread.sleep(500); // Simulate time taken to produce
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
    latch.countDown(); // Signal that the producer has finished
  }
}

class Consumer implements Runnable {

  private final SharedResource sharedResource;
  private final CountDownLatch latch;

  public Consumer(SharedResource sharedResource, CountDownLatch latch) {
    this.sharedResource = sharedResource;
    this.latch = latch;
  }

  @Override
  public void run() {
    for (int i = 0; i < 3; i++) {
      try {
        int data = sharedResource.getData(); // Consume data
        System.out.println("Consuming: " + data);
        Thread.sleep(1000); // Simulate time taken to consume
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
    latch.countDown(); // Signal that the consumer has finished
  }
}

public class InterThreadCom {

  public static void main(String[] args) {
    SharedResource sharedResource = new SharedResource();
    CountDownLatch latch = new CountDownLatch(2); // Count down for producer and consumer

    Thread producerThread = new Thread(new Producer(sharedResource, latch));
    Thread consumerThread = new Thread(new Consumer(sharedResource, latch));

    producerThread.start();
    consumerThread.start();

    try {
      latch.await(); // Wait for both producer and consumer to finish
      System.out.println("Both producer and consumer have completed their tasks.");
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

The first example you provided uses traditional synchronization mechanisms (i.e., wait() and notify()) for inter-thread communication between a producer and a consumer. The modified example incorporates CountDownLatch to manage synchronization and coordination between the threads. Here are the benefits of using CountDownLatch over the traditional approach:

Benefits of Using CountDownLatch

  1. Simplified Coordination:

    • CountDownLatch: It provides a straightforward way to wait for multiple threads to complete their tasks. You can simply call latch.await() in the main thread, and it will block until the count reaches zero.
    • Traditional Approach: Using wait() and notify() requires more boilerplate code to manage the state of the threads and ensure that they are properly synchronized.
  2. Decoupling of Threads:

    • CountDownLatch: The main thread does not need to know the details of how the producer and consumer operate. It only needs to wait for their completion, making the code cleaner and easier to understand.
    • Traditional Approach: The main thread may need to manage the state of the shared resource and ensure that the producer and consumer are properly synchronized, which can lead to more complex code.
  3. One-Time Use:

    • CountDownLatch: It is designed for one-time use, which is ideal for scenarios where you want to wait for a specific number of events to occur. Once the count reaches zero, it cannot be reset, which is often the desired behavior in many applications.
    • Traditional Approach: The wait() and notify() methods can be reused, but managing the state of the shared resource can become cumbersome, especially if you need to reset the state.
  4. Clarity of Intent:

    • CountDownLatch: The intent of waiting for a certain number of tasks to complete is clear and explicit. The use of CountDownLatch communicates the purpose of synchronization effectively.
    • Traditional Approach: The intent may be less clear, as it relies on the state of the shared resource and the use of wait() and notify(), which can be more difficult to follow.
  5. Error Handling:

    • CountDownLatch: It provides a cleaner way to handle thread completion. If a thread encounters an error, you can handle it in the thread itself without affecting the main thread's waiting logic.
    • Traditional Approach: Error handling can be more complex, as you need to ensure that the state of the shared resource is consistent and that the waiting threads are notified appropriately.
  6. Flexibility:

    • CountDownLatch: You can easily adjust the number of threads to wait for by changing the count in the CountDownLatch constructor. This makes it flexible for different scenarios.
    • Traditional Approach: Adjusting the number of threads and managing their states can require significant changes to the code.

Summary

While both approaches can achieve inter-thread communication, using CountDownLatch simplifies the code, improves clarity, and provides a more robust mechanism for coordinating the completion of multiple threads. It reduces the complexity associated with traditional synchronization methods and makes the intent of the code clearer. This can lead to fewer bugs and easier maintenance in concurrent programming scenarios.

Throughput

Throughput in the context of multithreading refers to the number of tasks or operations that a multithreaded application can complete in a given period of time. It is a critical performance metric that helps assess the efficiency and effectiveness of a multithreaded system.

Throughput in multithreading is a measure of how many tasks or operations a multithreaded application can complete in a given time frame. It is influenced by factors such as concurrency, resource utilization, context switching, and synchronization overhead. Optimizing throughput in multithreaded applications is essential for achieving high performance and responsiveness, especially in environments where multiple tasks need to be processed simultaneously.

Key Aspects of Throughput in Multithreading:

  1. Tasks per Second:

    • Throughput can be measured in terms of the number of tasks or operations completed per second. A higher number indicates better performance and resource utilization.
  2. Concurrency:

    • Multithreading allows multiple threads to run concurrently, which can significantly increase throughput by making better use of CPU resources. This is especially beneficial in CPU-bound applications where multiple threads can perform computations simultaneously.
  3. Resource Utilization:

    • Effective multithreading can lead to improved resource utilization, as threads can share resources (like memory and I/O) and reduce idle time. This can enhance overall throughput.
  4. Context Switching:

    • While multithreading can improve throughput, excessive context switching (the process of saving and loading the state of threads) can negatively impact performance. Each context switch incurs overhead, which can reduce the effective throughput.
  5. Synchronization Overhead:

    • In multithreaded applications, threads often need to synchronize access to shared resources. This synchronization can introduce delays and reduce throughput if not managed properly.
  6. Scalability:

    • Throughput in multithreaded applications can be affected by how well the application scales with the addition of more threads. Ideally, increasing the number of threads should lead to a proportional increase in throughput, but this is not always the case due to contention and resource limitations.
  7. Performance Bottlenecks:

    • Identifying and addressing bottlenecks (such as locks, contention for shared resources, or I/O limitations) is crucial for maximizing throughput in multithreaded applications.

Example

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


public class ThroughputHttpServer {

  private static final int PORT = 8000;
  private static final int NUMBER_OF_THREADS = 6;

  public static void main(String[] args) throws IOException {
    startServer();
  }

  public static void startServer() throws IOException {
    HttpServer server = HttpServer.create(new InetSocketAddress(PORT), 0);
    server.createContext("/search", new WordCountHandler());
    Executor executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
    server.setExecutor(executor);
    System.out.println("Listen on port: " + PORT);
    server.start();
  }

  private static class WordCountHandler implements HttpHandler {

    @Override
    public void handle(HttpExchange httpExchange) throws IOException {
      String query = httpExchange.getRequestURI().getQuery();
      String[] keyValue = query.split("=");
      String action = keyValue[0];
      String word = keyValue[1];
      System.out.println("Look up word: " + word);
      if (!action.equals("word")) {
        httpExchange.sendResponseHeaders(400, 0);
        return;
      }

      BigInteger veryBig = new BigInteger(new Random().nextInt(1000) * 4, new Random());
      BigInteger r = veryBig.nextProbablePrime();
      long count = r.longValue();

      byte[] response = Long.toString(count).getBytes();
      httpExchange.sendResponseHeaders(200, response.length);
      OutputStream outputStream = httpExchange.getResponseBody();
      outputStream.write(response);
      outputStream.close();
    }
  }

}

Data Race and Race Condition

Data Race

A data race occurs when two or more threads (or processes) access the same piece of data at the same time, and at least one of those accesses involves modifying the data. Because the threads are running concurrently, the final value of the data can be unpredictable and may depend on the timing of the threads. This can lead to inconsistent or incorrect results.

Race Condition

A race condition is a broader concept that refers to a situation where the outcome of a program depends on the sequence or timing of uncontrollable events, such as the order in which threads are scheduled to run. This can happen even if there are no direct conflicts over shared data. Essentially, if the timing of events affects the program's behavior or results, a race condition exists.

Key Differences


public class DataRace {

    public static void main(String[] args) {
        SharedClass sharedClass = new SharedClass();
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                sharedClass.increment();
            }
        });

        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                sharedClass.checkForDataRace();
            }

        });

        thread1.start();
        thread2.start();
    }

    public static class SharedClass {
        private int x = 0;
        private int y = 0;

        public void increment() {
            x++;
            y++;
        }

        public void checkForDataRace() {
            if (y > x) {
                System.out.println("y > x - Data Race is detected");
            }
        }
    }
}

Thread Per Task - Thread Per Request

In Java, the concepts of "Thread Per Task" and "Thread Per Request" refer to different threading models used to handle concurrent tasks or requests in applications, particularly in server environments. Here's a breakdown of each model:

Thread Per Task

Thread Per Request

Alternatives

Due to the limitations of both models, many modern applications use thread pools or asynchronous processing models:

Cached Thread Pool

Each task creates a new thread, which can lead to high resource consumption, especially if many tasks are initiated simultaneously. This can exhaust system resources (like memory and CPU), leading to performance degradation or even application crashes.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class IoBoundApplication {

  private static final int NUMBER_OF_TASKS = 1000;

  public static void main(String[] args) {

    System.out.printf("Running %d tasks\n", NUMBER_OF_TASKS);

    long start = System.currentTimeMillis();
    performTasks();

    System.out.printf("Tasks took %dms to complete\n", System.currentTimeMillis() - start);
  }

  private static void performTasks() {

    try (ExecutorService executorService = Executors.newCachedThreadPool()) {

      for (int i = 0; i < NUMBER_OF_TASKS; i++) {
        executorService.submit(() -> blockingIoOperation());
      }
    }
  }

  private static void blockingIoOperation() {

    System.out.println("Executing a blocking task from thread: " + Thread.currentThread());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

Fixed Thread Pool

A fixed thread pool limits the number of concurrent threads, which helps manage system resources more effectively. Excess tasks are queued until a thread becomes available, preventing resource exhaustion.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class Main {

  private static final int NUMBER_OF_TASKS = 10_000;

  public static void main(String[] args) {
    System.out.printf("Running %d tasks\n", NUMBER_OF_TASKS);

    long start = System.currentTimeMillis();
    performTasks();
    System.out.printf("Tasks took %dms to complete\n", System.currentTimeMillis() - start);
  }

  private static void performTasks() {
    try (ExecutorService executorService = Executors.newFixedThreadPool(1000)) {

      for (int i = 0; i < NUMBER_OF_TASKS; i++) {
        executorService.submit(() -> {
          for (int j = 0; j < 100; j++) {
            blockingIoOperation();
          }
        });
      }
    }
  }

  // Simulates a long blocking IO
  private static void blockingIoOperation() {
    System.out.println("Executing a blocking task from thread: " + Thread.currentThread());
    try {
      Thread.sleep(10);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

Java Virtual Threads

Java Virtual Threads are a feature introduced in Project Loom, which aims to simplify concurrent programming in Java by providing a lightweight implementation of threads. Unlike traditional Java threads, which are mapped to operating system threads, virtual threads are managed by the Java Virtual Machine (JVM) and are much lighter in terms of resource consumption.

Key Features of Java Virtual Threads:

  1. Lightweight: Virtual threads are much lighter than traditional threads, allowing you to create thousands or even millions of them without significant overhead.

  2. Simplified Concurrency: They enable a more straightforward programming model for concurrent applications. You can write code that looks synchronous but runs asynchronously, making it easier to understand and maintain.

  3. Blocking Operations: Virtual threads can block without consuming a platform thread. This means that when a virtual thread is waiting for I/O or other blocking operations, it does not tie up a system thread, allowing for better resource utilization.

  4. Structured Concurrency: Project Loom introduces the concept of structured concurrency, which helps manage the lifecycle of threads and ensures that they are properly cleaned up when they are no longer needed.

  5. Integration with Existing APIs: Virtual threads are designed to work seamlessly with existing Java APIs, including those that use traditional threads, making it easier to adopt them in existing applications.

Examples

import java.util.ArrayList;
import java.util.List;

public class Main {

  private static final int NUMBER_OF_VIRTUAL_THREADS = 1_000;

  public static void main(String[] args) throws InterruptedException {

    Runnable runnable = () -> System.out.println("Inside thread: " + Thread.currentThread());

    List<Thread> virtualThreads = new ArrayList<>();

    for (int i = 0; i < NUMBER_OF_VIRTUAL_THREADS; i++) {
      Thread virtualThread = Thread.ofVirtual().unstarted(runnable);
      virtualThreads.add(virtualThread);
    }

    long start = System.nanoTime();

    for (Thread virtualThread : virtualThreads) {
      virtualThread.start();
    }

    for (Thread virtualThread : virtualThreads) {
      virtualThread.join();
    }

    long end = System.nanoTime();
    long elapsedTime = end - start;
    double seconds = elapsedTime / 1_000_000_000.0;
    System.out.println("Time at: " + seconds);
  }
}

import java.util.ArrayList;
import java.util.List;

public class Main {

  private static final int NUMBER_OF_VIRTUAL_THREADS = 1_000;

  public static void main(String[] args) throws InterruptedException {
    List<Thread> virtualThreads = new ArrayList<>();

    for (int i = 0; i < NUMBER_OF_VIRTUAL_THREADS; i++) {
      Thread virtualThread = Thread.ofVirtual().unstarted(new BlockingTask());
      virtualThreads.add(virtualThread);
    }

    long start = System.nanoTime();

    for (Thread virtualThread : virtualThreads) {
      virtualThread.start();
    }

    for (Thread virtualThread : virtualThreads) {
      virtualThread.join();
    }

    long end = System.nanoTime();
    long elapsedTime = end - start;
    double seconds = elapsedTime / 1_000_000_000.0;
    System.out.println("Time at: " + seconds);
  }

  private static class BlockingTask implements Runnable {

    @Override
    public void run() {
      System.out.println("Inside thread: " + Thread.currentThread() + " before blocking call");
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
      System.out.println("Inside thread: " + Thread.currentThread() + " after blocking call");
    }
  }
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class Main {

  private static final int NUMBER_OF_TASKS = 10_000;

  public static void main(String[] args) {
    System.out.printf("Running %d tasks\n", NUMBER_OF_TASKS);

    long start = System.currentTimeMillis();
    performTasks();
    System.out.printf("Tasks took %dms to complete\n", System.currentTimeMillis() - start);
  }

  private static void performTasks() {
    try (ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) {

      for (int i = 0; i < NUMBER_OF_TASKS; i++) {
        executorService.submit(new Runnable() {
          @Override
          public void run() {
            for (int j = 0; j < 100; j++) {
              blockingIoOperation();
            }
          }
        });
      }
    }
  }

  // Simulates a long blocking IO
  private static void blockingIoOperation() {
    System.out.println("Executing a blocking task from thread: " + Thread.currentThread());
    try {
      Thread.sleep(10);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}