Python heapqモジュール完全マスターガイド – ヒープ操作から優先度付きキューまで
Pythonのheapqモジュールは、ヒープ(heap)データ構造を効率的に操作するための標準ライブラリです。最小値の高速取得、優先度付きキュー、上位N個の要素取得など、様々な場面で威力を発揮します。この記事では、heapqモジュールの基本的な使い方から高度な応用テクニックまで、実例とともに詳しく解説します。
1. heapqモジュールの基本概念
ヒープは最小ヒープとして実装されており、親ノードが子ノードより常に小さい(または等しい)完全二分木です。
import heapq
# リストをヒープに変換
data = [3, 7, 4, 1, 8, 2, 9]
heapq.heapify(data)
print(data) # [1, 7, 2, 3, 8, 4, 9] - ヒープ構造に変換される
2. 基本的なヒープ操作
要素の追加(heappush)
import heapq
heap = []
heapq.heappush(heap, 5)
heapq.heappush(heap, 2)
heapq.heappush(heap, 8)
heapq.heappush(heap, 1)
print(heap) # [1, 2, 8, 5]
最小値の取得と削除(heappop)
import heapq
heap = [1, 2, 8, 5]
min_value = heapq.heappop(heap)
print(f"最小値: {min_value}") # 最小値: 1
print(f"残りのヒープ: {heap}") # [2, 5, 8]
要素の追加と削除を同時に(heappushpop)
import heapq
heap = [2, 5, 8]
result = heapq.heappushpop(heap, 3)
print(f"削除された値: {result}") # 削除された値: 2
print(f"ヒープ: {heap}") # [3, 5, 8]
3. 上位・下位N個の要素取得
nlargest – 最大のN個を取得
import heapq
numbers = [1, 8, 2, 23, 7, 4, 18, 23, 42, 37, 2]
top_3 = heapq.nlargest(3, numbers)
print(top_3) # [42, 37, 23]
nsmallest – 最小のN個を取得
import heapq
numbers = [1, 8, 2, 23, 7, 4, 18, 23, 42, 37, 2]
bottom_3 = heapq.nsmallest(3, numbers)
print(bottom_3) # [1, 2, 2]
key引数を使った条件付き取得
import heapq
students = [
{"name": "Alice", "score": 85},
{"name": "Bob", "score": 92},
{"name": "Charlie", "score": 78},
{"name": "Diana", "score": 96}
]
top_2 = heapq.nlargest(2, students, key=lambda x: x["score"])
print(top_2) # [{"name": "Diana", "score": 96}, {"name": "Bob", "score": 92}]
4. 優先度付きキューの実装
基本的な優先度付きキュー
import heapq
class PriorityQueue:
def __init__(self):
self._queue = []
self._index = 0
def push(self, item, priority):
heapq.heappush(self._queue, (priority, self._index, item))
self._index += 1
def pop(self):
return heapq.heappop(self._queue)[-1]
def is_empty(self):
return len(self._queue) == 0
# 使用例
pq = PriorityQueue()
pq.push("低優先度タスク", 3)
pq.push("高優先度タスク", 1)
pq.push("中優先度タスク", 2)
while not pq.is_empty():
task = pq.pop()
print(task) # 高優先度タスク → 中優先度タスク → 低優先度タスク
5. タスクスケジューリングの実装
import heapq
import time
class TaskScheduler:
def __init__(self):
self._tasks = []
def schedule(self, func, delay, *args):
execute_time = time.time() + delay
heapq.heappush(self._tasks, (execute_time, func, args))
def run_pending(self):
current_time = time.time()
while self._tasks and self._tasks[0][0] <= current_time:
_, func, args = heapq.heappop(self._tasks)
func(*args)
# 使用例
def print_message(msg):
print(f"実行: {msg}")
scheduler = TaskScheduler()
scheduler.schedule(print_message, 1, "1秒後のタスク")
scheduler.schedule(print_message, 0.5, "0.5秒後のタスク")
time.sleep(1.5)
scheduler.run_pending()
6. 最大ヒープの実装
heapqは最小ヒープのみサポートしているため、最大ヒープは値を負数にして実現します。
import heapq
class MaxHeap:
def __init__(self):
self._heap = []
def push(self, item):
heapq.heappush(self._heap, -item)
def pop(self):
return -heapq.heappop(self._heap)
def peek(self):
return -self._heap[0] if self._heap else None
# 使用例
max_heap = MaxHeap()
max_heap.push(3)
max_heap.push(7)
max_heap.push(1)
max_heap.push(9)
print(max_heap.pop()) # 9(最大値)
print(max_heap.pop()) # 7
7. K番目の要素を効率的に取得
import heapq
def find_kth_largest(nums, k):
"""K番目に大きい要素を取得"""
return heapq.nlargest(k, nums)[-1]
def find_kth_smallest(nums, k):
"""K番目に小さい要素を取得"""
return heapq.nsmallest(k, nums)[-1]
numbers = [3, 1, 4, 1, 5, 9, 2, 6, 5]
print(find_kth_largest(numbers, 3)) # 5(3番目に大きい)
print(find_kth_smallest(numbers, 3)) # 2(3番目に小さい)
8. ストリーミングデータでの上位N個追跡
import heapq
class TopKTracker:
def __init__(self, k):
self.k = k
self._heap = []
def add(self, value):
if len(self._heap) < self.k:
heapq.heappush(self._heap, value)
elif value > self._heap[0]:
heapq.heapreplace(self._heap, value)
def get_top_k(self):
return sorted(self._heap, reverse=True)
# 使用例
tracker = TopKTracker(3)
for value in [1, 5, 3, 8, 2, 7, 4, 6]:
tracker.add(value)
print(f"現在の上位3位: {tracker.get_top_k()}")
9. マージされたソート済みリスト
import heapq
def merge_sorted_lists(*lists):
"""複数のソート済みリストをマージ"""
merged = []
heap = []
# 各リストの最初の要素をヒープに追加
for i, lst in enumerate(lists):
if lst:
heapq.heappush(heap, (lst[0], i, 0))
while heap:
value, list_idx, element_idx = heapq.heappop(heap)
merged.append(value)
# 次の要素があれば追加
if element_idx + 1 < len(lists[list_idx]):
next_value = lists[list_idx][element_idx + 1]
heapq.heappush(heap, (next_value, list_idx, element_idx + 1))
return merged
# 使用例
list1 = [1, 4, 7]
list2 = [2, 5, 8]
list3 = [3, 6, 9]
result = merge_sorted_lists(list1, list2, list3)
print(result) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
10. 移動平均の効率的な計算
import heapq
from collections import deque
class SlidingWindowMedian:
def __init__(self, window_size):
self.window_size = window_size
self.window = deque()
self.small_heap = [] # 最大ヒープ(負数で実現)
self.large_heap = [] # 最小ヒープ
def add_number(self, num):
self.window.append(num)
if len(self.window) > self.window_size:
self.window.popleft()
# 中央値計算のためのヒープバランス調整
self._balance_heaps()
def get_median(self):
if len(self.window) % 2 == 1:
return -self.small_heap[0]
else:
return (-self.small_heap[0] + self.large_heap[0]) / 2
def _balance_heaps(self):
# 簡略化された実装
sorted_window = sorted(self.window)
mid = len(sorted_window) // 2
self.small_heap = [-x for x in sorted_window[:mid+1]]
self.large_heap = sorted_window[mid+1:]
heapq.heapify(self.small_heap)
heapq.heapify(self.large_heap)
# 使用例
median_tracker = SlidingWindowMedian(3)
for num in [1, 2, 3, 4, 5]:
median_tracker.add_number(num)
print(f"中央値: {median_tracker.get_median()}")
11. CPU使用率監視システム
import heapq
import random
import time
class SystemMonitor:
def __init__(self, alert_threshold=5):
self.alert_threshold = alert_threshold
self.high_usage_heap = [] # 高CPU使用率のプロセスを追跡
def report_cpu_usage(self, process_id, cpu_percent):
if cpu_percent >= 80: # 高CPU使用率の閾値
heapq.heappush(self.high_usage_heap, (-cpu_percent, process_id))
# 上位5個のみ保持
if len(self.high_usage_heap) > self.alert_threshold:
heapq.heappop(self.high_usage_heap)
def get_top_cpu_processes(self):
return [(-usage, pid) for usage, pid in self.high_usage_heap]
# 使用例
monitor = SystemMonitor()
for i in range(10):
process_id = f"proc_{i}"
cpu_usage = random.randint(60, 100)
monitor.report_cpu_usage(process_id, cpu_usage)
print(f"{process_id}: {cpu_usage}%")
print("\n高CPU使用率プロセス(上位5個):")
for usage, pid in sorted(monitor.get_top_cpu_processes(), reverse=True):
print(f"{pid}: {usage}%")
12. 経路探索アルゴリズム(ダイクストラ法)
import heapq
def dijkstra(graph, start):
"""ダイクストラ法による最短経路探索"""
distances = {node: float('inf') for node in graph}
distances[start] = 0
pq = [(0, start)]
visited = set()
while pq:
current_distance, current = heapq.heappop(pq)
if current in visited:
continue
visited.add(current)
for neighbor, weight in graph[current].items():
distance = current_distance + weight
if distance < distances[neighbor]:
distances[neighbor] = distance
heapq.heappush(pq, (distance, neighbor))
return distances
# 使用例
graph = {
'A': {'B': 4, 'C': 2},
'B': {'C': 1, 'D': 5},
'C': {'D': 8, 'E': 10},
'D': {'E': 2},
'E': {}
}
distances = dijkstra(graph, 'A')
print(distances) # {'A': 0, 'B': 4, 'C': 2, 'D': 9, 'E': 11}
13. イベント駆動シミュレーション
import heapq
import random
class Event:
def __init__(self, time, event_type, data=None):
self.time = time
self.event_type = event_type
self.data = data
def __lt__(self, other):
return self.time < other.time
class EventSimulator:
def __init__(self):
self.event_queue = []
self.current_time = 0
def schedule_event(self, delay, event_type, data=None):
event_time = self.current_time + delay
event = Event(event_time, event_type, data)
heapq.heappush(self.event_queue, event)
def run_simulation(self, duration):
self.schedule_event(random.uniform(0, 1), "customer_arrival")
while self.event_queue and self.current_time < duration:
event = heapq.heappop(self.event_queue)
self.current_time = event.time
if event.event_type == "customer_arrival":
print(f"時刻 {self.current_time:.2f}: 顧客到着")
# 次の顧客到着をスケジュール
self.schedule_event(random.uniform(0.5, 2.0), "customer_arrival")
# 使用例
simulator = EventSimulator()
simulator.run_simulation(10) # 10時間単位のシミュレーション
14. メモリ効率的な大規模データ処理
import heapq
def process_large_dataset(data_generator, k):
"""大規模データセットから上位K個を効率的に取得"""
heap = []
for item in data_generator():
if len(heap) < k:
heapq.heappush(heap, item)
elif item > heap[0]:
heapq.heapreplace(heap, item)
return sorted(heap, reverse=True)
def large_number_generator():
"""大量の数値を生成するジェネレータ"""
for i in range(1000000):
yield random.randint(1, 10000)
# 使用例
import random
top_100 = process_large_dataset(large_number_generator, 100)
print(f"上位100個の最大値: {top_100[:5]}")
15. パフォーマンス比較
import heapq
import time
import random
def compare_performance():
"""heapqとsorted()のパフォーマンス比較"""
data = [random.randint(1, 10000) for _ in range(100000)]
k = 100
# heapq.nlargest()
start = time.time()
heap_result = heapq.nlargest(k, data)
heap_time = time.time() - start
# sorted()
start = time.time()
sorted_result = sorted(data, reverse=True)[:k]
sorted_time = time.time() - start
print(f"heapq.nlargest(): {heap_time:.4f}秒")
print(f"sorted(): {sorted_time:.4f}秒")
print(f"heapqは{sorted_time/heap_time:.1f}倍高速")
compare_performance()
16. ベストプラクティスと注意点
ヒープの不変条件を保つ
import heapq
# 良い例:heapq関数を使用
heap = []
heapq.heappush(heap, 5)
heapq.heappush(heap, 2)
# 悪い例:直接リストを操作(ヒープ不変条件が破れる)
# heap.append(1) # これはNG
# ヒープを直接操作した後は再構築
heap.append(1) # 直接追加してしまった場合
heapq.heapify(heap) # 再構築が必要
タプルを使った複合キー
import heapq
# 優先度とタイムスタンプの複合キー
tasks = []
heapq.heappush(tasks, (1, 1001, "高優先度タスク"))
heapq.heappush(tasks, (1, 1002, "同優先度だが後のタスク"))
heapq.heappush(tasks, (2, 1000, "低優先度タスク"))
while tasks:
priority, timestamp, task = heapq.heappop(tasks)
print(f"優先度{priority}, 時刻{timestamp}: {task}")
まとめ
Python heapqモジュールは、効率的なデータ構造操作を可能にする強力なツールです。最小値の高速取得、優先度付きキュー、上位N個の要素取得など、様々な用途で活用できます。
特に大規模データの処理やリアルタイムシステムにおいて、heapqの O(log n) の時間計算量は大きなメリットをもたらします。適切に活用することで、パフォーマンスの高いアプリケーションを構築できるでしょう。
ただし、ヒープの不変条件を保つことや、最大ヒープの実装には注意が必要です。これらのポイントを理解して、効果的にheapqモジュールを活用しましょう。
■プロンプトだけでオリジナルアプリを開発・公開してみた!!
■AI時代の第一歩!「AI駆動開発コース」はじめました!
テックジム東京本校で先行開始。
■テックジム東京本校
「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。
<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。
<月1開催>放送作家による映像ディレクター養成講座
<オンライン無料>ゼロから始めるPython爆速講座