杰瑞科技汇

Python aiohttp 文档怎么用?

aiohttp 完整指南

什么是 aiohttp

aiohttp 是一个用于 Python 的异步 HTTP 客户端/服务器框架,它的核心优势在于与 Python 的 async/await 语法深度集成,使其在处理大量并发 I/O 操作(如网络请求)时性能极高。

Python aiohttp 文档怎么用?-图1
(图片来源网络,侵删)
  • 客户端:可以异步地发送 HTTP/HTTPS 请求到其他服务器。
  • 服务器:可以构建异步的 Web 服务器、API 和微服务。

为什么选择 aiohttp?(优势)

  • 高性能:基于 asyncio,使用事件循环处理成千上万的并发连接,而不会阻塞主线程,这对于 I/O 密集型应用(如爬虫、API 聚合)至关重要。
  • 简洁的 API:提供了类似 requests 库的简洁易用的 API,学习成本低。
  • 功能强大:支持 HTTP/1.1、WebSockets、Server-Sent Events (SSE)、Cookie、文件上传、流式请求和响应等。
  • 活跃的社区:拥有庞大的用户群体和持续的维护。

安装

你需要安装 aiohttp,为了处理 URL 编码、JSON 等常用功能,建议也安装 aiohttp 的配套工具 aiohttp-socks(用于代理)和 yarl(URL 处理库,aiohttp 的依赖)。

# 基本安装
pip install aiohttp
# 可选:安装 cchardet 用于更快的字符编码检测
pip install cchardet
# 可选:安装 aiodns 用于更快的 DNS 解析
pip install aiodns

核心概念

1 asyncioasync/await

aiohttp 完全建立在 Python 的 asyncio 库之上,你需要理解 async/await 语法:

  • async def: 用于定义一个异步函数(协程)。
  • await: 用于暂停一个异步函数的执行,等待另一个异步操作完成(如网络请求)。

重要规则:在异步函数中,调用另一个异步函数时,必须使用 await,否则,它会返回一个协程对象,而不会真正执行。

2 ClientSession

这是 aiohttp 客户端的核心。强烈建议在所有客户端操作中使用 ClientSession

Python aiohttp 文档怎么用?-图2
(图片来源网络,侵删)
  • 作用:管理连接池、Cookie、请求头等,它会复用 TCP 连接,极大地提高性能。
  • 最佳实践ClientSession 应该被当作一个上下文管理器 (async with ...) 来使用,这样可以确保会话在完成后被正确关闭,释放资源。

3 ClientResponse

当你发送一个请求后,aiohttp 会返回一个 ClientResponse 对象,这个对象代表了服务器的响应。

  • 注意:响应体(如 response.text()response.json())在读取后就会被消耗,一旦读取,就不能再次读取。
  • 状态码response.status
  • 响应头response.headers
  • URLresponse.url

aiohttp 客户端用法详解

1 基本请求

这是最常见的用法,使用 async with 确保 ClientSession 被正确管理。

import aiohttp
import asyncio
async def fetch_url(url):
    # 使用 async with 创建 ClientSession
    async with aiohttp.ClientSession() as session:
        # 使用 async with 发送请求,并获取响应
        async with session.get(url) as response:
            # response.status 是 HTTP 状态码
            print(f"Status: {response.status}")
            # response.text() 会读取并解码响应体
            # 必须使用 await 来等待其完成
            html = await response.text()
            print(f"Content length: {len(html)}")
            # print(f"Content: {html[:100]}...") # 打印前100个字符
# 要运行的异步函数
async def main():
    url = "https://www.example.com"
    await fetch_url(url)
# 运行主事件循环
if __name__ == "__main__":
    asyncio.run(main())

2 发送不同类型的请求

import asyncio
import aiohttp
async def main():
    async with aiohttp.ClientSession() as session:
        # GET 请求
        async with session.get("https://httpbin.org/get") as resp:
            print("GET Response:", await resp.json())
        # POST 请求 (发送 JSON 数据)
        payload = {"key": "value", "number": 123}
        async with session.post("https://httpbin.org/post", json=payload) as resp:
            print("POST JSON Response:", await resp.json())
        # POST 请求 (发送 Form 数据)
        form_data = {"name": "John Doe", "email": "john.doe@example.com"}
        async with session.post("https://httpbin.org/post", data=form_data) as resp:
            print("POST Form Response:", await resp.json())
        # PUT 请求
        async with session.put("https://httpbin.org/put", data="some data") as resp:
            print("PUT Response:", await resp.text())
asyncio.run(main())

3 传递请求头和查询参数

import asyncio
import aiohttp
async def main():
    headers = {"User-Agent": "MyCoolApp/1.0", "Accept": "application/json"}
    params = {"key1": "value1", "key2": "value2"}
    async with aiohttp.ClientSession(headers=headers) as session:
        # params 会被自动编码并附加到 URL 后面
        async with session.get("https://httpbin.org/get", params=params) as response:
            data = await response.json()
            print("URL with params:", data["url"])
            print("Headers received by server:", data["headers"])
asyncio.run(main())

4 处理响应体

aiohttp 提供了多种读取响应体的方式:

  • response.text(): 将响应体解码为字符串。
  • response.json(): 将响应体解码为 JSON 对象。
  • response.read(): 将响应体读取为 bytes 对象。
  • response.content: 一个异步迭代器,可以逐块读取响应体(用于大文件下载)。
