Python非同期処理完全攻略【async/await・asyncio徹底マスター】

フリーランスボード

20万件以上の案件から、副業に最適なリモート・週3〜の案件を一括検索できるプラットフォーム。プロフィール登録でAIスカウトが自動的にマッチング案件を提案。市場統計や単価相場、エージェントの口コミも無料で閲覧可能なため、本業を続けながら効率的に高単価の副業案件を探せます。フリーランスボード

ITプロパートナーズ

週2〜3日から働ける柔軟な案件が業界トップクラスの豊富さを誇るフリーランスエージェント。エンド直契約のため高単価で、週3日稼働でも十分な報酬を得られます。リモートや時間フレキシブルな案件も多数。スタートアップ・ベンチャー中心で、トレンド技術を使った魅力的な案件が揃っています。専属エージェントが案件紹介から契約交渉までサポート。利用企業2,000社以上の実績。ITプロパートナーズ

Midworks 10,000件以上の案件を保有し、週3日〜・フルリモートなど柔軟な働き方に対応。高単価案件が豊富で、報酬保障制度(60%)や保険料負担(50%)など正社員並みの手厚い福利厚生が特徴。通勤交通費(月3万円)、スキルアップ費用(月1万円)の支給に加え、リロクラブ・freeeが無料利用可能。非公開案件80%以上、支払いサイト20日で安心して稼働できます。Midworks

非同期処理は、I/O待機時間を有効活用してプログラムの実行効率を大幅に向上させる技術です。Python 3.5で導入されたasync/await構文により、非同期プログラミングが格段に書きやすくなりました。本記事では、非同期処理の基本から実践的な応用まで、実用的なサンプルコードとともに徹底解説します。

目次

非同期処理とは

非同期処理とは、一つの処理の完了を待たずに次の処理を開始する実行方式です。特にI/O操作(ファイル読み書き、ネットワーク通信、データベースアクセス)で威力を発揮します。

同期処理 vs 非同期処理

同期処理の問題点:

  • I/O待機中にCPUが無駄になる
  • 複数のタスクを順次実行するため時間がかかる
  • ユーザーインターフェースがブロックされる

非同期処理の利点:

  • I/O待機中に他のタスクを実行
  • 複数のタスクを並行処理
  • レスポンシブなアプリケーション

基本概念と用語

重要な用語

  • コルーチン(Coroutine): async defで定義された関数
  • awaitable: awaitで待機可能なオブジェクト
  • イベントループ: 非同期タスクを管理・実行するメカニズム
  • Task: コルーチンをイベントループで実行するためのラッパー

async/awaitの基本

最初のコルーチン

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 実行
asyncio.run(hello())

複数のコルーチンを並行実行

import asyncio

async def task(name, duration):
    print(f"{name} 開始")
    await asyncio.sleep(duration)
    print(f"{name} 完了")

async def main():
    await asyncio.gather(
        task("タスク1", 2),
        task("タスク2", 1),
        task("タスク3", 3)
    )

asyncio.run(main())

戻り値のあるコルーチン

import asyncio

async def fetch_data(url):
    await asyncio.sleep(1)  # API呼び出しのシミュレーション
    return f"データ from {url}"

async def main():
    results = await asyncio.gather(
        fetch_data("url1"),
        fetch_data("url2"),
        fetch_data("url3")
    )
    print(results)

asyncio.run(main())

asyncioの基本機能

asyncio.create_task()

import asyncio

async def background_task():
    await asyncio.sleep(2)
    print("バックグラウンドタスク完了")

async def main():
    # タスクを作成して即座に開始
    task = asyncio.create_task(background_task())
    print("メイン処理中...")
    await asyncio.sleep(1)
    await task  # タスクの完了を待機

asyncio.run(main())

asyncio.wait_for()(タイムアウト)

import asyncio

async def slow_operation():
    await asyncio.sleep(5)
    return "完了"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=3)
        print(result)
    except asyncio.TimeoutError:
        print("タイムアウトしました")

asyncio.run(main())

asyncio.as_completed()

import asyncio

async def fetch(url, delay):
    await asyncio.sleep(delay)
    return f"結果: {url}"

async def main():
    tasks = [fetch(f"url{i}", i) for i in range(1, 4)]
    
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"完了: {result}")

asyncio.run(main())

HTTPリクエストの非同期処理

