杰瑞科技汇

Java并发,如何高效管理线程与数据同步?

Of course! Java Concurrency is a vast and crucial topic for building modern, high-performance, and responsive applications. Here's a comprehensive guide covering the core concepts, evolution, and best practices.

Java并发,如何高效管理线程与数据同步?-图1
(图片来源网络,侵删)

What is Concurrency and Why is it Important?

Concurrency is the ability of a program to handle multiple tasks at the same time. It's not the same as parallelism.

  • Concurrency: Dealing with lots of things at once. It's about structure, managing multiple tasks that may or may not be being executed simultaneously. (e.g., a single-core CPU switching between tasks to give the illusion of multitasking).
  • Parallelism: Doing lots of things at once. It's about execution, running multiple tasks simultaneously on multiple cores.

Why use it?

  1. Performance: To make your program run faster by utilizing multiple CPU cores.
  2. Responsiveness: In user interfaces (UI), concurrency prevents the application from freezing while a long-running task (like a network request) is in progress.
  3. Scalability: To handle a large number of operations efficiently, such as in a web server serving thousands of clients.

The Building Blocks of Java Concurrency

Java's concurrency API has evolved significantly over the years. We'll look at the modern approach (Java 5+) and touch on the legacy synchronized keyword.

A. The synchronized Keyword (The Classic Approach)

This is the simplest way to achieve thread safety. It can be used in two ways:

  1. Synchronized Method: The entire method becomes a critical section. Only one thread can execute a synchronized instance method on a given object at a time.

    public class Counter {
        private int count = 0;
        public synchronized void increment() {
            count++;
        }
        public synchronized int getCount() {
            return count;
        }
    }
    • Problem: It's very coarse-grained. While one thread is inside increment(), no other thread can enter any other synchronized method on the same Counter object, even if it's just reading.
  2. Synchronized Block: A more flexible approach. You can lock on any object, making the scope of the lock smaller and more granular.

    public class Counter {
        private int count = 0;
        private final Object lock = new Object(); // A dedicated lock object
        public void increment() {
            synchronized (lock) { // Only one thread can execute this block at a time
                count++;
            }
        }
    }
    • Best Practice: Use a private final Object for locking, not this or a publicly accessible object, to prevent external code from interfering with your lock.

B. The java.util.concurrent Package (The Modern Approach)

Java 5 introduced the java.util.concurrent package, which provides a rich set of high-performance, thread-safe utilities.

Atomic Variables

For simple operations like incrementing a counter, using synchronized can be overkill. Atomic variables use special CPU instructions (like Compare-And-Swap) to provide lock-free thread safety.

  • AtomicInteger, AtomicLong, AtomicBoolean, AtomicReference

    import java.util.concurrent.atomic.AtomicInteger;
    public class AtomicCounter {
        private final AtomicInteger count = new AtomicInteger(0);
        public void increment() {
            count.incrementAndGet(); // An atomic operation
        }
        public int getCount() {
            return count.get();
        }
    }
    • When to use: Ideal for single, atomic operations on variables where synchronized would be too heavy.

Locks (java.util.concurrent.locks)

The Lock interface provides more flexibility than synchronized.

  • ReentrantLock: A reentrant mutual exclusion lock. It's similar to synchronized but offers more features.

    • Fairness: Can be configured to be "fair" (gives locks to the longest-waiting thread), but this comes with a performance cost.
    • tryLock(): Attempts to acquire the lock but doesn't block if it can't be acquired immediately. This prevents deadlocks.
    • lockInterruptibly(): Allows a thread to be interrupted while waiting for the lock.
    import java.util.concurrent.locks.ReentrantLock;
    public class LockCounter {
        private int count = 0;
        private final ReentrantLock lock = new ReentrantLock();
        public void increment() {
            lock.lock(); // Acquire the lock
            try {
                count++;
            } finally {
                lock.unlock(); // Always release in a finally block!
            }
        }
    }

Concurrent Collections

These are thread-safe versions of standard collections. They are highly optimized for concurrent access.

  • ConcurrentHashMap: The thread-safe replacement for HashMap. It uses fine-grained locking (lock striping) to allow multiple threads to read and write different parts of the map simultaneously, leading to much better performance than Collections.synchronizedMap(new HashMap<>()).
  • CopyOnWriteArrayList: Good for scenarios where you have far more reads than writes. When a write operation occurs, it creates a new copy of the underlying array. Iterators are "weakly consistent" and will not throw ConcurrentModificationException.
  • BlockingQueue: A queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available when storing an element. Crucial for producer-consumer scenarios.
    • Implementations: ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue.

Executors (java.util.concurrent.Executor)

Manages a pool of threads, so you don't have to create and destroy threads manually. This is the recommended way to handle asynchronous tasks.

  • ExecutorService: The core interface.

  • Executors: A factory class for creating common types of executors.

    • FixedThreadPool: A pool with a fixed number of threads. Good for CPU-bound tasks.
      ExecutorService executor = Executors.newFixedThreadPool(10);
    • CachedThreadPool: A pool that creates new threads as needed and reuses old threads when they become available. Good for large numbers of short-lived tasks.
      ExecutorService executor = Executors.newCachedThreadPool();
    • ScheduledThreadPool: A pool for scheduling tasks to run after a given delay or periodically.
      ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
      scheduler.schedule(() -> System.out.println("Hello"), 1, TimeUnit.SECONDS);

    How to use:

    ExecutorService executor = Executors.newFixedThreadPool(4);
    // Submit a task that returns a result
    Future<String> future = executor.submit(() -> {
        Thread.sleep(1000);
        return "Task Complete";
    });
    // Submit a task that does not return a result (Runnable)
    executor.submit(() -> {
        System.out.println("Running a runnable task");
    });
    // Shutdown the executor gracefully
    executor.shutdown();
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }

