PythonのCeleryを徹底解説! 時間のかかる処理を非同期・並列実行


 

Webアプリケーション開発やデータ処理において、「ユーザーからのリクエストがタイムアウトしてしまう」「大量のタスクを並行して処理したい」「スケジュールに基づいて自動で処理を実行したい」といった課題に直面したことはありませんか? ユーザーの操作をブロックせずに重い処理を実行したり、定期的なバッチ処理を行ったりするには、バックグラウンド処理の仕組みが不可欠です。

そんな時にPythonで大活躍するのが、**分散型タスクキューCelery(セレリー)**です。

この記事では、Celeryの基本的な概念から、なぜ非同期・並列処理に不可欠なのか、そして実際に簡単な非同期タスクを実装する手順まで、初心者の方にも分かりやすく徹底的に解説します。Celeryをマスターして、あなたのPythonアプリケーションをより高速で応答性の高いものへと進化させましょう!


 

Celeryとは? なぜ非同期・並列処理に使うのか?

 

Celeryは、Pythonで書かれた分散型タスクキューです。Webアプリケーションなどで発生する時間のかかる処理(タスク)を、ユーザーのリクエストとは別にバックグラウンドで実行したり、複数のワーカーマシンに分散して並列実行したりすることを可能にします。

なぜCeleryが非同期・並列処理によく使われるのでしょうか?

  • ユーザー体験の向上: 時間のかかる処理をバックグラウンドに回すことで、Webアプリケーションの応答性を高め、ユーザーは処理完了を待つことなく次の操作に移ることができます。

  • スケーラビリティ: タスクを複数のワーカー(処理を実行するプロセスやサーバー)に分散して実行できるため、処理能力を水平に拡張できます。

  • 信頼性: タスクが失敗した場合のリトライ(再試行)メカニズムや、タスクの結果を保存する機能など、堅牢なタスク実行をサポートします。

  • 定期的なタスク: cronジョブのように、特定の時間に自動的に実行されるタスク(定期タスク)を設定できます。

  • 言語 agnostic (Broker): Celery自体はPythonで書かれていますが、メッセージブローカー(RedisやRabbitMQなど)を介してタスクをやり取りするため、他のプログラミング言語で書かれたアプリケーションとも連携できます。

  • 豊富な機能: タスクのチェイニング(連鎖実行)、グループ化、キャンセリング、レートリミット(実行頻度制限)など、高度なタスク管理機能が提供されています。

Celeryは、単体で動作するものではなく、メッセージブローカー(タスクを一時的に保存し、ワーカーに分配する仲介役)と、タスクを実行するワーカープロセスを必要とします。


 

Celeryの構成要素

 

Celeryの基本的な構成要素は以下の3つです。

  1. Client (Producer): タスクを作成し、それをメッセージブローカーに送信するアプリケーション(例: Webアプリケーションのビュー関数)。

  2. Broker (Message Queue): Clientから送られたタスクを受け取り、一時的にキューに保持し、利用可能なWorkerにタスクを分配する役割を担います。

    • 代表例: RabbitMQ (本番環境で推奨), Redis (シンプルで手軽), Amazon SQSなど。

  3. Worker (Consumer): Brokerからタスクを受け取り、実際にタスクのコードを実行するプロセス。


 

Celeryのインストール方法

 

Celeryを使うには、まずPCにインストールする必要があります。また、メッセージブローカー(ここでは手軽なRedisを使用)も必要です。

  1. コマンドプロンプト(Windows) または ターミナル(macOS/Linux) を開きます。

  2. CeleryとRedisクライアントライブラリをインストールします。

    Bash
     
    pip install celery redis
    
  3. Redisサーバーの準備: ローカルマシンにRedisサーバーをインストールし、起動しておく必要があります。OSによって手順は異なりますが、例えばmacOSならbrew install redis、Ubuntuならsudo apt install redis-serverでインストールし、redis-serverで起動します。


 

Celeryの基本的な使い方:非同期タスクの実装

 

ここでは、Celeryを使って簡単な非同期タスク(時間のかかる計算)を実装し、Webアプリケーションを模倣したクライアントから呼び出す例を見てみましょう。

 

1. Celeryアプリケーションの作成 (tasks.py)

 

Celeryアプリケーションのインスタンスを作成し、実行したいタスクを定義します。

Python
 
# tasks.py
from celery import Celery
import time

# Celeryアプリケーションの初期化
# broker: メッセージブローカーのURL (Redisの場合)
# backend: タスク結果を保存する場所 (Redisの場合。省略可能だが結果取得に必要)
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

# タスクの定義
@app.task
def long_running_task(x, y):
    """時間のかかる計算をシミュレートするタスク"""
    print(f"タスク開始: long_running_task({x}, {y})")
    time.sleep(5)  # 5秒間のスリープで時間のかかる処理を模倣
    result = x + y
    print(f"タスク終了: 結果 = {result}")
    return result