aiohttpを使った並行HTTP請求

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1"
    ]
    
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_url(session, url) for url in urls]
        )
        print(f"取得完了: {len(results)}件")

# asyncio.run(main())  # 実際の実行時のコメントアウトを外す

リトライ機能付きHTTPクライアント

import asyncio
import aiohttp

async def fetch_with_retry(session, url, max_retries=3):
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=5) as response:
                if response.status == 200:
                    return await response.json()
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            await asyncio.sleep(2 ** attempt)  # 指数バックオフ

async def main():
    async with aiohttp.ClientSession() as session:
        result = await fetch_with_retry(session, "https://api.example.com/data")
        print(result)

ファイルI/Oの非同期処理

aiofilesを使った非同期ファイル操作

import asyncio
import aiofiles

async def read_file(filename):
    async with aiofiles.open(filename, 'r') as file:
        content = await file.read()
        return len(content)

async def main():
    files = ['file1.txt', 'file2.txt', 'file3.txt']
    sizes = await asyncio.gather(
        *[read_file(f) for f in files]
    )
    print(f"ファイルサイズ: {sizes}")

# asyncio.run(main())

大量ファイルの並行処理

import asyncio
import aiofiles
import os

async def process_file(filepath):
    async with aiofiles.open(filepath, 'r') as file:
        lines = await file.readlines()
        return len(lines)

async def process_directory(directory, max_concurrent=10):
    files = [os.path.join(directory, f) for f in os.listdir(directory)]
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def limited_process(filepath):
        async with semaphore:
            return await process_file(filepath)
    
    results = await asyncio.gather(
        *[limited_process(f) for f in files[:5]]  # 例として5ファイル
    )
    return sum(results)

データベースの非同期処理

asyncpgを使ったPostgreSQL接続

import asyncio
import asyncpg

async def fetch_users():
    conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
    try:
        rows = await conn.fetch('SELECT id, name FROM users LIMIT 10')
        return [dict(row) for row in rows]
    finally:
        await conn.close()

async def main():
    users = await fetch_users()
    print(f"取得ユーザー数: {len(users)}")

# asyncio.run(main())

コネクションプールの活用

import asyncio
import asyncpg

async def create_pool():
    return await asyncpg.create_pool(
        'postgresql://user:pass@localhost/db',
        min_size=5,
        max_size=20
    )

async def fetch_data(pool, query):
    async with pool.acquire() as conn:
        return await conn.fetchval(query)

async def main():
    pool = await create_pool()
    try:
        results = await asyncio.gather(
            fetch_data(pool, 'SELECT COUNT(*) FROM users'),
            fetch_data(pool, 'SELECT COUNT(*) FROM orders'),
            fetch_data(pool, 'SELECT COUNT(*) FROM products')
        )
        print(f"結果: {results}")
    finally:
        await pool.close()

プロデューサー・コンシューマーパターン

asyncio.Queueを使った非同期キュー

import asyncio
import random

async def producer(queue, name):
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"生産: {item}")
        await asyncio.sleep(random.uniform(0.5, 1.5))

async def consumer(queue, name):
    while True:
        item = await queue.get()
        print(f"{name} が消費: {item}")
        await asyncio.sleep(random.uniform(1, 2))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    
    await asyncio.gather(
        producer(queue, "Producer1"),
        consumer(queue, "Consumer1"),
        consumer(queue, "Consumer2")
    )

# asyncio.run(main())

エラーハンドリング

例外処理のベストプラクティス

import asyncio

async def risky_operation(should_fail=False):
    await asyncio.sleep(1)
    if should_fail:
        raise ValueError("操作が失敗しました")
    return "成功"

async def safe_execution():
    tasks = [
        risky_operation(False),
        risky_operation(True),
        risky_operation(False)
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"タスク{i}: エラー - {result}")
        else:
            print(f"タスク{i}: 成功 - {result}")

asyncio.run(safe_execution())

カスタム例外ハンドラー

import asyncio
import logging

def exception_handler(loop, context):
    exception = context.get('exception')
    if isinstance(exception, ValueError):
        logging.error(f"カスタムエラー処理: {exception}")
    else:
        logging.error(f"未処理の例外: {context}")

async def problematic_task():
    await asyncio.sleep(1)
    raise ValueError("テストエラー")

