分页的基本原理
分页的核心思想是:只从数据库中查询当前页面需要的数据,而不是一次性获取所有数据。

这通常需要两个关键参数:
- 页码: 当前是第几页,通常用
page或p表示,从 1 开始。 - 每页大小: 每页显示多少条数据,通常用
page_size或per_page或size表示。
有了这两个参数,我们就可以计算出数据库查询时需要的 LIMIT 和 OFFSET 值。
LIMIT: 每页的大小,即page_size。OFFSET: 跳过的记录数,计算公式为(page - 1) * page_size。
示例:
假设我们有 100 条数据,每页显示 10 条 (page_size=10)。
- 第 1 页:
LIMIT 10 OFFSET 0(跳过 0 条) - 第 2 页:
LIMIT 10 OFFSET 10(跳过 10 条) - 第 3 页:
LIMIT 10 OFFSET 20(跳过 20 条)
准备工作:数据模型和数据库
为了演示,我们先创建一个简单的数据模型,这里我们使用 SQLAlchemy 作为 ORM,因为它与 Tornado 集成得很好。

安装必要的库:
pip install tornado sqlalchemy
创建一个数据模型文件 models.py,用于表示“文章”:
# models.py
import datetime
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 数据库配置 (这里使用 SQLite 内存数据库,方便演示)
DATABASE_URL = "sqlite:///:///pagination_example.db"
# 创建引擎和会话工厂
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True, index=True)= Column(String, index=True)
content = Column(String)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
# 创建数据库表
def create_tables():
Base.metadata.create_all(bind=engine)
# 获取数据库会话的依赖函数
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Tornado Handler 实现
我们来实现处理分页请求的 Tornado Handler。
简单实现(只返回数据列表)
这是最基础的方式,Handler 只负责查询和返回当前页的数据。

