杰瑞科技汇

Python ftplib 多线程如何高效实现?

核心思想

ftplib 本身是同步的,这意味着当一个文件正在上传或下载时,当前线程会一直阻塞,直到传输完成,如果需要同时处理多个文件或多个 FTP 服务器的任务,单线程的效率会非常低。

Python ftplib 多线程如何高效实现?-图1
(图片来源网络,侵删)

多线程的核心思想是:将一个大的、耗时的任务(如下载一个大文件)分解成多个小的、可以并行执行的任务(如同时下载多个文件),从而充分利用 CPU 和网络资源,缩短总执行时间。


一个完整的示例:多线程下载文件

下面是一个完整的、可运行的示例,它演示了如何使用一个线程池来从 FTP 服务器上下载多个文件。

准备工作

确保你的 Python 环境已经安装。ftplib 是 Python 标准库的一部分,无需额外安装。

为了演示,你需要一个可访问的 FTP 服务器,这里我们使用一个公共的测试 FTP 服务器:

Python ftplib 多线程如何高效实现?-图2
(图片来源网络,侵删)
  • 主机: testftp.com
  • 用户名: test
  • 密码: test
  • 路径:

这个服务器上有一个 files 目录,里面有一些可以下载的测试文件。

代码实现

我们将创建一个 FTPDownloader 类来封装 FTP 操作和多线程逻辑。

import os
import ftplib
import threading
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
class FTPDownloader:
    def __init__(self, host, username, password, local_dir='downloads', max_workers=5):
        """
        初始化FTP下载器
        :param host: FTP服务器地址
        :param username: 用户名
        :param password: 密码
        :param local_dir: 本地保存目录
        :param max_workers: 最大线程数
        """
        self.host = host
        self.username = username
        self.password = password
        self.local_dir = local_dir
        self.max_workers = max_workers
        self.ftp_lock = threading.Lock()  # 用于同步FTP连接的锁
        self.ftp_client = None
        # 确保本地目录存在
        if not os.path.exists(self.local_dir):
            os.makedirs(self.local_dir)
            print(f"创建本地目录: {self.local_dir}")
    def _get_ftp_connection(self):
        """
        获取一个FTP连接(线程安全)
        """
        if self.ftp_client is None or not self.ftp_client.sock:
            with self.ftp_lock:
                # 双重检查,防止多个线程同时通过第一个检查
                if self.ftp_client is None or not self.ftp_client.sock:
                    print(f"正在连接到 FTP 服务器: {self.host}")
                    self.ftp_client = ftplib.FTP(self.host, self.username, self.password)
                    self.ftp_client.encoding = "utf-8" # 设置编码以支持中文文件名
        return self.ftp_client
    def _download_file(self, remote_path, local_path):
        """
        下载单个文件的核心函数
        :param remote_path: 远程文件路径
        :param local_path: 本地文件路径
        """
        try:
            ftp = self._get_ftp_connection()
            print(f"[{threading.current_thread().name}] 开始下载: {remote_path} -> {local_path}")
            with open(local_path, 'wb') as f:
                ftp.retrbinary(f'RETR {remote_path}', f.write)
            print(f"[{threading.current_thread().name}] 下载完成: {local_path}")
            return True
        except Exception as e:
            print(f"[{threading.current_thread().name}] 下载失败 {remote_path}: {e}")
            return False
    def download_files(self, remote_files):
        """
        使用线程池下载多个文件
        :param remote_files: 远程文件路径列表
        """
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []
            for remote_file in remote_files:
                # 构造本地文件路径
                local_filename = os.path.basename(remote_file)
                local_file_path = os.path.join(self.local_dir, local_filename)
                # 提交下载任务到线程池
                future = executor.submit(self._download_file, remote_file, local_file_path)
                futures.append(future)
            # (可选) 等待所有任务完成并检查结果
            for i, future in enumerate(futures):
                try:
                    future.result()  # 获取结果,如果任务有异常,这里会抛出
                except Exception as e:
                    print(f"任务 {i} 发生异常: {e}")
        print("\n所有文件下载任务已提交完成。")
    def close(self):
        """
        关闭FTP连接
        """
        if self.ftp_client:
            try:
                self.ftp_client.quit()
                print("FTP 连接已关闭。")
            except Exception as e:
                print(f"关闭FTP连接时出错: {e}")
# --- 主程序入口 ---
if __name__ == "__main__":
    # FTP 服务器信息
    FTP_HOST = 'testftp.com'
    FTP_USER = 'test'
    FTP_PASS = 'test'
    # 要下载的文件列表
    # 这些文件都位于 testftp.com 的 /files 目录下
    files_to_download = [
        '/files/file1.txt',
        '/files/file2.txt',
        '/files/file3.txt',
        '/files/file4.txt',
        '/files/file5.txt',
        '/files/file6.txt',
        '/files/file7.txt',
        '/files/file8.txt',
    ]
    # 创建下载器实例
    downloader = FTPDownloader(
        host=FTP_HOST,
        username=FTP_USER,
        password=FTP_PASS,
        local_dir='my_ftp_downloads',
        max_workers=4  # 同时进行4个下载任务
    )
    try:
        # 开始多线程下载
        downloader.download_files(files_to_download)
    finally:
        # 确保程序结束时关闭FTP连接
        downloader.close()

代码解析

