PythonでWeb画像ダウンロード!個別・一括取得の完全ガイド

 

PythonでWebサイトから画像やファイルをダウンロードする処理は、データ収集や自動化で重要なスキルです。本記事では、個別ダウンロードから一括取得まで、実践的なサンプルコード付きで詳しく解説します。

基本的な単一ファイルダウンロード

requestsライブラリを使用

import requests

# 単一画像のダウンロード
def download_image(url, filename):
    response = requests.get(url)
    with open(filename, 'wb') as f:
        f.write(response.content)
    print(f'ダウンロード完了: {filename}')

# 使用例
url = 'https://example.com/image.jpg'
download_image(url, 'downloaded_image.jpg')

エラーハンドリング付きダウンロード

import requests

def safe_download(url, filename):
    try:
        response = requests.get(url)
        response.raise_for_status()  # HTTPエラーをチェック
        
        with open(filename, 'wb') as f:
            f.write(response.content)
        print(f'ダウンロード成功: {filename}')
        return True
    except requests.exceptions.RequestException as e:
        print(f'ダウンロードエラー: {e}')
        return False

# 使用例
safe_download('https://example.com/image.jpg', 'safe_image.jpg')

進歩的なダウンロード(進捗表示付き)

チャンク単位でのダウンロード

import requests
from pathlib import Path

def download_with_progress(url, filename):
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    
    with open(filename, 'wb') as f:
        downloaded = 0
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
            downloaded += len(chunk)
            if total_size > 0:
                percent = (downloaded / total_size) * 100
                print(f'\r進捗: {percent:.1f}%', end='')
    
    print(f'\nダウンロード完了: {filename}')

# 使用例
download_with_progress('https://example.com/large_image.jpg', 'large_image.jpg')

tqdmを使った視覚的進捗表示

import requests
from tqdm import tqdm

def download_with_tqdm(url, filename):
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    
    with open(filename, 'wb') as f, tqdm(
        desc=filename,
        total=total_size,
        unit='B',
        unit_scale=True
    ) as pbar:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
            pbar.update(len(chunk))

# 使用例
download_with_tqdm('https://example.com/large.jpg', 'large.jpg')

複数ファイルの一括ダウンロード

URLリストからの一括取得

import requests
from pathlib import Path
import time

def batch_download(urls, download_dir='downloads'):
    # ダウンロードディレクトリ作成
    Path(download_dir).mkdir(exist_ok=True)
    
    for i, url in enumerate(urls):
        try:
            filename = f'image_{i:03d}.jpg'
            filepath = Path(download_dir) / filename
            
            response = requests.get(url)
            response.raise_for_status()
            
            with open(filepath, 'wb') as f:
                f.write(response.content)
            
            print(f'ダウンロード完了 ({i+1}/{len(urls)}): {filename}')
            time.sleep(1)  # サーバー負荷軽減のための待機
            
        except Exception as e:
            print(f'エラー URL {url}: {e}')

# 使用例
urls = [
    'https://example.com/image1.jpg',
    'https://example.com/image2.jpg',
    'https://example.com/image3.jpg'
]
batch_download(urls)

並列ダウンロード(ThreadPoolExecutor)

import requests
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

def parallel_download_worker(url, filepath):
    try:
        response = requests.get(url)
        response.raise_for_status()
        
        with open(filepath, 'wb') as f:
            f.write(response.content)
        
        return f'成功: {filepath.name}'
    except Exception as e:
        return f'失敗: {filepath.name} - {e}'

def parallel_batch_download(urls, download_dir='parallel_downloads', max_workers=5):
    Path(download_dir).mkdir(exist_ok=True)
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for i, url in enumerate(urls):
            filename = f'parallel_{i:03d}.jpg'
            filepath = Path(download_dir) / filename
            future = executor.submit(parallel_download_worker, url, filepath)
            futures.append(future)
        
        for future in as_completed(futures):
            result = future.result()
            print(result)

# 使用例
urls = ['https://example.com/img1.jpg', 'https://example.com/img2.jpg']
parallel_batch_download(urls)

URLから自動的にファイル名を取得

import requests
from pathlib import Path
from urllib.parse import urlparse, unquote
import os

def smart_download(url, download_dir='downloads'):
    Path(download_dir).mkdir(exist_ok=True)
    
    # URLからファイル名を抽出
    parsed_url = urlparse(url)
    filename = unquote(Path(parsed_url.path).name)
    
    # ファイル名が取得できない場合の処理
    if not filename or filename == '/':
        # Content-Dispositionヘッダーを確認
        head_response = requests.head(url)
        content_disp = head_response.headers.get('content-disposition', '')
        if 'filename=' in content_disp:
            filename = content_disp.split('filename=')[-1].strip('"')
        else:
            filename = 'downloaded_file'
    
    filepath = Path(download_dir) / filename
    
    try:
        response = requests.get(url)
        response.raise_for_status()
        
        with open(filepath, 'wb') as f:
            f.write(response.content)
        
        print(f'ダウンロード完了: {filepath}')
        return filepath
    except Exception as e:
        print(f'ダウンロードエラー: {e}')
        return None

