センサーデータによる電力需要予測AI|スマートグリッド機械学習完全ガイド

 

概要

電力需要の正確な予測は、電力の安定供給とエネルギー効率の最適化に不可欠です。IoTセンサーから収集される多様なデータを機械学習で分析することで、従来手法を上回る高精度な電力需要予測が実現できます。本記事では、センサーデータを活用した電力需要予測システムの構築方法を詳しく解説します。

電力需要予測の重要性

なぜセンサーデータが重要なのか

電力需要は多様な要因で変動するため、包括的なデータ収集が必要です:

  • 気象条件: 温度、湿度、日照量による空調・照明需要の変化
  • 人流データ: オフィスビル、商業施設の利用状況
  • 設備稼働状況: 製造業の生産ラインや大型機器の運転状態
  • 生活パターン: 住宅地域の電力使用パターン

活用するセンサーデータ

  1. 気象センサー

    • 外気温、湿度、気圧
    • 日射量、風速、雲量
    • 降水量
  2. 人流センサー

    • 建物への入退室データ
    • 交通量データ
    • GPS位置情報
  3. 設備センサー

    • スマートメーター
    • 機器稼働センサー
    • 温度・振動センサー

電力需要予測システムの実装

データ前処理とモデル構築

import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

# サンプルセンサーデータの生成
def create_sensor_data():
    np.random.seed(42)
    dates = pd.date_range('2023-01-01', '2023-12-31', freq='H')
    n_hours = len(dates)
    
    # 時間特徴量
    hours = dates.hour
    days = dates.dayofweek
    months = dates.month
    
    # 基準需要パターン(時間、曜日、季節による変動)
    hourly_pattern = 50 + 30 * np.sin(2 * np.pi * hours / 24) + \
                     20 * np.cos(2 * np.pi * hours / 24)
    
    seasonal_pattern = 20 * np.sin(2 * np.pi * np.arange(n_hours) / (24*365))
    
    # 平日・休日パターン
    weekday_effect = np.where(days < 5, 20, -15)
    
    base_demand = 1000 + hourly_pattern + seasonal_pattern + weekday_effect
    
    data = {
        'datetime': dates,
        'temperature': 20 + 15 * np.sin(2 * np.pi * np.arange(n_hours) / (24*365)) + \
                      np.random.normal(0, 3, n_hours),
        'humidity': 50 + 20 * np.sin(2 * np.pi * np.arange(n_hours) / (24*180)) + \
                   np.random.normal(0, 5, n_hours),
        'solar_radiation': np.maximum(0, 500 + 400 * np.sin(2 * np.pi * hours / 24) + \
                                     np.random.normal(0, 50, n_hours)),
        'occupancy_rate': 0.3 + 0.5 * (hours >= 9) * (hours <= 18) * (days < 5) + \
                         np.random.normal(0, 0.1, n_hours),
        'equipment_load': 200 + 150 * (days < 5) * (hours >= 8) * (hours <= 17) + \
                         np.random.normal(0, 20, n_hours),
        'power_demand': base_demand + np.random.normal(0, 50, n_hours)
    }
    
    df = pd.DataFrame(data)
    
    # 気温と需要の関係を強化(冷暖房負荷)
    cooling_load = np.maximum(0, (df['temperature'] - 25) * 15)
    heating_load = np.maximum(0, (18 - df['temperature']) * 10)
    df['power_demand'] += cooling_load + heating_load
    
    # 日射量と需要の逆相関(太陽光発電効果)
    df['power_demand'] -= df['solar_radiation'] * 0.3
    
    # 占有率と設備負荷の影響
    df['power_demand'] += df['occupancy_rate'] * 200 + df['equipment_load'] * 0.5
    
    return df

