Python heapqモジュール完全マスターガイド – ヒープ操作から優先度付きキューまで

 

Pythonのheapqモジュールは、ヒープ(heap)データ構造を効率的に操作するための標準ライブラリです。最小値の高速取得、優先度付きキュー、上位N個の要素取得など、様々な場面で威力を発揮します。この記事では、heapqモジュールの基本的な使い方から高度な応用テクニックまで、実例とともに詳しく解説します。

1. heapqモジュールの基本概念

ヒープは最小ヒープとして実装されており、親ノードが子ノードより常に小さい(または等しい)完全二分木です。

import heapq

# リストをヒープに変換
data = [3, 7, 4, 1, 8, 2, 9]
heapq.heapify(data)
print(data)  # [1, 7, 2, 3, 8, 4, 9] - ヒープ構造に変換される

2. 基本的なヒープ操作

要素の追加(heappush)

import heapq

heap = []
heapq.heappush(heap, 5)
heapq.heappush(heap, 2)
heapq.heappush(heap, 8)
heapq.heappush(heap, 1)
print(heap)  # [1, 2, 8, 5]

最小値の取得と削除(heappop)

import heapq

heap = [1, 2, 8, 5]
min_value = heapq.heappop(heap)
print(f"最小値: {min_value}")  # 最小値: 1
print(f"残りのヒープ: {heap}")  # [2, 5, 8]

要素の追加と削除を同時に(heappushpop)

import heapq

heap = [2, 5, 8]
result = heapq.heappushpop(heap, 3)
print(f"削除された値: {result}")  # 削除された値: 2
print(f"ヒープ: {heap}")         # [3, 5, 8]

3. 上位・下位N個の要素取得

nlargest – 最大のN個を取得

import heapq

numbers = [1, 8, 2, 23, 7, 4, 18, 23, 42, 37, 2]
top_3 = heapq.nlargest(3, numbers)
print(top_3)  # [42, 37, 23]

nsmallest – 最小のN個を取得

import heapq

numbers = [1, 8, 2, 23, 7, 4, 18, 23, 42, 37, 2]
bottom_3 = heapq.nsmallest(3, numbers)
print(bottom_3)  # [1, 2, 2]

key引数を使った条件付き取得

import heapq

students = [
    {"name": "Alice", "score": 85},
    {"name": "Bob", "score": 92},
    {"name": "Charlie", "score": 78},
    {"name": "Diana", "score": 96}
]

top_2 = heapq.nlargest(2, students, key=lambda x: x["score"])
print(top_2)  # [{"name": "Diana", "score": 96}, {"name": "Bob", "score": 92}]

4. 優先度付きキューの実装

基本的な優先度付きキュー

import heapq

class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._index = 0
    
    def push(self, item, priority):
        heapq.heappush(self._queue, (priority, self._index, item))
        self._index += 1
    
    def pop(self):
        return heapq.heappop(self._queue)[-1]
    
    def is_empty(self):
        return len(self._queue) == 0

# 使用例
pq = PriorityQueue()
pq.push("低優先度タスク", 3)
pq.push("高優先度タスク", 1)
pq.push("中優先度タスク", 2)

while not pq.is_empty():
    task = pq.pop()
    print(task)  # 高優先度タスク → 中優先度タスク → 低優先度タスク

5. タスクスケジューリングの実装

import heapq
import time

class TaskScheduler:
    def __init__(self):
        self._tasks = []
    
    def schedule(self, func, delay, *args):
        execute_time = time.time() + delay
        heapq.heappush(self._tasks, (execute_time, func, args))
    
    def run_pending(self):
        current_time = time.time()
        while self._tasks and self._tasks[0][0] <= current_time:
            _, func, args = heapq.heappop(self._tasks)
            func(*args)

# 使用例
def print_message(msg):
    print(f"実行: {msg}")

scheduler = TaskScheduler()
scheduler.schedule(print_message, 1, "1秒後のタスク")
scheduler.schedule(print_message, 0.5, "0.5秒後のタスク")

time.sleep(1.5)
scheduler.run_pending()

6. 最大ヒープの実装

heapqは最小ヒープのみサポートしているため、最大ヒープは値を負数にして実現します。

import heapq

class MaxHeap:
    def __init__(self):
        self._heap = []
    
    def push(self, item):
        heapq.heappush(self._heap, -item)
    
    def pop(self):
        return -heapq.heappop(self._heap)
    
    def peek(self):
        return -self._heap[0] if self._heap else None

# 使用例
max_heap = MaxHeap()
max_heap.push(3)
max_heap.push(7)
max_heap.push(1)
max_heap.push(9)

print(max_heap.pop())  # 9(最大値)
print(max_heap.pop())  # 7

7. K番目の要素を効率的に取得

import heapq

def find_kth_largest(nums, k):
    """K番目に大きい要素を取得"""
    return heapq.nlargest(k, nums)[-1]