import asyncio
import aiohttp
async def main():
    async with aiohttp.ClientSession() as session:
        # 1. 作为文本
        async with session.get("https://httpbin.org/html") as resp:
            html = await resp.text()
            print(f"Text content length: {len(html)}")
        # 2. 作为 JSON
        async with session.get("https://httpbin.org/json") as resp:
            json_data = await resp.json()
            print(f"JSON title: {json_data['slideshow']['title']}")
        # 3. 作为 bytes
        async with session.get("https://httpbin.org/bytes/1024") as resp:
            byte_data = await resp.read()
            print(f"Bytes length: {len(byte_data)}")
        # 4. 流式读取 (下载大文件)
        async with session.get("https://speed.hetzner.de/100MB.bin") as resp:
            print("Starting download...")
            total_size = 0
            async for chunk in resp.content.iter_chunked(1024): # 每次读取 1KB
                total_size += len(chunk)
                # print(f"Downloaded chunk of size {len(chunk)}")
            print(f"Download finished. Total size: {total_size / (1024*1024):.2f} MB")
asyncio.run(main())

5 错误处理

aiohttp 定义了一些异常类来处理不同类型的错误。

Python aiohttp 文档怎么用?-图3
(图片来源网络,侵删)
  • aiohttp.ClientError: 所有客户端错误的基类。
  • aiohttp.ClientResponseError: 服务器返回了 4xx 或 5xx 错误。
  • aiohttp.ClientConnectorError: 连接服务器时出错(DNS 解析失败、连接被拒绝等)。
  • aiohttp.ServerTimeoutError: 服务器超时。
  • aiohttp.ClientPayloadError: 读取响应体时出错。
import asyncio
import aiohttp
async def fetch_with_error_handling(url):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=10) as response:
                # 如果状态码是 4xx 或 5xx,会抛出 ClientResponseError
                response.raise_for_status() 
                print(f"Success! Status: {response.status}")
                return await response.text()
    except aiohttp.ClientResponseError as e:
        print(f"HTTP Error: {e.status} - {e.message}")
    except aiohttp.ClientConnectorError as e:
        print(f"Connection Error: Could not connect to {url}. {e}")
    except asyncio.TimeoutError:
        print(f"Timeout Error: Request to {url} timed out.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
async def main():
    await fetch_with_error_handling("https://httpbin.org/status/404")
    await fetch_with_error_handling("https://non-existent-domain.xyz")
    await fetch_with_error_handling("https://httpbin.org/delay/5") # 默认超时是 300s,这里手动设为 10s
asyncio.run(main())

aiohttp 服务器用法

aiohttp 也是一个功能强大的 Web 服务器框架。

1 一个简单的 "Hello, World" 服务器

from aiohttp import web
async def handle(request):
    # request 对象包含了请求的所有信息
    name = request.match_info.get('name', 'Anonymous')
    text = f"Hello, {name}"
    return web.Response(text=text)
async def main():
    app = web.Application()
    # 添加路由
    app.add_routes([
        web.get('/', handle),           # GET / -> handle
        web.get('/{name}', handle),    # GET /John -> handle
    ])
    # 运行服务器
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()
    print("Server started at http://localhost:8080")
    # 保持服务器运行
    try:
        # 在真实应用中,这里可能是一个无限循环
        await asyncio.sleep(3600) 
    finally:
        await runner.cleanup()
if __name__ == '__main__':
    # 注意:服务器代码也需要在事件循环中运行
    asyncio.run(main())

2 处理 POST 请求和 JSON

from aiohttp import web
async def handle_post(request):
    try:
        # 从请求体中读取 JSON 数据
        data = await request.json()
        print(f"Received JSON data: {data}")
        # 返回一个 JSON 响应
        response_data = {"status": "success", "received": data}
        return web.json_response(response_data)
    except Exception as e:
        return web.json_response({"status": "error", "message": str(e)}, status=400)
async def main():
    app = web.Application()
    app.add_routes([web.post('/api/data', handle_post)])
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()
    print("Server started at http://localhost:8080")
    await asyncio.sleep(3600)
    await runner.cleanup()
if __name__ == '__main__':
    asyncio.run(main())

最佳实践

  1. 始终使用 async with for ClientSession: 这是保证资源被正确释放的最简单、最可靠的方式。

  2. 限制并发量: 在进行大量并发请求时(如爬虫),一次性发起成千上万个请求可能会耗尽系统资源或被目标服务器封禁,使用 asyncio.Semaphore 来限制并发数。

    SEMAPHORE = asyncio.Semaphore(100) # 限制最多100个并发
    async def fetch_with_semaphore(session, url):
        async with SEMAPHORE:
            async with session.get(url) as response:
                return await response.text()
  3. 超时设置: 为你的请求设置合理的超时时间,防止长时间挂起,可以在 ClientSession 级别或单个请求级别设置。

    # 在会话级别设置总超时
    timeout = aiohttp.ClientTimeout(total=60)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        # ...
  4. 复用 ClientSession: 在一个应用程序的生命周期内(一个爬虫任务或一个 Web 请求的处理过程),尽量只创建一个 ClientSession 并复用它,而不是为每个请求都创建一个新的。

  5. 使用会话钩子 (hooks): 可以使用 ClientSessionhooks 参数在请求前后执行自定义逻辑,例如记录日志、统一添加认证信息等。


官方文档和资源

这份指南涵盖了 aiohttp 的核心用法,要深入探索,建议结合官方文档进行学习。

分享:
扫描分享到社交APP
上一篇
下一篇