h5pyチャンク・圧縮完全ガイド【パフォーマンス最適化2025年版】

 

h5pyを使った大容量データ処理において、チャンクと圧縮の適切な設定は性能を大きく左右します。本記事では、実践的なチューニング手法とベストプラクティスを詳しく解説します。

目次

  1. h5pyチャンクの基本概念と設定方法
  2. 圧縮アルゴリズムの選択と最適化
  3. パフォーマンス向上のための実践テクニック
  4. 大容量データ処理の効率化手法
  5. 実践的なベンチマークと最適設定

h5pyチャンクの基本概念と設定方法 {#chunk-basics}

チャンクとは

チャンクは、HDF5ファイル内でデータを分割して格納する単位です。適切なチャンクサイズにより、読み書き性能が大幅に向上します。

import h5py
import numpy as np

# 基本的なチャンク設定
with h5py.File('data.h5', 'w') as f:
    # チャンク有効化(自動サイズ)
    dset = f.create_dataset('auto_chunk', (1000, 1000), 
                           chunks=True, dtype='f4')
    
    # 手動チャンクサイズ指定
    dset2 = f.create_dataset('manual_chunk', (1000, 1000), 
                            chunks=(100, 100), dtype='f4')

最適なチャンクサイズの決定

# データアクセスパターンに基づくチャンク設定
def create_optimized_dataset(file, name, shape, access_pattern='row'):
    if access_pattern == 'row':
        # 行単位アクセス用チャンク
        chunks = (1, shape[1]) if len(shape) == 2 else (1,) + shape[1:]
    elif access_pattern == 'column':
        # 列単位アクセス用チャンク
        chunks = (shape[0], 1) if len(shape) == 2 else (shape[0], 1) + shape[2:]
    else:
        # バランス型チャンク(推奨サイズ:1MB程度)
        chunk_size = 1024 * 1024 // 4  # 4バイトfloat想定
        chunks = True
    
    return file.create_dataset(name, shape, chunks=chunks, dtype='f4')

チャンクキャッシュの活用

# チャンクキャッシュサイズの最適化
with h5py.File('data.h5', 'r', rdcc_nbytes=1024*1024*50) as f:
    # 50MBのチャンクキャッシュを設定
    dset = f['dataset']
    data = dset[:100, :100]  # 高速アクセス

圧縮アルゴリズムの選択と最適化 {#compression-optimization}

主要圧縮アルゴリズムの比較

# gzip圧縮(汎用性重視)
with h5py.File('gzip_data.h5', 'w') as f:
    dset = f.create_dataset('gzip_data', (1000, 1000), 
                           compression='gzip', compression_opts=9,
                           chunks=True, dtype='f4')

# lzf圧縮(速度重視)
with h5py.File('lzf_data.h5', 'w') as f:
    dset = f.create_dataset('lzf_data', (1000, 1000), 
                           compression='lzf', chunks=True, dtype='f4')

# szip圧縮(科学データ特化)
with h5py.File('szip_data.h5', 'w') as f:
    dset = f.create_dataset('szip_data', (1000, 1000), 
                           compression='szip', chunks=True, dtype='f4')

フィルターチェーンの活用

# シャッフル + 圧縮の組み合わせ
with h5py.File('filtered_data.h5', 'w') as f:
    # シャッフルフィルターで圧縮率向上
    dset = f.create_dataset('filtered', (1000, 1000),
                           compression='gzip', compression_opts=6,
                           shuffle=True, fletcher32=True,
                           chunks=(100, 100), dtype='f4')

圧縮レベルの最適化

def find_optimal_compression(data, algorithms=['gzip', 'lzf']):
    """最適な圧縮設定を検索"""
    results = {}
    
    for alg in algorithms:
        with h5py.File(f'test_{alg}.h5', 'w') as f:
            if alg == 'gzip':
                # gzipは圧縮レベル1-9をテスト
                for level in range(1, 10, 2):
                    dset = f.create_dataset(f'data_l{level}', data.shape,
                                          compression=alg, compression_opts=level,
                                          chunks=True)
                    dset[:] = data
            else:
                dset = f.create_dataset('data', data.shape,
                                      compression=alg, chunks=True)
                dset[:] = data
    
    return results

パフォーマンス向上のための実践テクニック {#performance-techniques}

並列I/Oの実装

import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor

def parallel_write_chunks(filename, data_chunks, chunk_coords):
    """並列チャンク書き込み"""
    with h5py.File(filename, 'w', libver='latest') as f:
        dset = f.create_dataset('data', (1000, 1000), 
                               chunks=(100, 100), dtype='f4')
        
        # チャンク単位で並列書き込み
        for chunk_data, (start_row, start_col) in zip(data_chunks, chunk_coords):
            end_row = start_row + chunk_data.shape[0]
            end_col = start_col + chunk_data.shape[1]
            dset[start_row:end_row, start_col:end_col] = chunk_data

メモリ効率的な大容量データ処理

def process_large_hdf5(filename, chunk_size=1000):
    """メモリ効率的な大容量ファイル処理"""
    with h5py.File(filename, 'r') as f:
        dset = f['large_dataset']
        
        # チャンク単位で処理
        for i in range(0, dset.shape[0], chunk_size):
            end_idx = min(i + chunk_size, dset.shape[0])
            chunk = dset[i:end_idx]
            
            # 処理実行(例:統計計算)
            result = np.mean(chunk, axis=1)
            yield result

動的リサイズとチャンク最適化

def create_expandable_dataset(filename, initial_shape, max_shape):
    """拡張可能なデータセット作成"""
    with h5py.File(filename, 'w') as f:
        # 動的リサイズ対応
        dset = f.create_dataset('expandable', initial_shape,
                               maxshape=max_shape,
                               chunks=True, compression='gzip',
                               dtype='f4')
        
        # データ追加例
        new_data = np.random.random((100, initial_shape[1]))
        dset.resize((initial_shape[0] + 100, initial_shape[1]))
        dset[-100:] = new_data

大容量データ処理の効率化手法 {#large-data-handling}

ストリーミング処理の実装

class HDF5Streamer:
    def __init__(self, filename, dataset_name, chunk_size=10000):
        self.filename = filename
        self.dataset_name = dataset_name
        self.chunk_size = chunk_size
    
    def __iter__(self):
        with h5py.File(self.filename, 'r') as f:
            dset = f[self.dataset_name]
            for i in range(0, dset.shape[0], self.chunk_size):
                yield dset[i:i+self.chunk_size]

# 使用例
streamer = HDF5Streamer('large_file.h5', 'data')
for chunk in streamer:
    result = np.sum(chunk)  # チャンク単位処理

インデックス最適化

def create_indexed_dataset(filename, data, index_column):
    """インデックス付きデータセット作成"""
    with h5py.File(filename, 'w') as f:
        # データ本体
        dset = f.create_dataset('data', data.shape, 
                               chunks=True, compression='gzip')
        dset[:] = data
        
        # インデックス作成
        sorted_indices = np.argsort(data[:, index_column])
        idx_dset = f.create_dataset('index', sorted_indices.shape,
                                   chunks=True, dtype='i8')
        idx_dset[:] = sorted_indices

階層データ構造の活用

def create_hierarchical_structure(filename, time_series_data):
    """階層構造による効率的データ組織化"""
    with h5py.File(filename, 'w') as f:
        # 年月日による階層構造
        for year in time_series_data.keys():
            year_grp = f.create_group(f'year_{year}')
            
            for month, month_data in time_series_data[year].items():
                dset = year_grp.create_dataset(f'month_{month:02d}', 
                                             month_data.shape,
                                             chunks=True, compression='lzf')
                dset[:] = month_data
                
                # メタデータ追加
                dset.attrs['start_date'] = f'{year}-{month:02d}-01'

実践的なベンチマークと最適設定 {#benchmarks}

パフォーマンステスト関数

import time

def benchmark_compression(data, chunk_sizes, compressions):
    """圧縮・チャンク設定のベンチマーク"""
    results = []
    
    for chunk_size in chunk_sizes:
        for comp in compressions:
            start_time = time.time()
            
            filename = f'bench_{comp}_{chunk_size}.h5'
            with h5py.File(filename, 'w') as f:
                dset = f.create_dataset('data', data.shape,
                                       chunks=(chunk_size, chunk_size),
                                       compression=comp, dtype='f4')
                dset[:] = data
            
            write_time = time.time() - start_time
            file_size = os.path.getsize(filename)
            
            results.append({
                'compression': comp,
                'chunk_size': chunk_size,
                'write_time': write_time,
                'file_size': file_size
            })
    
    return results

最適設定の推奨値

def get_recommended_settings(data_shape, access_pattern='balanced'):
    """データ特性に基づく推奨設定"""
    total_elements = np.prod(data_shape)
    
    if total_elements < 1e6:  # 小容量データ
        return {
            'chunks': True,
            'compression': 'lzf',
            'shuffle': False
        }
    elif total_elements < 1e9:  # 中容量データ
        return {
            'chunks': (min(1000, data_shape[0]), min(1000, data_shape[1])),
            'compression': 'gzip',
            'compression_opts': 6,
            'shuffle': True
        }
    else:  # 大容量データ
        return {
            'chunks': (100, 100),
            'compression': 'gzip',
            'compression_opts': 4,
            'shuffle': True,
            'fletcher32': True
        }

ファイル形式最適化

def optimize_file_format(input_file, output_file):
    """既存ファイルの最適化"""
    with h5py.File(input_file, 'r') as src:
        with h5py.File(output_file, 'w') as dst:
            for name, dataset in src.items():
                # 最適設定で再作成
                optimized_dset = dst.create_dataset(
                    name, dataset.shape,
                    chunks=True, compression='gzip',
                    compression_opts=6, shuffle=True,
                    dtype=dataset.dtype
                )
                
                # チャンク単位でコピー
                chunk_size = 10000
                for i in range(0, dataset.shape[0], chunk_size):
                    optimized_dset[i:i+chunk_size] = dataset[i:i+chunk_size]

トラブルシューティングとデバッグ

性能問題の診断

def diagnose_performance(filename, dataset_name):
    """パフォーマンス問題の診断"""
    with h5py.File(filename, 'r') as f:
        dset = f[dataset_name]
        
        print(f"データセット形状: {dset.shape}")
        print(f"チャンク設定: {dset.chunks}")
        print(f"圧縮方式: {dset.compression}")
        print(f"ファイルサイズ: {os.path.getsize(filename) / 1024**2:.1f} MB")
        
        # アクセスパターンテスト
        start_time = time.time()
        _ = dset[0:100, :]  # 行アクセス
        row_time = time.time() - start_time
        
        start_time = time.time()
        _ = dset[:, 0:100]  # 列アクセス
        col_time = time.time() - start_time
        
        print(f"行アクセス時間: {row_time:.3f}s")
        print(f"列アクセス時間: {col_time:.3f}s")

メモリ使用量の監視

import psutil

def monitor_memory_usage(func, *args, **kwargs):
    """メモリ使用量監視付き関数実行"""
    process = psutil.Process()
    initial_memory = process.memory_info().rss / 1024**2
    
    result = func(*args, **kwargs)
    
    peak_memory = process.memory_info().rss / 1024**2
    print(f"メモリ使用量: {peak_memory - initial_memory:.1f} MB")
    
    return result

まとめ

h5pyでのチャンクと圧縮の効果的な活用により、大容量データ処理の性能を大幅に向上させることができます。

重要なポイント:

チャンク設定のベストプラクティス

  • アクセスパターンに合わせたチャンクサイズ選択
  • 1MB程度のチャンクサイズが一般的に最適
  • 動的リサイズが必要な場合は事前設計が重要

圧縮アルゴリズムの選択基準

  • gzip:汎用性と圧縮率のバランス重視
  • lzf:高速処理が必要な場合
  • szip:科学データに特化した圧縮

パフォーマンス最適化技術

  • シャッフルフィルターによる圧縮率向上
  • 並列I/Oによる処理速度向上
  • ストリーミング処理によるメモリ効率化

適切な設定により、ファイルサイズを50-90%削減し、処理速度を2-10倍向上させることが可能です。データの特性と用途に応じて、これらの技術を組み合わせて活用しましょう。

■プロンプトだけでオリジナルアプリを開発・公開してみた!!

■AI時代の第一歩!「AI駆動開発コース」はじめました!

テックジム東京本校で先行開始。

■テックジム東京本校

「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。

<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。

<月1開催>放送作家による映像ディレクター養成講座

<オンライン無料>ゼロから始めるPython爆速講座