PythonのCeleryを徹底解説! 時間のかかる処理を非同期・並列実行
Webアプリケーション開発やデータ処理において、「ユーザーからのリクエストがタイムアウトしてしまう」「大量のタスクを並行して処理したい」「スケジュールに基づいて自動で処理を実行したい」といった課題に直面したことはありませんか? ユーザーの操作をブロックせずに重い処理を実行したり、定期的なバッチ処理を行ったりするには、バックグラウンド処理の仕組みが不可欠です。
そんな時にPythonで大活躍するのが、**分散型タスクキューCelery(セレリー)**です。
この記事では、Celeryの基本的な概念から、なぜ非同期・並列処理に不可欠なのか、そして実際に簡単な非同期タスクを実装する手順まで、初心者の方にも分かりやすく徹底的に解説します。Celeryをマスターして、あなたのPythonアプリケーションをより高速で応答性の高いものへと進化させましょう!
Celeryとは? なぜ非同期・並列処理に使うのか?
Celeryは、Pythonで書かれた分散型タスクキューです。Webアプリケーションなどで発生する時間のかかる処理(タスク)を、ユーザーのリクエストとは別にバックグラウンドで実行したり、複数のワーカーマシンに分散して並列実行したりすることを可能にします。
なぜCeleryが非同期・並列処理によく使われるのでしょうか?
ユーザー体験の向上: 時間のかかる処理をバックグラウンドに回すことで、Webアプリケーションの応答性を高め、ユーザーは処理完了を待つことなく次の操作に移ることができます。
スケーラビリティ: タスクを複数のワーカー(処理を実行するプロセスやサーバー)に分散して実行できるため、処理能力を水平に拡張できます。
信頼性: タスクが失敗した場合のリトライ(再試行)メカニズムや、タスクの結果を保存する機能など、堅牢なタスク実行をサポートします。
定期的なタスク: cronジョブのように、特定の時間に自動的に実行されるタスク(定期タスク)を設定できます。
言語 agnostic (Broker): Celery自体はPythonで書かれていますが、メッセージブローカー(RedisやRabbitMQなど)を介してタスクをやり取りするため、他のプログラミング言語で書かれたアプリケーションとも連携できます。
豊富な機能: タスクのチェイニング(連鎖実行)、グループ化、キャンセリング、レートリミット(実行頻度制限)など、高度なタスク管理機能が提供されています。
Celeryは、単体で動作するものではなく、メッセージブローカー(タスクを一時的に保存し、ワーカーに分配する仲介役)と、タスクを実行するワーカープロセスを必要とします。
Celeryの構成要素
Celeryの基本的な構成要素は以下の3つです。
Client (Producer): タスクを作成し、それをメッセージブローカーに送信するアプリケーション(例: Webアプリケーションのビュー関数)。
Broker (Message Queue): Clientから送られたタスクを受け取り、一時的にキューに保持し、利用可能なWorkerにタスクを分配する役割を担います。
代表例: RabbitMQ (本番環境で推奨), Redis (シンプルで手軽), Amazon SQSなど。
Worker (Consumer): Brokerからタスクを受け取り、実際にタスクのコードを実行するプロセス。
Celeryのインストール方法
Celeryを使うには、まずPCにインストールする必要があります。また、メッセージブローカー(ここでは手軽なRedisを使用)も必要です。
コマンドプロンプト(Windows) または ターミナル(macOS/Linux) を開きます。
CeleryとRedisクライアントライブラリをインストールします。
Bashpip install celery redisRedisサーバーの準備: ローカルマシンにRedisサーバーをインストールし、起動しておく必要があります。OSによって手順は異なりますが、例えばmacOSなら
brew install redis、Ubuntuならsudo apt install redis-serverでインストールし、redis-serverで起動します。
Celeryの基本的な使い方:非同期タスクの実装
ここでは、Celeryを使って簡単な非同期タスク(時間のかかる計算)を実装し、Webアプリケーションを模倣したクライアントから呼び出す例を見てみましょう。
1. Celeryアプリケーションの作成 (tasks.py)
Celeryアプリケーションのインスタンスを作成し、実行したいタスクを定義します。
# tasks.py
from celery import Celery
import time
# Celeryアプリケーションの初期化
# broker: メッセージブローカーのURL (Redisの場合)
# backend: タスク結果を保存する場所 (Redisの場合。省略可能だが結果取得に必要)
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
# タスクの定義
@app.task
def long_running_task(x, y):
"""時間のかかる計算をシミュレートするタスク"""
print(f"タスク開始: long_running_task({x}, {y})")
time.sleep(5) # 5秒間のスリープで時間のかかる処理を模倣
result = x + y
print(f"タスク終了: 結果 = {result}")
return result
@app.task
def another_task(message):
print(f"別のタスク実行中: {message}")
return len(message)
2. クライアントからタスクを呼び出す (client.py)
Webアプリケーションのビュー関数などを模倣し、タスクを非同期で実行します。
# client.py
from tasks import long_running_task, another_task
import time
print("クライアント: タスクを呼び出します...")
# タスクを非同期で実行 (.delay() または .apply_async())
# .delay() は最もシンプルな呼び出し方法
result_async = long_running_task.delay(10, 20)
print(f"クライアント: タスクがキューに追加されました。タスクID: {result_async.id}")
# 別のタスクも呼び出してみる
result_another = another_task.delay("Hello Celery!")
print(f"クライアント: 別のタスクがキューに追加されました。タスクID: {result_another.id}")
print("クライアント: 他の処理を続行します(タスクの完了を待ちません)")
# ここでWebアプリケーションならユーザーに応答を返す
# 後でタスクの結果を取得することも可能 (時間がかかる場合がある)
# print("クライアント: long_running_taskの結果を待っています...")
# print(f"クライアント: long_running_taskの結果: {result_async.get(timeout=10)}") # 最大10秒待つ
print("クライアント: 全ての処理を終了。")
3. Celeryワーカーを起動する
タスクを実際に実行するワーカープロセスを起動します。
# tasks.py があるディレクトリで実行
celery -A tasks worker --loglevel=info
実行手順と結果
ターミナル1 (Redisサーバー):
redis-serverを実行してRedisを起動します。ターミナル2 (Celeryワーカー):
celery -A tasks worker --loglevel=infoを実行してワーカーを起動します。ワーカーが起動すると、ブローカーに接続し、タスクを待機状態になります。
ターミナル3 (クライアント):
python client.pyを実行してタスクを呼び出します。クライアントはすぐに実行を完了し、タスクIDを表示します。
ターミナル2のワーカーのログを見ると、
long_running_taskが実行され、5秒後に結果が出力されるのが確認できます。
これにより、クライアント(Webアプリケーション)は重い処理をバックグラウンドに渡し、すぐにユーザーにレスポンスを返すことができるようになります。
Celeryの主要な機能と応用
上記の基本的な使い方以外にも、Celeryは多岐にわたる機能を提供します。
タスク結果の取得:
AsyncResultオブジェクト(result_asyncなど)を使って、タスクのステータス(保留中、成功、失敗など)を確認したり、結果を取得したりできます。定期タスク (Celery Beat): 定期的に実行したいタスクを設定できます(例: 毎日午前3時にデータ集計タスクを実行)。
celery -A tasks beat --loglevel=infoで起動します。レートリミット: 特定のタスクの実行頻度を制限し、外部APIへの過剰なアクセスなどを防ぎます。
リトライ (Retry): タスクが一時的なエラーで失敗した場合に、自動的に再試行する設定が可能です。
タスクのチェイニング/グループ化: 複数のタスクを連結して順番に実行したり、複数のタスクを並行して実行し、全ての完了を待ってから次の処理に進んだりできます。
コンフィギュレーション:
Celeryアプリケーションの設定(並列度、タイムアウト、メッセージのシリアル化など)を柔軟にカスタマイズできます。
まとめ
Celeryは、Pythonアプリケーションに非同期・並列処理の能力をもたらし、パフォーマンスとスケーラビリティを大幅に向上させる強力な分散型タスクキューです。
時間のかかる処理をバックグラウンドで実行。
ユーザー体験の向上とアプリケーションのスケーラビリティに貢献。
Client (Producer)、Broker (Message Queue)、Worker (Consumer) の3要素で構成される。
RedisやRabbitMQなどのメッセージブローカーが必要。
@app.taskデコレータでタスクを定義し、.delay()で非同期呼び出し。celery -A your_module workerでワーカープロセスを起動。定期タスク、リトライ、レートリミットなど、豊富な機能を持つ。
Celeryを使いこなすことで、あなたのPythonアプリケーションはより複雑な処理にも対応できるようになり、ユーザーにより快適な体験を提供できるようになるでしょう。
■プロンプトだけでオリジナルアプリを開発・公開してみた!!
■AI時代の第一歩!「AI駆動開発コース」はじめました!
テックジム東京本校で先行開始。
■テックジム東京本校
「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。
<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。
<月1開催>放送作家による映像ディレクター養成講座
<オンライン無料>ゼロから始めるPython爆速講座