# 使用例
smart_download('https://example.com/photos/sunset.jpg')

Webページから画像URLを抽出してダウンロード

BeautifulSoupを使用したスクレイピング

import requests
from bs4 import BeautifulSoup
from pathlib import Path
from urllib.parse import urljoin, urlparse
import time

def scrape_and_download_images(webpage_url, download_dir='scraped_images', 
                             img_extensions=('.jpg', '.jpeg', '.png', '.gif', '.webp')):
    Path(download_dir).mkdir(exist_ok=True)
    
    try:
        # Webページを取得
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        response = requests.get(webpage_url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # 画像タグを取得
        img_tags = soup.find_all('img')
        downloaded_count = 0
        
        for i, img in enumerate(img_tags):
            img_url = img.get('src') or img.get('data-src')  # lazy loadingも考慮
            if not img_url:
                continue
            
            # 相対URLを絶対URLに変換
            img_url = urljoin(webpage_url, img_url)
            
            # 画像ファイル拡張子チェック
            parsed_url = urlparse(img_url)
            if not any(parsed_url.path.lower().endswith(ext) for ext in img_extensions):
                continue
            
            try:
                # ファイル名生成
                original_name = Path(parsed_url.path).name
                filename = f'scraped_{i:03d}_{original_name}' if original_name else f'scraped_{i:03d}.jpg'
                filepath = Path(download_dir) / filename
                
                # 画像ダウンロード
                img_response = requests.get(img_url, headers=headers)
                img_response.raise_for_status()
                
                with open(filepath, 'wb') as f:
                    f.write(img_response.content)
                
                print(f'画像ダウンロード完了: {filename}')
                downloaded_count += 1
                time.sleep(0.5)  # 負荷軽減
                
            except Exception as e:
                print(f'画像ダウンロードエラー {img_url}: {e}')
        
        print(f'合計 {downloaded_count} 個の画像をダウンロードしました')
        return downloaded_count
        
    except Exception as e:
        print(f'Webページ取得エラー: {e}')
        return 0

# 使用例
scrape_and_download_images('https://example.com/gallery')

非同期ダウンロードで高速化

asyncioとaioHttpを使用

import asyncio
import aiohttp
from pathlib import Path
import aiofiles

async def async_download_single(session, url, filepath):
    try:
        async with session.get(url) as response:
            if response.status == 200:
                async with aiofiles.open(filepath, 'wb') as f:
                    async for chunk in response.content.iter_chunked(8192):
                        await f.write(chunk)
                print(f'✓ 非同期ダウンロード完了: {filepath.name}')
                return True
            else:
                print(f'✗ HTTPエラー {response.status}: {url}')
                return False
    except Exception as e:
        print(f'✗ エラー {url}: {e}')
        return False

async def async_batch_download(urls, download_dir='async_downloads', max_concurrent=10):
    Path(download_dir).mkdir(exist_ok=True)
    
    connector = aiohttp.TCPConnector(limit=max_concurrent)
    timeout = aiohttp.ClientTimeout(total=30)
    
    async with aiohttp.ClientSession(
        connector=connector, 
        timeout=timeout,
        headers={'User-Agent': 'Mozilla/5.0'}
    ) as session:
        
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def download_with_semaphore(url, filepath):
            async with semaphore:
                return await async_download_single(session, url, filepath)
        
        tasks = []
        for i, url in enumerate(urls):
            filename = f'async_image_{i:03d}.jpg'
            filepath = Path(download_dir) / filename
            task = asyncio.create_task(download_with_semaphore(url, filepath))
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        success_count = sum(1 for result in results if result is True)
        print(f'非同期ダウンロード完了: {success_count}/{len(urls)} 成功')

# 使用例
async def main():
    urls = [
        'https://example.com/img1.jpg',
        'https://example.com/img2.jpg',
        'https://example.com/img3.jpg'
    ]
    await async_batch_download(urls)

# asyncio.run(main())

ファイルタイプの自動判別

Content-Typeヘッダーとmagic numberを使用

import requests
from pathlib import Path
import mimetypes

def download_with_auto_extension(url, base_filename, download_dir='downloads'):
    Path(download_dir).mkdir(exist_ok=True)
    
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        
        # Content-Typeから拡張子を決定
        content_type = response.headers.get('content-type', '').split(';')[0]
        extension = mimetypes.guess_extension(content_type)
        
        if not extension:
            # フォールバック用の拡張子マップ
            ext_map = {
                'image/jpeg': '.jpg',
                'image/png': '.png',
                'image/gif': '.gif',
                'image/webp': '.webp',
                'application/pdf': '.pdf',
                'text/plain': '.txt'
            }
            extension = ext_map.get(content_type, '.bin')
        
        filename = base_filename + extension
        filepath = Path(download_dir) / filename
        
        # ファイルシグネチャ(magic number)による追加検証
        first_chunk = next(response.iter_content(chunk_size=8192))
        
        # よく使われるファイルシグネチャ
        signatures = {
            b'\xff\xd8\xff': '.jpg',
            b'\x89PNG\r\n\x1a\n': '.png',
            b'GIF87a': '.gif',
            b'GIF89a': '.gif',
            b'RIFF': '.webp',
            b'%PDF': '.pdf'
        }
        
        for sig, ext in signatures.items():
            if first_chunk.startswith(sig):
                if not filename.endswith(ext):
                    filename = base_filename + ext
                    filepath = Path(download_dir) / filename
                break
        
        with open(filepath, 'wb') as f:
            f.write(first_chunk)
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        print(f'ダウンロード完了: {filepath} (タイプ: {content_type})')
        return filepath
        
    except Exception as e:
        print(f'ダウンロードエラー: {e}')
        return None

# 使用例
download_with_auto_extension('https://example.com/unknown', 'mystery_file')

重複ダウンロードの回避

ファイルハッシュを使った重複チェック

import requests
from pathlib import Path
import hashlib
import json

class DuplicateChecker:
    def __init__(self, hash_db_file='download_hashes.json'):
        self.hash_db_file = hash_db_file
        self.hash_db = self._load_hash_db()
    
    def _load_hash_db(self):
        try:
            with open(self.hash_db_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {}
    
    def _save_hash_db(self):
        with open(self.hash_db_file, 'w') as f:
            json.dump(self.hash_db, f, indent=2)
    
    def _calculate_hash(self, content):
        return hashlib.md5(content).hexdigest()
    
    def download_if_unique(self, url, filepath, force_redownload=False):
        filepath = Path(filepath)
        
        if filepath.exists() and not force_redownload:
            print(f'ファイルは既に存在します: {filepath}')
            return filepath
        
        try:
            response = requests.get(url)
            response.raise_for_status()
            content = response.content
            
            # コンテンツのハッシュを計算
            content_hash = self._calculate_hash(content)
            
            # 重複チェック
            if content_hash in self.hash_db and not force_redownload:
                existing_file = self.hash_db[content_hash]
                print(f'同一コンテンツが既にダウンロード済み: {existing_file}')
                return Path(existing_file)
            
            # ディレクトリが存在しない場合は作成
            filepath.parent.mkdir(parents=True, exist_ok=True)
            
            with open(filepath, 'wb') as f:
                f.write(content)
            
            # ハッシュデータベースに記録
            self.hash_db[content_hash] = str(filepath)
            self._save_hash_db()
            
            print(f'新規ダウンロード完了: {filepath}')
            return filepath
            
        except Exception as e:
            print(f'ダウンロードエラー: {e}')
            return None

# 使用例
checker = DuplicateChecker()
checker.download_if_unique('https://example.com/image.jpg', 'cache/image1.jpg')
checker.download_if_unique('https://example.com/same_image.jpg', 'cache/image2.jpg')  # 重複なら警告

セッション管理とヘッダー設定

カスタムヘッダーとCookie管理

import requests
from pathlib import Path

class AdvancedDownloader:
    def __init__(self, download_dir='downloads'):
        self.download_dir = Path(download_dir)
        self.download_dir.mkdir(exist_ok=True)
        self.session = requests.Session()
        self._setup_session()
    
    def _setup_session(self):
        # デフォルトヘッダー設定
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'ja,en-US;q=0.7,en;q=0.3',
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1'
        })
    
    def set_headers(self, headers):
        """カスタムヘッダーを設定"""
        self.session.headers.update(headers)
    
    def set_cookies(self, cookies):
        """Cookieを設定"""
        for key, value in cookies.items():
            self.session.cookies.set(key, value)
    
    def download(self, url, filename=None, referer=None):
        """高度な設定でダウンロード実行"""
        if filename is None:
            filename = Path(url).name or 'downloaded_file'
        
        filepath = self.download_dir / filename
        
        # 一時的にRefererヘッダーを設定
        headers = {}
        if referer:
            headers['Referer'] = referer
        
        try:
            response = self.session.get(url, headers=headers, stream=True)
            response.raise_for_status()
            
            # ファイルサイズ取得
            file_size = int(response.headers.get('content-length', 0))
            
            with open(filepath, 'wb') as f:
                downloaded = 0
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
                    downloaded += len(chunk)
                    
                    if file_size > 0:
                        percent = (downloaded / file_size) * 100
                        print(f'\r{filename}: {percent:.1f}%', end='', flush=True)
            
            print(f'\n✓ ダウンロード完了: {filename} ({downloaded} bytes)')
            return filepath
            
        except Exception as e:
            print(f'\n✗ ダウンロードエラー: {e}')
            return None
    
    def __del__(self):
        """セッションをクローズ"""
        if hasattr(self, 'session'):
            self.session.close()

