
2026/01/29 1:32
エリクサーから派生したジョブ処理フレームワーク「Oban」が、Python 版として登場しました。
RSS: https://news.ycombinator.com/rss
要約▶
Japanese Translation:
## Summary Oban‑py は PostgreSQL のみで動作するジョブキューで、無料の OSS エディションと追加機能付き有料 Pro エディションがあります。 全ての調整は一つのデータベース内で完結します:LISTEN/NOTIFY が `oban_jobs` に新しい行が挿入されるとワーカーを起動し、`oban_leaders` の TTL カラム付き INSERT … ON CONFLICT で単一リーダーを選出し、SELECT … FOR UPDATE SKIP LOCKED により複数のワーカーが安全にジョブを取得できます。 OSS 版ではワーカーは単一の asyncio イベントループ内で動作し、個々のジョブを一度に認証します。Pro 版では真の並列処理を実現するプロセスプールディスパッチャ、複数ジョブを同時に挿入または認証するバルク SQL 文、高度なハートビートで停止したプロデューサーを検知、ワークフローオーケストレーション、一意ジョブ検出、および設定可能なタイムアウト後に `executing` 状態のジョブを `available` に戻す「Lifeline」救済プロセスが追加されます。 両エディションとも、最大試行回数まで指数関数的バックオフ再試行ロジック(≈15 + 2^attempt s + jitter)をサポートし、それ以降はジョブを破棄します。 リーダー専用 Pruner は設定された `max_age` より古い終了済みジョブを削除します。 OSS は趣味プロジェクトや初期評価に適しており、Pro はより高スループット・並列性と堅牢な障害処理が必要な場合に推奨されます。
本文
舞台設定
私はElixirでソフトウェアを書き始めてからというもの、ほぼ同じくらいObanを使ってきました。処理ジョブの必須ツールとして常に存在しており、いつもObanはクールだと感じつつ、深掘りしたことはありませんでした。本記事では、Python版Oban(obn‑py)がどのように動作し、コードベースを探索する中で得た知見をまとめます。Elixir版との比較や、一般的な並行処理についても触れつつ紹介します。
表層レベル
Obanはデータベースだけでジョブの投入と実行が可能です。ユーザー作成時に確認メール送信ジョブを同じトランザクションで挿入でき、いずれかが失敗すれば全てロールバックされます。
ほぼすべてのジョブ処理フレームワークと同様に、Obanはキューを持ち、ローカル・グローバルの制限があります。他のツールとは違い、完了したジョブを保持し必要なら結果も保存できます。ビルトインでcronスケジューリングが備わり、ジョブ処理方法を細かく制御できる機能が豊富です。
Obanはオープンソース版 Oban‑py と商用版 Oban‑py‑pro の二種類があります。OSS版には Pro 版で自動的に解消されるいくつかの制限があります:
- 単一スレッドでの asyncio 実行 – 並列はあるものの真の並列ではないため、CPU 集約ジョブがイベントループをブロックします。
- バルク挿入なし – ジョブごとに個別に挿入されます。
- バルク認証なし – 完了時に各ジョブを個別に永続化します。
- 不正確なリカバリー – 長時間実行中のジョブがプロデューサーがまだ生存している場合でも救済される可能性があります。Pro はより賢いハートビートでプロデューサーの健全性を追跡します。
Pro 版はワークフロー、リレー、ユニークジョブ、スマート並行処理など、別途設定する必要がある機能を追加します。OSS Oban‑py は趣味レベルや Oban の哲学を評価したい場合には十分ですが、より大規模な運用では Pro を選択する価値があります。価格はこれらの機能に対して非常に魅力的です。
ここでは Pro のすべての機能を紹介できませんので、まず基本から始めましょう:ジョブ挿入から実行まで Oban Py が内部でどう動くかを追います。引き続きお付き合いください。
深掘り – ジョブ処理パス
1. ジョブの投入
from oban import job @job(queue="default") async def send_email(to: str, subject: str, body: str): await smtp.send(to, subject, body) await send_email.enqueue("[email protected]", "Hello", "World")
ジョブは
oban_jobs テーブルに state = 'available' として挿入されます。Oban はその挿入チャネルで PostgreSQL の NOTIFY を発火します:
# oban.py:414‑419 result = await self._query.insert_jobs(jobs) queues = {job.queue for job in result if job.state == "available"} await self._notifier.notify("insert", [{"queue": queue} for queue in queues])
2. 通知の受信とプロデューサーの起動
各ノードはそのチャネルを LISTEN しているため、通知を受け取ると Stager が目覚めます。Stager は実際に走らせるキューだけを扱い、該当キューが自ノードで走っている場合のみプロデューサーへ通知します:
# _stager.py:95‑99 async def _on_notification(self, channel: str, payload: dict) -> None: queue = payload["queue"] if queue in self._producers: self._producers[queue].notify()
notify() は asyncio.Event をセットし、プロデューサーの待機ループを抜けてジョブをワーカーへ送る準備ができたことを知らせます。
# _producer.py:244‑262 async def _loop(self) -> None: while True: try: await asyncio.wait_for(self._notified.wait(), timeout=1.0) except asyncio.TimeoutError: continue except asyncio.CancelledError: break self._notified.clear() try: await self._debounce() await self._produce() except asyncio.CancelledError: break except Exception: logger.exception("Error in producer for queue %s", self._queue)
3. ジョブ取得とロック
プロデューサーは既存の完了(ack)を永続化してキュー制限に従い、新規ジョブを取得します。取得時に
executing に状態遷移させます:
-- fetch_jobs.sql (simplified) WITH locked_jobs AS ( SELECT priority, scheduled_at, id FROM oban_jobs WHERE state = 'available' AND queue = %(queue)s ORDER BY priority ASC, scheduled_at ASC, id ASC LIMIT %(demand)s FOR UPDATE SKIP LOCKED ) UPDATE oban_jobs oj SET attempt = oj.attempt + 1, attempted_at = timezone('UTC', now()), attempted_by = %(attempted_by)s, state = 'executing' FROM locked_jobs WHERE oj.id = locked_jobs.id;
FOR UPDATE SKIP LOCKED は選択した行をロックし、既にロック済みの行はスキップします。これが並行処理で不可欠です。
4. ジョブ実行
取得したジョブは即座に非同期タスクとしてディスパッチされます:
jobs = await self._get_jobs() for job in jobs: task = self._dispatcher.dispatch(self, job) task.add_done_callback(lambda _, job_id=job.id: self._on_job_complete(job_id)) self._running_jobs[job.id] = (job, task)
add_done_callback は成功・失敗を問わず完了処理を行うために使用します。
ディスパッチャはジョブの実行方法を制御します。Pro 版では真の並列化を実現するためプロセスプールがローカルディスパッチャを置き換えます。
5. エグゼキュータと結果処理
エグゼキュータはワーカークラスを解決し、
process() を呼び出します。戻り値に応じて結果をパターンマッチで判定します:
# _executor.py:73‑83 async def _process(self) -> None: self.worker = resolve_worker(self.job.worker)() self.result = await self.worker.process(self.job) # _executor.py:95‑133 match result: case Exception() as error: # Retry or discard based on attempt count case Cancel(reason=reason): # Mark cancelled case Snooze(seconds=seconds): # Reschedule with decremented attempt case _: # Completed successfully
実行が完了すると結果は ACK 用にキューへ入れられます:
# _producer.py:315 self._pending_acks.append(executor.action)
完成コールバックはプロデューサーを再び起動し、さらにジョブ取得とバッチACKを行います。
5 つの主要ステップ
Insert → Notify → Fetch (with locking) → Execute → Ack
これがコードから完了までのホットパスです。背景プロセス、エラー処理、リトライ、周期ジョブ(cron)などについては次回に続きます。
背景プロセス – 隠れた働き
リーダー選出
クラスターではすべてのノードがジョブをプルしたり救済したりしないよう、単一リーダーが決定されます:
# _leader.py:107‑113 async def _election(self) -> None: self._is_leader = await self._query.attempt_leadership( self._name, self._node, int(self._interval), self._is_leader)
データベースはまず期限切れのリーダーを削除します:
DELETE FROM oban_leaders WHERE expires_at < timezone('UTC', now());
現在ノードがリーダーなら自ら再選出し、そうでなければ新規リーダーとして挿入を試みます。リーダーはリース保持のため通常より二倍速く更新します:
# _leader.py:101‑105 sleep_duration = self._interval / 2 if self._is_leader else self._interval
ノードが正常に停止すると、辞任通知を送ります:
# _leader.py:83‑87 if self._is_leader: payload = {"action": "resign", "node": self._node, "name": self._name} await self._notifier.notify("leader", payload) await self._query.resign_leader(self._name, self._node)
リーダー選出は PostgreSQL の
INSERT … ON CONFLICT と TTL ベースのレースで完結し、Raft 等の外部協調プロトコルを必要としません。
ライフライン – 孤立ジョブの救済
ワーカーがクラッシュしたりコンテナが停止した場合、実行中ジョブが永遠に止まる恐れがあります。リーダーのみが
lifeline を起動して救済します:
# _lifeline.py:73‑77 async def _rescue(self) -> None: if not self._leader.is_leader: return await use_ext("lifeline.rescue", _rescue, self._query, self._rescue_after)
救済は単純に時間ベースで行われ、
executing 状態のジョブが rescue_after(デフォルト 5 分)を超えたら戻します。Pro バージョンではプロデューサーの健全性も確認するため、真に長時間実行中のジョブは救済されません。
-- rescue_jobs.sql (simplified) UPDATE oban_jobs SET state = CASE WHEN attempt >= max_attempts THEN 'discarded' ELSE 'available' END, meta = CASE WHEN attempt >= max_attempts THEN meta ELSE meta || jsonb_build_object('rescued', coalesce((meta->>'rescued')::int,0)+1) END WHERE state = 'executing' AND attempted_at < timezone('UTC', now()) - make_interval(secs => %(rescue_after)s);
meta.rescued カウンタで救済回数を追跡できます。
プルーナー – 古いジョブのクリーンアップ
oban_jobs テーブルはプラグイン無しでは無限に膨張します。リーダーのみが max_age(デフォルト 1 日)より古い終端状態のジョブを削除します:
-- prune_jobs.sql WITH jobs_to_delete AS ( SELECT id FROM oban_jobs WHERE (state = 'completed' AND completed_at <= timezone('UTC', now()) - make_interval(secs => %(max_age)s)) OR (state = 'cancelled' AND cancelled_at <= timezone('UTC', now()) - make_interval(secs => %(max_age)s)) OR (state = 'discarded' AND discarded_at <= timezone('UTC', now()) - make_interval(secs => %(max_age)s)) ORDER BY id ASC LIMIT %(limit)s ) DELETE FROM oban_jobs WHERE id IN (SELECT id FROM jobs_to_delete);
LIMIT は長時間実行される削除が他の操作をブロックしないようにします。
リトライ & バックオフ
ジョブで例外が発生した場合、エグゼキュータはその運命を決定します:
# _executor.py:96‑109 match result: case Exception() as error: if self.job.attempt >= self.job.max_attempts: self.action = AckAction(job=self.job, state="discarded", error=self._format_error(error)) else: self.action = AckAction(job=self.job, state="retryable", error=self._format_error(error), schedule_in=self._retry_backoff())
デフォルトのバックオフは「ジッター付き制限済み指数関数的増加」で、スレッド群が同時に再試行する現象(thundering herd)を防ぎます:
# _backoff.py:66‑87 def jittery_clamped(attempt: int, max_attempts: int, *, clamped_max: int = 20) -> int: if max_attempts <= clamped_max: clamped_attempt = attempt else: clamped_attempt = round(attempt / max_attempts * clamped_max) time = exponential(clamped_attempt, mult=1, max_pow=100, min_pad=15) return jitter(time, mode="inc")
式は 15 + 2^attempt 秒で、最大10 % のジッターが付与されます。例:試行 1 は約17秒、5 は約47秒、10 は約1039秒(≈17分)です。高
max_attempts を設定するとスケールダウンし、年単位の遅延を防ぎます。ワーカーはカスタムバックオフで上書き可能です。
まとめ – 学びとポイント
- PostgreSQL が重役:
(並行取得)、LISTEN/NOTIFY(リアルタイム通知)、FOR UPDATE SKIP LOCKED
(リーダー選出)。Redis や ZooKeeper は不要。ON CONFLICT - Oban‑py は並行はあるものの真の並列化はなし。asyncio で多数ジョブを同時実行できるがイベントループは単一スレッド。CPU 集約タスクには Pro のプロセスプールがおすすめ。
- リーダー選出はシンプルかつ効果的:TTL ベースの
。リーダーは通常より二倍速く更新し、死亡時に別ノードが引き継ぎます。プルーナー・救済には十分です。INSERT … ON CONFLICT - コードベースはクリーンで命名規則も統一されており、読む感覚は良い本を読むようなものです。
- OSS 版でもかなりの機能が備わっていますが、バルク操作や賢い救済、本格的並列化など Pro が補完します。Pro ライセンスはコストパフォーマンスが高いと感じます。
総評
Oban.py は Elixir 版を忠実に移植したクリーンで構造化されたプロダクトです。Elixir の Oban を懐かしむ方、あるいは外部インフラ(Redis 等)なしで PostgreSQL だけでジョブキューを運用したい Python 開発者には十分価値があります。ぜひ一度試してみてください。