杰瑞科技汇

Python 分页 Tornado 如何实现高效分页?

分页的基本原理

分页的核心思想是:只从数据库中查询当前页面需要的数据,而不是一次性获取所有数据

Python 分页 Tornado 如何实现高效分页?-图1
(图片来源网络,侵删)

这通常需要两个关键参数:

  1. 页码: 当前是第几页,通常用 pagep 表示,从 1 开始。
  2. 每页大小: 每页显示多少条数据,通常用 page_sizeper_pagesize 表示。

有了这两个参数,我们就可以计算出数据库查询时需要的 LIMITOFFSET 值。

  • LIMIT: 每页的大小,即 page_size
  • OFFSET: 跳过的记录数,计算公式为 (page - 1) * page_size

示例: 假设我们有 100 条数据,每页显示 10 条 (page_size=10)。

  • 第 1 页: LIMIT 10 OFFSET 0 (跳过 0 条)
  • 第 2 页: LIMIT 10 OFFSET 10 (跳过 10 条)
  • 第 3 页: LIMIT 10 OFFSET 20 (跳过 20 条)

准备工作:数据模型和数据库

为了演示,我们先创建一个简单的数据模型,这里我们使用 SQLAlchemy 作为 ORM,因为它与 Tornado 集成得很好。

Python 分页 Tornado 如何实现高效分页?-图2
(图片来源网络,侵删)

安装必要的库:

pip install tornado sqlalchemy

创建一个数据模型文件 models.py,用于表示“文章”:

# models.py
import datetime
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 数据库配置 (这里使用 SQLite 内存数据库,方便演示)
DATABASE_URL = "sqlite:///:///pagination_example.db"
# 创建引擎和会话工厂
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class Article(Base):
    __tablename__ = "articles"
    id = Column(Integer, primary_key=True, index=True)= Column(String, index=True)
    content = Column(String)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
# 创建数据库表
def create_tables():
    Base.metadata.create_all(bind=engine)
# 获取数据库会话的依赖函数
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Tornado Handler 实现

我们来实现处理分页请求的 Tornado Handler。

简单实现(只返回数据列表)

这是最基础的方式,Handler 只负责查询和返回当前页的数据。

Python 分页 Tornado 如何实现高效分页?-图3
(图片来源网络,侵删)

创建 handlers.py 文件:

# handlers.py
import tornado.web
from tornado.escape import json_encode
from models import Article, get_db
class ArticleListHandler(tornado.web.RequestHandler):
    async def get(self):
        # 1. 从请求参数中获取页码和每页大小
        page = self.get_argument("page", 1, type=int)
        page_size = self.get_argument("page_size", 10, type=int)
        # --- 数据验证 ---
        if page < 1:
            page = 1
        if page_size < 1 or page_size > 100: # 限制每页最大100条
            page_size = 10
        # 2. 计算偏移量
        offset = (page - 1) * page_size
        # 3. 从数据库查询数据
        # 注意:这里我们使用同步的 SQLAlchemy,Tornado 默认是同步模式。
        # 如果使用异步驱动 (如 aiopg), 则需要 async/await。
        db = next(get_db())
        articles = db.query(Article).offset(offset).limit(page_size).all()
        # 4. 将结果转换为 JSON 并返回
        # 在实际应用中,你可能需要对 article 对象进行序列化
        response_data = {
            "page": page,
            "page_size": page_size,
            "articles": [
                {"id": a.id, "title": a.title, "content": a.content}
                for a in articles
            ]
        }
        self.set_header("Content-Type", "application/json")
        self.write(json_encode(response_data))

如何测试: 启动 Tornado 服务器,并确保数据库中有一些数据。

# main.py
import tornado.ioloop
import tornado.web
from models import create_tables, Article, SessionLocal
from handlers import ArticleListHandler
# 初始化数据库并添加一些示例数据
def init_db():
    create_tables()
    db = SessionLocal()
    if db.query(Article).count() == 0:
        print("Creating sample data...")
        for i in range(55): # 创建55条数据,足够测试分页
            article = Article(title=f"Article Title {i+1}", content=f"This is the content for article {i+1}.")
            db.add(article)
        db.commit()
    db.close()
# 设置路由
def make_app():
    return tornado.web.Application([
        (r"/articles", ArticleListHandler),
    ])
if __name__ == "__main__":
    init_db()
    app = make_app()
    app.listen(8888)
    print("Server is running on http://localhost:8888")
    tornado.ioloop.IOLoop.current().start()

运行 python main.py,然后用浏览器或 API 工具访问:

  • http://localhost:8888/articles -> 第1页,默认10条
  • http://localhost:8888/articles?page=2 -> 第2页
  • http://localhost:8888/articles?page_size=5 -> 第1页,每页5条
  • http://localhost:8888/articles?page=6&page_size=10 -> 第6页,每页10条

进阶实现:返回分页元数据(推荐)

一个更友好的 API 不仅返回数据,还应返回分页的元信息,比如总记录数、总页数、是否有上一页/下一页等。

修改 handlers.py

