环境准备
你需要安装 paramiko 库,如果尚未安装,请打开你的终端或命令行工具,运行以下命令:

pip install paramiko
核心概念与步骤
使用 paramiko 下载文件主要通过 SFTP 客户端实现,基本步骤如下:
- 创建 SSH 客户端:
paramiko.SSHClient()。 - 加载主机密钥: 为了防止中间人攻击,需要验证服务器的主机密钥。
set_missing_host_key_policy是一个方便的做法,它会自动添加未知的主机密钥。 - 连接到服务器: 使用
connect()方法,提供主机名、用户名、密码或私钥。 - 打开 SFTP 会话: 在已建立的 SSH 连接上,调用
open_sftp()方法获取一个SFTPClient对象。 - 执行下载操作: 使用
SFTPClient.get()方法将远程文件下载到本地。 - 关闭连接: 操作完成后,依次关闭 SFTP 会话和 SSH 连接。
基础示例:使用密码下载文件
这是最简单、最常见的情况。
import paramiko
import os
# --- 配置信息 ---
# 远程服务器信息
hostname = 'your_server_ip'
port = 22
username = 'your_username'
password = 'your_password'
# 远程和本地路径
remote_path = '/path/to/remote/file.txt'
local_path = '/path/to/local/downloaded_file.txt'
# --- 创建SSH客户端 ---
try:
# 1. 创建SSHClient实例
ssh = paramiko.SSHClient()
# 2. 自动添加服务器的主机密钥(不安全,仅用于测试)
# 在生产环境中,应该使用load_host_keys()来加载已知的主机密钥
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 3. 连接服务器
print(f"正在连接到 {hostname}...")
ssh.connect(hostname, port=port, username=username, password=password)
print("连接成功!")
# 4. 打开SFTP会话
sftp = ssh.open_sftp()
print("SFTP会话已打开。")
# 5. 下载文件
# get(remote_path, local_path)
print(f"正在从 {remote_path} 下载文件到 {local_path}...")
sftp.get(remote_path, local_path)
print("文件下载完成!")
except paramiko.AuthenticationException:
print("认证失败,请检查用户名和密码。")
except paramiko.SSHException as e:
print(f"SSH连接或操作出错: {e}")
except Exception as e:
print(f"发生未知错误: {e}")
finally:
# 6. 关闭连接
if 'sftp' in locals() and sftp:
sftp.close()
print("SFTP会话已关闭。")
if 'ssh' in locals() and ssh:
ssh.close()
print("SSH连接已关闭。")
高级示例:使用 SSH 私钥下载文件
在生产环境中,使用 SSH 密钥对进行认证比密码更安全。
假设你的私钥文件是 ~/.ssh/id_rsa。

