エリクサーから派生したジョブ処理フレームワーク「Oban」が、Python 版として登場しました。

2026/01/29 1:32

エリクサーから派生したジョブ処理フレームワーク「Oban」が、Python 版として登場しました。

RSS: https://news.ycombinator.com/rss

要約

Japanese Translation:

## Summary
Oban‑py は PostgreSQL のみで動作するジョブキューで、無料の OSS エディションと追加機能付き有料 Pro エディションがあります。  
全ての調整は一つのデータベース内で完結します:LISTEN/NOTIFY が `oban_jobs` に新しい行が挿入されるとワーカーを起動し、`oban_leaders` の TTL カラム付き INSERT … ON CONFLICT で単一リーダーを選出し、SELECT … FOR UPDATE SKIP LOCKED により複数のワーカーが安全にジョブを取得できます。  

OSS 版ではワーカーは単一の asyncio イベントループ内で動作し、個々のジョブを一度に認証します。Pro 版では真の並列処理を実現するプロセスプールディスパッチャ、複数ジョブを同時に挿入または認証するバルク SQL 文、高度なハートビートで停止したプロデューサーを検知、ワークフローオーケストレーション、一意ジョブ検出、および設定可能なタイムアウト後に `executing` 状態のジョブを `available` に戻す「Lifeline」救済プロセスが追加されます。  

両エディションとも、最大試行回数まで指数関数的バックオフ再試行ロジック(≈15 + 2^attempt s + jitter)をサポートし、それ以降はジョブを破棄します。  
リーダー専用 Pruner は設定された `max_age` より古い終了済みジョブを削除します。  

OSS は趣味プロジェクトや初期評価に適しており、Pro はより高スループット・並列性と堅牢な障害処理が必要な場合に推奨されます。

本文

舞台設定

私はElixirでソフトウェアを書き始めてからというもの、ほぼ同じくらいObanを使ってきました。処理ジョブの必須ツールとして常に存在しており、いつもObanはクールだと感じつつ、深掘りしたことはありませんでした。本記事では、Python版Oban(obn‑py)がどのように動作し、コードベースを探索する中で得た知見をまとめます。Elixir版との比較や、一般的な並行処理についても触れつつ紹介します。


表層レベル

Obanはデータベースだけでジョブの投入と実行が可能です。ユーザー作成時に確認メール送信ジョブを同じトランザクションで挿入でき、いずれかが失敗すれば全てロールバックされます。

ほぼすべてのジョブ処理フレームワークと同様に、Obanはキューを持ち、ローカル・グローバルの制限があります。他のツールとは違い、完了したジョブを保持し必要なら結果も保存できます。ビルトインでcronスケジューリングが備わり、ジョブ処理方法を細かく制御できる機能が豊富です。

Obanはオープンソース版 Oban‑py と商用版 Oban‑py‑pro の二種類があります。OSS版には Pro 版で自動的に解消されるいくつかの制限があります:

  • 単一スレッドでの asyncio 実行 – 並列はあるものの真の並列ではないため、CPU 集約ジョブがイベントループをブロックします。
  • バルク挿入なし – ジョブごとに個別に挿入されます。
  • バルク認証なし – 完了時に各ジョブを個別に永続化します。
  • 不正確なリカバリー – 長時間実行中のジョブがプロデューサーがまだ生存している場合でも救済される可能性があります。Pro はより賢いハートビートでプロデューサーの健全性を追跡します。

Pro 版はワークフロー、リレー、ユニークジョブ、スマート並行処理など、別途設定する必要がある機能を追加します。OSS Oban‑py は趣味レベルや Oban の哲学を評価したい場合には十分ですが、より大規模な運用では Pro を選択する価値があります。価格はこれらの機能に対して非常に魅力的です。

ここでは Pro のすべての機能を紹介できませんので、まず基本から始めましょう:ジョブ挿入から実行まで Oban Py が内部でどう動くかを追います。引き続きお付き合いください。


深掘り – ジョブ処理パス

1. ジョブの投入

from oban import job

@job(queue="default")
async def send_email(to: str, subject: str, body: str):
    await smtp.send(to, subject, body)

await send_email.enqueue("[email protected]", "Hello", "World")

ジョブは

oban_jobs
テーブルに
state = 'available'
として挿入されます。Oban はその挿入チャネルで PostgreSQL の NOTIFY を発火します:

# oban.py:414‑419
result = await self._query.insert_jobs(jobs)
queues = {job.queue for job in result if job.state == "available"}
await self._notifier.notify("insert", [{"queue": queue} for queue in queues])

2. 通知の受信とプロデューサーの起動

各ノードはそのチャネルを LISTEN しているため、通知を受け取ると Stager が目覚めます。Stager は実際に走らせるキューだけを扱い、該当キューが自ノードで走っている場合のみプロデューサーへ通知します:

# _stager.py:95‑99
async def _on_notification(self, channel: str, payload: dict) -> None:
    queue = payload["queue"]
    if queue in self._producers:
        self._producers[queue].notify()

notify()
は asyncio.Event をセットし、プロデューサーの待機ループを抜けてジョブをワーカーへ送る準備ができたことを知らせます。

# _producer.py:244‑262
async def _loop(self) -> None:
    while True:
        try:
            await asyncio.wait_for(self._notified.wait(), timeout=1.0)
        except asyncio.TimeoutError:
            continue
        except asyncio.CancelledError:
            break

        self._notified.clear()
        try:
            await self._debounce()
            await self._produce()
        except asyncio.CancelledError:
            break
        except Exception:
            logger.exception("Error in producer for queue %s", self._queue)

3. ジョブ取得とロック

プロデューサーは既存の完了(ack)を永続化してキュー制限に従い、新規ジョブを取得します。取得時に

executing
に状態遷移させます:

-- fetch_jobs.sql (simplified)
WITH locked_jobs AS (
  SELECT priority, scheduled_at, id
  FROM oban_jobs
  WHERE state = 'available' AND queue = %(queue)s
  ORDER BY priority ASC, scheduled_at ASC, id ASC
  LIMIT %(demand)s
  FOR UPDATE SKIP LOCKED
)
UPDATE oban_jobs oj
SET attempt = oj.attempt + 1,
    attempted_at = timezone('UTC', now()),
    attempted_by = %(attempted_by)s,
    state = 'executing'
FROM locked_jobs
WHERE oj.id = locked_jobs.id;

FOR UPDATE SKIP LOCKED
は選択した行をロックし、既にロック済みの行はスキップします。これが並行処理で不可欠です。


4. ジョブ実行

取得したジョブは即座に非同期タスクとしてディスパッチされます:

jobs = await self._get_jobs()
for job in jobs:
    task = self._dispatcher.dispatch(self, job)
    task.add_done_callback(lambda _, job_id=job.id: self._on_job_complete(job_id))
    self._running_jobs[job.id] = (job, task)

add_done_callback
は成功・失敗を問わず完了処理を行うために使用します。

ディスパッチャはジョブの実行方法を制御します。Pro 版では真の並列化を実現するためプロセスプールがローカルディスパッチャを置き換えます。


5. エグゼキュータと結果処理

エグゼキュータはワーカークラスを解決し、

process()
を呼び出します。戻り値に応じて結果をパターンマッチで判定します:

# _executor.py:73‑83
async def _process(self) -> None:
    self.worker = resolve_worker(self.job.worker)()
    self.result = await self.worker.process(self.job)

# _executor.py:95‑133
match result:
    case Exception() as error:
        # Retry or discard based on attempt count
    case Cancel(reason=reason):
        # Mark cancelled
    case Snooze(seconds=seconds):
        # Reschedule with decremented attempt
    case _:
        # Completed successfully

実行が完了すると結果は ACK 用にキューへ入れられます:

# _producer.py:315
self._pending_acks.append(executor.action)

完成コールバックはプロデューサーを再び起動し、さらにジョブ取得とバッチACKを行います。


5 つの主要ステップ

Insert → Notify → Fetch (with locking) → Execute → Ack

これがコードから完了までのホットパスです。背景プロセス、エラー処理、リトライ、周期ジョブ(cron)などについては次回に続きます。


背景プロセス – 隠れた働き

リーダー選出

クラスターではすべてのノードがジョブをプルしたり救済したりしないよう、単一リーダーが決定されます:

# _leader.py:107‑113
async def _election(self) -> None:
    self._is_leader = await self._query.attempt_leadership(
        self._name, self._node, int(self._interval), self._is_leader)

データベースはまず期限切れのリーダーを削除します:

DELETE FROM oban_leaders
WHERE expires_at < timezone('UTC', now());