async def main():
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(exception_handler)
    
    asyncio.create_task(problematic_task())
    await asyncio.sleep(2)

パフォーマンス最適化

セマフォによる同時実行制御

import asyncio
import aiohttp

async def fetch_with_semaphore(session, url, semaphore):
    async with semaphore:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [f"https://httpbin.org/delay/{i%3+1}" for i in range(20)]
    semaphore = asyncio.Semaphore(5)  # 最大5並行
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_semaphore(session, url, semaphore) 
            for url in urls
        ]
        results = await asyncio.gather(*tasks)
        print(f"完了: {len(results)}件")

バッチ処理による効率化

import asyncio

async def process_batch(items):
    print(f"バッチ処理: {len(items)}件")
    await asyncio.sleep(0.1)  # 処理のシミュレーション
    return len(items)

async def batch_processor(items, batch_size=10):
    batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
    results = await asyncio.gather(*[process_batch(batch) for batch in batches])
    return sum(results)

async def main():
    items = list(range(100))
    total = await batch_processor(items, batch_size=15)
    print(f"処理総数: {total}")

asyncio.run(main())

実用的なWebアプリケーション例

FastAPIでの非同期エンドポイント

from fastapi import FastAPI
import asyncio
import httpx

app = FastAPI()

@app.get("/fetch-multiple")
async def fetch_multiple_apis():
    async with httpx.AsyncClient() as client:
        tasks = [
            client.get("https://api.github.com/user"),
            client.get("https://api.github.com/repos"),
            client.get("https://api.github.com/notifications")
        ]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return {"results": len([r for r in responses if not isinstance(r, Exception)])}

WebSocketの非同期処理

import asyncio
import websockets

async def echo_handler(websocket, path):
    async for message in websocket:
        response = f"Echo: {message}"
        await websocket.send(response)

async def start_server():
    server = await websockets.serve(echo_handler, "localhost", 8765)
    print("WebSocketサーバー開始")
    await server.wait_closed()

# asyncio.run(start_server())

非同期処理のデバッグとテスト

ログ出力による追跡

import asyncio
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def traced_operation(name, duration):
    logger.info(f"{name} 開始")
    try:
        await asyncio.sleep(duration)
        logger.info(f"{name} 完了")
        return f"{name} の結果"
    except Exception as e:
        logger.error(f"{name} エラー: {e}")
        raise

async def main():
    tasks = [
        traced_operation("タスクA", 1),
        traced_operation("タスクB", 2),
        traced_operation("タスクC", 0.5)
    ]
    results = await asyncio.gather(*tasks)
    logger.info(f"全タスク完了: {len(results)}件")

asyncio.run(main())

テスト用の非同期関数

import asyncio
import pytest

async def async_add(a, b):
    await asyncio.sleep(0.1)  # 非同期処理のシミュレーション
    return a + b

@pytest.mark.asyncio
async def test_async_add():
    result = await async_add(2, 3)
    assert result == 5

# テスト用のメイン関数
async def run_tests():
    result = await async_add(5, 7)
    assert result == 12
    print("テスト成功")

asyncio.run(run_tests())

同期・非同期の使い分け

いつ非同期処理を使うべきか

処理の種類非同期処理の効果推奨度
HTTP API呼び出し高い
ファイル読み書き中程度
データベースアクセス高い
CPU集約的処理低い
リアルタイム通信非常に高い

同期コードと非同期コードの共存

import asyncio
import concurrent.futures

def sync_heavy_task(n):
    # CPU集約的な処理(同期)
    return sum(i*i for i in range(n))

async def async_wrapper():
    loop = asyncio.get_event_loop()
    
    # 同期関数を非同期的に実行
    with concurrent.futures.ThreadPoolExecutor() as executor:
        tasks = [
            loop.run_in_executor(executor, sync_heavy_task, 10000),
            loop.run_in_executor(executor, sync_heavy_task, 20000),
            loop.run_in_executor(executor, sync_heavy_task, 15000)
        ]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    results = await async_wrapper()
    print(f"計算結果: {results}")

asyncio.run(main())

よくある間違いと解決策

間違い1: awaitを忘れる

import asyncio

async def wrong_way():
    # 間違い: awaitを忘れている
    result = asyncio.sleep(1)  # これはコルーチンオブジェクト
    print(result)  # <coroutine object sleep at 0x...>