def find_kth_smallest(nums, k):
    """K番目に小さい要素を取得"""
    return heapq.nsmallest(k, nums)[-1]

numbers = [3, 1, 4, 1, 5, 9, 2, 6, 5]
print(find_kth_largest(numbers, 3))   # 5(3番目に大きい)
print(find_kth_smallest(numbers, 3))  # 2(3番目に小さい)

8. ストリーミングデータでの上位N個追跡

import heapq

class TopKTracker:
    def __init__(self, k):
        self.k = k
        self._heap = []
    
    def add(self, value):
        if len(self._heap) < self.k:
            heapq.heappush(self._heap, value)
        elif value > self._heap[0]:
            heapq.heapreplace(self._heap, value)
    
    def get_top_k(self):
        return sorted(self._heap, reverse=True)

# 使用例
tracker = TopKTracker(3)
for value in [1, 5, 3, 8, 2, 7, 4, 6]:
    tracker.add(value)
    print(f"現在の上位3位: {tracker.get_top_k()}")

9. マージされたソート済みリスト

import heapq

def merge_sorted_lists(*lists):
    """複数のソート済みリストをマージ"""
    merged = []
    heap = []
    
    # 各リストの最初の要素をヒープに追加
    for i, lst in enumerate(lists):
        if lst:
            heapq.heappush(heap, (lst[0], i, 0))
    
    while heap:
        value, list_idx, element_idx = heapq.heappop(heap)
        merged.append(value)
        
        # 次の要素があれば追加
        if element_idx + 1 < len(lists[list_idx]):
            next_value = lists[list_idx][element_idx + 1]
            heapq.heappush(heap, (next_value, list_idx, element_idx + 1))
    
    return merged

# 使用例
list1 = [1, 4, 7]
list2 = [2, 5, 8]
list3 = [3, 6, 9]
result = merge_sorted_lists(list1, list2, list3)
print(result)  # [1, 2, 3, 4, 5, 6, 7, 8, 9]

10. 移動平均の効率的な計算

import heapq
from collections import deque

class SlidingWindowMedian:
    def __init__(self, window_size):
        self.window_size = window_size
        self.window = deque()
        self.small_heap = []  # 最大ヒープ(負数で実現)
        self.large_heap = []  # 最小ヒープ
    
    def add_number(self, num):
        self.window.append(num)
        
        if len(self.window) > self.window_size:
            self.window.popleft()
        
        # 中央値計算のためのヒープバランス調整
        self._balance_heaps()
    
    def get_median(self):
        if len(self.window) % 2 == 1:
            return -self.small_heap[0]
        else:
            return (-self.small_heap[0] + self.large_heap[0]) / 2
    
    def _balance_heaps(self):
        # 簡略化された実装
        sorted_window = sorted(self.window)
        mid = len(sorted_window) // 2
        self.small_heap = [-x for x in sorted_window[:mid+1]]
        self.large_heap = sorted_window[mid+1:]
        heapq.heapify(self.small_heap)
        heapq.heapify(self.large_heap)

# 使用例
median_tracker = SlidingWindowMedian(3)
for num in [1, 2, 3, 4, 5]:
    median_tracker.add_number(num)
    print(f"中央値: {median_tracker.get_median()}")

11. CPU使用率監視システム

import heapq
import random
import time

class SystemMonitor:
    def __init__(self, alert_threshold=5):
        self.alert_threshold = alert_threshold
        self.high_usage_heap = []  # 高CPU使用率のプロセスを追跡
    
    def report_cpu_usage(self, process_id, cpu_percent):
        if cpu_percent >= 80:  # 高CPU使用率の閾値
            heapq.heappush(self.high_usage_heap, (-cpu_percent, process_id))
            
            # 上位5個のみ保持
            if len(self.high_usage_heap) > self.alert_threshold:
                heapq.heappop(self.high_usage_heap)
    
    def get_top_cpu_processes(self):
        return [(-usage, pid) for usage, pid in self.high_usage_heap]

# 使用例
monitor = SystemMonitor()
for i in range(10):
    process_id = f"proc_{i}"
    cpu_usage = random.randint(60, 100)
    monitor.report_cpu_usage(process_id, cpu_usage)
    print(f"{process_id}: {cpu_usage}%")

print("\n高CPU使用率プロセス(上位5個):")
for usage, pid in sorted(monitor.get_top_cpu_processes(), reverse=True):
    print(f"{pid}: {usage}%")

12. 経路探索アルゴリズム(ダイクストラ法)

import heapq

def dijkstra(graph, start):
    """ダイクストラ法による最短経路探索"""
    distances = {node: float('inf') for node in graph}
    distances[start] = 0
    pq = [(0, start)]
    visited = set()
    
    while pq:
        current_distance, current = heapq.heappop(pq)
        
        if current in visited:
            continue
        
        visited.add(current)
        
        for neighbor, weight in graph[current].items():
            distance = current_distance + weight
            
            if distance < distances[neighbor]:
                distances[neighbor] = distance
                heapq.heappush(pq, (distance, neighbor))
    
    return distances