現在ノードがリーダーなら自ら再選出し、そうでなければ新規リーダーとして挿入を試みます。リーダーはリース保持のため通常より二倍速く更新します:

# _leader.py:101‑105
sleep_duration = self._interval / 2 if self._is_leader else self._interval

ノードが正常に停止すると、辞任通知を送ります:

# _leader.py:83‑87
if self._is_leader:
    payload = {"action": "resign", "node": self._node, "name": self._name}
    await self._notifier.notify("leader", payload)
    await self._query.resign_leader(self._name, self._node)

リーダー選出は PostgreSQL の

INSERT … ON CONFLICT
と TTL ベースのレースで完結し、Raft 等の外部協調プロトコルを必要としません。


ライフライン – 孤立ジョブの救済

ワーカーがクラッシュしたりコンテナが停止した場合、実行中ジョブが永遠に止まる恐れがあります。リーダーのみが

lifeline
を起動して救済します:

# _lifeline.py:73‑77
async def _rescue(self) -> None:
    if not self._leader.is_leader:
        return
    await use_ext("lifeline.rescue", _rescue, self._query, self._rescue_after)

救済は単純に時間ベースで行われ、

executing
状態のジョブが
rescue_after
(デフォルト 5 分)を超えたら戻します。Pro バージョンではプロデューサーの健全性も確認するため、真に長時間実行中のジョブは救済されません。

-- rescue_jobs.sql (simplified)
UPDATE oban_jobs
SET state = CASE WHEN attempt >= max_attempts THEN 'discarded' ELSE 'available' END,
    meta  = CASE WHEN attempt >= max_attempts THEN meta ELSE meta || jsonb_build_object('rescued', coalesce((meta->>'rescued')::int,0)+1) END
WHERE state = 'executing'
  AND attempted_at < timezone('UTC', now()) - make_interval(secs => %(rescue_after)s);

meta.rescued
カウンタで救済回数を追跡できます。


プルーナー – 古いジョブのクリーンアップ

oban_jobs
テーブルはプラグイン無しでは無限に膨張します。リーダーのみが
max_age
(デフォルト 1 日)より古い終端状態のジョブを削除します:

-- prune_jobs.sql
WITH jobs_to_delete AS (
  SELECT id FROM oban_jobs
  WHERE (state = 'completed' AND completed_at <= timezone('UTC', now()) - make_interval(secs => %(max_age)s))
     OR (state = 'cancelled' AND cancelled_at <= timezone('UTC', now()) - make_interval(secs => %(max_age)s))
     OR (state = 'discarded' AND discarded_at <= timezone('UTC', now()) - make_interval(secs => %(max_age)s))
  ORDER BY id ASC
  LIMIT %(limit)s
)
DELETE FROM oban_jobs WHERE id IN (SELECT id FROM jobs_to_delete);

LIMIT
は長時間実行される削除が他の操作をブロックしないようにします。


リトライ & バックオフ

ジョブで例外が発生した場合、エグゼキュータはその運命を決定します:

# _executor.py:96‑109
match result:
    case Exception() as error:
        if self.job.attempt >= self.job.max_attempts:
            self.action = AckAction(job=self.job, state="discarded",
                                    error=self._format_error(error))
        else:
            self.action = AckAction(job=self.job, state="retryable",
                                    error=self._format_error(error),
                                    schedule_in=self._retry_backoff())

デフォルトのバックオフは「ジッター付き制限済み指数関数的増加」で、スレッド群が同時に再試行する現象(thundering herd)を防ぎます:

# _backoff.py:66‑87
def jittery_clamped(attempt: int, max_attempts: int, *, clamped_max: int = 20) -> int:
    if max_attempts <= clamped_max:
        clamped_attempt = attempt
    else:
        clamped_attempt = round(attempt / max_attempts * clamped_max)
    time = exponential(clamped_attempt, mult=1, max_pow=100, min_pad=15)
    return jitter(time, mode="inc")

式は 15 + 2^attempt 秒で、最大10 % のジッターが付与されます。例:試行 1 は約17秒、5 は約47秒、10 は約1039秒(≈17分)です。高

max_attempts
を設定するとスケールダウンし、年単位の遅延を防ぎます。ワーカーはカスタムバックオフで上書き可能です。