创建 handlers.py 文件:
# handlers.py
import tornado.web
from tornado.escape import json_encode
from models import Article, get_db
class ArticleListHandler(tornado.web.RequestHandler):
async def get(self):
# 1. 从请求参数中获取页码和每页大小
page = self.get_argument("page", 1, type=int)
page_size = self.get_argument("page_size", 10, type=int)
# --- 数据验证 ---
if page < 1:
page = 1
if page_size < 1 or page_size > 100: # 限制每页最大100条
page_size = 10
# 2. 计算偏移量
offset = (page - 1) * page_size
# 3. 从数据库查询数据
# 注意:这里我们使用同步的 SQLAlchemy,Tornado 默认是同步模式。
# 如果使用异步驱动 (如 aiopg), 则需要 async/await。
db = next(get_db())
articles = db.query(Article).offset(offset).limit(page_size).all()
# 4. 将结果转换为 JSON 并返回
# 在实际应用中,你可能需要对 article 对象进行序列化
response_data = {
"page": page,
"page_size": page_size,
"articles": [
{"id": a.id, "title": a.title, "content": a.content}
for a in articles
]
}
self.set_header("Content-Type", "application/json")
self.write(json_encode(response_data))
如何测试: 启动 Tornado 服务器,并确保数据库中有一些数据。
# main.py
import tornado.ioloop
import tornado.web
from models import create_tables, Article, SessionLocal
from handlers import ArticleListHandler
# 初始化数据库并添加一些示例数据
def init_db():
create_tables()
db = SessionLocal()
if db.query(Article).count() == 0:
print("Creating sample data...")
for i in range(55): # 创建55条数据,足够测试分页
article = Article(title=f"Article Title {i+1}", content=f"This is the content for article {i+1}.")
db.add(article)
db.commit()
db.close()
# 设置路由
def make_app():
return tornado.web.Application([
(r"/articles", ArticleListHandler),
])
if __name__ == "__main__":
init_db()
app = make_app()
app.listen(8888)
print("Server is running on http://localhost:8888")
tornado.ioloop.IOLoop.current().start()
运行 python main.py,然后用浏览器或 API 工具访问:
http://localhost:8888/articles-> 第1页,默认10条http://localhost:8888/articles?page=2-> 第2页http://localhost:8888/articles?page_size=5-> 第1页,每页5条http://localhost:8888/articles?page=6&page_size=10-> 第6页,每页10条
进阶实现:返回分页元数据(推荐)
一个更友好的 API 不仅返回数据,还应返回分页的元信息,比如总记录数、总页数、是否有上一页/下一页等。
修改 handlers.py:
# handlers.py (进阶版)
import tornado.web
from tornado.escape import json_encode
from models import Article, get_db
class ArticleListHandler(tornado.web.RequestHandler):
async def get(self):
page = self.get_argument("page", 1, type=int)
page_size = self.get_argument("page_size", 10, type=int)
if page < 1:
page = 1
if page_size < 1 or page_size > 100:
page_size = 10
offset = (page - 1) * page_size
db = next(get_db())
# 1. 查询总记录数
total_items = db.query(Article).count()
# 2. 计算总页数
# 使用 -1 // 确保在整除时结果正确,20 // 10 = 2, 21 // 10 = 2, 但我们需要 3
total_pages = (total_items + page_size - 1) // page_size if total_items > 0 else 0
# 3. 查询当前页的数据
articles = db.query(Article).offset(offset).limit(page_size).all()
# 4. 构建包含元数据的响应
response_data = {
"pagination": {
"page": page,
"page_size": page_size,
"total_items": total_items,
"total_pages": total_pages,
"has_prev": page > 1,
"has_next": page < total_pages,
"prev_page": page - 1 if page > 1 else None,
"next_page": page + 1 if page < total_pages else None,
},
"results": [
{"id": a.id, "title": a.title, "content": a.content}
for a in articles
]
}
self.set_header("Content-Type", "application/json")
self.write(json_encode(response_data))
API 的返回结果更加丰富和实用:
{
"pagination": {
"page": 2,
"page_size": 10,
"total_items": 55,
"total_pages": 6,
"has_prev": true,
"has_next": true,
"prev_page": 1,
"next_page": 3
},
"results": [
{ "id": 11, "title": "Article Title 11", ... },
...
]
}
最佳实践与性能优化
使用异步数据库驱动(关键)
Tornado 的核心是异步 I/O,如果你的数据库查询是同步的(如上面的示例),它会阻塞整个 I/O 循环,降低并发性能,对于生产环境,强烈建议使用异步数据库驱动。
推荐方案:aiopg (异步 PostgreSQL) 或 aiomysql (异步 MySQL)
示例 (使用 aiopg):
首先安装 aiopg 和 sqlalchemy[asyncio]:
pip install aiopg sqlalchemy[asyncio]
修改 models.py 和 handlers.py:
models.py (异步版)
# models_async.py
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base, selectinload
from . import Article # 假设 Article 模型定义在别处
# 异步数据库配置
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@host/dbname"
engine = create_async_engine(ASYNC_DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
handlers.py (异步版)
# handlers_async.py
import tornado.web
from tornado.escape import json_encode
from sqlalchemy import select
from models_async import Article, get_db
class AsyncArticleListHandler(tornado.web.RequestHandler):
async def get(self):
page = self.get_argument("page", 1, type=int)
page_size = self.get_argument("page_size", 10, type=int)
# ... (参数验证逻辑同上) ...
offset = (page - 1) * page_size
async with next(get_db()) as db:
# 使用异步查询
# 1. 查询总数
total_items_result = await db.scalar(select(func.count()).select_from(Article))
total_items = total_items_result or 0
# 2. 查询当前页数据
stmt = select(Article).offset(offset).limit(page_size)
result = await db.execute(stmt)
articles = result.scalars().all()
# ... (计算总页数、构建响应的逻辑同上) ...
response_data = {
"pagination": { ... },
"results": [{"id": a.id, "title": a.title} for a in articles]
}
self.set_header("Content-Type", "application/json")
self.write(json_encode(response_data))
使用异步驱动后,Handler 的方法必须是 async def,数据库操作使用 await,这样在等待数据库返回结果时,Tornado 可以去处理其他请求,大大提高了吞吐量。
URL 设计
清晰的 URL 设计很重要。
/articles?page=2(使用查询参数)/articles/page/2(在路径中,更 RESTful,但实现起来稍复杂)
对于大多数情况,查询参数 ?page=X 是最简单、最通用的选择。
缓存
对于不经常变动的数据列表,可以考虑使用缓存(如 Redis)来存储分页结果,特别是首页或热门页,可以缓存 page=1 的结果,设置一个较短的过期时间(如 5 分钟),可以显著减轻数据库压力。
使用 cursor (游标) 分页
对于数据量极大的表(如千万、亿级别),传统的 LIMIT/OFFSET 分页性能会急剧下降,因为数据库仍然需要扫描并跳过 OFFSET 之前的所有行。
这时,基于游标的分页 是更好的选择,它不使用页码,而是使用上一页最后一条记录的唯一 ID(通常是主键)来作为下一页的起点。
原理:
- 下一页:
WHERE id > last_seen_id ORDER BY id LIMIT page_size - 上一页:
WHERE id < last_seen_id ORDER BY id DESC LIMIT page_size(然后需要反转结果)
这种方式查询效率极高,因为它可以利用主键索引直接定位,无需扫描大量数据。
| 特性 | 简单实现 | 进阶实现 | 最佳实践 (异步) | 大数据优化 |
|---|---|---|---|---|
| 核心逻辑 | LIMIT/OFFSET |
LIMIT/OFFSET + 元数据 |
async/await + LIMIT/OFFSET |
WHERE id > cursor |
| 返回数据 | 仅当前页数据 | 当前页数据 + 分页信息 | 同左,但性能高 | 当前页数据 + next_cursor |
| 适用场景 | 学习、简单后台 | 绝大多数 Web API | 高并发 Web 应用 | 超大数据集、Feed 流 |
| 性能 | 低 (同步) | 低 (同步) | 高 (异步) | 极高 |
对于绝大多数 Tornado 项目,“进阶实现” 结合 “异步数据库驱动” 是一个完美的组合,它既提供了良好的 API 体验,又保证了高性能。
