杰瑞科技汇

Python paramiko如何实现文件下载?

环境准备

你需要安装 paramiko 库,如果尚未安装,请打开你的终端或命令行工具,运行以下命令:

Python paramiko如何实现文件下载?-图1
(图片来源网络,侵删)
pip install paramiko

核心概念与步骤

使用 paramiko 下载文件主要通过 SFTP 客户端实现,基本步骤如下:

  1. 创建 SSH 客户端: paramiko.SSHClient()
  2. 加载主机密钥: 为了防止中间人攻击,需要验证服务器的主机密钥。set_missing_host_key_policy 是一个方便的做法,它会自动添加未知的主机密钥。
  3. 连接到服务器: 使用 connect() 方法,提供主机名、用户名、密码或私钥。
  4. 打开 SFTP 会话: 在已建立的 SSH 连接上,调用 open_sftp() 方法获取一个 SFTPClient 对象。
  5. 执行下载操作: 使用 SFTPClient.get() 方法将远程文件下载到本地。
  6. 关闭连接: 操作完成后,依次关闭 SFTP 会话和 SSH 连接。

基础示例:使用密码下载文件

这是最简单、最常见的情况。

import paramiko
import os
# --- 配置信息 ---
# 远程服务器信息
hostname = 'your_server_ip'
port = 22
username = 'your_username'
password = 'your_password'
# 远程和本地路径
remote_path = '/path/to/remote/file.txt'
local_path = '/path/to/local/downloaded_file.txt'
# --- 创建SSH客户端 ---
try:
    # 1. 创建SSHClient实例
    ssh = paramiko.SSHClient()
    # 2. 自动添加服务器的主机密钥(不安全,仅用于测试)
    # 在生产环境中,应该使用load_host_keys()来加载已知的主机密钥
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 3. 连接服务器
    print(f"正在连接到 {hostname}...")
    ssh.connect(hostname, port=port, username=username, password=password)
    print("连接成功!")
    # 4. 打开SFTP会话
    sftp = ssh.open_sftp()
    print("SFTP会话已打开。")
    # 5. 下载文件
    # get(remote_path, local_path)
    print(f"正在从 {remote_path} 下载文件到 {local_path}...")
    sftp.get(remote_path, local_path)
    print("文件下载完成!")
except paramiko.AuthenticationException:
    print("认证失败,请检查用户名和密码。")
except paramiko.SSHException as e:
    print(f"SSH连接或操作出错: {e}")
except Exception as e:
    print(f"发生未知错误: {e}")
finally:
    # 6. 关闭连接
    if 'sftp' in locals() and sftp:
        sftp.close()
        print("SFTP会话已关闭。")
    if 'ssh' in locals() and ssh:
        ssh.close()
        print("SSH连接已关闭。")

高级示例:使用 SSH 私钥下载文件

在生产环境中,使用 SSH 密钥对进行认证比密码更安全。

假设你的私钥文件是 ~/.ssh/id_rsa

Python paramiko如何实现文件下载?-图2
(图片来源网络,侵删)
import paramiko
import os
# --- 配置信息 ---
hostname = 'your_server_ip'
port = 22
username = 'your_username'
private_key_path = os.path.expanduser('~/.ssh/id_rsa') # 使用os.path处理路径更健壮
# 远程和本地路径
remote_path = '/path/to/remote/file.txt'
local_path = '/path/to/local/downloaded_file.txt'
try:
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 使用私钥进行连接
    # 如果你的私钥有密码,需要提供 passphrase
    # ssh.connect(hostname, port=port, username=username, key_filename=private_key_path, passphrase='your_key_passphrase')
    print(f"正在使用私钥连接到 {hostname}...")
    ssh.connect(hostname, port=port, username=username, key_filename=private_key_path)
    print("连接成功!")
    sftp = ssh.open_sftp()
    print(f"正在从 {remote_path} 下载文件到 {local_path}...")
    sftp.get(remote_path, local_path)
    print("文件下载完成!")
except paramiko.AuthenticationException:
    print("认证失败,请检查私钥或用户名。")
except paramiko.SSHException as e:
    print(f"SSH连接或操作出错: {e}")
except FileNotFoundError:
    print(f"错误:私钥文件未找到于 {private_key_path}")
except Exception as e:
    print(f"发生未知错误: {e}")
finally:
    if 'sftp' in locals() and sftp:
        sftp.close()
    if 'ssh' in locals() and ssh:
        ssh.close()

处理不同场景

1 下载整个目录

paramikoSFTPClient 没有直接提供 get_r (recursive get) 这样的方法,但我们可以通过结合 ossftp.listdir_attr 来实现递归下载。

import paramiko
import os
def download_directory(sftp, remote_dir, local_dir):
    """
    递归下载整个目录
    """
    # 确保本地目录存在
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)
    # 遍历远程目录
    for item in sftp.listdir_attr(remote_dir):
        remote_item_path = os.path.join(remote_dir, item.filename)
        local_item_path = os.path.join(local_dir, item.filename)
        if item.st_mode & 0o170000 == 0o040000:  # 是目录 (S_IFDIR)
            print(f"正在进入目录: {remote_item_path}")
            download_directory(sftp, remote_item_path, local_item_path)
        else:  # 是文件
            print(f"正在下载文件: {remote_item_path} -> {local_item_path}")
            sftp.get(remote_item_path, local_item_path)
# --- 使用示例 ---
hostname = 'your_server_ip'
username = 'your_username'
password = 'your_password'
remote_dir = '/path/to/remote/directory'
local_dir = '/path/to/local/downloaded_directory'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname, username=username, password=password)
sftp = ssh.open_sftp()
try:
    download_directory(sftp, remote_dir, local_dir)
    print("整个目录下载完成!")