# 特徴量エンジニアリング
def create_time_features(df):
    df = df.copy()
    df['hour'] = df['datetime'].dt.hour
    df['day_of_week'] = df['datetime'].dt.dayofweek
    df['month'] = df['datetime'].dt.month
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
    df['is_business_hour'] = ((df['hour'] >= 8) & (df['hour'] <= 18) & 
                              (df['day_of_week'] < 5)).astype(int)
    
    # 周期的特徴量
    df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
    df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
    df['day_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
    df['day_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
    
    # ラグ特徴量
    df['demand_lag_1h'] = df['power_demand'].shift(1)
    df['demand_lag_24h'] = df['power_demand'].shift(24)
    df['demand_lag_168h'] = df['power_demand'].shift(168)  # 1週間前
    
    # 移動平均
    df['demand_ma_24h'] = df['power_demand'].rolling(24).mean()
    df['temp_ma_3h'] = df['temperature'].rolling(3).mean()
    
    return df.dropna()

df = create_sensor_data()
df = create_time_features(df)

LSTM(Long Short-Term Memory)モデルの実装

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.preprocessing import MinMaxScaler

# LSTM用のデータ準備
def prepare_lstm_data(df, sequence_length=24):
    """
    時系列データをLSTM用の形式に変換
    """
    feature_cols = ['temperature', 'humidity', 'solar_radiation', 
                   'occupancy_rate', 'equipment_load', 'hour_sin', 'hour_cos',
                   'day_sin', 'day_cos', 'is_weekend', 'is_business_hour']
    
    # データの正規化
    scaler_X = MinMaxScaler()
    scaler_y = MinMaxScaler()
    
    X_scaled = scaler_X.fit_transform(df[feature_cols])
    y_scaled = scaler_y.fit_transform(df[['power_demand']])
    
    X, y = [], []
    for i in range(sequence_length, len(X_scaled)):
        X.append(X_scaled[i-sequence_length:i])
        y.append(y_scaled[i])
    
    return np.array(X), np.array(y), scaler_X, scaler_y

# LSTMモデルの構築
def build_lstm_model(input_shape):
    model = Sequential([
        LSTM(50, return_sequences=True, input_shape=input_shape),
        Dropout(0.2),
        LSTM(50, return_sequences=False),
        Dropout(0.2),
        Dense(25),
        Dense(1)
    ])
    
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    return model

# LSTM訓練の実行
X, y, scaler_X, scaler_y = prepare_lstm_data(df)

# 訓練・テストデータの分割(時系列なので最後の20%をテスト用)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

# モデル訓練
lstm_model = build_lstm_model((X.shape[1], X.shape[2]))
history = lstm_model.fit(X_train, y_train, epochs=50, batch_size=32, 
                        validation_split=0.2, verbose=0)

# 予測と評価
y_pred_lstm = lstm_model.predict(X_test)
y_pred_lstm = scaler_y.inverse_transform(y_pred_lstm)
y_test_actual = scaler_y.inverse_transform(y_test)

lstm_mae = mean_absolute_error(y_test_actual, y_pred_lstm)
print(f"LSTM MAE: {lstm_mae:.2f} kW")

# ランダムフォレストとの比較
feature_cols_rf = ['temperature', 'humidity', 'solar_radiation', 'occupancy_rate',
                   'equipment_load', 'hour', 'day_of_week', 'month', 'is_weekend',
                   'is_business_hour', 'hour_sin', 'hour_cos', 'day_sin', 'day_cos',
                   'demand_lag_1h', 'demand_lag_24h', 'temp_ma_3h']

X_rf = df[feature_cols_rf].iloc[168:]  # ラグ特徴量のため168時間後から開始
y_rf = df['power_demand'].iloc[168:]

# 時系列分割での交差検証
tscv = TimeSeriesSplit(n_splits=5)
rf_model = RandomForestRegressor(n_estimators=100, random_state=42)

rf_scores = []
for train_idx, test_idx in tscv.split(X_rf):
    X_train_rf, X_test_rf = X_rf.iloc[train_idx], X_rf.iloc[test_idx]
    y_train_rf, y_test_rf = y_rf.iloc[train_idx], y_rf.iloc[test_idx]
    
    rf_model.fit(X_train_rf, y_train_rf)
    y_pred_rf = rf_model.predict(X_test_rf)
    
    mae = mean_absolute_error(y_test_rf, y_pred_rf)
    rf_scores.append(mae)

print(f"Random Forest 平均 MAE: {np.mean(rf_scores):.2f} ± {np.std(rf_scores):.2f} kW")

リアルタイム需要予測システム

class PowerDemandPredictor:
    def __init__(self, lstm_model, rf_model, scalers):
        self.lstm_model = lstm_model
        self.rf_model = rf_model
        self.scaler_X, self.scaler_y = scalers
        self.sequence_data = []
    
    def update_sensor_data(self, sensor_reading):
        """
        新しいセンサーデータでシステムを更新
        """
        processed_data = self.preprocess_sensor_data(sensor_reading)
        self.sequence_data.append(processed_data)
        
        # 過去24時間分のデータを保持
        if len(self.sequence_data) > 24:
            self.sequence_data.pop(0)
    
    def preprocess_sensor_data(self, sensor_reading):
        """
        センサーデータの前処理
        """
        # 時間特徴量の計算
        current_time = sensor_reading['timestamp']
        hour = current_time.hour
        day_of_week = current_time.dayofweek
        
        processed = {
            'temperature': sensor_reading['temperature'],
            'humidity': sensor_reading['humidity'],
            'solar_radiation': sensor_reading['solar_radiation'],
            'occupancy_rate': sensor_reading['occupancy_rate'],
            'equipment_load': sensor_reading['equipment_load'],
            'hour_sin': np.sin(2 * np.pi * hour / 24),
            'hour_cos': np.cos(2 * np.pi * hour / 24),
            'day_sin': np.sin(2 * np.pi * day_of_week / 7),
            'day_cos': np.cos(2 * np.pi * day_of_week / 7),
            'is_weekend': 1 if day_of_week >= 5 else 0,
            'is_business_hour': 1 if (8 <= hour <= 18 and day_of_week < 5) else 0
        }
        
        return processed
    
    def predict_next_hour(self):
        """
        次の1時間の電力需要を予測
        """
        if len(self.sequence_data) < 24:
            return None, "十分なデータがありません"
        
        # LSTM用データ準備
        feature_cols = ['temperature', 'humidity', 'solar_radiation', 
                       'occupancy_rate', 'equipment_load', 'hour_sin', 'hour_cos',
                       'day_sin', 'day_cos', 'is_weekend', 'is_business_hour']
        
        sequence = np.array([[data[col] for col in feature_cols] 
                            for data in self.sequence_data[-24:]])
        sequence_scaled = self.scaler_X.transform(sequence)
        sequence_reshaped = sequence_scaled.reshape(1, 24, len(feature_cols))
        
        # LSTM予測
        lstm_pred_scaled = self.lstm_model.predict(sequence_reshaped, verbose=0)
        lstm_pred = self.scaler_y.inverse_transform(lstm_pred_scaled)[0][0]
        
        return lstm_pred, "予測完了"
    
    def predict_day_ahead(self, future_weather_data):
        """
        24時間先までの需要予測
        """
        predictions = []
        current_sequence = self.sequence_data[-24:].copy()
        
        for hour_ahead in range(24):
            # 将来の気象データを使用
            future_data = future_weather_data[hour_ahead].copy()
            future_processed = self.preprocess_sensor_data(future_data)
            
            # 設備負荷と占有率の予測(簡略化)
            future_processed['equipment_load'] = self.estimate_equipment_load(
                future_data['timestamp']
            )
            future_processed['occupancy_rate'] = self.estimate_occupancy(
                future_data['timestamp']
            )
            
            # LSTM予測
            sequence = np.array([[data[col] for col in 
                                ['temperature', 'humidity', 'solar_radiation', 
                                 'occupancy_rate', 'equipment_load', 'hour_sin', 'hour_cos',
                                 'day_sin', 'day_cos', 'is_weekend', 'is_business_hour']] 
                                for data in current_sequence])
            
            sequence_scaled = self.scaler_X.transform(sequence)
            sequence_reshaped = sequence_scaled.reshape(1, 24, len(sequence[0]))
            
            pred_scaled = self.lstm_model.predict(sequence_reshaped, verbose=0)
            pred = self.scaler_y.inverse_transform(pred_scaled)[0][0]
            
            predictions.append({
                'timestamp': future_data['timestamp'],
                'predicted_demand': pred
            })
            
            # 予測値を次の予測に使用
            current_sequence.append(future_processed)
            current_sequence.pop(0)
        
        return predictions
    
    def estimate_equipment_load(self, timestamp):
        """
        設備負荷の推定(簡略版)
        """
        hour = timestamp.hour
        day_of_week = timestamp.dayofweek
        
        if day_of_week < 5 and 8 <= hour <= 17:  # 平日営業時間
            return 300 + np.random.normal(0, 20)
        else:
            return 150 + np.random.normal(0, 15)
    
    def estimate_occupancy(self, timestamp):
        """
        建物占有率の推定(簡略版)
        """
        hour = timestamp.hour
        day_of_week = timestamp.dayofweek
        
        if day_of_week < 5 and 9 <= hour <= 18:  # 平日営業時間
            return 0.7 + np.random.normal(0, 0.1)
        elif day_of_week >= 5 and 10 <= hour <= 16:  # 休日
            return 0.3 + np.random.normal(0, 0.1)
        else:
            return 0.1 + np.random.normal(0, 0.05)

# 使用例
predictor = PowerDemandPredictor(lstm_model, rf_model, (scaler_X, scaler_y))

# サンプルセンサーデータ
sample_reading = {
    'timestamp': pd.Timestamp('2023-12-01 14:00:00'),
    'temperature': 22.5,
    'humidity': 55.0,
    'solar_radiation': 450.0,
    'occupancy_rate': 0.8,
    'equipment_load': 320.0
}

predictor.update_sensor_data(sample_reading)
next_hour_demand, status = predictor.predict_next_hour()

if next_hour_demand:
    print(f"次の1時間の予測需要: {next_hour_demand:.0f} kW")

異常検知システムの統合

from sklearn.ensemble import IsolationForest

class AnomalyDetector:
    def __init__(self, contamination=0.1):
        self.model = IsolationForest(contamination=contamination, random_state=42)
        self.is_fitted = False
    
    def fit(self, normal_data):
        """
        正常データでモデルを訓練
        """
        self.model.fit(normal_data)
        self.is_fitted = True
    
    def detect_anomaly(self, current_data, predicted_demand):
        """
        現在のセンサーデータと予測需要の異常を検知
        """
        if not self.is_fitted:
            return False, "モデルが訓練されていません"
        
        # 特徴量ベクトルの作成
        features = [
            current_data['temperature'],
            current_data['humidity'],
            current_data['solar_radiation'],
            current_data['occupancy_rate'],
            current_data['equipment_load'],
            predicted_demand
        ]
        
        # 異常スコアの計算
        anomaly_score = self.model.decision_function([features])[0]
        is_anomaly = self.model.predict([features])[0] == -1
        
        return is_anomaly, anomaly_score

# 異常検知の統合例
class IntegratedPowerSystem:
    def __init__(self, predictor, anomaly_detector):
        self.predictor = predictor
        self.anomaly_detector = anomaly_detector
        self.alert_threshold = -0.3
    
    def process_sensor_data(self, sensor_reading):
        """
        センサーデータの処理と監視
        """
        # 需要予測
        self.predictor.update_sensor_data(sensor_reading)
        predicted_demand, status = self.predictor.predict_next_hour()
        
        if not predicted_demand:
            return {"status": "error", "message": status}
        
        # 異常検知
        is_anomaly, anomaly_score = self.anomaly_detector.detect_anomaly(
            sensor_reading, predicted_demand
        )
        
        # アラート判定
        alert_level = "normal"
        if is_anomaly or anomaly_score < self.alert_threshold:
            alert_level = "warning"
        
        if anomaly_score < -0.5:
            alert_level = "critical"
        
        result = {
            "status": "success",
            "predicted_demand": predicted_demand,
            "is_anomaly": is_anomaly,
            "anomaly_score": anomaly_score,
            "alert_level": alert_level,
            "timestamp": sensor_reading['timestamp']
        }
        
        return result

# システムの使用例
anomaly_detector = AnomalyDetector()

# 正常データでの訓練(実際のデータを使用)
normal_features = df[['temperature', 'humidity', 'solar_radiation', 
                     'occupancy_rate', 'equipment_load', 'power_demand']].values
anomaly_detector.fit(normal_features)

integrated_system = IntegratedPowerSystem(predictor, anomaly_detector)

# リアルタイム処理
result = integrated_system.process_sensor_data(sample_reading)
print(f"予測結果: {result}")

性能向上のための最適化

ハイパーパラメータチューニング

from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import GradientBoostingRegressor

# グリッドサーチによる最適化
param_grid = {
    'n_estimators': [100, 200, 300],
    'learning_rate': [0.05, 0.1, 0.15],
    'max_depth': [3, 5, 7]
}

gb_model = GradientBoostingRegressor(random_state=42)
grid_search = GridSearchCV(gb_model, param_grid, cv=3, scoring='neg_mean_absolute_error')

# 最適パラメータでのモデル訓練
X_train_final = X_rf.iloc[:-1000]
y_train_final = y_rf.iloc[:-1000]

grid_search.fit(X_train_final, y_train_final)
best_model = grid_search.best_estimator_

print(f"最適パラメータ: {grid_search.best_params_}")
print(f"最適スコア: {-grid_search.best_score_:.2f}")

モデルアンサンブル

class EnsemblePowerPredictor:
    def __init__(self, models, weights=None):
        self.models = models
        self.weights = weights or [1.0] * len(models)
    
    def predict(self, X):
        """
        複数モデルの加重平均予測
        """
        predictions = []
        for model in self.models:
            pred = model.predict(X)
            predictions.append(pred)
        
        # 加重平均
        ensemble_pred = np.average(predictions, axis=0, weights=self.weights)
        return ensemble_pred

# アンサンブルモデルの作成
ensemble_predictor = EnsemblePowerPredictor([
    rf_model,
    best_model
], weights=[0.4, 0.6])

# アンサンブル予測の評価
X_test_final = X_rf.iloc[-1000:]
y_test_final = y_rf.iloc[-1000:]

ensemble_pred = ensemble_predictor.predict(X_test_final)
ensemble_mae = mean_absolute_error(y_test_final, ensemble_pred)

print(f"アンサンブル MAE: {ensemble_mae:.2f} kW")

まとめ

センサーデータを活用した電力需要予測システムは、従来の統計的手法を大幅に上回る精度を実現できます。LSTMやランダムフォレストなどの機械学習手法を組み合わせることで、複雑な需要パターンを正確に捉えることが可能です。

リアルタイム予測と異常検知を統合したシステムにより、電力供給の最適化とリスク管理を同時に実現できます。継続的なデータ収集とモデル改善により、スマートグリッドの中核技術として活用が期待されます。

■テックジム「AIエンジニア養成コース」

■プロンプトだけでオリジナルアプリを開発・公開してみた!!

■AI時代の第一歩!「AI駆動開発コース」はじめました!

テックジム東京本校で先行開始。

■テックジム東京本校

「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。

<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。

<月1開催>放送作家による映像ディレクター養成講座

<オンライン無料>ゼロから始めるPython爆速講座