FTPDownloader 类设计

  • __init__: 初始化 FTP 服务器的连接信息和多线程的配置。max_workers 控制线程池的大小。local_dir 指定下载文件存放的本地目录。ftp_lock 是一个关键点,我们稍后解释。
  • _get_ftp_connection: 获取 FTP 连接。
    • 为什么需要锁 (ftp_lock)? ftplib.FTP 对象本身不是线程安全的,如果你在一个线程中调用 ftp.retrbinary(),同时在另一个线程中调用 ftp.cwd(),可能会导致数据混乱或连接断开。
    • 解决方案: 我们使用 threading.Lock 来确保在任何时刻,只有一个线程可以创建或使用 ftplib.FTP 实例,当一个线程正在使用 FTP 连接时,其他需要连接的线程必须等待,这是一种简单有效的同步方式。
    • 优化: _get_ftp_connection 实现了“双重检查锁定”模式,避免每次获取连接都去获取锁,提高了效率。
  • _download_file: 这是每个线程实际执行的任务。
    • 它接收远程和本地路径作为参数。
    • 调用 _get_ftp_connection() 安全地获取一个 FTP 连接。
    • 使用 ftplib.FTP.retrbinary() 方法下载文件,这是二进制模式下载,适用于所有类型的文件。
    • 使用 with open(...) 确保文件被正确关闭。
    • 包含了详细的日志和错误处理。
  • download_files: 这是任务的分发器。
    • 它使用 concurrent.futures.ThreadPoolExecutor 来管理一个线程池。
    • 遍历 remote_files 列表,为每个文件调用 executor.submit(),将 _download_file 任务提交到线程池中。
    • executor.submit() 会立即返回一个 Future 对象,代表这个异步任务,主线程不会等待任务完成,而是继续执行。
    • 循环结束后,所有任务都已排队等待执行。
  • close: 在程序结束时,显式地调用 ftp_client.quit() 关闭 FTP 连接,释放资源。

多线程库的选择

我们使用了 concurrent.futures.ThreadPoolExecutor,这是现代 Python 中管理线程池的推荐方式,比手动创建和管理 threading.Thread 对象更简洁、更强大。

  • max_workers: 你可以根据你的网络带宽和 FTP 服务器的承受能力来调整这个值,值太小,并发度不够;值太大,可能会给服务器带来过大压力,或者本地因创建过多线程而导致性能下降(线程切换开销)。

多线程上传

上传的逻辑与下载非常相似,只需要将 retrbinary 方法替换为 storbinary 方法即可。

下面是 _upload_file 方法的示例:

def _upload_file(self, local_path, remote_path):
    """
    上传单个文件的核心函数
    :param local_path: 本地文件路径
    :param remote_path: 远程文件路径
    """
    try:
        ftp = self._get_ftp_connection()
        print(f"[{threading.current_thread().name}] 开始上传: {local_path} -> {remote_path}")
        with open(local_path, 'rb') as f:
            # 使用 storbinary 进行二进制上传
            ftp.storbinary(f'STOR {remote_path}', f)
        print(f"[{threading.current_thread().name}] 上传完成: {remote_path}")
        return True
    except Exception as e:
        print(f"[{threading.current_thread().name}] 上传失败 {local_path}: {e}")
        return False

你只需要在 upload_files 方法中遍历本地文件列表,并为每个文件调用 executor.submit(self._upload_file, ...) 即可。


注意事项和最佳实践

  1. FTP 连接是瓶颈: 如前所述,ftplib 对象不是线程安全的,使用锁来同步连接是最简单的做法,但这也意味着所有文件传输任务实际上是串行地通过这一个连接执行的,对于 I/O 密集型任务(如网络传输),这通常是可以接受的,因为大部分时间线程都在等待网络 I/O,锁的争用不激烈,但如果你的应用需要极高的并发,可能需要为每个线程创建独立的 FTP 连接(但这会增加服务器的负担)。

  2. 错误处理: 务必在每个线程任务中加入 try...except 块,确保一个任务的失败不会影响其他任务的执行。

  3. 资源管理: 使用 try...finally 或上下文管理器(如 with 语句)来确保文件句柄和 FTP 连接被正确关闭和释放。

  4. 进度显示: retrbinarystorbinary 的回调函数可以用来显示传输进度,在多线程环境中,打印进度信息时最好也加上线程名,以便区分。

    # 下载进度回调示例
    def show_progress(block_num, block_size, total_size):
        downloaded = block_num * block_size
        if total_size > 0:
            percent = (downloaded / total_size) * 100
            print(f"\r[{threading.current_thread().name}] 进度: {percent:.2f}%", end="")
        else:
            print(f"\r[{threading.current_thread().name]} 已下载: {downloaded / (1024*1024):.2f} MB", end="")
    # 在 _download_file 中使用
    # ftp.retrbinary(f'RETR {remote_path}', lambda b: f.write(b), callback=show_progress)
  5. 被动模式 (Passive Mode): 很多 FTP 服务器(尤其是位于防火墙后的)需要使用被动模式才能成功连接和传输数据,你可以在连接后主动设置:

    ftp = self._get_ftp_connection()
    ftp.set_pasv(True) # 默认就是True,但显式设置更清晰

这个示例为你提供了一个坚实的基础,你可以根据实际需求进行修改和扩展,例如增加断点续传、更复杂的进度条或更精细的错误处理逻辑。

分享:
扫描分享到社交APP
上一篇
下一篇