python协程实战代码
时间: 2025-02-02 08:18:53 浏览: 36
### Python 协程实战示例代码
#### 使用 `asyncio` 库创建并运行多个协程任务
为了展示如何在实际项目中使用 Python 的协程功能,下面提供了一个简单的例子来模拟并发下载网页内容的任务。此案例展示了如何定义异步函数以及管理这些函数所返回的协程对象。
```python
import asyncio
import aiohttp
from typing import List
async def fetch_page(session: aiohttp.ClientSession, url: str) -> str:
try:
response = await session.get(url)
content = await response.text()
print(f"Fetched {url}")
return content[:100] # 只打印前100个字符作为示例
except Exception as e:
print(f"Failed to fetch {url}: {e}")
async def main(urls: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [fetch_page(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if not isinstance(result, Exception):
print(result)
if __name__ == "__main__":
websites = [
"https://www.python.org",
"https://docs.python.org/3/library/asyncio.html",
"https://realpython.com/python-concurrency/"
]
loop = asyncio.get_event_loop()
loop.run_until_complete(main(websites))
```
这段代码首先导入必要的模块,并定义了两个主要部分:
- **`fetch_page` 函数**:这是一个异步函数,它接受会话实例和目标 URL 参数,尝试获取页面内容并通过字符串形式返回部分内容[^2]。
- **`main` 函数**:该函数接收一系列网址列表参数,在其中创建一个新的 HTTP 客户端会话,并针对每个给定的 URL 构建相应的 `fetch_page` 调用任务集合;最后利用 `gather()` 方法一次性启动所有任务等待它们完成[^3]。
上述程序可以有效地提高 IO 密集型操作(如网络请求)的速度,因为当一个连接正在等待响应时,其他连接能够继续工作而不必阻塞整个应用程序流程。
阅读全文
相关推荐


















