PythonでWeb画像ダウンロード!個別・一括取得の完全ガイド
PythonでWebサイトから画像やファイルをダウンロードする処理は、データ収集や自動化で重要なスキルです。本記事では、個別ダウンロードから一括取得まで、実践的なサンプルコード付きで詳しく解説します。
基本的な単一ファイルダウンロード
requestsライブラリを使用
import requests
# 単一画像のダウンロード
def download_image(url, filename):
response = requests.get(url)
with open(filename, 'wb') as f:
f.write(response.content)
print(f'ダウンロード完了: {filename}')
# 使用例
url = 'https://example.com/image.jpg'
download_image(url, 'downloaded_image.jpg')
エラーハンドリング付きダウンロード
import requests
def safe_download(url, filename):
try:
response = requests.get(url)
response.raise_for_status() # HTTPエラーをチェック
with open(filename, 'wb') as f:
f.write(response.content)
print(f'ダウンロード成功: {filename}')
return True
except requests.exceptions.RequestException as e:
print(f'ダウンロードエラー: {e}')
return False
# 使用例
safe_download('https://example.com/image.jpg', 'safe_image.jpg')
進歩的なダウンロード(進捗表示付き)
チャンク単位でのダウンロード
import requests
from pathlib import Path
def download_with_progress(url, filename):
response = requests.get(url, stream=True)
total_size = int(response.headers.get('content-length', 0))
with open(filename, 'wb') as f:
downloaded = 0
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f'\r進捗: {percent:.1f}%', end='')
print(f'\nダウンロード完了: {filename}')
# 使用例
download_with_progress('https://example.com/large_image.jpg', 'large_image.jpg')
tqdmを使った視覚的進捗表示
import requests
from tqdm import tqdm
def download_with_tqdm(url, filename):
response = requests.get(url, stream=True)
total_size = int(response.headers.get('content-length', 0))
with open(filename, 'wb') as f, tqdm(
desc=filename,
total=total_size,
unit='B',
unit_scale=True
) as pbar:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
pbar.update(len(chunk))
# 使用例
download_with_tqdm('https://example.com/large.jpg', 'large.jpg')
複数ファイルの一括ダウンロード
URLリストからの一括取得
import requests
from pathlib import Path
import time
def batch_download(urls, download_dir='downloads'):
# ダウンロードディレクトリ作成
Path(download_dir).mkdir(exist_ok=True)
for i, url in enumerate(urls):
try:
filename = f'image_{i:03d}.jpg'
filepath = Path(download_dir) / filename
response = requests.get(url)
response.raise_for_status()
with open(filepath, 'wb') as f:
f.write(response.content)
print(f'ダウンロード完了 ({i+1}/{len(urls)}): {filename}')
time.sleep(1) # サーバー負荷軽減のための待機
except Exception as e:
print(f'エラー URL {url}: {e}')
# 使用例
urls = [
'https://example.com/image1.jpg',
'https://example.com/image2.jpg',
'https://example.com/image3.jpg'
]
batch_download(urls)
並列ダウンロード(ThreadPoolExecutor)
import requests
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
def parallel_download_worker(url, filepath):
try:
response = requests.get(url)
response.raise_for_status()
with open(filepath, 'wb') as f:
f.write(response.content)
return f'成功: {filepath.name}'
except Exception as e:
return f'失敗: {filepath.name} - {e}'
def parallel_batch_download(urls, download_dir='parallel_downloads', max_workers=5):
Path(download_dir).mkdir(exist_ok=True)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i, url in enumerate(urls):
filename = f'parallel_{i:03d}.jpg'
filepath = Path(download_dir) / filename
future = executor.submit(parallel_download_worker, url, filepath)
futures.append(future)
for future in as_completed(futures):
result = future.result()
print(result)
# 使用例
urls = ['https://example.com/img1.jpg', 'https://example.com/img2.jpg']
parallel_batch_download(urls)
URLから自動的にファイル名を取得
import requests
from pathlib import Path
from urllib.parse import urlparse, unquote
import os
def smart_download(url, download_dir='downloads'):
Path(download_dir).mkdir(exist_ok=True)
# URLからファイル名を抽出
parsed_url = urlparse(url)
filename = unquote(Path(parsed_url.path).name)
# ファイル名が取得できない場合の処理
if not filename or filename == '/':
# Content-Dispositionヘッダーを確認
head_response = requests.head(url)
content_disp = head_response.headers.get('content-disposition', '')
if 'filename=' in content_disp:
filename = content_disp.split('filename=')[-1].strip('"')
else:
filename = 'downloaded_file'
filepath = Path(download_dir) / filename
try:
response = requests.get(url)
response.raise_for_status()
with open(filepath, 'wb') as f:
f.write(response.content)
print(f'ダウンロード完了: {filepath}')
return filepath
except Exception as e:
print(f'ダウンロードエラー: {e}')
return None
# 使用例
smart_download('https://example.com/photos/sunset.jpg')
Webページから画像URLを抽出してダウンロード
BeautifulSoupを使用したスクレイピング
import requests
from bs4 import BeautifulSoup
from pathlib import Path
from urllib.parse import urljoin, urlparse
import time
def scrape_and_download_images(webpage_url, download_dir='scraped_images',
img_extensions=('.jpg', '.jpeg', '.png', '.gif', '.webp')):
Path(download_dir).mkdir(exist_ok=True)
try:
# Webページを取得
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(webpage_url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# 画像タグを取得
img_tags = soup.find_all('img')
downloaded_count = 0
for i, img in enumerate(img_tags):
img_url = img.get('src') or img.get('data-src') # lazy loadingも考慮
if not img_url:
continue
# 相対URLを絶対URLに変換
img_url = urljoin(webpage_url, img_url)
# 画像ファイル拡張子チェック
parsed_url = urlparse(img_url)
if not any(parsed_url.path.lower().endswith(ext) for ext in img_extensions):
continue
try:
# ファイル名生成
original_name = Path(parsed_url.path).name
filename = f'scraped_{i:03d}_{original_name}' if original_name else f'scraped_{i:03d}.jpg'
filepath = Path(download_dir) / filename
# 画像ダウンロード
img_response = requests.get(img_url, headers=headers)
img_response.raise_for_status()
with open(filepath, 'wb') as f:
f.write(img_response.content)
print(f'画像ダウンロード完了: {filename}')
downloaded_count += 1
time.sleep(0.5) # 負荷軽減
except Exception as e:
print(f'画像ダウンロードエラー {img_url}: {e}')
print(f'合計 {downloaded_count} 個の画像をダウンロードしました')
return downloaded_count
except Exception as e:
print(f'Webページ取得エラー: {e}')
return 0
# 使用例
scrape_and_download_images('https://example.com/gallery')
非同期ダウンロードで高速化
asyncioとaioHttpを使用
import asyncio
import aiohttp
from pathlib import Path
import aiofiles
async def async_download_single(session, url, filepath):
try:
async with session.get(url) as response:
if response.status == 200:
async with aiofiles.open(filepath, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
await f.write(chunk)
print(f'✓ 非同期ダウンロード完了: {filepath.name}')
return True
else:
print(f'✗ HTTPエラー {response.status}: {url}')
return False
except Exception as e:
print(f'✗ エラー {url}: {e}')
return False
async def async_batch_download(urls, download_dir='async_downloads', max_concurrent=10):
Path(download_dir).mkdir(exist_ok=True)
connector = aiohttp.TCPConnector(limit=max_concurrent)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={'User-Agent': 'Mozilla/5.0'}
) as session:
semaphore = asyncio.Semaphore(max_concurrent)
async def download_with_semaphore(url, filepath):
async with semaphore:
return await async_download_single(session, url, filepath)
tasks = []
for i, url in enumerate(urls):
filename = f'async_image_{i:03d}.jpg'
filepath = Path(download_dir) / filename
task = asyncio.create_task(download_with_semaphore(url, filepath))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(1 for result in results if result is True)
print(f'非同期ダウンロード完了: {success_count}/{len(urls)} 成功')
# 使用例
async def main():
urls = [
'https://example.com/img1.jpg',
'https://example.com/img2.jpg',
'https://example.com/img3.jpg'
]
await async_batch_download(urls)
# asyncio.run(main())
ファイルタイプの自動判別
Content-Typeヘッダーとmagic numberを使用
import requests
from pathlib import Path
import mimetypes
def download_with_auto_extension(url, base_filename, download_dir='downloads'):
Path(download_dir).mkdir(exist_ok=True)
try:
response = requests.get(url, stream=True)
response.raise_for_status()
# Content-Typeから拡張子を決定
content_type = response.headers.get('content-type', '').split(';')[0]
extension = mimetypes.guess_extension(content_type)
if not extension:
# フォールバック用の拡張子マップ
ext_map = {
'image/jpeg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'image/webp': '.webp',
'application/pdf': '.pdf',
'text/plain': '.txt'
}
extension = ext_map.get(content_type, '.bin')
filename = base_filename + extension
filepath = Path(download_dir) / filename
# ファイルシグネチャ(magic number)による追加検証
first_chunk = next(response.iter_content(chunk_size=8192))
# よく使われるファイルシグネチャ
signatures = {
b'\xff\xd8\xff': '.jpg',
b'\x89PNG\r\n\x1a\n': '.png',
b'GIF87a': '.gif',
b'GIF89a': '.gif',
b'RIFF': '.webp',
b'%PDF': '.pdf'
}
for sig, ext in signatures.items():
if first_chunk.startswith(sig):
if not filename.endswith(ext):
filename = base_filename + ext
filepath = Path(download_dir) / filename
break
with open(filepath, 'wb') as f:
f.write(first_chunk)
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f'ダウンロード完了: {filepath} (タイプ: {content_type})')
return filepath
except Exception as e:
print(f'ダウンロードエラー: {e}')
return None
# 使用例
download_with_auto_extension('https://example.com/unknown', 'mystery_file')
重複ダウンロードの回避
ファイルハッシュを使った重複チェック
import requests
from pathlib import Path
import hashlib
import json
class DuplicateChecker:
def __init__(self, hash_db_file='download_hashes.json'):
self.hash_db_file = hash_db_file
self.hash_db = self._load_hash_db()
def _load_hash_db(self):
try:
with open(self.hash_db_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {}
def _save_hash_db(self):
with open(self.hash_db_file, 'w') as f:
json.dump(self.hash_db, f, indent=2)
def _calculate_hash(self, content):
return hashlib.md5(content).hexdigest()
def download_if_unique(self, url, filepath, force_redownload=False):
filepath = Path(filepath)
if filepath.exists() and not force_redownload:
print(f'ファイルは既に存在します: {filepath}')
return filepath
try:
response = requests.get(url)
response.raise_for_status()
content = response.content
# コンテンツのハッシュを計算
content_hash = self._calculate_hash(content)
# 重複チェック
if content_hash in self.hash_db and not force_redownload:
existing_file = self.hash_db[content_hash]
print(f'同一コンテンツが既にダウンロード済み: {existing_file}')
return Path(existing_file)
# ディレクトリが存在しない場合は作成
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'wb') as f:
f.write(content)
# ハッシュデータベースに記録
self.hash_db[content_hash] = str(filepath)
self._save_hash_db()
print(f'新規ダウンロード完了: {filepath}')
return filepath
except Exception as e:
print(f'ダウンロードエラー: {e}')
return None
# 使用例
checker = DuplicateChecker()
checker.download_if_unique('https://example.com/image.jpg', 'cache/image1.jpg')
checker.download_if_unique('https://example.com/same_image.jpg', 'cache/image2.jpg') # 重複なら警告
セッション管理とヘッダー設定
カスタムヘッダーとCookie管理
import requests
from pathlib import Path
class AdvancedDownloader:
def __init__(self, download_dir='downloads'):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.session = requests.Session()
self._setup_session()
def _setup_session(self):
# デフォルトヘッダー設定
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'ja,en-US;q=0.7,en;q=0.3',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
})
def set_headers(self, headers):
"""カスタムヘッダーを設定"""
self.session.headers.update(headers)
def set_cookies(self, cookies):
"""Cookieを設定"""
for key, value in cookies.items():
self.session.cookies.set(key, value)
def download(self, url, filename=None, referer=None):
"""高度な設定でダウンロード実行"""
if filename is None:
filename = Path(url).name or 'downloaded_file'
filepath = self.download_dir / filename
# 一時的にRefererヘッダーを設定
headers = {}
if referer:
headers['Referer'] = referer
try:
response = self.session.get(url, headers=headers, stream=True)
response.raise_for_status()
# ファイルサイズ取得
file_size = int(response.headers.get('content-length', 0))
with open(filepath, 'wb') as f:
downloaded = 0
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if file_size > 0:
percent = (downloaded / file_size) * 100
print(f'\r{filename}: {percent:.1f}%', end='', flush=True)
print(f'\n✓ ダウンロード完了: {filename} ({downloaded} bytes)')
return filepath
except Exception as e:
print(f'\n✗ ダウンロードエラー: {e}')
return None
def __del__(self):
"""セッションをクローズ"""
if hasattr(self, 'session'):
self.session.close()
# 使用例
downloader = AdvancedDownloader('advanced_downloads')
# カスタムヘッダー設定
downloader.set_headers({
'Authorization': 'Bearer your_token_here',
'Custom-Header': 'custom_value'
})
# Cookie設定
downloader.set_cookies({
'session_id': 'abc123',
'user_pref': 'value'
})
# Referer付きダウンロード
downloader.download(
'https://example.com/protected.jpg',
'protected_image.jpg',
referer='https://example.com/gallery'
)
ダウンロード統計とログ機能
詳細なログとレポート機能
import requests
from pathlib import Path
from datetime import datetime
import json
import csv
import time
class DownloadManager:
def __init__(self, download_dir='downloads'):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.stats = {
'success': 0,
'failed': 0,
'skipped': 0,
'total_bytes': 0,
'start_time': None,
'end_time': None
}
self.log = []
self.session = requests.Session()
def start_session(self):
"""ダウンロードセッション開始"""
self.stats['start_time'] = datetime.now()
print(f"ダウンロードセッション開始: {self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}")
def download(self, url, filename=None, skip_if_exists=True):
"""単一ファイルのダウンロード"""
if filename is None:
filename = Path(url).name or f'file_{int(time.time())}'
filepath = self.download_dir / filename
# 既存ファイルのスキップチェック
if skip_if_exists and filepath.exists():
log_entry = self._create_log_entry(url, filename, 'skipped', 0, '既存ファイル')
self.stats['skipped'] += 1
print(f'⏭ スキップ: {filename} (既存)')
return filepath
start_time = time.time()
try:
response = self.session.get(url, stream=True)
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
with open(filepath, 'wb') as f:
downloaded_bytes = 0
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded_bytes += len(chunk)
download_time = time.time() - start_time
speed = downloaded_bytes / download_time if download_time > 0 else 0
log_entry = self._create_log_entry(
url, filename, 'success', downloaded_bytes,
f'{speed/1024:.1f} KB/s', download_time
)
self.stats['success'] += 1
self.stats['total_bytes'] += downloaded_bytes
print(f'✓ 成功: {filename} ({self._format_bytes(downloaded_bytes)}, {speed/1024:.1f} KB/s)')
return filepath
except Exception as e:
download_time = time.time() - start_time
log_entry = self._create_log_entry(
url, filename, 'failed', 0, str(e), download_time
)
self.stats['failed'] += 1
print(f'✗ 失敗: {filename} - {e}')
return None
finally:
self.log.append(log_entry)
def _create_log_entry(self, url, filename, status, size, note='', duration=0):
"""ログエントリの作成"""
return {
'timestamp': datetime.now().isoformat(),
'url': url,
'filename': filename,
'status': status,
'size': size,
'duration': round(duration, 2),
'note': note
}
def batch_download(self, urls, delay=0.5):
"""一括ダウンロード"""
self.start_session()
for i, url in enumerate(urls, 1):
print(f'\n[{i}/{len(urls)}] ダウンロード中...')
self.download(url)
if delay > 0 and i < len(urls):
time.sleep(delay)
self.end_session()
def end_session(self):
"""ダウンロードセッション終了"""
self.stats['end_time'] = datetime.now()
duration = (self.stats['end_time'] - self.stats['start_time']).total_seconds()
print(f"\n{'='*50}")
print("ダウンロードセッション完了")
print(f"{'='*50}")
print(f"開始時刻: {self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}")
print(f"終了時刻: {self.stats['end_time'].strftime('%Y-%m-%d %H:%M:%S')}")
print(f"実行時間: {duration:.1f}秒")
print(f"成功: {self.stats['success']} 件")
print(f"失敗: {self.stats['failed']} 件")
print(f"スキップ: {self.stats['skipped']} 件")
print(f"総ダウンロード量: {self._format_bytes(self.stats['total_bytes'])}")
if duration > 0:
avg_speed = self.stats['total_bytes'] / duration
print(f"平均速度: {avg_speed/1024:.1f} KB/s")
def save_log(self, format='json', filename=None):
"""ログの保存"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format.lower() == 'json':
filename = filename or f'download_log_{timestamp}.json'
with open(filename, 'w', encoding='utf-8') as f:
json.dump({
'stats': self.stats,
'downloads': self.log
}, f, ensure_ascii=False, indent=2, default=str)
elif format.lower() == 'csv':
filename = filename or f'download_log_{timestamp}.csv'
with open(filename, 'w', newline='', encoding='utf-8') as f:
if self.log:
writer = csv.DictWriter(f, fieldnames=self.log[0].keys())
writer.writeheader()
writer.writerows(self.log)
print(f'ログを保存しました: {filename}')
return filename
def _format_bytes(self, bytes_size):
"""バイト数を読みやすい形式に変換"""
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes_size < 1024.0:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.1f} TB"
def generate_report(self):
"""詳細レポートの生成"""
if not self.log:
print("ダウンロードログがありません")
return
# 成功率計算
total_attempts = len(self.log)
success_rate = (self.stats['success'] / total_attempts) * 100 if total_attempts > 0 else 0
# ファイルサイズ統計
successful_downloads = [entry for entry in self.log if entry['status'] == 'success']
if successful_downloads:
file_sizes = [entry['size'] for entry in successful_downloads]
avg_file_size = sum(file_sizes) / len(file_sizes)
max_file_size = max(file_sizes)
min_file_size = min(file_sizes)
else:
avg_file_size = max_file_size = min_file_size = 0
# エラー分析
failed_downloads = [entry for entry in self.log if entry['status'] == 'failed']
error_types = {}
for entry in failed_downloads:
error = entry.get('note', 'Unknown Error')
error_types[error] = error_types.get(error, 0) + 1
print(f"\n{'='*60}")
print("詳細レポート")
print(f"{'='*60}")
print(f"総試行数: {total_attempts}")
print(f"成功率: {success_rate:.1f}%")
print(f"平均ファイルサイズ: {self._format_bytes(avg_file_size)}")
print(f"最大ファイルサイズ: {self._format_bytes(max_file_size)}")
print(f"最小ファイルサイズ: {self._format_bytes(min_file_size)}")
if error_types:
print(f"\nエラー内訳:")
for error, count in sorted(error_types.items(), key=lambda x: x[1], reverse=True):
print(f" {error}: {count}件")
# 使用例
dm = DownloadManager('managed_downloads')
# URLリスト
urls = [
'https://httpbin.org/image/jpeg',
'https://httpbin.org/image/png',
'https://httpbin.org/status/404', # エラーテスト用
'https://httpbin.org/image/webp'
]
# 一括ダウンロード実行
dm.batch_download(urls, delay=1.0)
# レポート生成
dm.generate_report()
# ログ保存
dm.save_log('json')
dm.save_log('csv')
高度な機能とカスタマイゼーション
レート制限とリトライ機能
import requests
from pathlib import Path
import time
import random
from functools import wraps
def retry_on_failure(max_retries=3, backoff_factor=1.0):
"""デコレータ:失敗時のリトライ機能"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
raise e
wait_time = backoff_factor * (2 ** attempt) + random.uniform(0, 1)
print(f"リトライ {attempt + 1}/{max_retries} - {wait_time:.1f}秒後に再試行")
time.sleep(wait_time)
return None
return wrapper
return decorator
class RateLimitedDownloader:
def __init__(self, requests_per_second=1.0, download_dir='rate_limited'):
self.min_interval = 1.0 / requests_per_second
self.last_request_time = 0
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; RateLimitedBot/1.0)'
})
def _wait_for_rate_limit(self):
"""レート制限に基づく待機"""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_interval:
wait_time = self.min_interval - time_since_last
print(f"レート制限待機: {wait_time:.1f}秒")
time.sleep(wait_time)
self.last_request_time = time.time()
@retry_on_failure(max_retries=3, backoff_factor=2.0)
def download(self, url, filename=None):
"""レート制限とリトライ付きダウンロード"""
self._wait_for_rate_limit()
if filename is None:
filename = Path(url).name or f'file_{int(time.time())}'
filepath = self.download_dir / filename
response = self.session.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✓ ダウンロード完了: {filename}")
return filepath
# 使用例
downloader = RateLimitedDownloader(requests_per_second=0.5) # 2秒に1回
プロキシサポート付きダウンローダー
import requests
from pathlib import Path
import itertools
class ProxyDownloader:
def __init__(self, proxies=None, download_dir='proxy_downloads'):
self.proxies = proxies or []
self.proxy_cycle = itertools.cycle(self.proxies) if self.proxies else None
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.current_proxy = None
def _get_next_proxy(self):
"""次のプロキシを取得"""
if self.proxy_cycle:
self.current_proxy = next(self.proxy_cycle)
print(f"プロキシ使用: {self.current_proxy}")
return {'http': self.current_proxy, 'https': self.current_proxy}
return None
def download_with_proxy(self, url, filename=None, max_proxy_attempts=None):
"""プロキシ経由でダウンロード"""
if filename is None:
filename = Path(url).name or f'proxy_file_{int(time.time())}'
filepath = self.download_dir / filename
if not self.proxies:
# プロキシなしでダウンロード
return self._direct_download(url, filepath)
# プロキシを使用してダウンロード
proxy_attempts = max_proxy_attempts or len(self.proxies)
for attempt in range(proxy_attempts):
proxy_config = self._get_next_proxy()
try:
response = requests.get(
url,
proxies=proxy_config,
timeout=30,
stream=True
)
response.raise_for_status()
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✓ プロキシ経由ダウンロード成功: {filename}")
return filepath
except Exception as e:
print(f"✗ プロキシエラー ({self.current_proxy}): {e}")
if attempt == proxy_attempts - 1:
print("すべてのプロキシで失敗、直接接続を試行")
return self._direct_download(url, filepath)
return None
def _direct_download(self, url, filepath):
"""直接ダウンロード(プロキシなし)"""
try:
response = requests.get(url, timeout=30, stream=True)
response.raise_for_status()
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✓ 直接ダウンロード成功: {filepath.name}")
return filepath
except Exception as e:
print(f"✗ 直接ダウンロード失敗: {e}")
return None
# 使用例
proxy_list = [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:8080',
'socks5://proxy3.example.com:1080'
]
downloader = ProxyDownloader(proxies=proxy_list)
downloader.download_with_proxy('https://httpbin.org/image/jpeg', 'proxy_image.jpg')
メタデータ取得とファイル検証
import requests
from pathlib import Path
import hashlib
import json
from datetime import datetime
class MetadataDownloader:
def __init__(self, download_dir='metadata_downloads'):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
def get_file_metadata(self, url):
"""ダウンロード前のメタデータ取得"""
try:
# HEADリクエストでメタデータ取得
head_response = requests.head(url, allow_redirects=True)
head_response.raise_for_status()
metadata = {
'url': url,
'final_url': head_response.url,
'status_code': head_response.status_code,
'headers': dict(head_response.headers),
'file_size': int(head_response.headers.get('content-length', 0)),
'content_type': head_response.headers.get('content-type', ''),
'last_modified': head_response.headers.get('last-modified', ''),
'etag': head_response.headers.get('etag', ''),
'server': head_response.headers.get('server', ''),
'retrieved_at': datetime.now().isoformat()
}
return metadata
except Exception as e:
return {'url': url, 'error': str(e), 'retrieved_at': datetime.now().isoformat()}
def download_with_verification(self, url, filename=None, expected_hash=None,
hash_algorithm='md5'):
"""ハッシュ検証付きダウンロード"""
if filename is None:
filename = Path(url).name or f'verified_file_{int(time.time())}'
filepath = self.download_dir / filename
metadata_file = filepath.with_suffix(filepath.suffix + '.meta.json')
# メタデータ取得
metadata = self.get_file_metadata(url)
try:
# ファイルダウンロード
response = requests.get(url, stream=True)
response.raise_for_status()
# ハッシュ計算用オブジェクト
hash_func = getattr(hashlib, hash_algorithm.lower())()
downloaded_bytes = 0
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
hash_func.update(chunk)
downloaded_bytes += len(chunk)
# ハッシュ値取得
calculated_hash = hash_func.hexdigest()
# メタデータ更新
metadata.update({
'downloaded_at': datetime.now().isoformat(),
'actual_size': downloaded_bytes,
'calculated_hash': calculated_hash,
'hash_algorithm': hash_algorithm,
'verification_status': 'pending'
})
# ハッシュ検証
if expected_hash:
if calculated_hash.lower() == expected_hash.lower():
metadata['verification_status'] = 'verified'
print(f"✓ ハッシュ検証成功: {filename}")
else:
metadata['verification_status'] = 'failed'
print(f"✗ ハッシュ検証失敗: {filename}")
print(f" 期待値: {expected_hash}")
print(f" 実際値: {calculated_hash}")
else:
metadata['verification_status'] = 'no_expected_hash'
# サイズ検証
expected_size = metadata.get('file_size', 0)
if expected_size > 0 and downloaded_bytes != expected_size:
print(f"⚠ サイズ不一致: 期待 {expected_size}, 実際 {downloaded_bytes}")
metadata['size_verification'] = 'failed'
else:
metadata['size_verification'] = 'passed'
# メタデータファイル保存
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
print(f"✓ ダウンロード完了: {filename}")
print(f" ファイルサイズ: {downloaded_bytes} bytes")
print(f" {hash_algorithm.upper()}ハッシュ: {calculated_hash}")
return {
'filepath': filepath,
'metadata_file': metadata_file,
'metadata': metadata
}
except Exception as e:
metadata['download_error'] = str(e)
metadata['download_failed_at'] = datetime.now().isoformat()
# エラー情報もメタデータに保存
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
print(f"✗ ダウンロードエラー: {e}")
return None
# 使用例
downloader = MetadataDownloader()
# 事前にメタデータのみ取得
metadata = downloader.get_file_metadata('https://httpbin.org/image/jpeg')
print("メタデータ:", json.dumps(metadata, indent=2, ensure_ascii=False))
# ハッシュ検証付きダウンロード
result = downloader.download_with_verification(
'https://httpbin.org/image/jpeg',
'verified_image.jpg',
hash_algorithm='sha256'
)
パフォーマンス最適化とモニタリング
リアルタイム統計表示
import requests
from pathlib import Path
import threading
import time
import queue
from datetime import datetime, timedelta
class PerformanceMonitor:
def __init__(self, update_interval=1.0):
self.update_interval = update_interval
self.stats = {
'downloads_in_progress': 0,
'completed_downloads': 0,
'failed_downloads': 0,
'total_bytes_downloaded': 0,
'current_speed': 0,
'average_speed': 0,
'start_time': None,
'last_update': None
}
self.speed_history = []
self.monitoring = False
self.monitor_thread = None
self.stats_lock = threading.Lock()
def start_monitoring(self):
"""モニタリング開始"""
self.monitoring = True
self.stats['start_time'] = datetime.now()
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_monitoring(self):
"""モニタリング停止"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self):
"""モニタリングループ"""
while self.monitoring:
self._update_display()
time.sleep(self.update_interval)
def _update_display(self):
"""統計表示更新"""
with self.stats_lock:
current_time = datetime.now()
if self.stats['start_time']:
elapsed = (current_time - self.stats['start_time']).total_seconds()
if elapsed > 0:
self.stats['average_speed'] = self.stats['total_bytes_downloaded'] / elapsed
# クリアして統計表示
print('\033[2J\033[H', end='') # 画面クリア
print("="*60)
print("📊 ダウンロード統計(リアルタイム)")
print("="*60)
print(f"進行中: {self.stats['downloads_in_progress']}")
print(f"完了: {self.stats['completed_downloads']}")
print(f"失敗: {self.stats['failed_downloads']}")
print(f"総ダウンロード量: {self._format_bytes(self.stats['total_bytes_downloaded'])}")
print(f"現在の速度: {self._format_bytes(self.stats['current_speed'])}/秒")
print(f"平均速度: {self._format_bytes(self.stats['average_speed'])}/秒")
if self.stats['start_time']:
elapsed = datetime.now() - self.stats['start_time']
print(f"経過時間: {str(elapsed).split('.')[0]}")
print("="*60)
def _format_bytes(self, bytes_size):
"""バイト数フォーマット"""
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes_size < 1024.0:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.1f} TB"
def update_stats(self, **kwargs):
"""統計更新"""
with self.stats_lock:
for key, value in kwargs.items():
if key in self.stats:
if key == 'total_bytes_downloaded':
self.stats[key] += value
else:
self.stats[key] = value
class MonitoredDownloader:
def __init__(self, download_dir='monitored_downloads'):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.monitor = PerformanceMonitor()
self.session = requests.Session()
def start_session(self):
"""ダウンロードセッション開始"""
self.monitor.start_monitoring()
def end_session(self):
"""ダウンロードセッション終了"""
self.monitor.stop_monitoring()
def download(self, url, filename=None):
"""モニタリング付きダウンロード"""
if filename is None:
filename = Path(url).name or f'monitored_{int(time.time())}'
filepath = self.download_dir / filename
# 進行中カウント増加
self.monitor.update_stats(downloads_in_progress=
self.monitor.stats['downloads_in_progress'] + 1)
try:
start_time = time.time()
response = self.session.get(url, stream=True)
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
downloaded_bytes = 0
last_speed_update = start_time
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded_bytes += len(chunk)
# 速度計算(1秒ごと)
current_time = time.time()
if current_time - last_speed_update >= 1.0:
speed = downloaded_bytes / (current_time - start_time)
self.monitor.update_stats(current_speed=speed)
last_speed_update = current_time
# 完了統計更新
self.monitor.update_stats(
downloads_in_progress=self.monitor.stats['downloads_in_progress'] - 1,
completed_downloads=self.monitor.stats['completed_downloads'] + 1,
total_bytes_downloaded=downloaded_bytes,
current_speed=0
)
return filepath
except Exception as e:
# エラー統計更新
self.monitor.update_stats(
downloads_in_progress=self.monitor.stats['downloads_in_progress'] - 1,
failed_downloads=self.monitor.stats['failed_downloads'] + 1,
current_speed=0
)
raise e
# 使用例
def demo_monitored_download():
downloader = MonitoredDownloader()
downloader.start_session()
try:
urls = [
'https://httpbin.org/image/jpeg',
'https://httpbin.org/image/png',
'https://httpbin.org/bytes/1024000' # 1MB のテストファイル
]
# 並列ダウンロードのデモ
import threading
def download_worker(url):
try:
downloader.download(url)
except Exception as e:
print(f"ダウンロードエラー: {e}")
threads = []
for url in urls:
thread = threading.Thread(target=download_worker, args=(url,))
thread.start()
threads.append(thread)
# すべてのスレッド完了を待機
for thread in threads:
thread.join()
time.sleep(3) # 最終統計表示のための待機
finally:
downloader.end_session()
# demo_monitored_download()
まとめ
PythonでWebからファイルをダウンロードする方法は多岐にわたります。基本的なrequestsライブラリから始まり、用途に応じて以下の機能を組み合わせることで、プロダクションレベルのダウンロードシステムが構築できます:
主要機能一覧
- 基本ダウンロード:
requestsを使った単純なファイル取得 - エラーハンドリング: HTTPエラーや接続エラーの適切な処理
- 進捗表示: リアルタイムでのダウンロード進捗表示
- 一括処理: 複数ファイルの効率的なダウンロード
- 並列処理: ThreadPoolExecutorや非同期処理による高速化
- メタデータ管理: ファイル情報の取得と検証
- 重複回避: ハッシュ値による同一ファイルの検出
- レート制限: サーバー負荷を考慮した制御
- プロキシサポート: 企業環境での利用
- 統計とログ: 詳細な実行記録と分析
ベストプラクティス
- 適切なUser-Agentの設定: Webサイトによってはbotを拒否する場合があります
- レート制限の実装: サーバーに負荷をかけないよう配慮
- エラーハンドリング: ネットワークエラーやHTTPエラーの適切な処理
- リソース管理: セッションやファイルハンドルの適切なクローズ
- 進捗表示: 大容量ファイルダウンロード時のユーザビリティ向上
これらの機能を組み合わせることで、堅牢で効率的なファイルダウンロードシステムを構築できます。用途に応じて必要な機能を選択し、段階的に実装していくことをお勧めします。
■プロンプトだけでオリジナルアプリを開発・公開してみた!!
■AI時代の第一歩!「AI駆動開発コース」はじめました!
テックジム東京本校で先行開始。
■テックジム東京本校
「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。
<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。
<月1開催>放送作家による映像ディレクター養成講座
<オンライン無料>ゼロから始めるPython爆速講座