import paramiko
import os
# --- 配置信息 ---
hostname = 'your_server_ip'
port = 22
username = 'your_username'
private_key_path = os.path.expanduser('~/.ssh/id_rsa') # 使用os.path处理路径更健壮
# 远程和本地路径
remote_path = '/path/to/remote/file.txt'
local_path = '/path/to/local/downloaded_file.txt'
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 使用私钥进行连接
# 如果你的私钥有密码,需要提供 passphrase
# ssh.connect(hostname, port=port, username=username, key_filename=private_key_path, passphrase='your_key_passphrase')
print(f"正在使用私钥连接到 {hostname}...")
ssh.connect(hostname, port=port, username=username, key_filename=private_key_path)
print("连接成功!")
sftp = ssh.open_sftp()
print(f"正在从 {remote_path} 下载文件到 {local_path}...")
sftp.get(remote_path, local_path)
print("文件下载完成!")
except paramiko.AuthenticationException:
print("认证失败,请检查私钥或用户名。")
except paramiko.SSHException as e:
print(f"SSH连接或操作出错: {e}")
except FileNotFoundError:
print(f"错误:私钥文件未找到于 {private_key_path}")
except Exception as e:
print(f"发生未知错误: {e}")
finally:
if 'sftp' in locals() and sftp:
sftp.close()
if 'ssh' in locals() and ssh:
ssh.close()
处理不同场景
1 下载整个目录
paramiko 的 SFTPClient 没有直接提供 get_r (recursive get) 这样的方法,但我们可以通过结合 os 和 sftp.listdir_attr 来实现递归下载。
import paramiko
import os
def download_directory(sftp, remote_dir, local_dir):
"""
递归下载整个目录
"""
# 确保本地目录存在
if not os.path.exists(local_dir):
os.makedirs(local_dir)
# 遍历远程目录
for item in sftp.listdir_attr(remote_dir):
remote_item_path = os.path.join(remote_dir, item.filename)
local_item_path = os.path.join(local_dir, item.filename)
if item.st_mode & 0o170000 == 0o040000: # 是目录 (S_IFDIR)
print(f"正在进入目录: {remote_item_path}")
download_directory(sftp, remote_item_path, local_item_path)
else: # 是文件
print(f"正在下载文件: {remote_item_path} -> {local_item_path}")
sftp.get(remote_item_path, local_item_path)
# --- 使用示例 ---
hostname = 'your_server_ip'
username = 'your_username'
password = 'your_password'
remote_dir = '/path/to/remote/directory'
local_dir = '/path/to/local/downloaded_directory'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname, username=username, password=password)
sftp = ssh.open_sftp()
try:
download_directory(sftp, remote_dir, local_dir)
print("整个目录下载完成!")
finally:
sftp.close()
ssh.close()
2 显示下载进度
sftp.get() 方法本身不支持回调函数来显示进度,我们可以通过先获取文件大小,然后分块下载并手动计算进度来实现。
import paramiko
import os
def download_with_progress(sftp, remote_path, local_path):
"""
带进度显示的文件下载
"""
# 获取远程文件大小
remote_size = sftp.stat(remote_path).st_size
print(f"文件总大小: {remote_size / (1024*1024):.2f} MB")
# 断点续传模式
# 检查本地文件是否存在,并获取其大小
local_size = 0
if os.path.exists(local_path):
local_size = os.path.getsize(local_path)
print(f"发现本地已下载文件: {local_size / (1024*1024):.2f} MB")
# 定义回调函数
def progress_callback(transferred, total):
# transferred 是本次传输的字节数
# total 是总字节数
# 我们需要计算累计传输的字节
transferred += local_size
percent = transferred / total * 100
print(f"\r下载进度: {percent:.2f}% ({transferred}/{total})", end='')
# 使用 get 方法进行下载
# 如果是续传,需要指定续传的起始位置
if local_size > 0:
# sftp.get 不直接支持续传,需要更复杂的处理,这里简化为重新下载
# 实际续传需要使用 sftp.open 并配合 seek
print("\n检测到本地已有文件,将重新下载以简化示例。")
print(f"开始下载 {remote_path}...")
sftp.get(remote_path, local_path, callback=progress_callback)
print("\n下载完成!")
# --- 使用示例 ---
hostname = 'your_server_ip'
username = 'your_username'
password = 'your_password'
remote_path = '/path/to/large/file.zip'
local_path = '/path/to/local/large_file.zip'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname, username=username, password=password)
sftp = ssh.open_sftp()
try:
download_with_progress(sftp, remote_path, local_path)
finally:
sftp.close()
ssh.close()
完整可复用的示例类
为了方便在项目中复用,你可以将上述功能封装成一个类。
import paramiko
import os
class RemoteFileDownloader:
def __init__(self, hostname, username, password=None, port=22, private_key_path=None):
self.hostname = hostname
self.username = username
self.password = password
self.port = port
self.private_key_path = private_key_path
self.ssh = None
self.sftp = None
def connect(self):
"""建立SSH和SFTP连接"""
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.private_key_path:
self.ssh.connect(
self.hostname,
port=self.port,
username=self.username,
key_filename=self.private_key_path,
passphrase=None # 如果私钥有密码,在这里传入
)
else:
self.ssh.connect(
self.hostname,
port=self.port,
username=self.username,
password=self.password
)
self.sftp = self.ssh.open_sftp()
print("连接已建立。")
def download_file(self, remote_path, local_path):
"""下载单个文件"""
if not self.sftp:
raise ConnectionError("请先调用 connect() 方法建立连接。")
print(f"正在下载 {remote_path} 到 {local_path}...")
self.sftp.get(remote_path, local_path)
print("文件下载完成。")
def download_dir(self, remote_dir, local_dir):
"""递归下载整个目录"""
if not self.sftp:
raise ConnectionError("请先调用 connect() 方法建立连接。")
def _download_dir(remote_path, local_path):
if not os.path.exists(local_path):
os.makedirs(local_path)
for item in self.sftp.listdir_attr(remote_path):
r_item = os.path.join(remote_path, item.filename)
l_item = os.path.join(local_path, item.filename)
if item.st_mode & 0o170000 == 0o040000: # 目录
_download_dir(r_item, l_item)
else: # 文件
print(f"正在下载文件: {r_item} -> {l_item}")
self.sftp.get(r_item, l_item)
print(f"正在下载目录 {remote_dir} 到 {local_dir}...")
_download_dir(remote_dir, local_dir)
print("目录下载完成。")
def close(self):
"""关闭连接"""
if self.sftp:
self.sftp.close()
if self.ssh:
self.ssh.close()
print("连接已关闭。")
# --- 如何使用这个类 ---
if __name__ == '__main__':
# 配置你的信息
HOSTNAME = 'your_server_ip'
USERNAME = 'your_username'
PASSWORD = 'your_password' # 或者使用 PRIVATE_KEY_PATH
# 创建下载器实例
downloader = RemoteFileDownloader(
hostname=HOSTNAME,
username=USERNAME,
password=PASSWORD
# private_key_path=os.path.expanduser('~/.ssh/id_rsa')
)
try:
# 连接
downloader.connect()
# 下载单个文件
downloader.download_file(
remote_path='/etc/hosts',
local_path='./downloaded_hosts'
)
# 下载整个目录
downloader.download_dir(
remote_dir='/tmp/remote_test_dir',
local_dir='./downloaded_test_dir'
)
except Exception as e:
print(f"发生错误: {e}")
finally:
# 确保连接被关闭
downloader.close()
- 基础操作:核心是
SSHClient.connect()和SFTPClient.get()。 - 认证方式:支持密码和私钥,私钥更安全。
- 目录下载:需要手动实现递归逻辑。
- 进度显示:通过
sftp.get()的callback参数实现,但需要自己计算累计进度。 - 最佳实践:将功能封装成类,可以极大地提高代码的可读性和复用性。
希望这个详细的指南能帮助你顺利地在 Python 中使用 paramiko 下载文件!