# 使用例
graph = {
    'A': {'B': 4, 'C': 2},
    'B': {'C': 1, 'D': 5},
    'C': {'D': 8, 'E': 10},
    'D': {'E': 2},
    'E': {}
}

distances = dijkstra(graph, 'A')
print(distances)  # {'A': 0, 'B': 4, 'C': 2, 'D': 9, 'E': 11}

13. イベント駆動シミュレーション

import heapq
import random

class Event:
    def __init__(self, time, event_type, data=None):
        self.time = time
        self.event_type = event_type
        self.data = data
    
    def __lt__(self, other):
        return self.time < other.time

class EventSimulator:
    def __init__(self):
        self.event_queue = []
        self.current_time = 0
    
    def schedule_event(self, delay, event_type, data=None):
        event_time = self.current_time + delay
        event = Event(event_time, event_type, data)
        heapq.heappush(self.event_queue, event)
    
    def run_simulation(self, duration):
        self.schedule_event(random.uniform(0, 1), "customer_arrival")
        
        while self.event_queue and self.current_time < duration:
            event = heapq.heappop(self.event_queue)
            self.current_time = event.time
            
            if event.event_type == "customer_arrival":
                print(f"時刻 {self.current_time:.2f}: 顧客到着")
                # 次の顧客到着をスケジュール
                self.schedule_event(random.uniform(0.5, 2.0), "customer_arrival")

# 使用例
simulator = EventSimulator()
simulator.run_simulation(10)  # 10時間単位のシミュレーション

14. メモリ効率的な大規模データ処理

import heapq

def process_large_dataset(data_generator, k):
    """大規模データセットから上位K個を効率的に取得"""
    heap = []
    
    for item in data_generator():
        if len(heap) < k:
            heapq.heappush(heap, item)
        elif item > heap[0]:
            heapq.heapreplace(heap, item)
    
    return sorted(heap, reverse=True)

def large_number_generator():
    """大量の数値を生成するジェネレータ"""
    for i in range(1000000):
        yield random.randint(1, 10000)

# 使用例
import random
top_100 = process_large_dataset(large_number_generator, 100)
print(f"上位100個の最大値: {top_100[:5]}")

15. パフォーマンス比較

import heapq
import time
import random

def compare_performance():
    """heapqとsorted()のパフォーマンス比較"""
    data = [random.randint(1, 10000) for _ in range(100000)]
    k = 100
    
    # heapq.nlargest()
    start = time.time()
    heap_result = heapq.nlargest(k, data)
    heap_time = time.time() - start
    
    # sorted()
    start = time.time()
    sorted_result = sorted(data, reverse=True)[:k]
    sorted_time = time.time() - start
    
    print(f"heapq.nlargest(): {heap_time:.4f}秒")
    print(f"sorted(): {sorted_time:.4f}秒")
    print(f"heapqは{sorted_time/heap_time:.1f}倍高速")

compare_performance()

16. ベストプラクティスと注意点

ヒープの不変条件を保つ

import heapq

# 良い例:heapq関数を使用
heap = []
heapq.heappush(heap, 5)
heapq.heappush(heap, 2)

# 悪い例:直接リストを操作(ヒープ不変条件が破れる)
# heap.append(1)  # これはNG

# ヒープを直接操作した後は再構築
heap.append(1)  # 直接追加してしまった場合
heapq.heapify(heap)  # 再構築が必要

タプルを使った複合キー

import heapq

# 優先度とタイムスタンプの複合キー
tasks = []
heapq.heappush(tasks, (1, 1001, "高優先度タスク"))
heapq.heappush(tasks, (1, 1002, "同優先度だが後のタスク"))
heapq.heappush(tasks, (2, 1000, "低優先度タスク"))

while tasks:
    priority, timestamp, task = heapq.heappop(tasks)
    print(f"優先度{priority}, 時刻{timestamp}: {task}")

まとめ

Python heapqモジュールは、効率的なデータ構造操作を可能にする強力なツールです。最小値の高速取得、優先度付きキュー、上位N個の要素取得など、様々な用途で活用できます。

特に大規模データの処理やリアルタイムシステムにおいて、heapqの O(log n) の時間計算量は大きなメリットをもたらします。適切に活用することで、パフォーマンスの高いアプリケーションを構築できるでしょう。

ただし、ヒープの不変条件を保つことや、最大ヒープの実装には注意が必要です。これらのポイントを理解して、効果的にheapqモジュールを活用しましょう。

■プロンプトだけでオリジナルアプリを開発・公開してみた!!

■AI時代の第一歩!「AI駆動開発コース」はじめました!

テックジム東京本校で先行開始。

■テックジム東京本校

「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。

<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。

<月1開催>放送作家による映像ディレクター養成講座

<オンライン無料>ゼロから始めるPython爆速講座