杰瑞科技汇

生产者消费者多线程如何实现线程安全?

核心问题与目标

问题场景: 想象一个场景,有一个共享的缓冲区(比如一个篮子、一个仓库),多个“生产者”线程负责往里面放产品,多个“消费者”线程负责从里面取产品。

生产者消费者多线程如何实现线程安全?-图1
(图片来源网络,侵删)

需要解决的问题

  1. 线程安全问题:生产者和消费者共同操作缓冲区,必须确保对缓冲区的修改是原子性的,不能出现“同时放”或“同时取”导致数据错乱的情况。
  2. 线程通信问题
    • 当缓冲区已满时,生产者线程不能再放产品,必须等待,直到消费者取走一个产品,通知它“有空间了”。
    • 当缓冲区为空时,消费者线程不能再取产品,必须等待,直到生产者放入一个产品,通知它“有货了”。
  3. 避免死锁和活锁:确保线程能够被正确唤醒,避免所有线程都永久等待下去。

目标: 实现一个高效、安全、正确的生产者-消费者模型。


解决方案演进

我们将从简单到复杂,介绍几种主流的解决方案,并分析它们的优缺点。

synchronized + wait() / notify() (基础方案)

这是最经典、最基础的实现方式,利用 synchronized 关键字保证互斥访问,利用 wait()notify()/notifyAll() 实现线程通信。

生产者消费者多线程如何实现线程安全?-图2
(图片来源网络,侵删)

核心思想

  1. synchronized:确保同一时间只有一个线程(无论是生产者还是消费者)能进入对缓冲区的操作代码块。
  2. wait():让当前线程释放锁,并进入等待状态,它必须在 synchronized 代码块内调用。
  3. notify() / notifyAll():唤醒一个或所有在同一个对象上等待的线程。notify() 是随机唤醒一个,notifyAll() 是唤醒所有,它也必须在 synchronized 代码块内调用。

代码实现

