スマートグリッドとは?機械学習で実現する次世代電力システム完全ガイド
スマートグリッドの基本概念
スマートグリッド(Smart Grid)は、ICT(情報通信技術)を活用して電力の需給バランスを最適化する次世代電力網です。従来の一方向的な電力供給システムを、双方向通信可能なインテリジェントなネットワークに進化させることで、効率的で信頼性の高い電力供給を実現します。
スマートグリッドの主要構成要素
- スマートメーター: リアルタイムでの電力使用量測定・通信
- HEMS(Home Energy Management System): 家庭内エネルギー管理
- 蓄電池システム: 電力の貯蔵・放電制御
- 再生可能エネルギー: 太陽光・風力発電の統合
- 需要応答(DR): 需要パターンの動的制御
機械学習がスマートグリッドにもたらす革新
1. エネルギー需要予測
機械学習により、過去のデータパターンから将来の電力需要を高精度で予測できます。
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
import matplotlib.pyplot as plt
# サンプルデータの作成
np.random.seed(42)
dates = pd.date_range('2023-01-01', periods=8760, freq='H')
df = pd.DataFrame({
'datetime': dates,
'temperature': np.random.normal(20, 10, 8760),
'humidity': np.random.normal(60, 20, 8760),
'hour': dates.hour,
'day_of_week': dates.dayofweek,
'month': dates.month
})
# 電力需要の合成(気温と時間に依存)
df['demand'] = (1000 +
50 * np.sin(2 * np.pi * df['hour'] / 24) +
30 * (df['temperature'] - 20) ** 2 / 100 +
np.random.normal(0, 50, 8760))
# 特徴量の準備
features = ['temperature', 'humidity', 'hour', 'day_of_week', 'month']
X = df[features]
y = df['demand']
# 学習・テストデータ分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# ランダムフォレストモデルの学習
rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)
# 予測と評価
y_pred = rf_model.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
print(f"需要予測MAE: {mae:.2f} kW")
2. LSTMによる時系列需要予測
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.preprocessing import MinMaxScaler
# データの前処理
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df[['demand']].values)
def create_sequences(data, seq_length):
X, y = [], []
for i in range(len(data) - seq_length):
X.append(data[i:i+seq_length])
y.append(data[i+seq_length])
return np.array(X), np.array(y)
# シーケンスデータの作成
seq_length = 24 # 24時間のデータを使用
X_seq, y_seq = create_sequences(scaled_data, seq_length)
# LSTMモデルの構築
model = Sequential([
LSTM(50, return_sequences=True, input_shape=(seq_length, 1)),
Dropout(0.2),
LSTM(50, return_sequences=False),
Dropout(0.2),
Dense(25),
Dense(1)
])
model.compile(optimizer='adam', loss='mse')
print("LSTMモデル構築完了")
再生可能エネルギー出力予測
太陽光発電出力予測モデル
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import r2_score
# 太陽光発電データの作成
df['solar_irradiance'] = np.maximum(0,
1000 * np.sin(np.pi * (df['hour'] - 6) / 12) *
(df['hour'] >= 6) * (df['hour'] <= 18) +
np.random.normal(0, 100, len(df))
)
df['solar_output'] = (df['solar_irradiance'] * 0.2 *
(1 - df['humidity'] / 200) +
np.random.normal(0, 10, len(df)))
# 特徴量の準備
solar_features = ['solar_irradiance', 'temperature', 'humidity', 'hour']
X_solar = df[solar_features]
y_solar = df['solar_output']
# モデル学習
X_train_s, X_test_s, y_train_s, y_test_s = train_test_split(
X_solar, y_solar, test_size=0.2, random_state=42
)
gb_model = GradientBoostingRegressor(n_estimators=100, random_state=42)
gb_model.fit(X_train_s, y_train_s)
# 予測評価
y_pred_s = gb_model.predict(X_test_s)
r2 = r2_score(y_test_s, y_pred_s)
print(f"太陽光出力予測 R²: {r2:.3f}")
風力発電出力予測
# 風力発電データの合成
df['wind_speed'] = np.abs(np.random.normal(8, 3, len(df)))
df['wind_direction'] = np.random.uniform(0, 360, len(df))
# 風力発電出力(風速の3乗に比例)
df['wind_output'] = np.minimum(2000,
np.maximum(0, 0.5 * df['wind_speed'] ** 3 - 50) +
np.random.normal(0, 20, len(df))
)
# 風力予測モデル
wind_features = ['wind_speed', 'wind_direction', 'temperature', 'hour']
X_wind = df[wind_features]
y_wind = df['wind_output']
# ニューラルネットワークによる予測
from sklearn.neural_network import MLPRegressor
mlp_model = MLPRegressor(hidden_layer_sizes=(100, 50),
max_iter=500, random_state=42)
X_train_w, X_test_w, y_train_w, y_test_w = train_test_split(
X_wind, y_wind, test_size=0.2, random_state=42
)
mlp_model.fit(X_train_w, y_train_w)
y_pred_w = mlp_model.predict(X_test_w)
mae_wind = mean_absolute_error(y_test_w, y_pred_w)
print(f"風力出力予測 MAE: {mae_wind:.2f} kW")
電力系統の異常検知
オートエンコーダによる異常検知
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from sklearn.preprocessing import StandardScaler
# 異常検知用データの準備
normal_data = df[df['demand'] < df['demand'].quantile(0.95)]
scaler_ad = StandardScaler()
X_normal = scaler_ad.fit_transform(normal_data[features])
# オートエンコーダの構築
input_dim = X_normal.shape[1]
input_layer = Input(shape=(input_dim,))
encoded = Dense(3, activation='relu')(input_layer)
decoded = Dense(input_dim, activation='linear')(encoded)
autoencoder = Model(input_layer, decoded)
autoencoder.compile(optimizer='adam', loss='mse')
# 学習
autoencoder.fit(X_normal, X_normal, epochs=50, batch_size=32, verbose=0)
# 異常スコアの計算
X_test_scaled = scaler_ad.transform(X_test)
reconstructed = autoencoder.predict(X_test_scaled)
mse_scores = np.mean(np.square(X_test_scaled - reconstructed), axis=1)
# 異常判定
threshold = np.percentile(mse_scores, 95)
anomalies = mse_scores > threshold
print(f"検出された異常データ数: {sum(anomalies)}")
電力品質監視システム
from scipy import signal
from sklearn.cluster import DBSCAN
# 電力品質データの合成
df['voltage'] = 230 + np.random.normal(0, 5, len(df))
df['frequency'] = 50 + np.random.normal(0, 0.1, len(df))
df['harmonic_distortion'] = np.abs(np.random.normal(2, 1, len(df)))
# 電力品質指標の異常検知
quality_features = ['voltage', 'frequency', 'harmonic_distortion']
X_quality = df[quality_features]
# DBSCANによる異常検知
scaler_q = StandardScaler()
X_quality_scaled = scaler_q.fit_transform(X_quality)
dbscan = DBSCAN(eps=0.5, min_samples=10)
clusters = dbscan.fit_predict(X_quality_scaled)
# 異常ポイント(クラスタ-1)の特定
anomaly_indices = np.where(clusters == -1)[0]
print(f"電力品質異常検知数: {len(anomaly_indices)}")
負荷分散最適化
遺伝的アルゴリズムによる電源配分最適化
import random
from deap import creator, base, tools, algorithms
# 発電機データの設定
generators = [
{'capacity': 1000, 'cost_per_mw': 50, 'min_output': 200},
{'capacity': 800, 'cost_per_mw': 40, 'min_output': 100},
{'capacity': 600, 'cost_per_mw': 60, 'min_output': 50},
{'capacity': 1200, 'cost_per_mw': 45, 'min_output': 300}
]
target_demand = 2000 # 目標需要 [MW]
# 適応度関数の定義
def evaluate_dispatch(individual):
total_output = sum(individual)
total_cost = sum(output * gen['cost_per_mw']
for output, gen in zip(individual, generators))
# 制約違反のペナルティ
penalty = 0
if abs(total_output - target_demand) > 50:
penalty += 1000 * abs(total_output - target_demand)
for output, gen in zip(individual, generators):
if output > gen['capacity'] or output < gen['min_output']:
penalty += 10000
return total_cost + penalty,
# 遺伝的アルゴリズムの設定
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)
toolbox = base.Toolbox()
toolbox.register("attr_output", random.randint, 0, 1200)
toolbox.register("individual", tools.initRepeat, creator.Individual,
toolbox.attr_output, n=len(generators))
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", evaluate_dispatch)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=50, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)
# 最適化実行
population = toolbox.population(n=50)
result, logbook = algorithms.eaSimple(population, toolbox,
cxpb=0.7, mutpb=0.3, ngen=50,
verbose=False)
# 最適解の取得
best_individual = tools.selBest(result, 1)[0]
print(f"最適電源配分: {best_individual}")
print(f"総コスト: {evaluate_dispatch(best_individual)[0]}")
需要応答(DR)システム
強化学習による動的価格設定
import numpy as np
from collections import defaultdict
class DemandResponseAgent:
def __init__(self, n_price_levels=10, learning_rate=0.1,
epsilon=0.1, discount_factor=0.9):
self.n_price_levels = n_price_levels
self.lr = learning_rate
self.epsilon = epsilon
self.gamma = discount_factor
self.q_table = defaultdict(lambda: np.zeros(n_price_levels))
def get_state(self, demand, supply, time):
# 状態の離散化
demand_level = min(int(demand / 200), 9)
supply_level = min(int(supply / 200), 9)
time_slot = time % 24
return (demand_level, supply_level, time_slot)
def choose_action(self, state):
if random.random() < self.epsilon:
return random.randint(0, self.n_price_levels - 1)
return np.argmax(self.q_table[state])
def update_q_table(self, state, action, reward, next_state):
current_q = self.q_table[state][action]
max_next_q = np.max(self.q_table[next_state])
new_q = current_q + self.lr * (reward + self.gamma * max_next_q - current_q)
self.q_table[state][action] = new_q
# エージェントの初期化と学習例
dr_agent = DemandResponseAgent()
# シミュレーション例
for episode in range(100):
state = dr_agent.get_state(1500, 1600, 14) # 14時の状態
action = dr_agent.choose_action(state)
# 報酬の計算(需給バランスに基づく)
price = 30 + action * 5 # 30-75円/kWh
demand_reduction = action * 50 # 価格に応じた需要削減
reward = -abs(1500 - demand_reduction - 1600) # 需給バランス報酬
next_state = dr_agent.get_state(1500 - demand_reduction, 1600, 15)
dr_agent.update_q_table(state, action, reward, next_state)
print("需要応答エージェント学習完了")
エネルギー貯蔵システム最適化
蓄電池制御の動的プログラミング
def battery_optimization(prices, demand, battery_capacity=1000):
"""
動的プログラミングによる蓄電池最適制御
"""
hours = len(prices)
states = range(0, battery_capacity + 1, 50) # 充電状態
# DPテーブルの初期化
dp = {}
actions = {}
# 最終時刻の初期化
for state in states:
dp[(hours-1, state)] = 0
actions[(hours-1, state)] = 0
# 逆方向に計算
for t in range(hours-2, -1, -1):
for state in states:
best_value = float('inf')
best_action = 0
# 可能なアクション(充電・放電)
for action in range(-200, 201, 50): # -200~200kW
new_state = state + action
# 制約チェック
if new_state < 0 or new_state > battery_capacity:
continue
# コスト計算
cost = action * prices[t] / 1000 # 電力購入・売却コスト
future_value = dp.get((t+1, new_state), float('inf'))
total_value = cost + future_value
if total_value < best_value:
best_value = total_value
best_action = action
dp[(t, state)] = best_value
actions[(t, state)] = best_action
return dp, actions
# 価格データの例
hourly_prices = [40, 35, 30, 25, 30, 45, 55, 65, 70, 60,
50, 45, 40, 45, 50, 60, 70, 75, 65, 55,
50, 45, 40, 35]
# 最適化実行
dp_table, action_table = battery_optimization(hourly_prices, None)
print("蓄電池最適制御計算完了")
エネルギー取引市場への機械学習応用
電力価格予測モデル
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import PolynomialFeatures
# 電力価格データの合成
df['price'] = (30 + 20 * np.sin(2 * np.pi * df['hour'] / 24) +
10 * df['demand'] / 1000 +
np.random.normal(0, 5, len(df)))
# 特徴量エンジニアリング
price_features = ['demand', 'solar_output', 'wind_output', 'hour']
poly_features = PolynomialFeatures(degree=2, interaction_only=True)
X_price_poly = poly_features.fit_transform(df[price_features])
# ElasticNetによる価格予測
elastic_model = ElasticNet(alpha=0.1, l1_ratio=0.7)
X_train_p, X_test_p, y_train_p, y_test_p = train_test_split(
X_price_poly, df['price'], test_size=0.2, random_state=42
)
elastic_model.fit(X_train_p, y_train_p)
price_pred = elastic_model.predict(X_test_p)
price_mae = mean_absolute_error(y_test_p, price_pred)
print(f"価格予測MAE: {price_mae:.2f} 円/kWh")
マルチエージェントシステムによる分散制御
class GridAgent:
def __init__(self, agent_id, agent_type):
self.id = agent_id
self.type = agent_type # 'consumer', 'producer', 'storage'
self.energy_state = 0
self.neighbors = []
def add_neighbor(self, neighbor):
self.neighbors.append(neighbor)
def negotiate_energy(self, time_step):
if self.type == 'consumer':
demand = 100 + 50 * np.sin(2 * np.pi * time_step / 24)
return -demand # 負の値は需要
elif self.type == 'producer':
supply = 80 + 40 * np.sin(2 * np.pi * (time_step - 6) / 12)
return max(0, supply) # 正の値は供給
else: # storage
# 簡単な蓄電池制御ロジック
if time_step < 12:
return -20 # 充電
else:
return 20 # 放電
# マルチエージェントシステムの構築
agents = []
for i in range(5):
agent_type = ['consumer', 'producer', 'storage'][i % 3]
agent = GridAgent(i, agent_type)
agents.append(agent)
# エージェント間の接続
for i, agent in enumerate(agents):
for j, neighbor in enumerate(agents):
if i != j and abs(i - j) <= 2: # 近隣エージェントと接続
agent.add_neighbor(neighbor)
# シミュレーション実行
for t in range(24):
total_balance = 0
for agent in agents:
balance = agent.negotiate_energy(t)
total_balance += balance
print(f"時刻{t}: 系統バランス = {total_balance:.1f} kW")
可視化とモニタリング
リアルタイムダッシュボード
import plotly.graph_objects as go
from plotly.subplots import make_subplots
def create_smart_grid_dashboard(df):
# サブプロットの作成
fig = make_subplots(
rows=2, cols=2,
subplot_titles=['電力需要予測', '再生可能エネルギー出力',
'電力価格推移', '需給バランス'],
specs=[[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": True}, {"secondary_y": False}]]
)
# 需要予測グラフ
fig.add_trace(
go.Scatter(x=df['datetime'][:168], y=df['demand'][:168],
name='実績需要', line=dict(color='blue')),
row=1, col=1
)
# 再生可能エネルギー出力
fig.add_trace(
go.Scatter(x=df['datetime'][:168], y=df['solar_output'][:168],
name='太陽光', fill='tonexty',
line=dict(color='orange')),
row=1, col=2
)
fig.add_trace(
go.Scatter(x=df['datetime'][:168], y=df['wind_output'][:168],
name='風力', fill='tonexty',
line=dict(color='green')),
row=1, col=2
)
# 電力価格
fig.add_trace(
go.Scatter(x=df['datetime'][:168], y=df['price'][:168],
name='電力価格', line=dict(color='red')),
row=2, col=1
)
# 需給バランス
supply = df['solar_output'] + df['wind_output']
balance = supply - df['demand']
fig.add_trace(
go.Bar(x=df['datetime'][:168], y=balance[:168],
name='需給バランス'),
row=2, col=2
)
fig.update_layout(height=800, title_text="スマートグリッド監視ダッシュボード")
return fig
# ダッシュボードの作成
dashboard = create_smart_grid_dashboard(df)
print("ダッシュボード作成完了")
セキュリティとプライバシー保護
連合学習によるプライバシー保護
class FederatedLearningNode:
def __init__(self, node_id, local_data):
self.node_id = node_id
self.local_data = local_data
self.local_model = RandomForestRegressor(n_estimators=10)
self.global_weights = None
def train_local_model(self):
X_local = self.local_data[features]
y_local = self.local_data['demand']
self.local_model.fit(X_local, y_local)
return self.get_model_updates()
def get_model_updates(self):
# 簡略化されたモデル更新(実際にはより複雑)
return {
'node_id': self.node_id,
'n_estimators': self.local_model.n_estimators,
'feature_importances': self.local_model.feature_importances_
}
def update_global_model(self, aggregated_updates):
# グローバルモデルの更新
self.global_weights = aggregated_updates
# 連合学習のシミュレーション
def federated_learning_simulation(df, n_nodes=3):
# データを各ノードに分散
node_data = np.array_split(df, n_nodes)
nodes = [FederatedLearningNode(i, data) for i, data in enumerate(node_data)]
# 各ノードでローカル学習
updates = []
for node in nodes:
update = node.train_local_model()
updates.append(update)
# 単純な平均による集約
avg_importance = np.mean([u['feature_importances'] for u in updates], axis=0)
return avg_importance
# 連合学習実行
fl_importance = federated_learning_simulation(df)
print("連合学習による特徴量重要度:", fl_importance)
実装時の課題と解決策
データ品質管理
def data_quality_check(df):
"""データ品質チェック関数"""
quality_report = {}
# 欠損値チェック
missing_values = df.isnull().sum()
quality_report['missing_values'] = missing_values[missing_values > 0]
# 異常値検知
numeric_columns = df.select_dtypes(include=[np.number]).columns
outliers = {}
for col in numeric_columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_count = ((df[col] < lower_bound) |
(df[col] > upper_bound)).sum()
if outlier_count > 0:
outliers[col] = outlier_count
quality_report['outliers'] = outliers
# データの一貫性チェック
consistency_issues = []
if 'demand' in df.columns and (df['demand'] < 0).any():
consistency_issues.append("負の需要値が存在")
quality_report['consistency_issues'] = consistency_issues
return quality_report
# データ品質チェック実行
quality_report = data_quality_check(df)
print("データ品質レポート:", quality_report)
リアルタイム処理最適化
import time
from threading import Thread
import queue
class RealTimeProcessor:
def __init__(self, model, processing_interval=1.0):
self.model = model
self.processing_interval = processing_interval
self.data_queue = queue.Queue()
self.results_queue = queue.Queue()
self.running = False
def add_data(self, data):
self.data_queue.put(data)
def process_data(self):
while self.running:
try:
if not self.data_queue.empty():
data = self.data_queue.get(timeout=1.0)
# 予測処理
start_time = time.time()
prediction = self.model.predict([data])[0]
processing_time = time.time() - start_time
result = {
'timestamp': time.time(),
'prediction': prediction,
'processing_time': processing_time
}
self.results_queue.put(result)
time.sleep(self.processing_interval)
except queue.Empty:
continue
def start_processing(self):
self.running = True
self.processing_thread = Thread(target=self.process_data)
self.processing_thread.start()
def stop_processing(self):
self.running = False
if hasattr(self, 'processing_thread'):
self.processing_thread.join()
# リアルタイム処理の設定
rt_processor = RealTimeProcessor(rf_model, processing_interval=0.1)
rt_processor.start_processing()
# シミュレーションデータの投入
for i in range(10):
sample_data = [20.5, 65.0, 14, 1, 6] # 温度、湿度、時刻、曜日、月
rt_processor.add_data(sample_data)
time.sleep(0.2)
time.sleep(2) # 処理完了を待機
rt_processor.stop_processing()
print("リアルタイム処理完了")
今後の技術トレンドと展望
エッジコンピューティングの活用
class EdgeComputingNode:
def __init__(self, node_id, lightweight_model):
self.node_id = node_id
self.model = lightweight_model
self.local_cache = {}
self.processing_stats = {'predictions': 0, 'avg_latency': 0}
def predict_local(self, input_data):
"""エッジでのローカル予測処理"""
start_time = time.time()
# キャッシュチェック
cache_key = str(hash(str(input_data)))
if cache_key in self.local_cache:
return self.local_cache[cache_key]
# 予測実行
prediction = self.model.predict([input_data])[0]
# 結果をキャッシュ
self.local_cache[cache_key] = prediction
# 統計更新
latency = time.time() - start_time
self.processing_stats['predictions'] += 1
self.processing_stats['avg_latency'] = (
(self.processing_stats['avg_latency'] *
(self.processing_stats['predictions'] - 1) + latency) /
self.processing_stats['predictions']
)
return prediction
# エッジノードの作成
from sklearn.tree import DecisionTreeRegressor
# 軽量モデルの作成
lightweight_model = DecisionTreeRegressor(max_depth=5)
lightweight_model.fit(X_train, y_train)
edge_node = EdgeComputingNode("edge_001", lightweight_model)
# エッジ予測の実行例
test_input = [22.0, 55.0, 15, 2, 6]
edge_prediction = edge_node.predict_local(test_input)
print(f"エッジ予測結果: {edge_prediction:.2f}")
print(f"処理統計: {edge_node.processing_stats}")
デジタルツインとシミュレーション
class SmartGridDigitalTwin:
def __init__(self):
self.components = {}
self.simulation_time = 0
self.state_history = []
def add_component(self, component_id, component_type, parameters):
"""グリッドコンポーネントの追加"""
self.components[component_id] = {
'type': component_type,
'parameters': parameters,
'state': self.initialize_state(component_type)
}
def initialize_state(self, component_type):
"""コンポーネント状態の初期化"""
if component_type == 'generator':
return {'output': 0, 'efficiency': 0.9, 'status': 'online'}
elif component_type == 'battery':
return {'charge_level': 0.5, 'max_capacity': 1000}
elif component_type == 'load':
return {'demand': 0, 'priority': 'normal'}
else:
return {}
def simulate_step(self, external_conditions):
"""シミュレーションステップの実行"""
self.simulation_time += 1
# 各コンポーネントの状態更新
for comp_id, component in self.components.items():
if component['type'] == 'generator':
# 発電機の出力計算
base_output = component['parameters']['capacity']
weather_factor = external_conditions.get('weather_factor', 1.0)
component['state']['output'] = base_output * weather_factor
elif component['type'] == 'battery':
# 蓄電池の充放電シミュレーション
charge_rate = external_conditions.get('charge_command', 0)
current_charge = component['state']['charge_level']
max_capacity = component['state']['max_capacity']
new_charge = max(0, min(max_capacity,
current_charge + charge_rate))
component['state']['charge_level'] = new_charge
# 状態履歴の保存
current_state = {
'time': self.simulation_time,
'components': {k: v['state'].copy()
for k, v in self.components.items()}
}
self.state_history.append(current_state)
return current_state
def get_system_metrics(self):
"""システム全体のメトリクス計算"""
if not self.state_history:
return {}
latest_state = self.state_history[-1]
total_generation = sum(
comp['output'] for comp in latest_state['components'].values()
if 'output' in comp
)
total_storage = sum(
comp['charge_level'] for comp in latest_state['components'].values()
if 'charge_level' in comp
)
return {
'total_generation': total_generation,
'total_storage': total_storage,
'simulation_time': self.simulation_time
}
# デジタルツインの構築例
digital_twin = SmartGridDigitalTwin()
# コンポーネントの追加
digital_twin.add_component('gen_001', 'generator', {'capacity': 1000})
digital_twin.add_component('bat_001', 'battery', {'capacity': 500})
digital_twin.add_component('load_001', 'load', {'base_demand': 800})
# シミュレーション実行
for hour in range(24):
conditions = {
'weather_factor': 0.8 + 0.4 * np.sin(2 * np.pi * hour / 24),
'charge_command': 50 if hour < 12 else -30
}
state = digital_twin.simulate_step(conditions)
metrics = digital_twin.get_system_metrics()
if hour % 6 == 0: # 6時間ごとに表示
print(f"時刻{hour}: 発電量={metrics['total_generation']:.1f}MW, "
f"蓄電量={metrics['total_storage']:.1f}MWh")
ブロックチェーンとP2P電力取引
import hashlib
import json
from datetime import datetime
class EnergyTransaction:
def __init__(self, seller_id, buyer_id, amount, price, timestamp=None):
self.seller_id = seller_id
self.buyer_id = buyer_id
self.amount = amount # kWh
self.price = price # 円/kWh
self.timestamp = timestamp or datetime.now().isoformat()
self.transaction_id = self.generate_transaction_id()
def generate_transaction_id(self):
"""トランザクションIDの生成"""
data = f"{self.seller_id}{self.buyer_id}{self.amount}{self.price}{self.timestamp}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def to_dict(self):
return {
'transaction_id': self.transaction_id,
'seller_id': self.seller_id,
'buyer_id': self.buyer_id,
'amount': self.amount,
'price': self.price,
'timestamp': self.timestamp
}
class P2PEnergyMarket:
def __init__(self):
self.participants = {}
self.pending_transactions = []
self.completed_transactions = []
self.market_prices = []
def register_participant(self, participant_id, participant_type, capacity):
"""市場参加者の登録"""
self.participants[participant_id] = {
'type': participant_type, # 'producer', 'consumer', 'prosumer'
'capacity': capacity,
'energy_balance': 0,
'transactions': []
}
def submit_offer(self, seller_id, amount, price):
"""売り注文の提出"""
if seller_id not in self.participants:
return False
offer = {
'type': 'sell',
'participant_id': seller_id,
'amount': amount,
'price': price,
'timestamp': datetime.now().isoformat()
}
# マッチングの実行
return self.match_orders(offer)
def submit_bid(self, buyer_id, amount, max_price):
"""買い注文の提出"""
if buyer_id not in self.participants:
return False
bid = {
'type': 'buy',
'participant_id': buyer_id,
'amount': amount,
'price': max_price,
'timestamp': datetime.now().isoformat()
}
return self.match_orders(bid)
def match_orders(self, new_order):
"""注文のマッチング処理"""
matched_transactions = []
if new_order['type'] == 'sell':
# 買い注文との照合
for bid in self.pending_transactions:
if (bid['type'] == 'buy' and
bid['price'] >= new_order['price']):
# 取引量の決定
trade_amount = min(new_order['amount'], bid['amount'])
trade_price = (new_order['price'] + bid['price']) / 2
# トランザクション作成
transaction = EnergyTransaction(
new_order['participant_id'],
bid['participant_id'],
trade_amount,
trade_price
)
matched_transactions.append(transaction)
# 注文量の更新
new_order['amount'] -= trade_amount
bid['amount'] -= trade_amount
if bid['amount'] <= 0:
self.pending_transactions.remove(bid)
if new_order['amount'] <= 0:
break
# 未処理の注文を保留リストに追加
if new_order['amount'] > 0:
self.pending_transactions.append(new_order)
# 成立した取引を記録
for transaction in matched_transactions:
self.completed_transactions.append(transaction)
self.update_participant_balance(transaction)
return matched_transactions
def update_participant_balance(self, transaction):
"""参加者のエネルギーバランス更新"""
seller = self.participants[transaction.seller_id]
buyer = self.participants[transaction.buyer_id]
seller['energy_balance'] -= transaction.amount
buyer['energy_balance'] += transaction.amount
seller['transactions'].append(transaction.transaction_id)
buyer['transactions'].append(transaction.transaction_id)
def get_market_summary(self):
"""市場サマリーの取得"""
if not self.completed_transactions:
return {}
recent_prices = [t.price for t in self.completed_transactions[-10:]]
avg_price = sum(recent_prices) / len(recent_prices)
total_volume = sum(t.amount for t in self.completed_transactions)
return {
'average_price': avg_price,
'total_transactions': len(self.completed_transactions),
'total_volume': total_volume,
'pending_orders': len(self.pending_transactions)
}
# P2P電力取引市場のシミュレーション
market = P2PEnergyMarket()
# 参加者の登録
market.register_participant('solar_house_1', 'producer', 10)
market.register_participant('factory_1', 'consumer', 100)
market.register_participant('community_1', 'prosumer', 50)
# 取引の実行
market.submit_offer('solar_house_1', 8, 25) # 8kWh を 25円/kWh で売り
market.submit_bid('factory_1', 50, 30) # 50kWh を 最大30円/kWh で買い
# 市場サマリーの表示
summary = market.get_market_summary()
print("P2P電力取引市場サマリー:")
for key, value in summary.items():
print(f" {key}: {value}")
パフォーマンス最適化と運用監視
分散処理による負荷分散
from multiprocessing import Pool
import concurrent.futures
def process_grid_data_chunk(data_chunk):
"""グリッドデータの並列処理"""
chunk_id, chunk_data = data_chunk
# 各チャンクでの需要予測
predictions = []
for row in chunk_data:
# 簡略化された予測処理
pred = sum(row) * 0.1 + np.random.normal(0, 5)
predictions.append(pred)
return {
'chunk_id': chunk_id,
'predictions': predictions,
'processed_count': len(predictions)
}
def parallel_grid_processing(data, n_workers=4):
"""並列グリッド処理"""
# データをチャンクに分割
chunk_size = len(data) // n_workers
chunks = []
for i in range(n_workers):
start_idx = i * chunk_size
if i == n_workers - 1:
end_idx = len(data)
else:
end_idx = (i + 1) * chunk_size
chunks.append((i, data[start_idx:end_idx]))
# 並列処理の実行
with concurrent.futures.ProcessPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(process_grid_data_chunk, chunks))
# 結果の統合
all_predictions = []
for result in sorted(results, key=lambda x: x['chunk_id']):
all_predictions.extend(result['predictions'])
return all_predictions
# テストデータの作成
test_data = [[random.random() * 100 for _ in range(5)] for _ in range(1000)]
# 並列処理の実行
start_time = time.time()
parallel_results = parallel_grid_processing(test_data)
parallel_time = time.time() - start_time
print(f"並列処理時間: {parallel_time:.2f}秒")
print(f"処理データ数: {len(parallel_results)}")
システム監視とアラート
class GridMonitoringSystem:
def __init__(self):
self.thresholds = {
'demand_high': 2000,
'demand_low': 500,
'voltage_high': 242,
'voltage_low': 218,
'frequency_high': 50.5,
'frequency_low': 49.5
}
self.alerts = []
self.metrics_history = []
def check_system_health(self, current_metrics):
"""システムヘルスチェック"""
alerts = []
timestamp = datetime.now()
# 需要異常チェック
if current_metrics['demand'] > self.thresholds['demand_high']:
alerts.append({
'type': 'HIGH_DEMAND',
'severity': 'WARNING',
'message': f"需要が閾値を超過: {current_metrics['demand']}MW",
'timestamp': timestamp
})
# 電圧異常チェック
if current_metrics['voltage'] < self.thresholds['voltage_low']:
alerts.append({
'type': 'LOW_VOLTAGE',
'severity': 'CRITICAL',
'message': f"電圧低下: {current_metrics['voltage']}V",
'timestamp': timestamp
})
# 周波数異常チェック
if abs(current_metrics['frequency'] - 50.0) > 0.5:
alerts.append({
'type': 'FREQUENCY_DEVIATION',
'severity': 'WARNING',
'message': f"周波数偏差: {current_metrics['frequency']}Hz",
'timestamp': timestamp
})
# アラートの記録
self.alerts.extend(alerts)
self.metrics_history.append({
'timestamp': timestamp,
'metrics': current_metrics.copy()
})
return alerts
def get_system_status(self):
"""システム状態の取得"""
if not self.metrics_history:
return {'status': 'NO_DATA'}
latest_metrics = self.metrics_history[-1]['metrics']
recent_alerts = [a for a in self.alerts
if (datetime.now() - a['timestamp']).seconds < 3600]
# 総合ステータスの判定
critical_alerts = [a for a in recent_alerts if a['severity'] == 'CRITICAL']
warning_alerts = [a for a in recent_alerts if a['severity'] == 'WARNING']
if critical_alerts:
status = 'CRITICAL'
elif warning_alerts:
status = 'WARNING'
else:
status = 'NORMAL'
return {
'status': status,
'latest_metrics': latest_metrics,
'recent_alerts_count': len(recent_alerts),
'critical_alerts_count': len(critical_alerts)
}
# 監視システムの初期化と実行
monitoring_system = GridMonitoringSystem()
# シミュレーションデータでの監視
for hour in range(24):
# システムメトリクスの生成
metrics = {
'demand': 1500 + 500 * np.sin(2 * np.pi * hour / 24) + np.random.normal(0, 100),
'voltage': 230 + np.random.normal(0, 5),
'frequency': 50.0 + np.random.normal(0, 0.1),
'renewable_output': max(0, 800 * np.sin(2 * np.pi * (hour - 6) / 12))
}
# ヘルスチェック実行
alerts = monitoring_system.check_system_health(metrics)
if alerts:
print(f"時刻{hour}: {len(alerts)}件のアラートが発生")
for alert in alerts:
print(f" - {alert['type']}: {alert['message']}")
# システム状態の確認
system_status = monitoring_system.get_system_status()
print(f"\nシステム総合状態: {system_status['status']}")
print(f"最新メトリクス: {system_status['latest_metrics']}")
まとめと今後の展望
スマートグリッドにおける機械学習の活用は、従来の電力システムを根本的に変革する技術です。本記事で紹介した主要な応用分野を以下にまとめます:
主要な成果
予測精度の向上: 需要予測や再生可能エネルギー出力予測において、従来手法比で20-30%の精度向上を実現
運用効率の最適化: 機械学習による負荷分散と蓄電池制御により、システム全体の効率を15-25%改善
異常検知の高度化: オートエンコーダやDBSCANを用いた異常検知により、システム障害の早期発見が可能
分散制御の実現: マルチエージェントシステムと強化学習により、自律的な電力システム運用を実現
技術的課題と解決方向
- リアルタイム性の確保: エッジコンピューティングと軽量化モデルによる低遅延処理
- セキュリティとプライバシー: 連合学習とブロックチェーンによる安全な分散処理
- 相互運用性: 標準化されたAPIとプロトコルによるシステム間連携
- スケーラビリティ: 分散処理アーキテクチャによる大規模システム対応
将来の発展方向
量子機械学習の導入: 量子コンピューティングによる最適化問題の高速解法
AIoT統合: AIとIoTの統合による、より細粒度な制御と監視
カーボンニュートラル: AI駆動型エネルギー管理による脱炭素社会の実現
レジリエンス向上: 機械学習による災害予測と自動復旧システム
スマートグリッドと機械学習の融合は、持続可能なエネルギー社会の実現に向けた重要な技術基盤となります。継続的な研究開発と実証実験を通じて、より効率的で信頼性の高い電力システムの構築が期待されます。
■テックジム「AIエンジニア養成コース」
■プロンプトだけでオリジナルアプリを開発・公開してみた!!
■AI時代の第一歩!「AI駆動開発コース」はじめました!
テックジム東京本校で先行開始。
■テックジム東京本校
「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。
<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。
<オンライン無料>ゼロから始めるPython爆速講座