async def correct_way():
    # 正解: awaitを使う
    await asyncio.sleep(1)
    print("1秒経過")

asyncio.run(correct_way())

間違い2: ブロッキング処理を非同期で使う

import asyncio
import time

async def wrong_blocking():
    # 間違い: time.sleepは同期的にブロックする
    time.sleep(1)

async def correct_async():
    # 正解: asyncio.sleepを使う
    await asyncio.sleep(1)

async def main():
    start = asyncio.get_event_loop().time()
    await asyncio.gather(correct_async(), correct_async())
    elapsed = asyncio.get_event_loop().time() - start
    print(f"実行時間: {elapsed:.1f}秒")  # 約1秒

asyncio.run(main())

パフォーマンス比較

同期 vs 非同期のベンチマーク

import asyncio
import time
import aiohttp
import requests

# 同期版
def sync_fetch(url):
    response = requests.get(url)
    return len(response.text)

def sync_main():
    urls = ["https://httpbin.org/delay/1"] * 5
    start = time.time()
    results = [sync_fetch(url) for url in urls]
    end = time.time()
    return end - start, len(results)

# 非同期版
async def async_fetch(session, url):
    async with session.get(url) as response:
        text = await response.text()
        return len(text)

async def async_main():
    urls = ["https://httpbin.org/delay/1"] * 5
    start = time.time()
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[async_fetch(session, url) for url in urls]
        )
    end = time.time()
    return end - start, len(results)

# ベンチマーク実行例(実際の実行時はコメントアウトを外す)
# sync_time, sync_count = sync_main()
# async_time, async_count = asyncio.run(async_main())
# print(f"同期処理: {sync_time:.1f}秒")
# print(f"非同期処理: {async_time:.1f}秒")
# print(f"速度向上: {sync_time/async_time:.1f}倍")

まとめ

Pythonの非同期処理は、I/O集約的なアプリケーションのパフォーマンスを劇的に向上させる強力な技術です。

重要なポイント:

  • async/awaitで読みやすい非同期コードを書く
  • **asyncio.gather()**で複数タスクを並行実行
  • 適切なライブラリ(aiohttp、aiofiles、asyncpgなど)を選択
  • エラーハンドリングを忘れずに実装
  • CPU集約的処理では効果が薄いことを理解

非同期処理を適切に活用することで、レスポンシブで高性能なPythonアプリケーションを開発できます。本記事のサンプルコードを参考に、あなたのプロジェクトに最適な非同期処理を実装してください。

参考文献

  • Python公式asyncioドキュメント
  • aiohttp公式ドキュメント
  • FastAPI公式ドキュメント
  • Real Python – Async IO in Python

「らくらくPython塾」が切り開く「呪文コーディング」とは?

■プロンプトだけでオリジナルアプリを開発・公開してみた!!

■AI時代の第一歩!「AI駆動開発コース」はじめました!

テックジム東京本校で先行開始。

■テックジム東京本校

「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。

<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。

<月1開催>放送作家による映像ディレクター養成講座

<オンライン無料>ゼロから始めるPython爆速講座

フリーランスボード

20万件以上の案件から、副業に最適なリモート・週3〜の案件を一括検索できるプラットフォーム。プロフィール登録でAIスカウトが自動的にマッチング案件を提案。市場統計や単価相場、エージェントの口コミも無料で閲覧可能なため、本業を続けながら効率的に高単価の副業案件を探せます。フリーランスボード

ITプロパートナーズ

週2〜3日から働ける柔軟な案件が業界トップクラスの豊富さを誇るフリーランスエージェント。エンド直契約のため高単価で、週3日稼働でも十分な報酬を得られます。リモートや時間フレキシブルな案件も多数。スタートアップ・ベンチャー中心で、トレンド技術を使った魅力的な案件が揃っています。専属エージェントが案件紹介から契約交渉までサポート。利用企業2,000社以上の実績。ITプロパートナーズ

Midworks 10,000件以上の案件を保有し、週3日〜・フルリモートなど柔軟な働き方に対応。高単価案件が豊富で、報酬保障制度(60%)や保険料負担(50%)など正社員並みの手厚い福利厚生が特徴。通勤交通費(月3万円)、スキルアップ費用(月1万円)の支給に加え、リロクラブ・freeeが無料利用可能。非公開案件80%以上、支払いサイト20日で安心して稼働できます。Midworks