顧客分析とセグメント別マーケティングAI|機械学習で売上最大化
概要
効果的なマーケティング戦略には、顧客の行動パターンや属性に基づいた適切なセグメンテーションが不可欠です。機械学習を活用した顧客分析により、従来の人口統計学的セグメンテーションを超えた精密な顧客理解と、パーソナライズされたマーケティング施策が実現できます。
顧客セグメンテーションの重要性
なぜ機械学習による顧客分析が必要なのか
現代のマーケティングでは、画一的なアプローチでは顧客の多様なニーズに対応できません:
- 購買行動の複雑化: オンライン・オフラインを横断した顧客行動
- 個人化の要求: パーソナライズされた体験への期待の高まり
- データの多様化: 構造化・非構造化データの統合分析
- リアルタイム対応: 動的な顧客状態の変化への即座の対応
分析対象となる顧客データ
取引データ
- 購買履歴、金額、頻度
- 商品カテゴリ、ブランド選好
- 決済方法、チャネル利用
行動データ
- Webサイト閲覧履歴
- アプリ利用状況
- メール開封・クリック率
属性データ
- 年齢、性別、居住地
- 職業、年収
- 家族構成
機械学習による顧客セグメンテーション
RFM分析とK-meansクラスタリング
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns
# サンプル顧客データの生成
def create_customer_data():
np.random.seed(42)
n_customers = 2000
# 顧客セグメントの定義
segments = ['高価値顧客', '成長顧客', '新規顧客', '離脱リスク顧客']
segment_props = [0.15, 0.35, 0.30, 0.20]
customer_segments = np.random.choice(segments, n_customers, p=segment_props)
data = []
for i, segment in enumerate(customer_segments):
if segment == '高価値顧客':
recency = np.random.exponential(30) # 最近購入
frequency = np.random.poisson(20) + 10 # 高頻度
monetary = np.random.normal(50000, 15000) # 高額
elif segment == '成長顧客':
recency = np.random.exponential(45)
frequency = np.random.poisson(8) + 5
monetary = np.random.normal(25000, 8000)
elif segment == '新規顧客':
recency = np.random.exponential(15)
frequency = np.random.poisson(2) + 1
monetary = np.random.normal(10000, 5000)
else: # 離脱リスク顧客
recency = np.random.exponential(120) + 60
frequency = np.random.poisson(3) + 1
monetary = np.random.normal(15000, 6000)
data.append({
'customer_id': f'CUST_{i:04d}',
'recency': max(1, recency), # 最後の購入からの日数
'frequency': max(1, frequency), # 購入回数
'monetary': max(100, monetary), # 累計購入金額
'true_segment': segment
})
return pd.DataFrame(data)
# RFMスコアの計算
def calculate_rfm_score(df):
df = df.copy()
# 四分位数によるスコアリング(1-4点)
df['R_score'] = pd.qcut(df['recency'].rank(method='first'), 4,
labels=[4, 3, 2, 1]) # 最近ほど高スコア
df['F_score'] = pd.qcut(df['frequency'].rank(method='first'), 4,
labels=[1, 2, 3, 4]) # 頻度が高いほど高スコア
df['M_score'] = pd.qcut(df['monetary'].rank(method='first'), 4,
labels=[1, 2, 3, 4]) # 金額が高いほど高スコア
# RFMスコアの統合
df['RFM_score'] = df['R_score'].astype(str) + \
df['F_score'].astype(str) + \
df['M_score'].astype(str)
return df
# K-meansクラスタリングの実装
def perform_clustering(df, n_clusters=4):
# 特徴量の標準化
features = ['recency', 'frequency', 'monetary']
X = df[features]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# K-meansクラスタリング
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
df['cluster'] = kmeans.fit_predict(X_scaled)
# クラスター中心の逆変換
cluster_centers = scaler.inverse_transform(kmeans.cluster_centers_)
cluster_df = pd.DataFrame(cluster_centers, columns=features)
cluster_df['cluster'] = range(n_clusters)
return df, kmeans, scaler, cluster_df
# データ生成と分析の実行
df = create_customer_data()
df = calculate_rfm_score(df)
df, kmeans_model, scaler, cluster_centers = perform_clustering(df)
# クラスター別特徴量の分析
print("クラスター別統計:")
cluster_stats = df.groupby('cluster')[['recency', 'frequency', 'monetary']].agg({
'recency': ['mean', 'std'],
'frequency': ['mean', 'std'],
'monetary': ['mean', 'std']
}).round(2)
print(cluster_stats)
# クラスターの可視化
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# RFM各軸でのクラスター分布
for i, feature in enumerate(['recency', 'frequency', 'monetary']):
sns.boxplot(data=df, x='cluster', y=feature, ax=axes[i])
axes[i].set_title(f'{feature.capitalize()} by Cluster')
plt.tight_layout()
plt.show()
高度な顧客行動予測モデル
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
# 追加の顧客行動データを生成
def add_behavioral_features(df):
np.random.seed(42)
df = df.copy()
# Webサイト行動データ
df['page_views'] = np.random.poisson(df['frequency'] * 5) + 1
df['session_duration'] = np.random.exponential(df['monetary'] / 1000) + 2
df['bounce_rate'] = np.random.beta(2, 5) # 0-1の値
# メールマーケティング反応
df['email_open_rate'] = np.random.beta(3, 7)
df['email_click_rate'] = df['email_open_rate'] * np.random.beta(2, 8)
# ソーシャルメディア活動
df['social_engagement'] = np.random.poisson(df['frequency'] / 2)
# 顧客満足度(模擬)
df['satisfaction_score'] = np.random.normal(7, 1.5)
df['satisfaction_score'] = np.clip(df['satisfaction_score'], 1, 10)
return df
# 離脱予測モデル
def create_churn_prediction_model(df):
# 離脱フラグの生成(recencyが高く、satisfactionが低い顧客)
churn_probability = (df['recency'] / df['recency'].max() * 0.6 +
(10 - df['satisfaction_score']) / 10 * 0.4)
df['will_churn'] = (np.random.random(len(df)) < churn_probability).astype(int)
# 特徴量の選択
feature_cols = ['recency', 'frequency', 'monetary', 'page_views',
'session_duration', 'bounce_rate', 'email_open_rate',
'email_click_rate', 'social_engagement', 'satisfaction_score']
X = df[feature_cols]
y = df['will_churn']
# 訓練・テストデータの分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42, stratify=y
)
# ランダムフォレストモデルの訓練
churn_model = RandomForestClassifier(n_estimators=100, random_state=42)
churn_model.fit(X_train, y_train)
# 予測と評価
y_pred = churn_model.predict(X_test)
y_pred_proba = churn_model.predict_proba(X_test)[:, 1]
print("離脱予測モデル性能:")
print(classification_report(y_test, y_pred))
# 特徴量重要度
feature_importance = pd.DataFrame({
'feature': feature_cols,
'importance': churn_model.feature_importances_
}).sort_values('importance', ascending=False)
print("\n特徴量重要度:")
print(feature_importance)
return churn_model, feature_cols, y_pred_proba
# 行動データの追加と分析
df = add_behavioral_features(df)
churn_model, feature_cols, churn_probabilities = create_churn_prediction_model(df)
パーソナライズドレコメンデーションシステム
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import TruncatedSVD
# 商品購入履歴の生成
def create_purchase_history(df):
np.random.seed(42)
# 商品カテゴリの定義
categories = ['電子機器', 'ファッション', 'ホーム&キッチン', 'スポーツ',
'本・音楽', '食品', '美容・健康', '自動車用品']
purchase_data = []
for _, customer in df.iterrows():
# 顧客の購入傾向を決定
n_purchases = int(customer['frequency'])
# クラスターに基づく商品嗜好
cluster = customer['cluster']
if cluster == 0: # 高価値顧客
preferred_categories = ['電子機器', 'ファッション', '美容・健康']
elif cluster == 1: # 成長顧客
preferred_categories = ['ホーム&キッチン', 'スポーツ', '本・音楽']
elif cluster == 2: # 新規顧客
preferred_categories = ['食品', 'ファッション', 'ホーム&キッチン']
else: # 離脱リスク顧客
preferred_categories = ['本・音楽', '食品', '自動車用品']
for _ in range(n_purchases):
# 80%の確率で好みのカテゴリから選択
if np.random.random() < 0.8:
category = np.random.choice(preferred_categories)
else:
category = np.random.choice(categories)
purchase_data.append({
'customer_id': customer['customer_id'],
'category': category,
'rating': np.random.randint(3, 6) # 3-5の評価
})
return pd.DataFrame(purchase_data)
# 協調フィルタリングレコメンデーション
class CollaborativeFilteringRecommender:
def __init__(self, n_components=50):
self.svd = TruncatedSVD(n_components=n_components, random_state=42)
self.user_item_matrix = None
self.categories = None
self.users = None
def fit(self, purchase_df):
# ユーザー-アイテム行列の作成
user_item = purchase_df.pivot_table(
index='customer_id',
columns='category',
values='rating',
fill_value=0
)
self.user_item_matrix = user_item
self.categories = user_item.columns.tolist()
self.users = user_item.index.tolist()
# SVDによる次元削減
self.svd.fit(user_item.values)
return self
def get_recommendations(self, customer_id, n_recommendations=3):
if customer_id not in self.users:
return []
user_idx = self.users.index(customer_id)
user_vector = self.user_item_matrix.iloc[user_idx].values.reshape(1, -1)
# SVD変換
user_transformed = self.svd.transform(user_vector)
# 全カテゴリの予測評価
reconstructed = self.svd.inverse_transform(user_transformed)
predicted_ratings = reconstructed[0]
# 未購入カテゴリのみを推薦対象とする
purchased_categories = self.user_item_matrix.iloc[user_idx]
unpurchased_mask = purchased_categories == 0
# 予測評価の高い順にソート
category_scores = pd.Series(predicted_ratings, index=self.categories)
recommendations = category_scores[unpurchased_mask].sort_values(ascending=False)
return recommendations.head(n_recommendations).to_dict()
# レコメンデーションシステムの実装
purchase_df = create_purchase_history(df)
recommender = CollaborativeFilteringRecommender()
recommender.fit(purchase_df)
# サンプル顧客への推薦
sample_customer = df.iloc[0]['customer_id']
recommendations = recommender.get_recommendations(sample_customer)
print(f"顧客 {sample_customer} への推薦:")
for category, score in recommendations.items():
print(f" {category}: {score:.3f}")
セグメント別マーケティング戦略の自動生成
class MarketingStrategyGenerator:
def __init__(self, customer_df, churn_model, recommender):
self.customer_df = customer_df
self.churn_model = churn_model
self.recommender = recommender
def analyze_segment(self, cluster_id):
"""
特定のクラスターの特徴を分析
"""
segment_data = self.customer_df[self.customer_df['cluster'] == cluster_id]
analysis = {
'size': len(segment_data),
'avg_recency': segment_data['recency'].mean(),
'avg_frequency': segment_data['frequency'].mean(),
'avg_monetary': segment_data['monetary'].mean(),
'avg_satisfaction': segment_data['satisfaction_score'].mean(),
'churn_risk': segment_data['will_churn'].mean(),
'email_engagement': segment_data['email_open_rate'].mean(),
'web_engagement': segment_data['page_views'].mean()
}
return analysis
def generate_strategy(self, cluster_id):
"""
クラスター特性に基づくマーケティング戦略を生成
"""
analysis = self.analyze_segment(cluster_id)
# 戦略の基本テンプレート
strategy = {
'cluster_id': cluster_id,
'segment_size': analysis['size'],
'priority_level': self._calculate_priority(analysis),
'communication_channel': self._recommend_channel(analysis),
'campaign_type': self._recommend_campaign_type(analysis),
'messaging_tone': self._recommend_messaging(analysis),
'offer_type': self._recommend_offer(analysis),
'frequency': self._recommend_frequency(analysis)
}
return strategy
def _calculate_priority(self, analysis):
"""
セグメントの優先度を計算
"""
# 高価値・低離脱リスクが最優先
value_score = analysis['avg_monetary'] / 10000 # 正規化
retention_score = 1 - analysis['churn_risk']
size_score = analysis['size'] / 1000 # 正規化
priority_score = value_score * 0.5 + retention_score * 0.3 + size_score * 0.2
if priority_score > 0.7:
return "高"
elif priority_score > 0.4:
return "中"
else:
return "低"
def _recommend_channel(self, analysis):
"""
最適なコミュニケーションチャネルを推薦
"""
if analysis['email_engagement'] > 0.3:
return "メール"
elif analysis['web_engagement'] > 15:
return "Webプッシュ通知"
else:
return "SMS"
def _recommend_campaign_type(self, analysis):
"""
キャンペーンタイプを推薦
"""
if analysis['churn_risk'] > 0.6:
return "離脱防止キャンペーン"
elif analysis['avg_frequency'] < 3:
return "リピート促進キャンペーン"
elif analysis['avg_monetary'] > 30000:
return "VIP向け特別オファー"
else:
return "クロスセル・アップセル"
def _recommend_messaging(self, analysis):
"""
メッセージングトーンを推薦
"""
if analysis['avg_monetary'] > 30000:
return "プレミアム・特別感重視"
elif analysis['avg_satisfaction'] < 6:
return "親しみやすい・サポート重視"
else:
return "価値提案・メリット重視"
def _recommend_offer(self, analysis):
"""
オファータイプを推薦
"""
if analysis['churn_risk'] > 0.6:
return "大幅割引(20-30%)"
elif analysis['avg_frequency'] < 3:
return "送料無料・ポイント2倍"
elif analysis['avg_monetary'] > 30000:
return "限定商品・先行販売"
else:
return "まとめ買い割引"
def _recommend_frequency(self, analysis):
"""
コミュニケーション頻度を推薦
"""
if analysis['email_engagement'] > 0.4:
return "週2回"
elif analysis['email_engagement'] > 0.2:
return "週1回"
else:
return "月2回"
def create_campaign_plan(self):
"""
全クラスター向けの包括的キャンペーン計画を作成
"""
campaign_plan = {}
for cluster_id in self.customer_df['cluster'].unique():
strategy = self.generate_strategy(cluster_id)
# 具体的なアクションプランを追加
action_plan = self._create_action_plan(cluster_id, strategy)
strategy['action_plan'] = action_plan
campaign_plan[f'Cluster_{cluster_id}'] = strategy
return campaign_plan
def _create_action_plan(self, cluster_id, strategy):
"""
具体的なアクションプランを作成
"""
segment_customers = self.customer_df[self.customer_df['cluster'] == cluster_id]
actions = []
# 高リスク顧客への個別アプローチ
high_risk_customers = segment_customers[segment_customers['will_churn'] == 1]
if len(high_risk_customers) > 0:
actions.append({
'action': '個別フォローアップ',
'target_count': len(high_risk_customers),
'timeline': '1週間以内',
'responsible': 'カスタマーサクセス'
})
# 推薦商品キャンペーン
actions.append({
'action': 'パーソナライズド商品推薦',
'target_count': len(segment_customers),
'timeline': '2週間以内',
'responsible': 'マーケティング'
})
# セグメント別コンテンツ作成
actions.append({
'action': 'セグメント特化コンテンツ作成',
'target_count': 1,
'timeline': '1ヶ月以内',
'responsible': 'コンテンツチーム'
})
return actions
# マーケティング戦略の生成
strategy_generator = MarketingStrategyGenerator(df, churn_model, recommender)
campaign_plan = strategy_generator.create_campaign_plan()
# 結果の表示
for cluster_name, strategy in campaign_plan.items():
print(f"\n=== {cluster_name} マーケティング戦略 ===")
print(f"セグメントサイズ: {strategy['segment_size']}人")
print(f"優先度: {strategy['priority_level']}")
print(f"推奨チャネル: {strategy['communication_channel']}")
print(f"キャンペーンタイプ: {strategy['campaign_type']}")
print(f"メッセージング: {strategy['messaging_tone']}")
print(f"オファー: {strategy['offer_type']}")
print(f"配信頻度: {strategy['frequency']}")
print("\nアクションプラン:")
for action in strategy['action_plan']:
print(f" - {action['action']} ({action['target_count']}件, {action['timeline']})")
ROI測定とA/Bテストフレームワーク
class MarketingROIAnalyzer:
def __init__(self):
self.campaign_results = []
def simulate_campaign_results(self, campaign_plan, customer_df):
"""
キャンペーン結果をシミュレート
"""
results = {}
for cluster_name, strategy in campaign_plan.items():
cluster_id = int(cluster_name.split('_')[1])
segment_customers = customer_df[customer_df['cluster'] == cluster_id]
# キャンペーン効果のシミュレート
base_conversion = 0.1 # ベース転換率
# 戦略による効果補正
if strategy['priority_level'] == '高':
conversion_boost = 0.05
elif strategy['priority_level'] == '中':
conversion_boost = 0.03
else:
conversion_boost = 0.01
# チャネル効果
if strategy['communication_channel'] == 'メール':
channel_boost = 0.02
elif strategy['communication_channel'] == 'Webプッシュ通知':
channel_boost = 0.01
else:
channel_boost = 0.005
final_conversion = base_conversion + conversion_boost + channel_boost
# 結果計算
n_targeted = len(segment_customers)
n_conversions = int(n_targeted * final_conversion)
avg_order_value = segment_customers['monetary'].mean() / segment_customers['frequency'].mean()
# コスト計算(簡略化)
cost_per_contact = {'メール': 50, 'Webプッシュ通知': 30, 'SMS': 100}
campaign_cost = n_targeted * cost_per_contact[strategy['communication_channel']]
revenue = n_conversions * avg_order_value
roi = ((revenue - campaign_cost) / campaign_cost) * 100 if campaign_cost > 0 else 0
results[cluster_name] = {
'targeted_customers': n_targeted,
'conversions': n_conversions,
'conversion_rate': final_conversion,
'revenue': revenue,
'cost': campaign_cost,
'roi': roi,
'avg_order_value': avg_order_value
}
return results
def generate_roi_report(self, results):
"""
ROIレポートの生成
"""
total_revenue = sum([r['revenue'] for r in results.values()])
total_cost = sum([r['cost'] for r in results.values()])
total_roi = ((total_revenue - total_cost) / total_cost) * 100
print("=== マーケティングROI レポート ===\n")
for cluster_name, result in results.items():
print(f"{cluster_name}:")
print(f" ターゲット顧客: {result['targeted_customers']:,}人")
print(f" 転換数: {result['conversions']:,}件")
print(f" 転換率: {result['conversion_rate']:.1%}")
print(f" 売上: ¥{result['revenue']:,.0f}")
print(f" コスト: ¥{result['cost']:,.0f}")
print(f" ROI: {result['roi']:.1f}%")
print()
print(f"全体ROI: {total_roi:.1f}%")
print(f"総売上: ¥{total_revenue:,.0f}")
print(f"総コスト: ¥{total_cost:,.0f}")
return {
'total_roi': total_roi,
'total_revenue': total_revenue,
'total_cost': total_cost,
'segment_results': results
}
# ROI分析の実行
roi_analyzer = MarketingROIAnalyzer()
campaign_results = roi_analyzer.simulate_campaign_results(campaign_plan, df)
roi_report = roi_analyzer.generate_roi_report(campaign_results)
継続的改善のためのモニタリングシステム
class CustomerSegmentMonitor:
def __init__(self, initial_model, initial_scaler):
self.model = initial_model
self.scaler = initial_scaler
self.performance_history = []
def update_customer_data(self, new_customer_data):
"""
新しい顧客データでセグメントを更新
"""
# 新データの前処理
features = ['recency', 'frequency', 'monetary']
new_data_scaled = self.scaler.transform(new_customer_data[features])
# 新しいクラスター予測
new_clusters = self.model.predict(new_data_scaled)
new_customer_data['cluster'] = new_clusters
return new_customer_data
def monitor_segment_drift(self, current_data, historical_data):
"""
セグメント分布の変化を監視
"""
current_dist = current_data['cluster'].value_counts(normalize=True).sort_index()
historical_dist = historical_data['cluster'].value_counts(normalize=True).sort_index()
# カイ二乗検定による分布変化の検定
from scipy.stats import chisquare
# 期待頻度の計算
expected = historical_dist * len(current_data)
observed = current_dist * len(current_data)
chi2_stat, p_value = chisquare(observed, expected)
drift_detected = p_value < 0.05
return {
'drift_detected': drift_detected,
'p_value': p_value,
'current_distribution': current_dist.to_dict(),
'historical_distribution': historical_dist.to_dict()
}
def recommend_retraining(self, drift_results, performance_threshold=0.8):
"""
モデル再訓練の推奨判定
"""
if drift_results['drift_detected']:
return True, "セグメント分布に有意な変化が検出されました"
if len(self.performance_history) > 0:
recent_performance = np.mean(self.performance_history[-5:])
if recent_performance < performance_threshold:
return True, f"予測性能が閾値({performance_threshold})を下回りました"
return False, "再訓練は不要です"
# モニタリングシステムの使用例
monitor = CustomerSegmentMonitor(kmeans_model, scaler)
# 新しい顧客データのシミュレート(1ヶ月後)
new_customer_data = create_customer_data()
new_customer_data = add_behavioral_features(new_customer_data)
# セグメント更新
updated_segments = monitor.update_customer_data(new_customer_data)
# ドリフト検出
drift_results = monitor.monitor_segment_drift(updated_segments, df)
should_retrain, reason = monitor.recommend_retraining(drift_results)
print(f"セグメントドリフト検出: {drift_results['drift_detected']}")
print(f"再訓練推奨: {should_retrain} - {reason}")
まとめ
機械学習を活用した顧客分析とセグメンテーションにより、従来のマーケティングアプローチを大幅に改善できます。RFM分析からクラスタリング、離脱予測、パーソナライズドレコメンデーションまで、包括的な顧客理解に基づく戦略的マーケティングが実現可能です。
継続的なモニタリングとモデル更新により、変化する顧客行動に適応し、ROIの最大化を図ることが重要です。データドリブンなアプローチで、より効果的な顧客エンゲージメントと収益向上を実現できます。
■テックジム「AIエンジニア養成コース」
■プロンプトだけでオリジナルアプリを開発・公開してみた!!
■AI時代の第一歩!「AI駆動開発コース」はじめました!
テックジム東京本校で先行開始。
■テックジム東京本校
「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。
<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。
<月1開催>放送作家による映像ディレクター養成講座
<オンライン無料>ゼロから始めるPython爆速講座