核心问题与目标
问题场景: 想象一个场景,有一个共享的缓冲区(比如一个篮子、一个仓库),多个“生产者”线程负责往里面放产品,多个“消费者”线程负责从里面取产品。

需要解决的问题:
- 线程安全问题:生产者和消费者共同操作缓冲区,必须确保对缓冲区的修改是原子性的,不能出现“同时放”或“同时取”导致数据错乱的情况。
- 线程通信问题:
- 当缓冲区已满时,生产者线程不能再放产品,必须等待,直到消费者取走一个产品,通知它“有空间了”。
- 当缓冲区为空时,消费者线程不能再取产品,必须等待,直到生产者放入一个产品,通知它“有货了”。
- 避免死锁和活锁:确保线程能够被正确唤醒,避免所有线程都永久等待下去。
目标: 实现一个高效、安全、正确的生产者-消费者模型。
解决方案演进
我们将从简单到复杂,介绍几种主流的解决方案,并分析它们的优缺点。
synchronized + wait() / notify() (基础方案)
这是最经典、最基础的实现方式,利用 synchronized 关键字保证互斥访问,利用 wait() 和 notify()/notifyAll() 实现线程通信。

核心思想:
synchronized:确保同一时间只有一个线程(无论是生产者还是消费者)能进入对缓冲区的操作代码块。wait():让当前线程释放锁,并进入等待状态,它必须在synchronized代码块内调用。notify()/notifyAll():唤醒一个或所有在同一个对象上等待的线程。notify()是随机唤醒一个,notifyAll()是唤醒所有,它也必须在synchronized代码块内调用。
代码实现:
import java.util.LinkedList;
import java.util.Queue;
class Buffer {
private final int capacity;
private final Queue<Integer> queue;
public Buffer(int capacity) {
this.capacity = capacity;
this.queue = new LinkedList<>();
}
// 生产者调用
public void produce(int item) throws InterruptedException {
synchronized (this) {
// 缓冲区已满,生产者等待
while (queue.size() == capacity) {
System.out.println("缓冲区已满,生产者等待...");
wait(); // 释放锁,进入等待状态
}
// 生产产品
queue.add(item);
System.out.println("生产: " + item + " | 当前缓冲区大小: " + queue.size());
// 通知消费者(唤醒一个等待的消费者)
notify(); // 或者用 notifyAll() 更安全
}
}
// 消费者调用
public int consume() throws InterruptedException {
synchronized (this) {
// 缓冲区为空,消费者等待
while (queue.isEmpty()) {
System.out.println("缓冲区为空,消费者等待...");
wait(); // 释放锁,进入等待状态
}
// 消费产品
int item = queue.poll();
System.out.println("消费: " + item + " | 当前缓冲区大小: " + queue.size());
// 通知生产者(唤醒一个等待的生产者)
notify(); // 或者用 notifyAll() 更安全
return item;
}
}
}
class Producer implements Runnable {
private final Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
buffer.produce(i);
Thread.sleep((long) (Math.random() * 100)); // 模拟生产耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
buffer.consume();
Thread.sleep((long) (Math.random() * 200)); // 模拟消费耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerDemo {
public static void main(String[] args) {
Buffer buffer = new Buffer(2); // 缓冲区容量为2
Thread producerThread = new Thread(new Producer(buffer));
Thread consumerThread = new Thread(new Consumer(buffer));
producerThread.start();
consumerThread.start();
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
为什么用 while 而不是 if?
这是面试中的一个高频问题。if 只会检查一次等待条件,如果被 notify() 唤醒后,条件可能依然不满足(生产者被唤醒,但缓冲区还是满的),它会继续执行后面的代码,导致错误,而 while 循环会反复检查,只有当条件真正满足时(缓冲区不满/不空),线程才会继续执行,保证了逻辑的正确性。
缺点:

notify()可能会唤醒错误的线程(比如唤醒了另一个生产者,但缓冲区已满),效率不高。notifyAll()虽然能避免这个问题,但会唤醒所有线程,导致不必要的上下文切换,开销较大。- 代码需要手动管理锁和等待/通知,容易出错。
java.util.concurrent 包 (现代方案)
Java 并发包提供了更高级、更健壮的工具来解决这类问题,我们推荐在实际开发中使用它们。
A. 使用 BlockingQueue (最推荐)
BlockingQueue 是一个接口,它专门为在生产者-消费者场景中设计的队列,它在内部已经实现了所有必要的同步和线程通信逻辑,使用起来非常简单、安全。
核心方法:
put(E e): 如果队列满了,阻塞直到有空间可用。take(): 如果队列空了,阻塞直到有元素可用。offer(E e): 尝试添加,成功返回true,队列满则立即返回false。poll(): 尝试取出,成功返回元素,队列空则立即返回null。
代码实现:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// Buffer 类被 BlockingQueue 替代
class ProducerWithBlockingQueue implements Runnable {
private final BlockingQueue<Integer> queue;
public ProducerWithBlockingQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("生产: " + i);
queue.put(i); // 如果队列满,这里会自动阻塞
Thread.sleep((long) (Math.random() * 100));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class ConsumerWithBlockingQueue implements Runnable {
private final BlockingQueue<Integer> queue;
public ConsumerWithBlockingQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("消费: " + queue.take()); // 如果队列空,这里会自动阻塞
Thread.sleep((long) (Math.random() * 200));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerWithBlockingQueue {
public static void main(String[] args) {
// 创建一个容量为2的阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
Thread producerThread = new Thread(new ProducerWithBlockingQueue(queue));
Thread consumerThread = new Thread(new ConsumerWithBlockingQueue(queue));
producerThread.start();
consumerThread.start();
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
优点:
- 代码简洁:无需手动编写
synchronized、wait()、notify()。 - 线程安全:
BlockingQueue内部已经处理了所有并发问题。 - 高效:
ArrayBlockingQueue和LinkedBlockingQueue等实现都针对高并发场景做了优化。 - 功能强大:提供了多种操作模式(阻塞、超时、非阻塞等)。
B. 使用 Semaphore (信号量)
Semaphore 可以用来控制同时访问某个资源的线程数量,在这个场景中,我们可以用它来模拟缓冲区的“空位”和“商品”。
核心思想:
Semaphore emptySlots: 代表空的缓冲区位置,初始值为缓冲区容量,生产者必须先获取一个emptySlots(即找到一个空位),才能生产。Semaphore fullSlots: 代表已填充的缓冲区位置,初始值为 0,消费者必须先获取一个fullSlots(即找到一个有产品的位置),才能消费。
代码实现:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
class BufferWithSemaphore {
private final Queue<Integer> queue;
private final int capacity;
private final Semaphore emptySlots;
private final Semaphore fullSlots;
private final Semaphore mutex; // 用于保护对队列本身的互斥访问
public BufferWithSemaphore(int capacity) {
this.capacity = capacity;
this.queue = new LinkedList<>();
this.emptySlots = new Semaphore(capacity);
this.fullSlots = new Semaphore(0);
this.mutex = new Semaphore(1); // 初始为1,表示只有一个线程能访问
}
public void produce(int item) throws InterruptedException {
emptySlots.acquire(); // 获取一个空位,如果没空位则阻塞
mutex.acquire(); // 获取锁,保护队列
// 生产产品
queue.add(item);
System.out.println("生产: " + item + " | 当前缓冲区大小: " + queue.size());
mutex.release(); // 释放锁
fullSlots.release(); // 增加一个满位,通知消费者
}
public int consume() throws InterruptedException {
fullSlots.acquire(); // 获取一个满位,如果没满位则阻塞
mutex.acquire(); // 获取锁,保护队列
// 消费产品
int item = queue.poll();
System.out.println("消费: " + item + " | 当前缓冲区大小: " + queue.size());
mutex.release(); // 释放锁
emptySlots.release(); // 增加一个空位,通知生产者
return item;
}
}
// Producer 和 Consumer 类与方案一类似,只需将 Buffer 替换为 BufferWithSemaphore
// ...
优点:
- 灵活性高,可以精确控制资源的数量。
- 是学习并发控制底层原理的好例子。
缺点:
- 实现比
BlockingQueue复杂,需要管理多个信号量和一个互斥锁,容易出错。
方案对比与总结
| 特性 | synchronized + wait/notify |
BlockingQueue |
Semaphore |
|---|---|---|---|
| 代码简洁性 | 较差,需要手动管理同步和通信 | 极佳,封装了所有复杂性 | 较差,需要管理多个信号量 |
| 线程安全性 | 需要正确实现,容易出错 | 高,内部已保证 | 需要正确实现,容易出错 |
| 易用性 | 低 | 极高 | 中 |
| 性能 | 一般,notifyAll开销大 |
高,针对并发优化 | 一般 |
| 适用场景 | 学习原理、面试、简单场景 | 生产环境首选 | 需要精细控制资源数量的复杂场景 |
- 对于学习和面试:必须掌握
synchronized+wait()/notify()的实现方式,并理解while循环的必要性。 - 对于实际项目开发:强烈推荐使用
BlockingQueue,它更安全、更简洁、性能更好,是 Java 并发包为生产者-消费者问题提供的标准解决方案。java.util.concurrent.ArrayBlockingQueue和java.util.concurrent.LinkedBlockingQueue是最常用的两个实现。
