目录
- 环境准备
- 安装 MongoDB
- 安装 Python 驱动
- 连接 MongoDB
- 基本连接
- 连接字符串
- 连接池
- 核心操作: CRUD
- Create (插入数据)
- Read (查询数据)
- Update (更新数据)
- Delete (删除数据)
- 高级操作
- 聚合管道
- 索引
- 事务
- 最佳实践与模式
- 数据建模
- 错误处理
- 使用 Pydantic 进行数据验证
- 异步驱动 (
motor)
- 完整示例代码
- 推荐资源
环境准备
a. 安装 MongoDB
你需要一个运行中的 MongoDB 实例,最简单的方式是使用 Docker:

# 拉取并运行 MongoDB 最新版本 docker run -d -p 27017:27017 --name my-mongo mongo
或者,你可以从 MongoDB 官网 下载并安装。
b. 安装 Python 驱动
Python 官方的 MongoDB 驱动是 pymongo,它是同步的,适用于大多数应用。
pip install pymongo
如果你需要异步支持(例如在 FastAPI, Starlette 等框架中使用),应该安装 motor,它是 pymongo 的异步版本。
pip install motor
连接 MongoDB
a. 基本连接
pymongo 的核心是 MongoClient。

from pymongo import MongoClient
# 默认连接到本地 27017 端口
client = MongoClient('mongodb://localhost:27017/')
# 指定数据库
db = client['my_database']
# 指定集合 (类似于关系型数据库中的表)
collection = db['users']
# 完成操作后关闭连接
client.close()
b. 连接字符串
在生产环境中,推荐使用连接字符串,它更灵活和安全。
# 包含用户名、密码、主机、端口和数据库名
uri = "mongodb://username:password@hostname:port/my_database"
client = MongoClient(uri)
# 也可以不指定数据库,在后续操作中指定
uri = "mongodb://localhost:27017/"
client = MongoClient(uri)
db = client.get_database('my_database')
c. 连接池
pymongo 默认使用连接池来管理数据库连接,这是高效且推荐的,你可以配置连接池的大小:
client = MongoClient(
'mongodb://localhost:27017/',
maxPoolSize=50, # 连接池最大连接数
minPoolSize=5, # 连接池最小连接数
serverSelectionTimeoutMS=5000 # 服务器选择超时时间
)
核心操作: CRUD
a. Create (插入数据)
使用 insert_one() 插入单个文档,insert_many() 插入多个文档。
# 插入单个文档
new_user = {
"name": "Alice",
"age": 30,
"city": "New York",
"hobbies": ["reading", "hiking"]
}
insert_result = collection.insert_one(new_user)
print(f"插入的文档ID: {insert_result.inserted_id}")
# 插入多个文档
new_users = [
{"name": "Bob", "age": 25, "city": "London"},
{"name": "Charlie", "age": 35, "city": "Paris"}
]
insert_many_result = collection.insert_many(new_users)
print(f"插入的文档IDs: {insert_many_result.inserted_ids}")
b. Read (查询数据)
查询是 MongoDB 最强大的功能之一。

