我们将采用 单例模式 来管理 MongoDB 连接,确保在整个应用中只创建一个连接池,提高资源利用率和性能。

项目结构
为了保持代码的整洁和可维护性,我们建议采用以下结构:
mongo_project/
├── config.py # 配置文件,存储数据库连接信息
├── mongo_client.py # 核心 MongoDB 客户端封装类
├── models.py # 示例数据模型(可选,推荐使用 Pydantic)
└── main.py # 示例使用脚本
配置文件 (config.py)
将数据库连接信息与业务代码分离,是一个好习惯。
# config.py
MONGO_CONFIG = {
"host": "localhost",
"port": 27017,
"username": "", # 如果没有用户名,留空
"password": "", # 如果没有密码,留空
"auth_source": "admin", # 认证数据库,通常是 "admin"
"database": "test_db", # 默认数据库名
"max_pool_size": 100, # 连接池最大连接数
"min_pool_size": 0, # 连接池最小连接数
"max_idle_time_ms": 30000 # 连接在池中的最大空闲时间 (ms)
}
核心封装类 (mongo_client.py)
这是封装的核心,我们将在这里实现所有功能。
# mongo_client.py
import pymongo
from pymongo import MongoClient, UpdateOne
from pymongo.errors import PyMongoError
from typing import Optional, Dict, List, Any, Union
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MongoDBClient:
"""
MongoDB 客户端封装类,使用单例模式管理连接。
提供基本的 CRUD 操作、批量操作、聚合和事务功能。
"""
_instance = None
_client = None
_db = None
def __new__(cls, *args, **kwargs):
"""单例模式实现"""
if not cls._instance:
cls._instance = super(MongoDBClient, cls).__new__(cls)
return cls._instance
def __init__(self, config: Dict[str, Any]):
"""
初始化 MongoDB 客户端。
:param config: MongoDB 连接配置字典
"""
if not self._client:
try:
# 构建连接字符串
connection_string = self._build_connection_string(config)
# 创建 MongoClient
self._client = MongoClient(
connection_string,
maxPoolSize=config.get('max_pool_size', 100),
minPoolSize=config.get('min_pool_size', 0),
maxIdleTimeMS=config.get('max_idle_time_ms', 30000)
)
# 选择默认数据库
self._db = self._client[config.get('database')]
# 测试连接是否成功
self._client.admin.command('ping')
logger.info("MongoDB 连接成功!")
except PyMongoError as e:
logger.error(f"MongoDB 连接失败: {e}")
raise
def _build_connection_string(self, config: Dict[str, Any]) -> str:
"""根据配置构建 MongoDB 连接字符串"""
auth_part = ""
if config.get('username') and config.get('password'):
auth_part = f"{config['username']}:{config['password']}@"
host = config.get('host', 'localhost')
port = config.get('port', 27017)
return f"mongodb://{auth_part}{host}:{port}/"
def get_collection(self, collection_name: str):
"""
获取指定集合的 Collection 对象。
:param collection_name: 集合名称
:return: pymongo.Collection 对象
"""
if not self._db:
raise RuntimeError("数据库未初始化")
return self._db[collection_name]
# --- CRUD 操作 ---
def insert_one(self, collection_name: str, document: Dict[str, Any]) -> str:
"""
向集合中插入一个文档。
:param collection_name: 集合名称
:param document: 要插入的文档 (字典)
:return: 插入文档的 _id (str)
"""
collection = self.get_collection(collection_name)
try:
result = collection.insert_one(document)
logger.info(f"成功插入文档,ID: {result.inserted_id}")
return str(result.inserted_id)
except PyMongoError as e:
logger.error(f"插入文档失败: {e}")
raise
def insert_many(self, collection_name: str, documents: List[Dict[str, Any]]) -> List[str]:
"""
向集合中插入多个文档。
:param collection_name: 集合名称
:param documents: 要插入的文档列表
:return: 插入文档的 _id 列表 (List[str])
"""
collection = self.get_collection(collection_name)
try:
result = collection.insert_many(documents)
logger.info(f"成功插入 {len(result.inserted_ids)} 个文档")
return [str(_id) for _id in result.inserted_ids]
except PyMongoError as e:
logger.error(f"批量插入文档失败: {e}")
raise
def find_one(self, collection_name: str, query: Dict[str, Any],
projection: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""
查找并返回一个文档。
:param collection_name: 集合名称
:param query: 查询条件
:param projection: 投影,指定返回哪些字段
:return: 匹配的文档,未找到则返回 None
"""
collection = self.get_collection(collection_name)
try:
return collection.find_one(query, projection)
except PyMongoError as e:
logger.error(f"查找文档失败: {e}")
raise
def find(self, collection_name: str, query: Dict[str, Any],
projection: Optional[Dict[str, Any]] = None,
sort: Optional[List[tuple]] = None,
limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
查找并返回多个文档。
:param collection_name: 集合名称
:param query: 查询条件
:param projection: 投影
:param sort: 排序,[('name', 1)] 表示按 name 升序
:param limit: 限制返回结果数量
:return: 匹配的文档列表
"""
collection = self.get_collection(collection_name)
try:
cursor = collection.find(query, projection)
if sort:
cursor = cursor.sort(sort)
if limit:
cursor = cursor.limit(limit)
return list(cursor)
except PyMongoError as e:
logger.error(f"查找文档列表失败: {e}")
raise
def update_one(self, collection_name: str, query: Dict[str, Any],
update: Dict[str, Any], upsert: bool = False) -> Dict[str, Any]:
"""
更新一个文档。
:param collection_name: 集合名称
:param query: 查询条件,定位要更新的文档
:param update: 更新操作,{'$set': {'name': 'New Name'}}
:param upsert: 如果不存在是否插入新文档
:return: 更新结果,包含 matched_count, modified_count 等
"""
collection = self.get_collection(collection_name)
try:
result = collection.update_one(query, update, upsert=upsert)
logger.info(f"更新操作: 匹配 {result.matched_count} 个, 修改 {result.modified_count} 个")
return {
"matched_count": result.matched_count,
"modified_count": result.modified_count,
"upserted_id": str(result.upserted_id) if result.upserted_id else None
}
except PyMongoError as e:
logger.error(f"更新文档失败: {e}")
raise
def update_many(self, collection_name: str, query: Dict[str, Any],
update: Dict[str, Any], upsert: bool = False) -> Dict[str, Any]:
"""
更新多个文档。
:param collection_name: 集合名称
:param query: 查询条件
:param update: 更新操作
:param upsert: 如果不存在是否插入新文档
:return: 更新结果
"""
collection = self.get_collection(collection_name)
try:
result = collection.update_many(query, update, upsert=upsert)
logger.info(f"批量更新操作: 匹配 {result.matched_count} 个, 修改 {result.modified_count} 个")
return {
"matched_count": result.matched_count,
"modified_count": result.modified_count,
"upserted_id": str(result.upserted_id) if result.upserted_id else None
}
except PyMongoError as e:
logger.error(f"批量更新文档失败: {e}")
raise
def delete_one(self, collection_name: str, query: Dict[str, Any]) -> Dict[str, Any]:
"""
删除一个文档。
:param collection_name: 集合名称
:param query: 查询条件
:return: 删除结果,包含 deleted_count
"""
collection = self.get_collection(collection_name)
try:
result = collection.delete_one(query)
logger.info(f"删除操作: 删除了 {result.deleted_count} 个文档")
return {"deleted_count": result.deleted_count}
except PyMongoError as e:
logger.error(f"删除文档失败: {e}")
raise
def delete_many(self, collection_name: str, query: Dict[str, Any]) -> Dict[str, Any]:
"""
删除多个文档。
:param collection_name: 集合名称
:param query: 查询条件
:return: 删除结果
"""
collection = self.get_collection(collection_name)
try:
result = collection.delete_many(query)
logger.info(f"批量删除操作: 删除了 {result.deleted_count} 个文档")
return {"deleted_count": result.deleted_count}
except PyMongoError as e:
logger.error(f"批量删除文档失败: {e}")
raise
# --- 高级操作 ---
def bulk_write(self, collection_name: str, operations: List[Union[UpdateOne, Dict]]):
"""
执行批量写入操作(如插入、更新、删除)。
:param collection_name: 集合名称
:param operations: 操作列表,可以是 UpdateOne 对象或字典形式的操作
"""
collection = self.get_collection(collection_name)
try:
# 如果传入的是字典列表,需要转换成 pymongo 的操作对象
if isinstance(operations[0], dict):
# 示例: [{'update_one': {'filter': {'_id': 1}, 'update': {'$set': {'a': 1}}}}]
# 这是一个简化的示例,实际应用中需要更复杂的解析
# 这里为了演示,我们假设传入的是 UpdateOne 对象
pass
# 更常见的用法是直接传入 pymongo 的操作对象
# [UpdateOne({'_id': 1}, {'$set': {'a': 1}}), ...]
# 注意:这里我们只演示 UpdateOne,其他操作类似
# 需要根据传入操作类型创建对应的 BulkWriteOperation 对象
# 这是一个简化的实现,实际中可能需要更复杂的逻辑来处理不同类型的操作
# 为了简化,我们假设 operations 已经是 pymongo 的 BulkWriteOperation 对象列表
# 使用 UpdateOne
if operations and isinstance(operations[0], UpdateOne):
result = collection.bulk_write(operations, ordered=False)
logger.info(f"批量写入完成: 插入 {result.inserted_count}, 更新 {result.modified_count}, 删除 {result.deleted_count}")
return result
# 如果是自定义的字典操作,需要转换
# 这里提供一个转换示例,实际应用中需要完善
bulk_operations = []
for op_dict in operations:
if op_dict.get('type') == 'update_one':
bulk_operations.append(UpdateOne(op_dict['filter'], op_dict['update']))
if bulk_operations:
result = collection.bulk_write(bulk_operations, ordered=False)
logger.info(f"批量写入完成: 插入 {result.inserted_count}, 更新 {result.modified_count}, 删除 {result.deleted_count}")
return result
except PyMongoError as e:
logger.error(f"批量写入失败: {e}")
raise
def aggregate(self, collection_name: str, pipeline: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
执行聚合操作。
:param collection_name: 集合名称
:param pipeline: 聚合管道,[{'$match': {'age': {'$gt': 20}}}, {'$group': {'_id': '$city', 'count': {'$sum': 1}}}]
:return: 聚合结果列表
"""
collection = self.get_collection(collection_name)
try:
return list(collection.aggregate(pipeline))
except PyMongoError as e:
logger.error(f"聚合操作失败: {e}")
raise
# --- 事务处理 ---
def start_session(self):
"""启动一个客户端会话,用于事务"""
return self._client.start_session()
def with_transaction(self, callback, session=None):
"""
执行一个带事务的回调函数。
:param callback: 一个接受 session 作为参数的函数,包含事务内的操作
:param session: 可选,传入已存在的会话
:return: 回调函数的返回值
"""
if session is None:
session = self.start_session()
try:
with session.start_transaction():
logger.info("事务已开始。")
# 执行回调函数,并将 session 传入
result = callback(session)
logger.info("事务提交成功。")
return result
except Exception as e:
logger.error(f"事务失败,已回滚: {e}")
raise
finally:
session.end_session()
def close(self):
"""关闭 MongoDB 连接"""
if self._client:
self._client.close()
logger.info("MongoDB 连接已关闭。")
示例数据模型 (models.py) - 可选但推荐
使用 Pydantic 可以定义数据结构,提供数据验证和类型提示。

pip install pydantic
# models.py
from pydantic import BaseModel, EmailStr
from typing import Optional, List
from datetime import datetime
class User(BaseModel):
name: str
email: EmailStr
age: int
is_active: bool = True
tags: List[str] = []
created_at: datetime = datetime.utcnow()
class Product(BaseModel):
name: str
price: float
stock: int
description: Optional[str] = None
使用示例 (main.py)
现在我们来演示如何使用这个封装好的 MongoDBClient。
# main.py
import sys
import os
from datetime import datetime
# 将项目根目录添加到 Python 路径,以便导入模块
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from mongo_client import MongoDBClient
from config import MONGO_CONFIG
from models import User, Product
def main():
# 1. 初始化客户端 (单例模式,多次初始化也只会创建一次连接)
print("--- 1. 初始化 MongoDB 客户端 ---")
mongo_client = MongoDBClient(MONGO_CONFIG)
db = mongo_client._db # 获取数据库对象 (内部使用,推荐通过 client.get_collection 访问)
# 2. 准备测试数据
print("\n--- 2. 准备测试数据 ---")
users_collection_name = "users"
products_collection_name = "products"
# 清空集合,确保每次运行都是干净的
db[users_collection_name].delete_many({})
db[products_collection_name].delete_many({})
user_data = [
{"name": "Alice", "email": "alice@example.com", "age": 30, "tags": ["python", "mongodb"]},
{"name": "Bob", "email": "bob@example.com", "age": 25, "tags": ["java", "spring"]},
{"name": "Charlie", "email": "charlie@example.com", "age": 35, "tags": ["javascript", "react"]},
]
product_data = [
{"name": "Laptop", "price": 1200.50, "stock": 50},
{"name": "Mouse", "price": 25.00, "stock": 200},
{"name": "Keyboard", "price": 75.00, "stock": 150},
]
# 3. 插入操作
print("\n--- 3. 插入操作 ---")
mongo_client.insert_many(users_collection_name, user_data)
mongo_client.insert_many(products_collection_name, product_data)
print("用户和产品数据插入成功。")
# 4. 查询操作
print("\n--- 4. 查询操作 ---")
# 查找所有用户
all_users = mongo_client.find(users_collection_name, {})
print(f"所有用户数量: {len(all_users)}")
# 查找年龄大于 28 的用户
older_users = mongo_client.find(users_collection_name, {"age": {"$gt": 28}})
print(f"年龄大于 28 的用户: {[u['name'] for u in older_users]}")
# 查找一个特定用户
alice = mongo_client.find_one(users_collection_name, {"name": "Alice"})
print(f"Alice 的信息: {alice}")
# 5. 更新操作
print("\n--- 5. 更新操作 ---")
# 更新 Alice 的年龄
update_result = mongo_client.update_one(
users_collection_name,
{"name": "Alice"},
{"$set": {"age": 31}}
)
print(f"更新 Alice 的年龄结果: {update_result}")
# 再次查找 Alice 验证更新
updated_alice = mongo_client.find_one(users_collection_name, {"name": "Alice"})
print(f"更新后的 Alice 信息: {updated_alice}")
# 6. 删除操作
print("\n--- 6. 删除操作 ---")
# 删除 Charlie
delete_result = mongo_client.delete_one(users_collection_name, {"name": "Charlie"})
print(f"删除 Charlie 的结果: {delete_result}")
# 验证 Charlie 是否被删除
charlie = mongo_client.find_one(users_collection_name, {"name": "Charlie"})
print(f"查找 Charlie (应不存在): {charlie}")
# 7. 聚合操作
print("\n--- 7. 聚合操作 ---")
# 计算每个年龄段的人数
age_pipeline = [
{"$group": {"_id": "$age", "count": {"$sum": 1}}},
{"$sort": {"_id": 1}}
]
age_counts = mongo_client.aggregate(users_collection_name, age_pipeline)
print(f"各年龄段人数统计: {age_counts}")
# 8. 批量操作
print("\n--- 8. 批量操作 ---")
from pymongo import UpdateOne
# 批量更新产品库存
bulk_operations = [
UpdateOne({"name": "Laptop"}, {"$inc": {"stock": -10}}),
UpdateOne({"name": "Mouse"}, {"$inc": {"stock": 50}}),
UpdateOne({"name": "Keyboard"}, {"$set": {"stock": 140}}),
]
bulk_result = mongo_client.bulk_write(products_collection_name, bulk_operations)
print(f"批量更新库存结果: {bulk_result}")
# 9. 事务处理
print("\n--- 9. 事务处理 ---")
def transfer_funds(session):
# 在一个事务中,从一个用户账户扣款,向另一个用户账户加款
# 这里我们用一个简单的集合来模拟账户
accounts_collection = db["accounts"]
# Alice 转账 100 给 Bob
from_user = "Alice"
to_user = "Bob"
amount = 100
# 扣款
accounts_collection.update_one(
{"name": from_user},
{"$inc": {"balance": -amount}},
session=session
)
# 加款
accounts_collection.update_one(
{"name": to_user},
{"$inc": {"balance": amount}},
session=session
)
print(f"事务内: {from_user} 向 {to_user} 转账 {amount}")
# 初始化账户
db["accounts"].delete_many({})
db["accounts"].insert_many([
{"name": "Alice", "balance": 500},
{"name": "Bob", "balance": 300}
])
# 执行事务
try:
mongo_client.with_transaction(transfer_funds)
print("转账事务成功执行。")
except Exception as e:
print(f"转账事务失败: {e}")
# 验证账户余额
alice_balance = db["accounts"].find_one({"name": "Alice"})
bob_balance = db["accounts"].find_one({"name": "Bob"})
print(f"事务后, Alice 余额: {alice_balance['balance']}, Bob 余额: {bob_balance['balance']}")
# 10. 关闭连接
print("\n--- 10. 关闭连接 ---")
mongo_client.close()
if __name__ == "__main__":
main()
如何运行
- 确保你已经安装了
pymongo和pydantic:pip install pymongo pydantic
- 确保 MongoDB 服务正在运行(通过 Docker
docker run -d -p 27017:27017 mongo)。 - 将上述所有代码文件保存到
mongo_project目录下。 - 在终端中进入
mongo_project目录,然后运行:python main.py
这个封装方案提供了以下优点:
- 易于使用:将复杂的 PyMongo 操作封装成简单的函数调用,如
insert_one,find。 - 连接管理:使用单例模式,自动管理连接池,避免重复创建和销毁连接。
- 配置分离:数据库配置与业务代码分离,便于在不同环境(开发、测试、生产)间切换。
- 功能全面:覆盖了 CRUD、批量操作、聚合和事务等核心功能。
- 健壮性:内置了错误处理和日志记录,方便调试和监控。
- 可扩展性:你可以基于这个基础框架,轻松地添加更多高级功能,如索引管理、分片操作等。
你可以根据自己项目的具体需求,对这个封装进行进一步的定制和优化。

