
2026/01/13 23:45
**Show HN:** *FastScheduler* – Decorator‑first Python task scheduler with async support → **HNで紹介**:FastScheduler ― デコレータ優先のPythonタスクスケジューラ、非同期対応
RSS: https://news.ycombinator.com/rss
要約▶
Japanese Translation:
FastSchedulerは、開発者が最小限のコードで非同期タスクをスケジュールできる軽量Pythonライブラリです。
@scheduler.every(10).secondsや@scheduler.daily.at("14:30")といったデコレータを使用すると、インターバル型・時間ベース型・cron型・ワンタイムジョブを定義でき、オプションでタイムゾーン設定(tz="America/New_York"や.tz()チェーン可能)も行えます。
自動リトライ(指数バックオフ付き)、タイムアウト設定、pause/ resume / cancel アクション、“no‑catch‑up” モード(欠落した実行をスキップ)、履歴保持制限(
max_history、history_retention_days)およびデッドレターキューサイズ(max_dead_letters)をサポートしています。Scheduler の状態は fastscheduler_state.json に永続化され、デッドレタージョブは _dead_letters.json に保存されます。
インストールは pip で簡単に行えます。[cron] などの extras を指定すると cron 構文がサポートされ、[fastapi] を追加すると
/scheduler/ に FastAPI ベースのリアルタイムダッシュボードが提供されます。ダッシュボードは SSE(Server‑Sent Events)でライブ更新を行い、ジョブステータス・リスト・履歴・イベント用の REST エンドポイントも公開します。
FastScheduler の API には
start()、stop(wait=True, timeout=30)、get_jobs()、get_job(id)、get_history()、get_statistics()、pause_job(id)、resume_job(id)、cancel_job(id) があり、コンテキストマネージャとしても使用可能です。
典型的な使用例としては、インターバルジョブ、非同期タイムゾーンジョブ(タイムアウト付き)、リトライ付き cron ジョブ、週次ジョブ、および寿命フックを使った FastAPI アプリへの統合があります。
本文
FastScheduler(ファストスケジューラ)
FastScheduler は、Python 用の軽量で非同期に優しいタスクスケジューラです。タイムゾーン・cron 式・永続状態をサポートし、FastAPI が提供するリアルタイムダッシュボードと連携します。
主な特徴
| 🎯 | デコレータベースのシンプル API – 1 行でタスクをスケジュール |
| ⚡ | をネイティブにサポート |
| 🕐 | タイムゾーン対応 – 任意のタイムゾーンでジョブ実行 |
| 📅 | cron 式による複雑なスケジュール設定 |
| 💾 | 永続状態 – 再起動後も残存し、欠落したジョブを処理 |
| 🎨 | FastAPI ダッシュボードでリアルタイム監視 |
| 🔄 | 失敗時の自動再試行(指数関数的バックオフ) |
| ⏱️ | ジョブタイムアウト – 長時間実行中のタスクを強制終了 |
| ⏸️ | 削除せずに一時停止/再開 |
| 📋 | デッドレターキュー – 失敗したジョブを追跡・デバッグ |
インストール
# 基本インストール pip install fastscheduler # FastAPI ダッシュボード付き pip install fastscheduler[fastapi] # cron サポート付き pip install fastscheduler[cron] # すべてのオプションを有効にする pip install fastscheduler[all]
クイックスタート
from fastscheduler import FastScheduler scheduler = FastScheduler(quiet=True) @scheduler.every(10).seconds def task(): print("Task executed") @scheduler.daily.at("14:30") async def daily_task(): print("Daily task at 2:30 PM") scheduler.start()
スケジューリングオプション
インターバルベース
@scheduler.every(10).seconds @scheduler.every(5).minutes @scheduler.every(2).hours @scheduler.every(1).days
時間ベース
@scheduler.daily.at("09:00") # 毎朝9時 @scheduler.hourly.at(":30") # 毎時間30分に @scheduler.weekly.monday.at("10:00") # 月曜10時 @scheduler.weekly.weekdays.at("09:00") # 平日9時 @scheduler.weekly.weekends.at("12:00") # 週末正午
cron 式(pip install fastscheduler[cron]
が必要)
pip install fastscheduler[cron]@scheduler.cron("0 9 * * MON-FRI") def market_open(): ... @scheduler.cron("*/15 * * * *") def frequent_check(): ... @scheduler.cron("0 0 1 * *") def monthly_report(): ...
一度だけ実行
@scheduler.once(60) def delayed_task(): ... @scheduler.at("2024-12-25 00:00:00") def christmas_task(): ...
タイムゾーンサポート
# tz パラメータ @scheduler.daily.at("09:00", tz="America/New_York") def nyc_morning(): ... # チェーン可能な .tz() @scheduler.weekly.monday.tz("Europe/London").at("09:00") def london_standup(): ... # cron でも @scheduler.cron("0 9 * * MON-FRI").tz("Asia/Tokyo") def tokyo_market(): ...
代表的なタイムゾーンは
UTC、America/New_York、Europe/London、Asia/Tokyo 等です。
ジョブ制御
タイムアウト
@scheduler.every(1).minutes.timeout(30) def quick_task(): ... @scheduler.daily.at("02:00").timeout(3600) # 最大1時間 def nightly_backup(): ...
再試行
@scheduler.every(5).minutes.retries(5) def flaky_api_call(): ... # 指数関数的バックオフ:2s, 4s, 8s …
カッチアップをスキップ
@scheduler.every(1).hours.no_catch_up() def hourly_stats(): ...
一時停止 / 再開 / キャンセル
scheduler.pause_job("job_0") scheduler.resume_job("job_0") scheduler.cancel_job("job_0") scheduler.cancel_job_by_name("my_task")
FastAPI 連携
from fastapi import FastAPI from fastscheduler import FastScheduler from fastscheduler.fastapi_integration import create_scheduler_routes app = FastAPI() scheduler = FastScheduler(quiet=True) app.include_router(create_scheduler_routes(scheduler)) @scheduler.every(30).seconds def background_task(): print("Background work") scheduler.start()
ダッシュボードは
http://localhost:8000/scheduler/ で確認できます。
API エンドポイント
| エンドポイント | メソッド | 説明 |
|---|---|---|
| GET | ダッシュボード UI |
| GET | スケジューラの状態 |
| GET | すべてのジョブ一覧 |
| GET | 指定ジョブ取得 |
| POST | ジョブを一時停止 |
| POST | ジョブを再開 |
| POST | ジョブをキャンセル |
| GET | 実行履歴 |
| GET | デッドレターキュー |
| DELETE | キュークリア |
| GET | SSE イベントストリーム |
設定
scheduler = FastScheduler( state_file="scheduler.json", # 永続ファイル quiet=True, # ログ抑制 auto_start=False, # 手動起動 max_history=5000, # 履歴最大件数 max_workers=20, # 同時ジョブ数 history_retention_days=8, # 古い履歴削除期間 max_dead_letters=500, # デッドレターキューサイズ )
履歴保持
(件数上限)max_history
(時間上限;0 で無効化)history_retention_days
デッドレターキュー
に保存*_dead_letters.json- 最新の
件を詳細付きで保持max_dead_letters
モニタリング & プログラム的アクセス
jobs = scheduler.get_jobs() job = scheduler.get_job("job_0") history = scheduler.get_history(limit=100) stats = scheduler.get_statistics() scheduler.print_status()
コンテキストマネージャーで使用可能:
with FastScheduler(quiet=True) as scheduler: @scheduler.every(5).seconds def task(): print("Running") time.sleep(30) # スケジューラは自動実行 # 退出時に停止
状態永続化
- ジョブ定義・スケジュール・履歴・統計・ジョブカウンタをディスクへ保存
- 再起動時:ジョブ復元、欠落した実行を算出し、必要ならキャッチアップジョブを実行(
で除外可)no_catch_up()
完全な例
import asyncio, time from fastscheduler import FastScheduler scheduler = FastScheduler(quiet=True) @scheduler.every(10).seconds def heartbeat(): print(f"[{time.strftime('%H:%M:%S')}] ❤️ Heartbeat") @scheduler.daily.at("09:00", tz="America/New_York").timeout(60) async def morning_report(): print("Generating report...") await asyncio.sleep(5) print("Report sent!") @scheduler.cron("*/5 * * * *").retries(3) def check_api(): print("Checking API health") @scheduler.weekly.monday.at("10:00") def weekly_standup(): print("Time for standup!") scheduler.start() try: while True: time.sleep(60) scheduler.print_status() except KeyboardInterrupt: scheduler.stop()
FastAPI とライフサイクル
from contextlib import asynccontextmanager from fastapi import FastAPI from fastscheduler import FastScheduler from fastscheduler.fastapi_integration import create_scheduler_routes scheduler = FastScheduler(quiet=True) @asynccontextmanager async def lifespan(app: FastAPI): scheduler.start() yield scheduler.stop(wait=True) app = FastAPI(lifespan=lifespan) app.include_router(create_scheduler_routes(scheduler)) @scheduler.every(30).seconds def background_job(): print("Working...")
API 参照
| メソッド | 説明 |
|---|---|
| スケジューラを開始 |
| Graceful 停止 |
| すべてのジョブ取得 |
| 指定ジョブ取得 |
| 実行履歴 |
| ランタイム統計 |
| 失敗ジョブキュー |
| キューをクリア |
| ジョブ停止 |
| ジョブ再開 |
| ジョブキャンセル・削除 |
| 関数名でジョブ全て削除 |
| コンソールに状態表示 |
ライセンス
MIT © 2024
貢献歓迎!GitHub で issue や PR を作成してください。