finally:
    sftp.close()
    ssh.close()

2 显示下载进度

sftp.get() 方法本身不支持回调函数来显示进度,我们可以通过先获取文件大小,然后分块下载并手动计算进度来实现。

import paramiko
import os
def download_with_progress(sftp, remote_path, local_path):
    """
    带进度显示的文件下载
    """
    # 获取远程文件大小
    remote_size = sftp.stat(remote_path).st_size
    print(f"文件总大小: {remote_size / (1024*1024):.2f} MB")
    # 断点续传模式
    # 检查本地文件是否存在,并获取其大小
    local_size = 0
    if os.path.exists(local_path):
        local_size = os.path.getsize(local_path)
        print(f"发现本地已下载文件: {local_size / (1024*1024):.2f} MB")
    # 定义回调函数
    def progress_callback(transferred, total):
        # transferred 是本次传输的字节数
        # total 是总字节数
        # 我们需要计算累计传输的字节
        transferred += local_size
        percent = transferred / total * 100
        print(f"\r下载进度: {percent:.2f}% ({transferred}/{total})", end='')
    # 使用 get 方法进行下载
    # 如果是续传,需要指定续传的起始位置
    if local_size > 0:
        # sftp.get 不直接支持续传,需要更复杂的处理,这里简化为重新下载
        # 实际续传需要使用 sftp.open 并配合 seek
        print("\n检测到本地已有文件,将重新下载以简化示例。")
    print(f"开始下载 {remote_path}...")
    sftp.get(remote_path, local_path, callback=progress_callback)
    print("\n下载完成!")
# --- 使用示例 ---
hostname = 'your_server_ip'
username = 'your_username'
password = 'your_password'
remote_path = '/path/to/large/file.zip'
local_path = '/path/to/local/large_file.zip'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname, username=username, password=password)
sftp = ssh.open_sftp()
try:
    download_with_progress(sftp, remote_path, local_path)
finally:
    sftp.close()
    ssh.close()

完整可复用的示例类

为了方便在项目中复用,你可以将上述功能封装成一个类。

import paramiko
import os
class RemoteFileDownloader:
    def __init__(self, hostname, username, password=None, port=22, private_key_path=None):
        self.hostname = hostname
        self.username = username
        self.password = password
        self.port = port
        self.private_key_path = private_key_path
        self.ssh = None
        self.sftp = None
    def connect(self):
        """建立SSH和SFTP连接"""
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        if self.private_key_path:
            self.ssh.connect(
                self.hostname, 
                port=self.port, 
                username=self.username, 
                key_filename=self.private_key_path,
                passphrase=None # 如果私钥有密码,在这里传入
            )
        else:
            self.ssh.connect(
                self.hostname, 
                port=self.port, 
                username=self.username, 
                password=self.password
            )
        self.sftp = self.ssh.open_sftp()
        print("连接已建立。")
    def download_file(self, remote_path, local_path):
        """下载单个文件"""
        if not self.sftp:
            raise ConnectionError("请先调用 connect() 方法建立连接。")
        print(f"正在下载 {remote_path} 到 {local_path}...")
        self.sftp.get(remote_path, local_path)
        print("文件下载完成。")
    def download_dir(self, remote_dir, local_dir):
        """递归下载整个目录"""
        if not self.sftp:
            raise ConnectionError("请先调用 connect() 方法建立连接。")
        def _download_dir(remote_path, local_path):
            if not os.path.exists(local_path):
                os.makedirs(local_path)
            for item in self.sftp.listdir_attr(remote_path):
                r_item = os.path.join(remote_path, item.filename)
                l_item = os.path.join(local_path, item.filename)
                if item.st_mode & 0o170000 == 0o040000: # 目录
                    _download_dir(r_item, l_item)
                else: # 文件
                    print(f"正在下载文件: {r_item} -> {l_item}")
                    self.sftp.get(r_item, l_item)
        print(f"正在下载目录 {remote_dir} 到 {local_dir}...")
        _download_dir(remote_dir, local_dir)
        print("目录下载完成。")
    def close(self):
        """关闭连接"""
        if self.sftp:
            self.sftp.close()
        if self.ssh:
            self.ssh.close()
        print("连接已关闭。")
# --- 如何使用这个类 ---
if __name__ == '__main__':
    # 配置你的信息
    HOSTNAME = 'your_server_ip'
    USERNAME = 'your_username'
    PASSWORD = 'your_password' # 或者使用 PRIVATE_KEY_PATH
    # 创建下载器实例
    downloader = RemoteFileDownloader(
        hostname=HOSTNAME,
        username=USERNAME,
        password=PASSWORD
        # private_key_path=os.path.expanduser('~/.ssh/id_rsa')
    )
    try:
        # 连接
        downloader.connect()
        # 下载单个文件
        downloader.download_file(
            remote_path='/etc/hosts',
            local_path='./downloaded_hosts'
        )
        # 下载整个目录
        downloader.download_dir(
            remote_dir='/tmp/remote_test_dir',
            local_dir='./downloaded_test_dir'
        )
    except Exception as e:
        print(f"发生错误: {e}")
    finally:
        # 确保连接被关闭
        downloader.close()
  • 基础操作:核心是 SSHClient.connect()SFTPClient.get()
  • 认证方式:支持密码和私钥,私钥更安全。
  • 目录下载:需要手动实现递归逻辑。
  • 进度显示:通过 sftp.get()callback 参数实现,但需要自己计算累计进度。
  • 最佳实践:将功能封装成类,可以极大地提高代码的可读性和复用性。

希望这个详细的指南能帮助你顺利地在 Python 中使用 paramiko 下载文件!

Python paramiko如何实现文件下载?-图3
(图片来源网络,侵删)
分享:
扫描分享到社交APP
上一篇
下一篇