import java.util.LinkedList;
import java.util.Queue;
class Buffer {
    private final int capacity;
    private final Queue<Integer> queue;
    public Buffer(int capacity) {
        this.capacity = capacity;
        this.queue = new LinkedList<>();
    }
    // 生产者调用
    public void produce(int item) throws InterruptedException {
        synchronized (this) {
            // 缓冲区已满,生产者等待
            while (queue.size() == capacity) {
                System.out.println("缓冲区已满,生产者等待...");
                wait(); // 释放锁,进入等待状态
            }
            // 生产产品
            queue.add(item);
            System.out.println("生产: " + item + " | 当前缓冲区大小: " + queue.size());
            // 通知消费者(唤醒一个等待的消费者)
            notify(); // 或者用 notifyAll() 更安全
        }
    }
    // 消费者调用
    public int consume() throws InterruptedException {
        synchronized (this) {
            // 缓冲区为空,消费者等待
            while (queue.isEmpty()) {
                System.out.println("缓冲区为空,消费者等待...");
                wait(); // 释放锁,进入等待状态
            }
            // 消费产品
            int item = queue.poll();
            System.out.println("消费: " + item + " | 当前缓冲区大小: " + queue.size());
            // 通知生产者(唤醒一个等待的生产者)
            notify(); // 或者用 notifyAll() 更安全
            return item;
        }
    }
}
class Producer implements Runnable {
    private final Buffer buffer;
    public Producer(Buffer buffer) {
        this.buffer = buffer;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                buffer.produce(i);
                Thread.sleep((long) (Math.random() * 100)); // 模拟生产耗时
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
class Consumer implements Runnable {
    private final Buffer buffer;
    public Consumer(Buffer buffer) {
        this.buffer = buffer;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                buffer.consume();
                Thread.sleep((long) (Math.random() * 200)); // 模拟消费耗时
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
public class ProducerConsumerDemo {
    public static void main(String[] args) {
        Buffer buffer = new Buffer(2); // 缓冲区容量为2
        Thread producerThread = new Thread(new Producer(buffer));
        Thread consumerThread = new Thread(new Consumer(buffer));
        producerThread.start();
        consumerThread.start();
        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

为什么用 while 而不是 if 这是面试中的一个高频问题。if 只会检查一次等待条件,如果被 notify() 唤醒后,条件可能依然不满足(生产者被唤醒,但缓冲区还是满的),它会继续执行后面的代码,导致错误,而 while 循环会反复检查,只有当条件真正满足时(缓冲区不满/不空),线程才会继续执行,保证了逻辑的正确性。

缺点

生产者消费者多线程如何实现线程安全?-图3
(图片来源网络,侵删)
  • notify() 可能会唤醒错误的线程(比如唤醒了另一个生产者,但缓冲区已满),效率不高。notifyAll() 虽然能避免这个问题,但会唤醒所有线程,导致不必要的上下文切换,开销较大。
  • 代码需要手动管理锁和等待/通知,容易出错。

java.util.concurrent 包 (现代方案)

Java 并发包提供了更高级、更健壮的工具来解决这类问题,我们推荐在实际开发中使用它们。

A. 使用 BlockingQueue (最推荐)

BlockingQueue 是一个接口,它专门为在生产者-消费者场景中设计的队列,它在内部已经实现了所有必要的同步和线程通信逻辑,使用起来非常简单、安全。

核心方法

  • put(E e): 如果队列满了,阻塞直到有空间可用。
  • take(): 如果队列空了,阻塞直到有元素可用。
  • offer(E e): 尝试添加,成功返回 true,队列满则立即返回 false
  • poll(): 尝试取出,成功返回元素,队列空则立即返回 null

代码实现

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// Buffer 类被 BlockingQueue 替代
class ProducerWithBlockingQueue implements Runnable {
    private final BlockingQueue<Integer> queue;
    public ProducerWithBlockingQueue(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                System.out.println("生产: " + i);
                queue.put(i); // 如果队列满,这里会自动阻塞
                Thread.sleep((long) (Math.random() * 100));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
class ConsumerWithBlockingQueue implements Runnable {
    private final BlockingQueue<Integer> queue;
    public ConsumerWithBlockingQueue(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                System.out.println("消费: " + queue.take()); // 如果队列空,这里会自动阻塞
                Thread.sleep((long) (Math.random() * 200));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
public class ProducerConsumerWithBlockingQueue {
    public static void main(String[] args) {
        // 创建一个容量为2的阻塞队列
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
        Thread producerThread = new Thread(new ProducerWithBlockingQueue(queue));
        Thread consumerThread = new Thread(new ConsumerWithBlockingQueue(queue));
        producerThread.start();
        consumerThread.start();
        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

优点

  • 代码简洁:无需手动编写 synchronizedwait()notify()
  • 线程安全BlockingQueue 内部已经处理了所有并发问题。
  • 高效ArrayBlockingQueueLinkedBlockingQueue 等实现都针对高并发场景做了优化。
  • 功能强大:提供了多种操作模式(阻塞、超时、非阻塞等)。

B. 使用 Semaphore (信号量)

Semaphore 可以用来控制同时访问某个资源的线程数量,在这个场景中,我们可以用它来模拟缓冲区的“空位”和“商品”。

核心思想

  • Semaphore emptySlots: 代表空的缓冲区位置,初始值为缓冲区容量,生产者必须先获取一个 emptySlots(即找到一个空位),才能生产。
  • Semaphore fullSlots: 代表已填充的缓冲区位置,初始值为 0,消费者必须先获取一个 fullSlots(即找到一个有产品的位置),才能消费。

代码实现

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
class BufferWithSemaphore {
    private final Queue<Integer> queue;
    private final int capacity;
    private final Semaphore emptySlots;
    private final Semaphore fullSlots;
    private final Semaphore mutex; // 用于保护对队列本身的互斥访问
    public BufferWithSemaphore(int capacity) {
        this.capacity = capacity;
        this.queue = new LinkedList<>();
        this.emptySlots = new Semaphore(capacity);
        this.fullSlots = new Semaphore(0);
        this.mutex = new Semaphore(1); // 初始为1,表示只有一个线程能访问
    }
    public void produce(int item) throws InterruptedException {
        emptySlots.acquire(); // 获取一个空位,如果没空位则阻塞
        mutex.acquire();     // 获取锁,保护队列
        // 生产产品
        queue.add(item);
        System.out.println("生产: " + item + " | 当前缓冲区大小: " + queue.size());
        mutex.release();     // 释放锁
        fullSlots.release(); // 增加一个满位,通知消费者
    }
    public int consume() throws InterruptedException {
        fullSlots.acquire(); // 获取一个满位,如果没满位则阻塞
        mutex.acquire();     // 获取锁,保护队列
        // 消费产品
        int item = queue.poll();
        System.out.println("消费: " + item + " | 当前缓冲区大小: " + queue.size());
        mutex.release();     // 释放锁
        emptySlots.release(); // 增加一个空位,通知生产者
        return item;
    }
}
// Producer 和 Consumer 类与方案一类似,只需将 Buffer 替换为 BufferWithSemaphore
// ...

优点

  • 灵活性高,可以精确控制资源的数量。
  • 是学习并发控制底层原理的好例子。

缺点

  • 实现比 BlockingQueue 复杂,需要管理多个信号量和一个互斥锁,容易出错。

方案对比与总结

特性 synchronized + wait/notify BlockingQueue Semaphore
代码简洁性 较差,需要手动管理同步和通信 极佳,封装了所有复杂性 较差,需要管理多个信号量
线程安全性 需要正确实现,容易出错 ,内部已保证 需要正确实现,容易出错
易用性 极高
性能 一般,notifyAll开销大 ,针对并发优化 一般
适用场景 学习原理、面试、简单场景 生产环境首选 需要精细控制资源数量的复杂场景
  • 对于学习和面试:必须掌握 synchronized + wait() / notify() 的实现方式,并理解 while 循环的必要性。
  • 对于实际项目开发强烈推荐使用 BlockingQueue,它更安全、更简洁、性能更好,是 Java 并发包为生产者-消费者问题提供的标准解决方案。java.util.concurrent.ArrayBlockingQueuejava.util.concurrent.LinkedBlockingQueue 是最常用的两个实现。
分享:
扫描分享到社交APP
上一篇
下一篇