# handlers.py (进阶版)
import tornado.web
from tornado.escape import json_encode
from models import Article, get_db
class ArticleListHandler(tornado.web.RequestHandler):
    async def get(self):
        page = self.get_argument("page", 1, type=int)
        page_size = self.get_argument("page_size", 10, type=int)
        if page < 1:
            page = 1
        if page_size < 1 or page_size > 100:
            page_size = 10
        offset = (page - 1) * page_size
        db = next(get_db())
        # 1. 查询总记录数
        total_items = db.query(Article).count()
        # 2. 计算总页数
        # 使用 -1 // 确保在整除时结果正确,20 // 10 = 2, 21 // 10 = 2, 但我们需要 3
        total_pages = (total_items + page_size - 1) // page_size if total_items > 0 else 0
        # 3. 查询当前页的数据
        articles = db.query(Article).offset(offset).limit(page_size).all()
        # 4. 构建包含元数据的响应
        response_data = {
            "pagination": {
                "page": page,
                "page_size": page_size,
                "total_items": total_items,
                "total_pages": total_pages,
                "has_prev": page > 1,
                "has_next": page < total_pages,
                "prev_page": page - 1 if page > 1 else None,
                "next_page": page + 1 if page < total_pages else None,
            },
            "results": [
                {"id": a.id, "title": a.title, "content": a.content}
                for a in articles
            ]
        }
        self.set_header("Content-Type", "application/json")
        self.write(json_encode(response_data))

API 的返回结果更加丰富和实用:

{
  "pagination": {
    "page": 2,
    "page_size": 10,
    "total_items": 55,
    "total_pages": 6,
    "has_prev": true,
    "has_next": true,
    "prev_page": 1,
    "next_page": 3
  },
  "results": [
    { "id": 11, "title": "Article Title 11", ... },
    ...
  ]
}

最佳实践与性能优化

使用异步数据库驱动(关键)

Tornado 的核心是异步 I/O,如果你的数据库查询是同步的(如上面的示例),它会阻塞整个 I/O 循环,降低并发性能,对于生产环境,强烈建议使用异步数据库驱动。

推荐方案:aiopg (异步 PostgreSQL) 或 aiomysql (异步 MySQL)

示例 (使用 aiopg):

首先安装 aiopgsqlalchemy[asyncio]:

pip install aiopg sqlalchemy[asyncio]

修改 models.pyhandlers.py:

models.py (异步版)

# models_async.py
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base, selectinload
from . import Article # 假设 Article 模型定义在别处
# 异步数据库配置
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@host/dbname"
engine = create_async_engine(ASYNC_DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

handlers.py (异步版)

# handlers_async.py
import tornado.web
from tornado.escape import json_encode
from sqlalchemy import select
from models_async import Article, get_db
class AsyncArticleListHandler(tornado.web.RequestHandler):
    async def get(self):
        page = self.get_argument("page", 1, type=int)
        page_size = self.get_argument("page_size", 10, type=int)
        # ... (参数验证逻辑同上) ...
        offset = (page - 1) * page_size
        async with next(get_db()) as db:
            # 使用异步查询
            # 1. 查询总数
            total_items_result = await db.scalar(select(func.count()).select_from(Article))
            total_items = total_items_result or 0
            # 2. 查询当前页数据
            stmt = select(Article).offset(offset).limit(page_size)
            result = await db.execute(stmt)
            articles = result.scalars().all()
            # ... (计算总页数、构建响应的逻辑同上) ...
            response_data = {
                "pagination": { ... },
                "results": [{"id": a.id, "title": a.title} for a in articles]
            }
            self.set_header("Content-Type", "application/json")
            self.write(json_encode(response_data))

使用异步驱动后,Handler 的方法必须是 async def,数据库操作使用 await,这样在等待数据库返回结果时,Tornado 可以去处理其他请求,大大提高了吞吐量。

URL 设计

清晰的 URL 设计很重要。

  • /articles?page=2 (使用查询参数)
  • /articles/page/2 (在路径中,更 RESTful,但实现起来稍复杂)

对于大多数情况,查询参数 ?page=X 是最简单、最通用的选择。

缓存

对于不经常变动的数据列表,可以考虑使用缓存(如 Redis)来存储分页结果,特别是首页或热门页,可以缓存 page=1 的结果,设置一个较短的过期时间(如 5 分钟),可以显著减轻数据库压力。

使用 cursor (游标) 分页

对于数据量极大的表(如千万、亿级别),传统的 LIMIT/OFFSET 分页性能会急剧下降,因为数据库仍然需要扫描并跳过 OFFSET 之前的所有行。

这时,基于游标的分页 是更好的选择,它不使用页码,而是使用上一页最后一条记录的唯一 ID(通常是主键)来作为下一页的起点。

原理:

  • 下一页: WHERE id > last_seen_id ORDER BY id LIMIT page_size
  • 上一页: WHERE id < last_seen_id ORDER BY id DESC LIMIT page_size (然后需要反转结果)

这种方式查询效率极高,因为它可以利用主键索引直接定位,无需扫描大量数据。

特性 简单实现 进阶实现 最佳实践 (异步) 大数据优化
核心逻辑 LIMIT/OFFSET LIMIT/OFFSET + 元数据 async/await + LIMIT/OFFSET WHERE id > cursor
返回数据 仅当前页数据 当前页数据 + 分页信息 同左,但性能高 当前页数据 + next_cursor
适用场景 学习、简单后台 绝大多数 Web API 高并发 Web 应用 超大数据集、Feed 流
性能 低 (同步) 低 (同步) 高 (异步) 极高

对于绝大多数 Tornado 项目,“进阶实现” 结合 “异步数据库驱动” 是一个完美的组合,它既提供了良好的 API 体验,又保证了高性能。

分享:
扫描分享到社交APP
上一篇
下一篇