# 使用例
downloader = AdvancedDownloader('advanced_downloads')

# カスタムヘッダー設定
downloader.set_headers({
    'Authorization': 'Bearer your_token_here',
    'Custom-Header': 'custom_value'
})

# Cookie設定
downloader.set_cookies({
    'session_id': 'abc123',
    'user_pref': 'value'
})

# Referer付きダウンロード
downloader.download(
    'https://example.com/protected.jpg',
    'protected_image.jpg',
    referer='https://example.com/gallery'
)

ダウンロード統計とログ機能

詳細なログとレポート機能

import requests
from pathlib import Path
from datetime import datetime
import json
import csv
import time

class DownloadManager:
    def __init__(self, download_dir='downloads'):
        self.download_dir = Path(download_dir)
        self.download_dir.mkdir(exist_ok=True)
        self.stats = {
            'success': 0,
            'failed': 0,
            'skipped': 0,
            'total_bytes': 0,
            'start_time': None,
            'end_time': None
        }
        self.log = []
        self.session = requests.Session()
    
    def start_session(self):
        """ダウンロードセッション開始"""
        self.stats['start_time'] = datetime.now()
        print(f"ダウンロードセッション開始: {self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}")
    
    def download(self, url, filename=None, skip_if_exists=True):
        """単一ファイルのダウンロード"""
        if filename is None:
            filename = Path(url).name or f'file_{int(time.time())}'
        
        filepath = self.download_dir / filename
        
        # 既存ファイルのスキップチェック
        if skip_if_exists and filepath.exists():
            log_entry = self._create_log_entry(url, filename, 'skipped', 0, '既存ファイル')
            self.stats['skipped'] += 1
            print(f'⏭  スキップ: {filename} (既存)')
            return filepath
        
        start_time = time.time()
        
        try:
            response = self.session.get(url, stream=True)
            response.raise_for_status()
            
            file_size = int(response.headers.get('content-length', 0))
            
            with open(filepath, 'wb') as f:
                downloaded_bytes = 0
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
                    downloaded_bytes += len(chunk)
            
            download_time = time.time() - start_time
            speed = downloaded_bytes / download_time if download_time > 0 else 0
            
            log_entry = self._create_log_entry(
                url, filename, 'success', downloaded_bytes, 
                f'{speed/1024:.1f} KB/s', download_time
            )
            
            self.stats['success'] += 1
            self.stats['total_bytes'] += downloaded_bytes
            
            print(f'✓ 成功: {filename} ({self._format_bytes(downloaded_bytes)}, {speed/1024:.1f} KB/s)')
            return filepath
            
        except Exception as e:
            download_time = time.time() - start_time
            log_entry = self._create_log_entry(
                url, filename, 'failed', 0, str(e), download_time
            )
            
            self.stats['failed'] += 1
            print(f'✗ 失敗: {filename} - {e}')
            return None
        
        finally:
            self.log.append(log_entry)
    
    def _create_log_entry(self, url, filename, status, size, note='', duration=0):
        """ログエントリの作成"""
        return {
            'timestamp': datetime.now().isoformat(),
            'url': url,
            'filename': filename,
            'status': status,
            'size': size,
            'duration': round(duration, 2),
            'note': note
        }
    
    def batch_download(self, urls, delay=0.5):
        """一括ダウンロード"""
        self.start_session()
        
        for i, url in enumerate(urls, 1):
            print(f'\n[{i}/{len(urls)}] ダウンロード中...')
            self.download(url)
            
            if delay > 0 and i < len(urls):
                time.sleep(delay)
        
        self.end_session()
    
    def end_session(self):
        """ダウンロードセッション終了"""
        self.stats['end_time'] = datetime.now()
        duration = (self.stats['end_time'] - self.stats['start_time']).total_seconds()
        
        print(f"\n{'='*50}")
        print("ダウンロードセッション完了")
        print(f"{'='*50}")
        print(f"開始時刻: {self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"終了時刻: {self.stats['end_time'].strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"実行時間: {duration:.1f}秒")
        print(f"成功: {self.stats['success']} 件")
        print(f"失敗: {self.stats['failed']} 件")
        print(f"スキップ: {self.stats['skipped']} 件")
        print(f"総ダウンロード量: {self._format_bytes(self.stats['total_bytes'])}")
        
        if duration > 0:
            avg_speed = self.stats['total_bytes'] / duration
            print(f"平均速度: {avg_speed/1024:.1f} KB/s")
    
    def save_log(self, format='json', filename=None):
        """ログの保存"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        if format.lower() == 'json':
            filename = filename or f'download_log_{timestamp}.json'
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump({
                    'stats': self.stats,
                    'downloads': self.log
                }, f, ensure_ascii=False, indent=2, default=str)
        
        elif format.lower() == 'csv':
            filename = filename or f'download_log_{timestamp}.csv'
            with open(filename, 'w', newline='', encoding='utf-8') as f:
                if self.log:
                    writer = csv.DictWriter(f, fieldnames=self.log[0].keys())
                    writer.writeheader()
                    writer.writerows(self.log)
        
        print(f'ログを保存しました: {filename}')
        return filename
    
    def _format_bytes(self, bytes_size):
        """バイト数を読みやすい形式に変換"""
        for unit in ['B', 'KB', 'MB', 'GB']:
            if bytes_size < 1024.0:
                return f"{bytes_size:.1f} {unit}"
            bytes_size /= 1024.0
        return f"{bytes_size:.1f} TB"
    
    def generate_report(self):
        """詳細レポートの生成"""
        if not self.log:
            print("ダウンロードログがありません")
            return
        
        # 成功率計算
        total_attempts = len(self.log)
        success_rate = (self.stats['success'] / total_attempts) * 100 if total_attempts > 0 else 0
        
        # ファイルサイズ統計
        successful_downloads = [entry for entry in self.log if entry['status'] == 'success']
        if successful_downloads:
            file_sizes = [entry['size'] for entry in successful_downloads]
            avg_file_size = sum(file_sizes) / len(file_sizes)
            max_file_size = max(file_sizes)
            min_file_size = min(file_sizes)
        else:
            avg_file_size = max_file_size = min_file_size = 0
        
        # エラー分析
        failed_downloads = [entry for entry in self.log if entry['status'] == 'failed']
        error_types = {}
        for entry in failed_downloads:
            error = entry.get('note', 'Unknown Error')
            error_types[error] = error_types.get(error, 0) + 1
        
        print(f"\n{'='*60}")
        print("詳細レポート")
        print(f"{'='*60}")
        print(f"総試行数: {total_attempts}")
        print(f"成功率: {success_rate:.1f}%")
        print(f"平均ファイルサイズ: {self._format_bytes(avg_file_size)}")
        print(f"最大ファイルサイズ: {self._format_bytes(max_file_size)}")
        print(f"最小ファイルサイズ: {self._format_bytes(min_file_size)}")
        
        if error_types:
            print(f"\nエラー内訳:")
            for error, count in sorted(error_types.items(), key=lambda x: x[1], reverse=True):
                print(f"  {error}: {count}件")

# 使用例
dm = DownloadManager('managed_downloads')

# URLリスト
urls = [
    'https://httpbin.org/image/jpeg',
    'https://httpbin.org/image/png',
    'https://httpbin.org/status/404',  # エラーテスト用
    'https://httpbin.org/image/webp'
]

# 一括ダウンロード実行
dm.batch_download(urls, delay=1.0)

# レポート生成
dm.generate_report()

# ログ保存
dm.save_log('json')
dm.save_log('csv')

高度な機能とカスタマイゼーション

レート制限とリトライ機能

import requests
from pathlib import Path
import time
import random
from functools import wraps

def retry_on_failure(max_retries=3, backoff_factor=1.0):
    """デコレータ:失敗時のリトライ機能"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries:
                        raise e
                    
                    wait_time = backoff_factor * (2 ** attempt) + random.uniform(0, 1)
                    print(f"リトライ {attempt + 1}/{max_retries} - {wait_time:.1f}秒後に再試行")
                    time.sleep(wait_time)
            return None
        return wrapper
    return decorator

class RateLimitedDownloader:
    def __init__(self, requests_per_second=1.0, download_dir='rate_limited'):
        self.min_interval = 1.0 / requests_per_second
        self.last_request_time = 0
        self.download_dir = Path(download_dir)
        self.download_dir.mkdir(exist_ok=True)
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (compatible; RateLimitedBot/1.0)'
        })
    
    def _wait_for_rate_limit(self):
        """レート制限に基づく待機"""
        current_time = time.time()
        time_since_last = current_time - self.last_request_time
        
        if time_since_last < self.min_interval:
            wait_time = self.min_interval - time_since_last
            print(f"レート制限待機: {wait_time:.1f}秒")
            time.sleep(wait_time)
        
        self.last_request_time = time.time()
    
    @retry_on_failure(max_retries=3, backoff_factor=2.0)
    def download(self, url, filename=None):
        """レート制限とリトライ付きダウンロード"""
        self._wait_for_rate_limit()
        
        if filename is None:
            filename = Path(url).name or f'file_{int(time.time())}'
        
        filepath = self.download_dir / filename
        
        response = self.session.get(url, stream=True, timeout=30)
        response.raise_for_status()
        
        with open(filepath, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        print(f"✓ ダウンロード完了: {filename}")
        return filepath

# 使用例
downloader = RateLimitedDownloader(requests_per_second=0.5)  # 2秒に1回

プロキシサポート付きダウンローダー

import requests
from pathlib import Path
import itertools

class ProxyDownloader:
    def __init__(self, proxies=None, download_dir='proxy_downloads'):
        self.proxies = proxies or []
        self.proxy_cycle = itertools.cycle(self.proxies) if self.proxies else None
        self.download_dir = Path(download_dir)
        self.download_dir.mkdir(exist_ok=True)
        self.current_proxy = None
    
    def _get_next_proxy(self):
        """次のプロキシを取得"""
        if self.proxy_cycle:
            self.current_proxy = next(self.proxy_cycle)
            print(f"プロキシ使用: {self.current_proxy}")
            return {'http': self.current_proxy, 'https': self.current_proxy}
        return None
    
    def download_with_proxy(self, url, filename=None, max_proxy_attempts=None):
        """プロキシ経由でダウンロード"""
        if filename is None:
            filename = Path(url).name or f'proxy_file_{int(time.time())}'
        
        filepath = self.download_dir / filename
        
        if not self.proxies:
            # プロキシなしでダウンロード
            return self._direct_download(url, filepath)
        
        # プロキシを使用してダウンロード
        proxy_attempts = max_proxy_attempts or len(self.proxies)
        
        for attempt in range(proxy_attempts):
            proxy_config = self._get_next_proxy()
            
            try:
                response = requests.get(
                    url, 
                    proxies=proxy_config,
                    timeout=30,
                    stream=True
                )
                response.raise_for_status()
                
                with open(filepath, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                
                print(f"✓ プロキシ経由ダウンロード成功: {filename}")
                return filepath
                
            except Exception as e:
                print(f"✗ プロキシエラー ({self.current_proxy}): {e}")
                if attempt == proxy_attempts - 1:
                    print("すべてのプロキシで失敗、直接接続を試行")
                    return self._direct_download(url, filepath)
        
        return None
    
    def _direct_download(self, url, filepath):
        """直接ダウンロード(プロキシなし)"""
        try:
            response = requests.get(url, timeout=30, stream=True)
            response.raise_for_status()
            
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            
            print(f"✓ 直接ダウンロード成功: {filepath.name}")
            return filepath
            
        except Exception as e:
            print(f"✗ 直接ダウンロード失敗: {e}")
            return None

# 使用例
proxy_list = [
    'http://proxy1.example.com:8080',
    'http://proxy2.example.com:8080',
    'socks5://proxy3.example.com:1080'
]

downloader = ProxyDownloader(proxies=proxy_list)
downloader.download_with_proxy('https://httpbin.org/image/jpeg', 'proxy_image.jpg')

メタデータ取得とファイル検証

import requests
from pathlib import Path
import hashlib
import json
from datetime import datetime

class MetadataDownloader:
    def __init__(self, download_dir='metadata_downloads'):
        self.download_dir = Path(download_dir)
        self.download_dir.mkdir(exist_ok=True)
    
    def get_file_metadata(self, url):
        """ダウンロード前のメタデータ取得"""
        try:
            # HEADリクエストでメタデータ取得
            head_response = requests.head(url, allow_redirects=True)
            head_response.raise_for_status()
            
            metadata = {
                'url': url,
                'final_url': head_response.url,
                'status_code': head_response.status_code,
                'headers': dict(head_response.headers),
                'file_size': int(head_response.headers.get('content-length', 0)),
                'content_type': head_response.headers.get('content-type', ''),
                'last_modified': head_response.headers.get('last-modified', ''),
                'etag': head_response.headers.get('etag', ''),
                'server': head_response.headers.get('server', ''),
                'retrieved_at': datetime.now().isoformat()
            }
            
            return metadata
            
        except Exception as e:
            return {'url': url, 'error': str(e), 'retrieved_at': datetime.now().isoformat()}
    
    def download_with_verification(self, url, filename=None, expected_hash=None, 
                                 hash_algorithm='md5'):
        """ハッシュ検証付きダウンロード"""
        if filename is None:
            filename = Path(url).name or f'verified_file_{int(time.time())}'
        
        filepath = self.download_dir / filename
        metadata_file = filepath.with_suffix(filepath.suffix + '.meta.json')
        
        # メタデータ取得
        metadata = self.get_file_metadata(url)
        
        try:
            # ファイルダウンロード
            response = requests.get(url, stream=True)
            response.raise_for_status()
            
            # ハッシュ計算用オブジェクト
            hash_func = getattr(hashlib, hash_algorithm.lower())()
            downloaded_bytes = 0
            
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
                    hash_func.update(chunk)
                    downloaded_bytes += len(chunk)
            
            # ハッシュ値取得
            calculated_hash = hash_func.hexdigest()
            
            # メタデータ更新
            metadata.update({
                'downloaded_at': datetime.now().isoformat(),
                'actual_size': downloaded_bytes,
                'calculated_hash': calculated_hash,
                'hash_algorithm': hash_algorithm,
                'verification_status': 'pending'
            })
            
            # ハッシュ検証
            if expected_hash:
                if calculated_hash.lower() == expected_hash.lower():
                    metadata['verification_status'] = 'verified'
                    print(f"✓ ハッシュ検証成功: {filename}")
                else:
                    metadata['verification_status'] = 'failed'
                    print(f"✗ ハッシュ検証失敗: {filename}")
                    print(f"  期待値: {expected_hash}")
                    print(f"  実際値: {calculated_hash}")
            else:
                metadata['verification_status'] = 'no_expected_hash'
            
            # サイズ検証
            expected_size = metadata.get('file_size', 0)
            if expected_size > 0 and downloaded_bytes != expected_size:
                print(f"⚠ サイズ不一致: 期待 {expected_size}, 実際 {downloaded_bytes}")
                metadata['size_verification'] = 'failed'
            else:
                metadata['size_verification'] = 'passed'
            
            # メタデータファイル保存
            with open(metadata_file, 'w', encoding='utf-8') as f:
                json.dump(metadata, f, ensure_ascii=False, indent=2)
            
            print(f"✓ ダウンロード完了: {filename}")
            print(f"  ファイルサイズ: {downloaded_bytes} bytes")
            print(f"  {hash_algorithm.upper()}ハッシュ: {calculated_hash}")
            
            return {
                'filepath': filepath,
                'metadata_file': metadata_file,
                'metadata': metadata
            }
            
        except Exception as e:
            metadata['download_error'] = str(e)
            metadata['download_failed_at'] = datetime.now().isoformat()
            
            # エラー情報もメタデータに保存
            with open(metadata_file, 'w', encoding='utf-8') as f:
                json.dump(metadata, f, ensure_ascii=False, indent=2)
            
            print(f"✗ ダウンロードエラー: {e}")
            return None

# 使用例
downloader = MetadataDownloader()

# 事前にメタデータのみ取得
metadata = downloader.get_file_metadata('https://httpbin.org/image/jpeg')
print("メタデータ:", json.dumps(metadata, indent=2, ensure_ascii=False))

# ハッシュ検証付きダウンロード
result = downloader.download_with_verification(
    'https://httpbin.org/image/jpeg',
    'verified_image.jpg',
    hash_algorithm='sha256'
)

パフォーマンス最適化とモニタリング

リアルタイム統計表示

import requests
from pathlib import Path
import threading
import time
import queue
from datetime import datetime, timedelta

class PerformanceMonitor:
    def __init__(self, update_interval=1.0):
        self.update_interval = update_interval
        self.stats = {
            'downloads_in_progress': 0,
            'completed_downloads': 0,
            'failed_downloads': 0,
            'total_bytes_downloaded': 0,
            'current_speed': 0,
            'average_speed': 0,
            'start_time': None,
            'last_update': None
        }
        self.speed_history = []
        self.monitoring = False
        self.monitor_thread = None
        self.stats_lock = threading.Lock()
    
    def start_monitoring(self):
        """モニタリング開始"""
        self.monitoring = True
        self.stats['start_time'] = datetime.now()
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """モニタリング停止"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def _monitor_loop(self):
        """モニタリングループ"""
        while self.monitoring:
            self._update_display()
            time.sleep(self.update_interval)
    
    def _update_display(self):
        """統計表示更新"""
        with self.stats_lock:
            current_time = datetime.now()
            
            if self.stats['start_time']:
                elapsed = (current_time - self.stats['start_time']).total_seconds()
                
                if elapsed > 0:
                    self.stats['average_speed'] = self.stats['total_bytes_downloaded'] / elapsed
            
            # クリアして統計表示
            print('\033[2J\033[H', end='')  # 画面クリア
            print("="*60)
            print("📊 ダウンロード統計(リアルタイム)")
            print("="*60)
            print(f"進行中: {self.stats['downloads_in_progress']}")
            print(f"完了: {self.stats['completed_downloads']}")
            print(f"失敗: {self.stats['failed_downloads']}")
            print(f"総ダウンロード量: {self._format_bytes(self.stats['total_bytes_downloaded'])}")
            print(f"現在の速度: {self._format_bytes(self.stats['current_speed'])}/秒")
            print(f"平均速度: {self._format_bytes(self.stats['average_speed'])}/秒")
            
            if self.stats['start_time']:
                elapsed = datetime.now() - self.stats['start_time']
                print(f"経過時間: {str(elapsed).split('.')[0]}")
            
            print("="*60)
    
    def _format_bytes(self, bytes_size):
        """バイト数フォーマット"""
        for unit in ['B', 'KB', 'MB', 'GB']:
            if bytes_size < 1024.0:
                return f"{bytes_size:.1f} {unit}"
            bytes_size /= 1024.0
        return f"{bytes_size:.1f} TB"
    
    def update_stats(self, **kwargs):
        """統計更新"""
        with self.stats_lock:
            for key, value in kwargs.items():
                if key in self.stats:
                    if key == 'total_bytes_downloaded':
                        self.stats[key] += value
                    else:
                        self.stats[key] = value

class MonitoredDownloader:
    def __init__(self, download_dir='monitored_downloads'):
        self.download_dir = Path(download_dir)
        self.download_dir.mkdir(exist_ok=True)
        self.monitor = PerformanceMonitor()
        self.session = requests.Session()
    
    def start_session(self):
        """ダウンロードセッション開始"""
        self.monitor.start_monitoring()
    
    def end_session(self):
        """ダウンロードセッション終了"""
        self.monitor.stop_monitoring()
    
    def download(self, url, filename=None):
        """モニタリング付きダウンロード"""
        if filename is None:
            filename = Path(url).name or f'monitored_{int(time.time())}'
        
        filepath = self.download_dir / filename
        
        # 進行中カウント増加
        self.monitor.update_stats(downloads_in_progress=
            self.monitor.stats['downloads_in_progress'] + 1)
        
        try:
            start_time = time.time()
            response = self.session.get(url, stream=True)
            response.raise_for_status()
            
            file_size = int(response.headers.get('content-length', 0))
            downloaded_bytes = 0
            last_speed_update = start_time
            
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
                    downloaded_bytes += len(chunk)
                    
                    # 速度計算(1秒ごと)
                    current_time = time.time()
                    if current_time - last_speed_update >= 1.0:
                        speed = downloaded_bytes / (current_time - start_time)
                        self.monitor.update_stats(current_speed=speed)
                        last_speed_update = current_time
            
            # 完了統計更新
            self.monitor.update_stats(
                downloads_in_progress=self.monitor.stats['downloads_in_progress'] - 1,
                completed_downloads=self.monitor.stats['completed_downloads'] + 1,
                total_bytes_downloaded=downloaded_bytes,
                current_speed=0
            )
            
            return filepath
            
        except Exception as e:
            # エラー統計更新
            self.monitor.update_stats(
                downloads_in_progress=self.monitor.stats['downloads_in_progress'] - 1,
                failed_downloads=self.monitor.stats['failed_downloads'] + 1,
                current_speed=0
            )
            raise e

# 使用例
def demo_monitored_download():
    downloader = MonitoredDownloader()
    downloader.start_session()
    
    try:
        urls = [
            'https://httpbin.org/image/jpeg',
            'https://httpbin.org/image/png',
            'https://httpbin.org/bytes/1024000'  # 1MB のテストファイル
        ]
        
        # 並列ダウンロードのデモ
        import threading
        
        def download_worker(url):
            try:
                downloader.download(url)
            except Exception as e:
                print(f"ダウンロードエラー: {e}")
        
        threads = []
        for url in urls:
            thread = threading.Thread(target=download_worker, args=(url,))
            thread.start()
            threads.append(thread)
        
        # すべてのスレッド完了を待機
        for thread in threads:
            thread.join()
        
        time.sleep(3)  # 最終統計表示のための待機
        
    finally:
        downloader.end_session()

# demo_monitored_download()

まとめ

PythonでWebからファイルをダウンロードする方法は多岐にわたります。基本的なrequestsライブラリから始まり、用途に応じて以下の機能を組み合わせることで、プロダクションレベルのダウンロードシステムが構築できます:

主要機能一覧

  • 基本ダウンロード: requestsを使った単純なファイル取得
  • エラーハンドリング: HTTPエラーや接続エラーの適切な処理
  • 進捗表示: リアルタイムでのダウンロード進捗表示
  • 一括処理: 複数ファイルの効率的なダウンロード
  • 並列処理: ThreadPoolExecutorや非同期処理による高速化
  • メタデータ管理: ファイル情報の取得と検証
  • 重複回避: ハッシュ値による同一ファイルの検出
  • レート制限: サーバー負荷を考慮した制御
  • プロキシサポート: 企業環境での利用
  • 統計とログ: 詳細な実行記録と分析

ベストプラクティス

  1. 適切なUser-Agentの設定: Webサイトによってはbotを拒否する場合があります
  2. レート制限の実装: サーバーに負荷をかけないよう配慮
  3. エラーハンドリング: ネットワークエラーやHTTPエラーの適切な処理
  4. リソース管理: セッションやファイルハンドルの適切なクローズ
  5. 進捗表示: 大容量ファイルダウンロード時のユーザビリティ向上

これらの機能を組み合わせることで、堅牢で効率的なファイルダウンロードシステムを構築できます。用途に応じて必要な機能を選択し、段階的に実装していくことをお勧めします。

■プロンプトだけでオリジナルアプリを開発・公開してみた!!

■AI時代の第一歩!「AI駆動開発コース」はじめました!

テックジム東京本校で先行開始。

■テックジム東京本校

「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。

<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。

<月1開催>放送作家による映像ディレクター養成講座

<オンライン無料>ゼロから始めるPython爆速講座