データ連携処理のリファクタリングメモ

GCSからデータを取得してアプリケーションのDBに登録する処理で、扱うファイルのサイズが増えたときに備えて処理を効率化できるようにリファクタリングを行いました。
対応しながら学んだことをメモします。

改善内容

これまでは、日付ごとにフォルダを分けてそのフォルダに存在する全てのファイルを一度にメモリに読み込み、まとめて処理していました。
この構造を以下のように変更しました。

Before

  • 日付フォルダないのファイルを全てダウンロード→メモリ上で一括処理

After

ファイルを1つずつダウンロード→処理後にメモリを解放→次のファイルへ進む

実装のイメージは以下のような感じ.

# Before
all_files = download_all_files_from_gcs(date_folder)
for file in all_files:
    process(file)

# After
list_files_in_gcs = list_files(date_folter)
for file_name in list_files_in_gcs:
    with download_file_stream(file_name) as f:
        process(f)

このリファクタリングにより、メモリ使用量を一定に保ちながら処理できるようになりました。

対応しながら気づいた課題

洗い替え処理のタイミング

このデータ連携自体は洗い替えを行う仕様なのですが、PR作成時では1ファイルごとの登録処理の後にDELETEを行うロジックになっていました。
この結果、複数ファイルがある場合に最後のファイルのデータしか残らない挙動になっていました。

これを修正するために全ファイルのINSERTが完了した後に1回だけDELETEを行うように修正しました。
しかしこの変更で新たに、日付フォルダが存在しない場合でもDELETE処理が行われGCSにデータがないときにDBのレコードが空になる問題が生じました。
GCSには日次でファイルがアップロードされる仕組みになっているのですが、そのパイプライン処理で問題が起きた場合にアプリケーションのレコードに影響が及ぶことを避けるため、登録されたレコードが1件もない場合は削除を行わないように修正しました。

メモリ使用量の検証不足.

1ファイルずつ処理することでメモリ使用量は減るだろうという推測はしていましたが、それをローカルで検証まではしていませんでした。
処理が効率化される根拠を持てるようにメモリの使用量を調べる方法を探したところ、psutilモジュールを使うと実行中のプロセスのメモリ使用量を取得できることをがわかったので後からPRに情報を追記しました。
開発の早い段階でメモリの挙動を検証しておけばリファクタリングの効果を定量的に確認できたと思います。

コード例

import psutil, os

process = psutil.Process(os.getpid())
print(f"Memory usage: {process.memory_info().rss / 1024 ** 2:.2f} MB")

ログ設計.

dev環境にデプロイして処理を動かしてみるとCloudWatchで確認できるログが冗長なものになっていました。
開発の段階でこういうログがほしいというイメージを自分の中でもっておき、考えが固まった上でLLMを活用してログを出力する実装をするのが良さそうに思いました。