Core Concepts & Challenges

A. The Memory Model & volatile

The Java Memory Model (JMM) defines how threads interact through memory. It's complex, but two key concepts are essential:

  1. Visibility: Changes made by one thread to a shared variable may not be visible to other threads immediately.
  2. Reordering: The compiler and CPU can reorder instructions for performance, as long as the result on a single thread is the same.

The volatile keyword solves the visibility problem.

  • What it does:

    • Guarantees that any write to a volatile variable is immediately flushed to main memory.
    • Guarantees that any read from a volatile variable is directly read from main memory.
    • Prevents certain types of instruction reordering around the volatile variable.
  • When to use:

    • For a status flag that is checked by multiple threads (e.g., private volatile boolean shutdownRequested;).
    • It is NOT a replacement for synchronization. It does not provide atomicity for compound operations (like count++).

B. The final Keyword and Thread Safety

Since Java 5, the JMM guarantees that the construction of an object is "safe publication" if the reference to the object is assigned to a volatile or final field, or stored in a concurrent collection.

This means that if an object is constructed safely, any thread that sees a reference to that object will see all of its final fields correctly initialized, without needing synchronization.

public class SafePublisher {
    private final SafeObject safeObject; // final ensures safe publication
    public SafePublisher() {
        this.safeObject = new SafeObject(); // Constructor is guaranteed to complete before the reference is published
    }
}
class SafeObject {
    final int data; // Guaranteed to be visible once the object reference is visible
}

C. Thread Safety

A class or method is thread-safe if it behaves correctly when accessed from multiple threads, regardless of the scheduling or interleaving of the execution of those threads by the runtime system.

Strategies for achieving thread safety:

  1. Immutability: Make objects immutable. Since their state can't change, they are inherently thread-safe.
  2. Thread Confinement: Keep data within a single thread. Don't share it across threads (e.g., using ThreadLocal).
  3. Synchronization: Use synchronized, locks, or concurrent collections to control access to shared state.
  4. Atomic Variables: Use java.util.concurrent.atomic for lock-free thread safety on single variables.

D. The "Happens-Before" Guarantee

The "happens-before" relationship is the foundation of the JMM. It's a set of rules that define when one memory action is guaranteed to be visible to another.

Key Rules:

  • Program Order: Actions in a thread happen before actions that follow it in that thread's code.
  • Monitor Lock Rule: An unlock on a monitor lock m happens-before a subsequent lock on m.
  • Volatile Variable Rule: A write to a volatile field v happens-before every subsequent read of v.
  • Thread Start Rule: threadA.start() happens-before any action in the started threadB.
  • Thread Termination Rule: Any action in a thread threadA happens-before any other action detects that threadA has terminated (e.g., via Thread.join()).

Advanced Topics

A. The Fork/Join Framework (Java 7)

A framework for "work-stealing" algorithms, designed to efficiently process large amounts of data by breaking it down into smaller tasks.

  • ForkJoinPool: A special ExecutorService for running ForkJoinTasks.
  • RecursiveTask<V>: A task that returns a result.
  • RecursiveAction: A task that does not return a result.
// Example: Summing an array
class SumTask extends RecursiveTask<Long> {
    private final long[] array;
    private final int start;
    private final int end;
    static final int THRESHOLD = 1000; // Task size to split at
    SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) sum += array[i];
            return sum;
        }
        int mid = (start + end) / 2;
        SumTask leftTask = new SumTask(array, start, mid);
        SumTask rightTask = new SumTask(array, mid, end);
        leftTask.fork(); // Asynchronously execute the left task
        long rightResult = rightTask.compute(); // Execute the right task
        long leftResult = leftTask.join();       // Wait for the left task and get its result
        return leftResult + rightResult;
    }
}

B. CompletableFuture (Java 8)

The modern way to write asynchronous, non-blocking code. It's a Future that can be manually completed and has a rich set of composition methods (thenApply, thenCompose, thenAccept, exceptionally, combine, etc.).

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Result of the asynchronous computation";
        });
        // Attach a callback to run when the future completes
        future.thenAccept(result -> System.out.println("Got result: " + result));
        // Or chain more computations
        CompletableFuture<Integer> lengthFuture = future.thenApply(String::length);
        System.out.println("Length of the result: " + lengthFuture.get());
    }
}

Best Practices

  1. Prefer java.util.concurrent over low-level primitives. Use ConcurrentHashMap, ExecutorService, AtomicInteger, etc., before reaching for synchronized or Lock.
  2. Minimize shared, mutable state. The easiest way to manage concurrency is to avoid it. Design your objects to be immutable or to not share state.
  3. Keep critical sections small. The less code you protect with a lock, the less contention there will be, and the better your performance will be.
  4. Use final for thread-safe publication. It's a simple and effective tool.
  5. Prefer ExecutorService over manual thread management. It handles thread lifecycle, resource management, and provides a clean API for task submission.
  6. Be careful with synchronized on non-private objects. Locking on this or a publicly accessible object can lead to deadlocks if other code also tries to lock on the same object.
  7. Always release locks in a finally block. This prevents deadlocks if an exception occurs within the critical section.
  8. Avoid calling unknown code from within a synchronized block or while holding a lock. The unknown code could do anything, including trying to acquire the same lock, leading to a deadlock.
  9. Document your thread-safety guarantees. Make it clear for other developers (and your future self) how a class is intended to be used in a concurrent environment.
分享:
扫描分享到社交APP
上一篇
下一篇