@app.task
def another_task(message):
    print(f"別のタスク実行中: {message}")
    return len(message)

 

2. クライアントからタスクを呼び出す (client.py)

 

Webアプリケーションのビュー関数などを模倣し、タスクを非同期で実行します。

Python
 
# client.py
from tasks import long_running_task, another_task
import time

print("クライアント: タスクを呼び出します...")

# タスクを非同期で実行 (.delay() または .apply_async())
# .delay() は最もシンプルな呼び出し方法
result_async = long_running_task.delay(10, 20)
print(f"クライアント: タスクがキューに追加されました。タスクID: {result_async.id}")

# 別のタスクも呼び出してみる
result_another = another_task.delay("Hello Celery!")
print(f"クライアント: 別のタスクがキューに追加されました。タスクID: {result_another.id}")

print("クライアント: 他の処理を続行します(タスクの完了を待ちません)")
# ここでWebアプリケーションならユーザーに応答を返す

# 後でタスクの結果を取得することも可能 (時間がかかる場合がある)
# print("クライアント: long_running_taskの結果を待っています...")
# print(f"クライアント: long_running_taskの結果: {result_async.get(timeout=10)}") # 最大10秒待つ

print("クライアント: 全ての処理を終了。")

 

3. Celeryワーカーを起動する

 

タスクを実際に実行するワーカープロセスを起動します。

Bash
 
# tasks.py があるディレクトリで実行
celery -A tasks worker --loglevel=info

 

実行手順と結果

 

  1. ターミナル1 (Redisサーバー): redis-server を実行してRedisを起動します。

  2. ターミナル2 (Celeryワーカー): celery -A tasks worker --loglevel=info を実行してワーカーを起動します。

    • ワーカーが起動すると、ブローカーに接続し、タスクを待機状態になります。

  3. ターミナル3 (クライアント): python client.py を実行してタスクを呼び出します。

    • クライアントはすぐに実行を完了し、タスクIDを表示します。

    • ターミナル2のワーカーのログを見ると、long_running_taskが実行され、5秒後に結果が出力されるのが確認できます。

これにより、クライアント(Webアプリケーション)は重い処理をバックグラウンドに渡し、すぐにユーザーにレスポンスを返すことができるようになります。


 

Celeryの主要な機能と応用

 

上記の基本的な使い方以外にも、Celeryは多岐にわたる機能を提供します。

  • タスク結果の取得: AsyncResultオブジェクト(result_asyncなど)を使って、タスクのステータス(保留中、成功、失敗など)を確認したり、結果を取得したりできます。

  • 定期タスク (Celery Beat): 定期的に実行したいタスクを設定できます(例: 毎日午前3時にデータ集計タスクを実行)。celery -A tasks beat --loglevel=infoで起動します。

  • レートリミット: 特定のタスクの実行頻度を制限し、外部APIへの過剰なアクセスなどを防ぎます。

  • リトライ (Retry): タスクが一時的なエラーで失敗した場合に、自動的に再試行する設定が可能です。

  • タスクのチェイニング/グループ化: 複数のタスクを連結して順番に実行したり、複数のタスクを並行して実行し、全ての完了を待ってから次の処理に進んだりできます。

  • コンフィギュレーション: Celeryアプリケーションの設定(並列度、タイムアウト、メッセージのシリアル化など)を柔軟にカスタマイズできます。


 

まとめ

 

Celeryは、Pythonアプリケーションに非同期・並列処理の能力をもたらし、パフォーマンスとスケーラビリティを大幅に向上させる強力な分散型タスクキューです。

  • 時間のかかる処理をバックグラウンドで実行。

  • ユーザー体験の向上アプリケーションのスケーラビリティに貢献。

  • Client (Producer)Broker (Message Queue)Worker (Consumer) の3要素で構成される。

  • RedisRabbitMQなどのメッセージブローカーが必要。

  • @app.taskデコレータでタスクを定義し、.delay()で非同期呼び出し。

  • celery -A your_module workerでワーカープロセスを起動。

  • 定期タスク、リトライ、レートリミットなど、豊富な機能を持つ。

Celeryを使いこなすことで、あなたのPythonアプリケーションはより複雑な処理にも対応できるようになり、ユーザーにより快適な体験を提供できるようになるでしょう。


 

■プロンプトだけでオリジナルアプリを開発・公開してみた!!

■AI時代の第一歩!「AI駆動開発コース」はじめました!

テックジム東京本校で先行開始。

■テックジム東京本校

「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。

<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。

<月1開催>放送作家による映像ディレクター養成講座

<オンライン無料>ゼロから始めるPython爆速講座