
2025/12/26 17:52
**Pandas with Rows(2022)**
RSS: https://news.ycombinator.com/rss
要約▶
Japanese Translation:
(欠落した詳細と明確化を統合)**
要約
この記事では、1億2000万件のレコードから米国で平均国内フライト遅延が最大の5つの空港を見つけるためのいくつかの手法をベンチマークしています。
- ベースライン: すべての年次CSVに対して
を使用すると MemoryError が発生します。pandas.concat - 純Pythonストリーミング: 空港ごとのカウント/遅延をメモリ内で保持(約1 MB)し、実行時間は約7分です。
- PyPy: 同じコードを PyPy で実行すると RAM が約40 MB使用され、完了までに約4 m 40 s(CPython の約⅔)かかります。
- 最適化されたpandas:
、Origin
、Year
、Month
、DayofMonth
、CRSDepTime
だけを読み込み、それらを効率的な dtype(DepTime
、category
)にキャストします。これで実行時間が約2 m 45 sに短縮され、結合時のピークは8.1 GBです。CSV のロードに 80 % 以上が費やされています。uint16/uint8 - Pandas + PyArrow エンジン:
に切り替えると実行時間が約1 m 10 s(元の約42 %)に短縮され、ピークメモリは8.1 GBを維持します。マルチスレッドは役立ちますが、ディスク I/O がボトルネックです。engine='pyarrow' - 直接 PyArrow API: 各年次 CSV を Arrow テーブル(
)に読み込み、convert_options
で結合し、pandas に変換すると約50 sの実行時間になり、ピーク RAM は7.5 GBです(pandas/PyArrow のピークの93 %)。pyarrow.concat_tables - 年ごとのアプローチ: 年を1年ずつ処理し、空港ごとの合計/カウントを集約してから年全体で縮減すると、実行時間は約37 s、ピーク RAM はわずか900 MB(完全ロード法の12 %)です。
- 並列 pandas(8 ワーカー): 各 CSV を並列に読み込む multiprocessing プールを使用すると実行時間が約53 s(単一コア pandas の約1/3)まで短縮されますが、ディスク I/O に制限されます。プロセスごとのピーク RAM は約400 MBです。
データセットはハーバード大学の「Data Expo 2009: Airline on time data」(1987‑2008年)から取得され、22 個の CSV ファイルで約13 GBに圧縮されています。テストは単一 Linux マシン(i7‑8550U、16 GB LPDDR3 RAM、NVMe SSD、スワップなし)上で Arch Linux + KDE Plasma 環境で実行されました。
結論: メモリ制限はストリーミングや年ごとの処理によって回避できます。パフォーマンス向上は PyPy、PyArrow エンジン、または直接 Arrow API によって得られます。並列化はわずかなスピードアップを提供しますが、最終的にはディスクスループットに制限されます。これらの知見は、大規模航空データセットを扱うデータサイエンティストや航空会社/規制当局が遅延パターンを分析する際に役立ちます。
本文
問題
米国内線で平均(算術平均)遅延が最大のアメリカ空港トップ 5を見つけたい。
データ
- データセット:Harvard Dataverse の Data Expo 2009 – Airline on‑time data
- 米国内のすべて商業フライト(到着・出発)の詳細。1987年10月〜2008年4月。
- 約1億2,000万件を22個のCSVファイル(各年1ファイル)+無視する4個の補助CSVで構成。
- 圧縮時サイズ ≈ 13 GB。
環境
| コンポーネント | スペック |
|---|---|
| CPU | Intel® Core™ i7‑8550U @ 1.80 GHz |
| RAM | 16 GB LPDDR3、スワップ無し |
| ディスク | NVMe TOSHIBA 512 GB (ext4) |
| OS | Linux 5.19.9 (Arch)、KDE Plasma、Konsole 1セッション |
| ソフトウェア | CPython 3.10.6、PyPy 7.3.9(Python 3.9)、pandas 1.4.4、PyArrow 9.0.0 |
単純なアプローチ
import pandas as pd df = pd.concat( pd.read_csv(f'{year}.csv') for year in range(1987, 2009) )
すべてのデータをメモリに読み込もうとすると
MemoryError(またはカーネルが終了)になる。RAM に収まらないため。
純Pythonでの解決策
CSV をストリームし、空港ごとの集計だけを保持する。
import csv, datetime, heapq, operator USE_COLS = ('Origin', 'Year', 'Month', 'DayofMonth', 'CRSDepTime', 'DepTime') airports = {} for year in range(1987, 2009): with open(f'../data/{year}.csv', errors='ignore') as f: reader = csv.reader(f) header = {name: pos for pos, name in enumerate(next(reader)) if name in USE_COLS} for row in reader: if (row[header['CRSDepTime']] == 'NA' or row[header['DepTime']] == 'NA'): continue y, m, d = (int(row[header['Year']]), int(row[header['Month']]), int(row[header['DayofMonth']])) try: sched = datetime.datetime(y, m, d, int(row[header['CRSDepTime']][:-2] or '0'), int(row[header['CRSDepTime']][-2:])) act = datetime.datetime(y, m, d, int(row[header['DepTime']][:-2] or '0'), int(row[header['DepTime']][-2:])) except ValueError: continue delay = (act - sched).total_seconds() / 3600.0 if delay < -2.0: delay = 24.0 - delay key = row[header['Origin']] airports.setdefault(key, [0, 0.0]) airports[key][0] += 1 airports[key][1] += delay # 平均遅延でトップ5 top5 = dict(heapq.nlargest( 5, ((a, total / count) for a, (count, total) in airports.items()), operator.itemgetter(1) )) print(top5)
- メモリ:約 1 MB
- 時間:≈ 7 分
PyPy
同じスクリプトを PyPy で実行すると、メモリは ≈ 40 MB、終了時間は ≈ 4 m 40 s(CPython の約 ⅔)になる。
pandas – メモリ効率の良いアプローチ
import pandas as pd LOAD_COLS = ('Origin', 'Year', 'Month', 'DayofMonth', 'CRSDepTime', 'DepTime') df = pd.concat( pd.read_csv(f'../data/{fname}.csv', usecols=LOAD_COLS, encoding_errors='ignore', dtype={'Origin': 'category', 'Year': 'uint16', 'Month': 'uint8', 'DayofMonth': 'uint8', 'CRSDepTime': 'uint16', 'DepTime': 'UInt16'}) for fname in range(1987, 2009), ignore_index=True ) date = pd.to_datetime(df[['Year', 'Month', 'DayofMonth']].rename(columns={'DayofMonth':'Day'})) df['scheduled_dep'] = date + pd.to_timedelta( (df['CRSDepTime']//100)*60 + (df['CRSDepTime']%100), unit='minutes') df['actual_dep'] = date + pd.to_timedelta( (df['DepTime']//100)*60 + (df['DepTime']%100), unit='minutes') df = df[['Origin', 'scheduled_dep', 'actual_dep']] df['delay'] = (df['actual_dep'] - df['scheduled_dep']).dt.total_seconds() / 3600 df['delay'] = df['delay'].where(df['delay'] > -2, 24 - df['delay']) print(df.groupby('Origin')['delay'] .mean() .sort_values(ascending=False) .head(5))
- ピークメモリ:≈ 8.1 GB(
時)pd.concat - 時間:約 2 分 45 秒
pandas + PyArrow エンジン
df = pd.concat( pd.read_csv(f'../data/{fname}.csv', usecols=LOAD_COLS, encoding_errors='ignore', dtype={'Origin': 'category', 'Year': 'uint16', 'Month': 'uint8', 'DayofMonth': 'uint8', 'CRSDepTime': 'uint16', 'DepTime': 'UInt16'}, engine='pyarrow') for fname in range(1987, 2009), ignore_index=True )
- ピークメモリ:同上
- 時間:≈ 1 分 10 秒(通常の pandas 実行時間の約 42%)
直接 PyArrow
import pyarrow as pa, pyarrow.csv as pacsv COLUMN_TYPES = { 'Origin': pa.dictionary(pa.int32(), pa.string()), 'Year': pa.uint16(), 'Month': pa.uint8(), 'DayofMonth': pa.uint8(), 'CRSDepTime': pa.uint16(), 'DepTime': pa.uint16() } tables = [] for year in range(1987, 2009): tables.append( pacsv.read_csv(f'../data/{year}.csv', convert_options=pa.csv.ConvertOptions( include_columns=COLUMN_TYPES, column_types=COLUMN_TYPES)) ) df = pa.concat_tables(tables).to_pandas()
- ピークメモリ:≈ 7.5 GB
- 時間:約 50 秒
PyArrow 年別(逐次集計)
import functools, pyarrow as pa, pyarrow.csv as pacsv, pandas as pd COLUMN_TYPES = {...} # 上と同じ results = [] for year in range(1987, 2009): df = pacsv.read_csv( f'../data/{year}.csv', convert_options=pa.csv.ConvertOptions( include_columns=COLUMN_TYPES, column_types=COLUMN_TYPES) ).to_pandas() date = pd.to_datetime(df[['Year', 'Month', 'DayofMonth']].rename(columns={'DayofMonth':'Day'})) df['scheduled_dep'] = date + pd.to_timedelta((df['CRSDepTime']//100)*60 + (df['CRSDepTime']%100), unit='minutes') df['actual_dep'] = date + pd.to_timedelta((df['DepTime']//100)*60 + (df['DepTime']%100), unit='minutes') df = df[['Origin', 'scheduled_dep', 'actual_dep']] df['delay'] = (df['actual_dep'] - df['scheduled_dep']).dt.total_seconds() / 3600 df['delay'] = df['delay'].where(df['delay'] > -2, 24 - df['delay']) results.append(df.groupby('Origin')['delay'].agg(['sum', 'count'])) # 年間を統合 df = functools.reduce(lambda x, y: x.add(y, fill_value=0), results) df['mean'] = df['sum'] / df['count'] print(df['mean'].sort_values(ascending=False).head(5))
- ピークメモリ:≈ 900 MB
- 時間:約 37 秒
pandas 年別+マルチプロセッシング
import functools, multiprocessing as mp, pandas as pd LOAD_COLS = ('Origin', 'Year', 'Month', 'DayofMonth', 'CRSDepTime', 'DepTime') def read_one_csv(year): df = pd.read_csv(f'../data/{year}.csv', engine='c', usecols=LOAD_COLS, encoding_errors='ignore', dtype={'Origin': 'category', 'Year': 'uint16', 'Month': 'uint8', 'DayofMonth': 'uint8', 'CRSDepTime': 'uint16', 'DepTime': 'UInt16'}) date = pd.to_datetime(df[['Year', 'Month', 'DayofMonth']].rename(columns={'DayofMonth':'Day'})) df['scheduled_dep'] = date + pd.to_timedelta((df['CRSDepTime']//100)*60 + (df['CRSDepTime']%100), unit='minutes') df['actual_dep'] = date + pd.to_timedelta((df['DepTime']//100)*60 + (df['DepTime']%100), unit='minutes') df = df[['Origin', 'scheduled_dep', 'actual_dep']] df['delay'] = (df['actual_dep'] - df['scheduled_dep']).dt.total_seconds() / 3600 df['delay'] = df['delay'].where(df['delay'] > -2, 24 - df['delay']) return df.groupby('Origin')['delay'].agg(['sum', 'count']) pool = mp.Pool() df = functools.reduce(lambda x, y: x.add(y, fill_value=0), pool.map(read_one_csv, range(1987, 2009))) df['mean'] = df['sum'] / df['count'] print(df['mean'].sort_values(ascending=False).head(5))
- プロセスあたりピークメモリ:≈ 400 MB
- 総時間:約 53 秒(通常の pandas 実行時間の約 ⅓)
- PyArrow に対する速度向上はディスク I/O がボトルネックになるため、ワーカーを増やしすぎると逆に遅くなる。
要点まとめ
| アプローチ | ピーク RAM | 時間 |
|---|---|---|
| 純Python | ~1 MB | 7 min |
| PyPy (スクリプト) | ~40 MB | 4 m 40 s |
| pandas (C エンジン、全データ) | 8.1 GB | 2 min 45 秒 |
| pandas + PyArrow エンジン | 8.1 GB | 1 分 10 秒 |
| Direct PyArrow | 7.5 GB | 50 s |
| PyArrow 年別(逐次集計) | 900 MB | 37 s |
| pandas 年別+マルチプロセッシング | ~400 MB/プロセス | 53 秒 |
学び
- ストリーミング/逐次集計はメモリを低く保ち、時に一括読み込みより高速になる。
- PyArrow のマルチスレッド CSV リーダーはデフォルト pandas エンジンより大幅に速い。
- マルチプロセッシングは CPU がボトルネックのときだけ有効で、I/O が支配的なら効果が薄い。
さらに読む
- High Performance Python – Python コードを高速化する実践ガイド。
- 私の講演「Demystifying pandas internals」(PyData London 2018)。
- Telegram チャンネル https://t.me/datapythonista でクイックノートやミームを共有しています。