まとめ – 学びとポイント

  • PostgreSQL が重役:
    FOR UPDATE SKIP LOCKED
    (並行取得)、LISTEN/NOTIFY(リアルタイム通知)、
    ON CONFLICT
    (リーダー選出)。Redis や ZooKeeper は不要。
  • Oban‑py は並行はあるものの真の並列化はなし。asyncio で多数ジョブを同時実行できるがイベントループは単一スレッド。CPU 集約タスクには Pro のプロセスプールがおすすめ。
  • リーダー選出はシンプルかつ効果的:TTL ベースの
    INSERT … ON CONFLICT
    。リーダーは通常より二倍速く更新し、死亡時に別ノードが引き継ぎます。プルーナー・救済には十分です。
  • コードベースはクリーンで命名規則も統一されており、読む感覚は良い本を読むようなものです。
  • OSS 版でもかなりの機能が備わっていますが、バルク操作や賢い救済、本格的並列化など Pro が補完します。Pro ライセンスはコストパフォーマンスが高いと感じます。

総評

Oban.py は Elixir 版を忠実に移植したクリーンで構造化されたプロダクトです。Elixir の Oban を懐かしむ方、あるいは外部インフラ(Redis 等)なしで PostgreSQL だけでジョブキューを運用したい Python 開発者には十分価値があります。ぜひ一度試してみてください。

同じ日のほかのニュース

一覧に戻る →

2026/01/28 9:57

**トリニティ・ラージ** オープンな400 B スパースMoEモデル

## Japanese Translation: ``` (combining all key points with clarity):** --- ### Trinity‑Large: A Fast, Open, State‑of‑the‑Art Sparse MoE Language Model Trinity‑Large は、1 つのトークンで約 13 B パラメータ(256 エキスパート、1.56 % ルーティング分率)しか活性化しない 400 B パラメータを持つ sparse mixture‑of‑experts モデルです。10 T、4 T、3 T の三段階で **17 T** のキュレーション済みトークンを使用して訓練されました。プログラミング・STEM・推論・多言語コンテンツをカバーする合成データが用いられ、Momentum‑based エキスパートロードバランシング、1 シーケンスあたりのバランスロス、z‑loss 正則化で LM‑head ロジットを抑制し、効率的な注意機構(HSDP)と 8‑expert 並列処理が採用されました。 **リリースされたバリアント** | バリアント | 説明 | |---------|-------------| | **Trinity‑Large‑Preview** | 軽くポストトレーニングし、チャット対応。創造的執筆・物語作成・ロールプレイ・リアルタイム音声支援・エージェントタスク(OpenCode, Cline, Kilo Code)で優れた性能を発揮します。まだ推論モデルではありません。 | | **Trinity‑Large‑Base** | 完全な 17 T 事前訓練チェックポイント。ベンチマークと研究資源として使用されます。 | | **TrueBase** | 初期の 10 T チェックポイントで、指示データや LR アニーリングが含まれていません。大規模な高品質事前訓練効果を研究するのに最適です。 | 全体の作業―6か月間にわたる4つのモデル―は約 **2,000 万ドル** の費用で、**2048 台の Nvidia B300 GPU** を使用し、**33 日間** にわたって訓練されました。 **性能** - 数学・コーディング・科学的推論・原知識ベンチマークにおいて同等またはそれ以上の性能を示します。 - 推論速度は、同じハードウェア上で比較可能な重みクラスモデルより約 2–3 倍速です。 - ベンチマーク比較(Preview vs. Llama 4 Maverick): - MMLU: 87.2 vs. 85.5 - MMLU‑Pro: 75.2 vs. 80.5 - GPQA‑Diamond: 63.3 vs. 69.8 - AIME 2025: 24.0 vs. 19.3 **技術的詳細** - ネイティブコンテキスト長:**512k トークン**。Preview API はインフラ調整中に 128k と 8‑bit 量子化で動作します。 - モデルと API は Hugging Face、OpenRouter、および Arcee.ai を通じて公開されており、Kilo Code、Cline、OpenCode 用の統合がすぐに利用可能です。 **コミュニティへの関与** チームは Trinity‑Large が最先端レベルでありながら所有権と実際の使用を念頭に置いて設計されていることを強調し、ユーザーに失敗例を報告してもらうことでオープンモデルが継続的に改善できるよう奨励しています。 ```

2026/01/28 9:18

「有名な研究者が、赤ちゃんの中毒事件を隠したのでしょうか?」

2026/01/28 23:32

エアフォイル(2024)