# 查询所有文档
for user in collection.find():
print(user)
# 查询满足条件的单个文档
# find_one() 返回第一个匹配的文档
user = collection.find_one({"name": "Alice"})
print(f"找到 Alice: {user}")
# 使用查询操作符
# 年龄大于 25 的用户
for user in collection.find({"age": {"$gt": 25}}):
print(user)
# 年龄小于等于 30 的用户
for user in collection.find({"age": {"$lte": 30}}):
print(user)
# 查询名字是 Alice 并且城市是 New York 的用户
for user in collection.find({"name": "Alice", "city": "New York"}):
print(user)
# 投影:只返回需要的字段
# 只返回 name 和 city,不返回 _id
for user in collection.find({"name": "Alice"}, {"name": 1, "city": 1, "_id": 0}):
print(user)
c. Update (更新数据)
使用 update_one() 或 update_many()。
# 更新单个文档
# 将 Alice 的年龄更新为 31
update_result = collection.update_one(
{"name": "Alice"}, # 查询条件
{"$set": {"age": 31}} # 更新操作
)
print(f"匹配的文档数: {update_result.matched_count}, 修改的文档数: {update_result.modified_count}")
# 更新多个文档
# 将所有伦敦居民的年龄加 1
update_many_result = collection.update_many(
{"city": "London"},
{"$inc": {"age": 1}} # 使用 $inc 操作符进行增量更新
)
print(f"匹配的文档数: {update_many_result.matched_count}, 修改的文档数: {update_many_result.modified_count}")
d. Delete (删除数据)
使用 delete_one() 或 delete_many()。
# 删除单个文档
delete_result = collection.delete_one({"name": "Charlie"})
print(f"删除的文档数: {delete_result.deleted_count}")
# 删除多个文档
# 删除所有年龄大于 40 的用户
delete_many_result = collection.delete_many({"age": {"$gt": 40}})
print(f"删除的文档数: {delete_many_result.deleted_count}")
高级操作
a. 聚合管道
聚合管道是 MongoDB 最强大的特性之一,它允许你对数据进行一系列的转换,以处理数据、返回计算结果。
# 假设我们有如下订单数据
orders_collection = db['orders']
orders_data = [
{"customer": "Alice", "item": "laptop", "price": 1200, "status": "completed"},
{"customer": "Bob", "item": "mouse", "price": 25, "status": "shipped"},
{"customer": "Alice", "item": "keyboard", "price": 75, "status": "completed"},
{"customer": "Charlie", "item": "monitor", "price": 300, "status": "pending"},
{"customer": "Bob", "item": "webcam", "price": 80, "status": "completed"},
]
orders_collection.insert_many(orders_data)
# 聚合管道示例:计算每个客户的总消费金额
pipeline = [
{"$match": {"status": "completed"}}, # 第一步:筛选出已完成的订单
{"$group": { # 第二步:按客户名分组
"_id": "$customer", # _id 字段是分组的依据
"total_spent": {"$sum": "$price"} # 计算每个组的 price 总和
}},
{"$sort": {"total_spent": -1}} # 第三步:按总消费降序排序
]
results = orders_collection.aggregate(pipeline)
for result in results:
print(result)
b. 索引
索引用于加速查询,没有索引,MongoDB 必须执行集合扫描(即检查集合中的每个文档)。
# 创建升序索引
collection.create_index([("name", 1)])
# 创建复合索引 (name 和 city)
collection.create_index([("name", 1), ("city", -1)])
# 查看集合的索引
print(collection.list_indexes())
c. 事务
MongoDB 支持多文档 ACID 事务,确保多个操作作为一个原子单元执行。
from pymongo.errors import OperationFailure
# 事务必须在支持副本集的 MongoDB 实例上运行
# client = MongoClient('mongodb://localhost:27017/?replicaSet=myReplicaSet')
with client.start_session() as session:
try:
with session.start_transaction():
# 在事务中执行操作
db.accounts.update_one(
{"_id": 1}, {"$inc": {"balance": -100}}, session=session
)
db.accounts.update_one(
{"_id": 2}, {"$inc": {"balance": 100}}, session=session
)
print("事务提交成功")
except OperationFailure as e:
print(f"事务失败,已回滚: {e}")
最佳实践与模式
a. 数据建模
MongoDB 是 NoSQL,数据建模与关系型数据库不同,核心思想是嵌入和引用。
-
嵌入:将相关数据放在一个文档中,适用于“包含”关系。
{ "name": "Alice", "address": { // 嵌入的地址信息 "street": "123 Main St", "city": "New York", "zip": "10001" } } -
引用:使用文档 ID 来关联数据,适用于“多对多”或大型“一对多”关系。
// posts 集合 { "_id": 1, "title": "Post 1" } { "_id": 2, "title": "Post 2" } // comments 集合 { "text": "Great post!", "post_id": 1 } // 引用 posts 集合的 ID { "text": "Interesting.", "post_id": 2 }
b. 错误处理
始终对数据库操作进行错误处理。
from pymongo.errors import PyMongoError
try:
result = collection.insert_one({"invalid": "data"})
except PyMongoError as e:
print(f"数据库操作出错: {e}")
c. 使用 Pydantic 进行数据验证
在 Python 应用中,使用 Pydantic 可以在数据存入数据库前进行严格的类型和结构验证,是现代 Python Web 开发的标配。
from pymongo import MongoClient
from pydantic import BaseModel, ValidationError
class User(BaseModel):
name: str
age: int
city: str
# ... 连接数据库的代码 ...
# 尝试插入数据
try:
user_data = {"name": "David", "age": "forty", "city": "Berlin"} # age 是字符串,会失败
user = User(**user_data) # 在这里会触发 ValidationError
collection.insert_one(user.model_dump())
except ValidationError as e:
print(f"数据验证失败: {e}")
except PyMongoError as e:
print(f"数据库操作失败: {e}")
d. 异步驱动 (motor)
对于 I/O 密集型应用(如 Web API),异步操作能极大提高性能。
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
async def main():
# 连接是异步的
client = AsyncIOMotorClient('mongodb://localhost:27017/')
db: AsyncIOMotorDatabase = client['my_database']
collection: AsyncIOMotorCollection = db['users']
# 插入操作是异步的
await collection.insert_one({"name": "Eve", "age": 28})
# 查询操作也是异步的,需要使用 `to_list()` 或 `to_list(length=None)`
cursor = collection.find({"age": {"$gt": 25}})
users = await cursor.to_list(length=100)
print(users)
# 关闭连接
client.close()
# 运行异步主函数
asyncio.run(main())
完整示例代码 (同步 pymongo)
这是一个完整的 Python 脚本,展示了如何连接、创建、读取、更新和删除数据。
import pymongo
from pymongo import MongoClient
from pymongo.errors import PyMongoError
def main():
# 1. 连接
try:
client = MongoClient('mongodb://localhost:27017/', serverSelectionTimeoutMS=5000)
# 测试连接是否成功
client.admin.command('ping')
print("成功连接到 MongoDB!")
except pymongo.errors.ConnectionFailure as e:
print(f"无法连接到 MongoDB: {e}")
return
db = client['company_db']
employees_collection = db['employees']
# 2. Create (插入)
print("\n--- 插入数据 ---")
new_employee = {
"name": "张三",
"position": "软件工程师",
"department": "技术部",
"skills": ["Python", "MongoDB", "Docker"],
"join_date": "2025-01-15"
}
try:
result = employees_collection.insert_one(new_employee)
print(f"插入成功,ID: {result.inserted_id}")
except PyMongoError as e:
print(f"插入失败: {e}")
# 3. Read (查询)
print("\n--- 查询数据 ---")
try:
# 查询所有员工
print("所有员工:")
for emp in employees_collection.find():
print(emp)
# 查询技术部的员工
print("\n技术部员工:")
for emp in employees_collection.find({"department": "技术部"}):
print(emp)
except PyMongoError as e:
print(f"查询失败: {e}")
# 4. Update (更新)
print("\n--- 更新数据 ---")
try:
# 给张三增加一个技能
update_result = employees_collection.update_one(
{"name": "张三"},
{"$push": {"skills": "Kubernetes"}}
)
print(f"更新匹配: {update_result.matched_count}, 修改: {update_result.modified_count}")
# 打印更新后的张三信息
updated_zhangsan = employees_collection.find_one({"name": "张三"})
print(f"更新后的张三: {updated_zhangsan}")
except PyMongoError as e:
print(f"更新失败: {e}")
# 5. Delete (删除)
print("\n--- 删除数据 ---")
try:
# 删除张三
delete_result = employees_collection.delete_one({"name": "张三"})
print(f"删除了 {delete_result.deleted_count} 个文档")
except PyMongoError as e:
print(f"删除失败: {e}")
# 6. 关闭连接
client.close()
print("\n连接已关闭。")
if __name__ == "__main__":
main()
推荐资源
- MongoDB 官方文档: https://www.mongodb.com/docs/ (最权威、最全面)
- PyMongo 官方文档: https://pymongo.readthedocs.io/
- Motor 官方文档: https://motor.readthedocs.io/
- MongoDB University: https://university.mongodb.com/ (提供免费的在线课程,强烈推荐)
希望这份指南能帮助你顺利开始 Python MongoDB 的开发之旅!
