Pythonでコードを書いていて、「同じような関数を何度も書いている」「計算に時間がかかりすぎる」「デコレータを作ったけど関数の情報が消えてしまう」といった経験はありませんか?
そんな問題を解決してくれるのが「functools(ファンクツールズ)」モジュールです。
これは、関数をより効率的で柔軟に扱うためのツールが詰まった、Python標準ライブラリの隠れた名モジュールです。
この記事では、functoolsの主要機能を実例とともに詳しく解説し、あなたのPythonコーディングスキルを一段階上げるお手伝いをします。
functoolsモジュールの基本

functoolsとは
functools
は、高階関数(関数を操作する関数)と関数に対する操作のためのユーティリティを提供するモジュールです。
関数型プログラミングの考え方を取り入れ、コードをより洗練されたものにできます。
基本的なインポート
import functools
# または
from functools import partial, lru_cache, wraps
functoolsの主要機能一覧
機能 | 用途 | Python版本 |
---|---|---|
partial | 関数の一部引数を固定 | 2.5+ |
lru_cache | 関数結果のキャッシュ | 3.2+ |
wraps | デコレータでの関数情報保持 | 2.5+ |
reduce | リストの要素を順次処理 | 3.0+ |
singledispatch | 型によるメソッド分岐 | 3.4+ |
cached_property | プロパティのキャッシュ | 3.8+ |
total_ordering | 比較演算子の自動生成 | 2.7+ |
partial:関数の一部引数を固定する
基本的な使い方
partial
は、既存の関数から一部の引数を事前に設定した新しい関数を作成します。
from functools import partial
# 基本例:べき乗関数
def power(base, exponent):
"""baseをexponent乗する"""
return base ** exponent
# 平方(2乗)を計算する関数を作成
square = partial(power, exponent=2)
print(square(4)) # → 16
print(square(5)) # → 25
# 立方(3乗)を計算する関数を作成
cube = partial(power, exponent=3)
print(cube(3)) # → 27
print(cube(4)) # → 64
# 位置引数での固定も可能
double = partial(power, 2) # baseを2に固定
print(double(3)) # → 8 (2の3乗)
実践的な活用例
1. データ処理での活用
from functools import partial
import json
def process_data(data, multiplier=1, offset=0, formatter=str):
"""データを処理する汎用関数"""
processed = [(x * multiplier + offset) for x in data]
return [formatter(x) for x in processed]
# 異なる処理パターンを関数として定義
data = [1, 2, 3, 4, 5]
# 2倍にして文字列化
double_str = partial(process_data, multiplier=2, formatter=str)
print(double_str(data)) # → ['2', '4', '6', '8', '10']
# 10倍してJSONの数値として整形
ten_times_json = partial(process_data, multiplier=10, formatter=lambda x: f'"{x}"')
print(ten_times_json(data)) # → ['"10"', '"20"', '"30"', '"40"', '"50"']
# 平均との差を計算
def calc_diff_from_average(data):
avg = sum(data) / len(data)
return partial(process_data, offset=-avg, formatter=lambda x: round(x, 2))
diff_func = calc_diff_from_average(data)
print(diff_func(data)) # → [-2.0, -1.0, 0.0, 1.0, 2.0]
2. イベントハンドラでの活用
from functools import partial
def handle_button_click(button_id, action_type, user_data):
"""ボタンクリックを処理する汎用関数"""
print(f"ボタン {button_id} がクリックされました")
print(f"アクション: {action_type}")
print(f"ユーザーデータ: {user_data}")
# 特定のボタン用のハンドラを作成
save_handler = partial(handle_button_click, "save", "save_document")
delete_handler = partial(handle_button_click, "delete", "delete_document")
print_handler = partial(handle_button_click, "print", "print_document")
# 使用例
save_handler({"document_id": 123})
# → ボタン save がクリックされました
# → アクション: save_document
# → ユーザーデータ: {'document_id': 123}
3. 設定パターンの作成
from functools import partial
import logging
def log_message(level, message, extra_info=None):
"""ログメッセージを出力する汎用関数"""
log_entry = f"[{level}] {message}"
if extra_info:
log_entry += f" | 追加情報: {extra_info}"
print(log_entry)
# 各ログレベル用の関数を作成
log_info = partial(log_message, "INFO")
log_warning = partial(log_message, "WARNING")
log_error = partial(log_message, "ERROR")
# 使用例
log_info("アプリケーションが開始されました")
log_warning("設定ファイルが見つかりません", {"config_path": "/etc/app.conf"})
log_error("データベース接続エラー", {"db_host": "localhost"})
partialの高度な使い方
from functools import partial
# partialオブジェクトの属性を確認
def multiply(a, b, c=1):
return a * b * c
partial_func = partial(multiply, 2, c=3)
print(f"元の関数: {partial_func.func}") # → <function multiply at 0x...>
print(f"固定された引数: {partial_func.args}") # → (2,)
print(f"固定されたキーワード引数: {partial_func.keywords}") # → {'c': 3}
# partial同士の組み合わせ
multiply_by_2 = partial(multiply, 2)
multiply_by_2_and_3 = partial(multiply_by_2, c=3)
print(multiply_by_2_and_3(5)) # → 30 (2 * 5 * 3)
lru_cache:関数の結果をキャッシュして高速化

基本的な使い方
lru_cache
は、関数の実行結果をメモリにキャッシュし、同じ引数で再度呼び出されたときにキャッシュから結果を返します。
from functools import lru_cache
import time
# キャッシュなしの関数
def fibonacci_slow(n):
"""フィボナッチ数列(キャッシュなし)"""
if n < 2:
return n
return fibonacci_slow(n-1) + fibonacci_slow(n-2)
# キャッシュありの関数
@lru_cache(maxsize=None)
def fibonacci_fast(n):
"""フィボナッチ数列(キャッシュあり)"""
if n < 2:
return n
return fibonacci_fast(n-1) + fibonacci_fast(n-2)
# パフォーマンス比較
def performance_test():
print("=== パフォーマンステスト ===")
# キャッシュなし
start = time.time()
result_slow = fibonacci_slow(30)
time_slow = time.time() - start
# キャッシュあり
start = time.time()
result_fast = fibonacci_fast(30)
time_fast = time.time() - start
print(f"結果: {result_slow} (両方とも同じ)")
print(f"キャッシュなし: {time_slow:.4f}秒")
print(f"キャッシュあり: {time_fast:.6f}秒")
print(f"高速化倍率: {time_slow / time_fast:.0f}倍")
performance_test()
キャッシュサイズの設定
from functools import lru_cache
# 小さなキャッシュサイズ
@lru_cache(maxsize=5)
def expensive_calculation(n):
"""計算コストの高い処理をシミュレート"""
print(f"計算実行: {n}")
time.sleep(0.1) # 重い処理をシミュレート
return n ** 2
# テスト
print("=== キャッシュサイズテスト ===")
for i in range(3):
print(f"実行 {i+1}回目:")
for num in [1, 2, 3, 4, 5, 6]: # 6個のうち、最後の1個でキャッシュアウトが発生
result = expensive_calculation(num)
print(f" {num} -> {result}")
print()
# キャッシュ情報の確認
print(f"キャッシュ情報: {expensive_calculation.cache_info()}")
実践的な活用例
1. Webスクレイピングでのキャッシュ
from functools import lru_cache
import requests
import time
@lru_cache(maxsize=100)
def fetch_url_content(url):
"""URLの内容を取得(キャッシュ付き)"""
print(f"実際にリクエスト送信: {url}")
# 実際のHTTPリクエストの代わりにシミュレート
time.sleep(0.5) # ネットワーク遅延をシミュレート
return f"Content from {url}"
def demonstrate_url_caching():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page1", # 再度同じURL
"https://example.com/page3",
"https://example.com/page2", # 再度同じURL
]
print("=== URLキャッシュのデモ ===")
start_time = time.time()
for url in urls:
content = fetch_url_content(url)
print(f"取得完了: {content}")
total_time = time.time() - start_time
print(f"\n総実行時間: {total_time:.2f}秒")
print(f"キャッシュ統計: {fetch_url_content.cache_info()}")
demonstrate_url_caching()
2. データベースクエリのキャッシュ
from functools import lru_cache
import sqlite3
import time
# データベース接続のシミュレート
def simulate_db_query(query, param):
"""データベースクエリをシミュレート"""
print(f"DB実行: {query} with param={param}")
time.sleep(0.1) # DB遅延をシミュレート
return f"Result for {param}"
@lru_cache(maxsize=50)
def get_user_data(user_id):
"""ユーザーデータを取得(キャッシュ付き)"""
return simulate_db_query("SELECT * FROM users WHERE id = ?", user_id)
@lru_cache(maxsize=20)
def get_user_posts(user_id):
"""ユーザーの投稿を取得(キャッシュ付き)"""
return simulate_db_query("SELECT * FROM posts WHERE user_id = ?", user_id)
def demonstrate_db_caching():
print("=== データベースキャッシュのデモ ===")
# 同じユーザーのデータを複数回取得
user_ids = [1, 2, 1, 3, 2, 1]
for user_id in user_ids:
user_data = get_user_data(user_id)
user_posts = get_user_posts(user_id)
print(f"ユーザー{user_id}: データ取得完了")
print(f"\nユーザーデータのキャッシュ: {get_user_data.cache_info()}")
print(f"投稿データのキャッシュ: {get_user_posts.cache_info()}")
demonstrate_db_caching()
キャッシュの管理
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_function(n):
print(f"実際に計算: {n}")
return n * 2
def cache_management_demo():
print("=== キャッシュ管理のデモ ===")
# キャッシュの蓄積
for i in range(5):
result = cached_function(i)
print(f"結果: {result}")
print(f"初期キャッシュ状態: {cached_function.cache_info()}")
# 同じ値での再実行(キャッシュヒット)
print("\n再実行(キャッシュから取得):")
for i in range(3):
result = cached_function(i)
print(f"再実行後: {cached_function.cache_info()}")
# キャッシュクリア
cached_function.cache_clear()
print(f"クリア後: {cached_function.cache_info()}")
# 再度実行(キャッシュなし)
print("\nクリア後の再実行:")
result = cached_function(1)
print(f"クリア後の実行: {cached_function.cache_info()}")
cache_management_demo()
wraps:デコレータで元の関数情報を保持

問題の理解
def bad_decorator(func):
"""wrapsを使わないデコレータ(悪い例)"""
def wrapper(*args, **kwargs):
print("実行前の処理")
result = func(*args, **kwargs)
print("実行後の処理")
return result
return wrapper
@bad_decorator
def greet(name):
"""挨拶を表示する関数"""
print(f"こんにちは、{name}さん")
def demonstrate_problem():
print("=== wrapsを使わない場合の問題 ===")
print(f"関数名: {greet.__name__}") # wrapper になってしまう
print(f"ドキュメント: {greet.__doc__}") # None になってしまう
print(f"モジュール: {greet.__module__}")
print()
demonstrate_problem()
wrapsによる解決
from functools import wraps
def good_decorator(func):
"""wrapsを使ったデコレータ(良い例)"""
@wraps(func)
def wrapper(*args, **kwargs):
print("実行前の処理")
result = func(*args, **kwargs)
print("実行後の処理")
return result
return wrapper
@good_decorator
def greet_fixed(name):
"""挨拶を表示する関数(修正版)"""
print(f"こんにちは、{name}さん")
def demonstrate_solution():
print("=== wrapsを使った場合の解決 ===")
print(f"関数名: {greet_fixed.__name__}") # greet_fixed
print(f"ドキュメント: {greet_fixed.__doc__}") # 元のドキュメントが保持される
print(f"モジュール: {greet_fixed.__module__}")
print()
demonstrate_solution()
実践的なデコレータの作成
1. 実行時間測定デコレータ
from functools import wraps
import time
def measure_time(func):
"""関数の実行時間を測定するデコレータ"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"{func.__name__} の実行時間: {execution_time:.4f}秒")
return result
return wrapper
@measure_time
def slow_calculation(n):
"""時間のかかる計算をシミュレート"""
total = 0
for i in range(n):
total += i ** 2
return total
# テスト
result = slow_calculation(100000)
print(f"計算結果: {result}")
2. 引数検証デコレータ
from functools import wraps
def validate_types(**expected_types):
"""引数の型を検証するデコレータ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 位置引数の検証
import inspect
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
for param_name, value in bound_args.arguments.items():
if param_name in expected_types:
expected_type = expected_types[param_name]
if not isinstance(value, expected_type):
raise TypeError(
f"{func.__name__}の引数'{param_name}'は{expected_type.__name__}型である必要があります。"
f"実際の型: {type(value).__name__}"
)
return func(*args, **kwargs)
return wrapper
return decorator
@validate_types(name=str, age=int, score=float)
def process_user_data(name, age, score=0.0):
"""ユーザーデータを処理する関数"""
print(f"名前: {name}, 年齢: {age}, スコア: {score}")
# 正常な使用
process_user_data("太郎", 25, 85.5)
# エラーになる使用例
try:
process_user_data("太郎", "25歳", 85.5) # ageが文字列
except TypeError as e:
print(f"エラー: {e}")
3. リトライ機能付きデコレータ
from functools import wraps
import time
import random
def retry(max_attempts=3, delay=1, exceptions=(Exception,)):
"""関数の実行をリトライするデコレータ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
print(f"{func.__name__} 失敗 (試行 {attempt + 1}/{max_attempts}): {e}")
time.sleep(delay)
else:
print(f"{func.__name__} 最終的に失敗: {e}")
raise last_exception
return wrapper
return decorator
@retry(max_attempts=3, delay=0.5)
def unreliable_api_call():
"""不安定なAPI呼び出しをシミュレート"""
if random.random() < 0.7: # 70%の確率で失敗
raise ConnectionError("API接続に失敗しました")
return "API呼び出し成功!"
# テスト
try:
result = unreliable_api_call()
print(result)
except ConnectionError as e:
print(f"最終的なエラー: {e}")
reduce:リストの要素を順次処理

基本的な使い方
from functools import reduce
# 基本例:リストの合計
numbers = [1, 2, 3, 4, 5]
total = reduce(lambda x, y: x + y, numbers)
print(f"合計: {total}") # → 15
# 初期値を指定
total_with_initial = reduce(lambda x, y: x + y, numbers, 10)
print(f"初期値ありの合計: {total_with_initial}") # → 25
# 文字列の連結
words = ["Python", "is", "awesome"]
sentence = reduce(lambda x, y: x + " " + y, words)
print(f"文章: {sentence}") # → Python is awesome
実践的な活用例
1. 辞書の統合
from functools import reduce
def merge_dicts(dicts):
"""複数の辞書を統合する"""
def merge_two_dicts(dict1, dict2):
result = dict1.copy()
result.update(dict2)
return result
return reduce(merge_two_dicts, dicts, {})
# テスト
user_profiles = [
{"name": "太郎", "age": 25},
{"city": "東京", "job": "エンジニア"},
{"hobby": "プログラミング", "skill": "Python"}
]
merged_profile = merge_dicts(user_profiles)
print(f"統合されたプロフィール: {merged_profile}")
# → {'name': '太郎', 'age': 25, 'city': '東京', 'job': 'エンジニア', 'hobby': 'プログラミング', 'skill': 'Python'}
2. ネストしたリストの平坦化
from functools import reduce
def flatten_list(nested_list):
"""ネストしたリストを平坦化する"""
return reduce(lambda x, y: x + y, nested_list, [])
# テスト
nested_data = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
flattened = flatten_list(nested_data)
print(f"平坦化後: {flattened}") # → [1, 2, 3, 4, 5, 6, 7, 8, 9]
# より複雑な例
mixed_data = [["a", "b"], ["c"], ["d", "e", "f"]]
flattened_mixed = flatten_list(mixed_data)
print(f"文字列の平坦化: {flattened_mixed}") # → ['a', 'b', 'c', 'd', 'e', 'f']
3. 複雑な集計処理
from functools import reduce
def analyze_sales_data(sales_records):
"""売上データを分析する"""
def accumulate_sales(acc, record):
# 累積データを更新
acc['total_amount'] += record['amount']
acc['total_quantity'] += record['quantity']
# 地域別集計
region = record['region']
if region not in acc['by_region']:
acc['by_region'][region] = {'amount': 0, 'quantity': 0}
acc['by_region'][region]['amount'] += record['amount']
acc['by_region'][region]['quantity'] += record['quantity']
# 最高売上の記録
if record['amount'] > acc['max_sale']['amount']:
acc['max_sale'] = record.copy()
return acc
initial_acc = {
'total_amount': 0,
'total_quantity': 0,
'by_region': {},
'max_sale': {'amount': 0}
}
return reduce(accumulate_sales, sales_records, initial_acc)
# テストデータ
sales_data = [
{'region': '東京', 'amount': 15000, 'quantity': 5, 'product': 'ノートPC'},
{'region': '大阪', 'amount': 8000, 'quantity': 2, 'product': 'タブレット'},
{'region': '東京', 'amount': 25000, 'quantity': 1, 'product': 'デスクトップPC'},
{'region': '福岡', 'amount': 12000, 'quantity': 3, 'product': 'スマートフォン'},
{'region': '大阪', 'amount': 18000, 'quantity': 4, 'product': 'モニター'},
]
analysis_result = analyze_sales_data(sales_data)
print("=== 売上分析結果 ===")
print(f"総売上: ¥{analysis_result['total_amount']:,}")
print(f"総販売数: {analysis_result['total_quantity']}個")
print(f"最高売上: {analysis_result['max_sale']['product']} (¥{analysis_result['max_sale']['amount']:,})")
print("地域別売上:")
for region, data in analysis_result['by_region'].items():
print(f" {region}: ¥{data['amount']:,} ({data['quantity']}個)")
singledispatch:型による処理の分岐
基本的な使い方
from functools import singledispatch
@singledispatch
def process_data(data):
"""データを処理する汎用関数(デフォルト)"""
return f"未対応の型: {type(data).__name__}"
@process_data.register
def _(data: str):
"""文字列の処理"""
return f"文字列処理: '{data.upper()}'"
@process_data.register
def _(data: int):
"""整数の処理"""
return f"整数処理: {data * 2}"
@process_data.register
def _(data: list):
"""リストの処理"""
return f"リスト処理: 要素数={len(data)}, 合計={sum(data) if all(isinstance(x, (int, float)) for x in data) else 'N/A'}"
@process_data.register
def _(data: dict):
"""辞書の処理"""
return f"辞書処理: キー数={len(data)}, キー={list(data.keys())}"
# テスト
test_data = [
"hello world",
42,
[1, 2, 3, 4, 5],
{"name": "太郎", "age": 25},
3.14 # 未登録の型
]
print("=== singledispatch のテスト ===")
for data in test_data:
result = process_data(data)
print(f"{type(data).__name__}: {result}")
実践的な活用例
1. データ変換システム
from functools import singledispatch
import json
from datetime import datetime
@singledispatch
def to_json_serializable(obj):
"""オブジェクトをJSON互換形式に変換(デフォルト)"""
return str(obj)
@to_json_serializable.register
def _(obj: datetime):
"""datetime型の変換"""
return obj.isoformat()
@to_json_serializable.register
def _(obj: set):
"""set型の変換"""
return list(obj)
@to_json_serializable.register
def _(obj: bytes):
"""bytes型の変換"""
return obj.decode('utf-8', errors='ignore')
@to_json_serializable.register
def _(obj: complex):
"""complex型の変換"""
return {"real": obj.real, "imag": obj.imag}
def safe_json_dumps(data):
"""安全なJSON変換"""
def convert_recursive(obj):
if isinstance(obj, dict):
return {k: convert_recursive(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)):
return [convert_recursive(item) for item in obj]
else:
return to_json_serializable(obj)
converted_data = convert_recursive(data)
return json.dumps(converted_data, ensure_ascii=False, indent=2)
# テスト
complex_data = {
"timestamp": datetime.now(),
"tags": {"python", "programming", "tutorial"},
"binary_data": b"Hello World",
"calculation": 3 + 4j,
"nested": {
"more_dates": [datetime(2024, 6, 19), datetime(2024, 12, 25)],
"numbers": (1, 2, 3)
}
}
print("=== JSON変換システムのテスト ===")
json_result = safe_json_dumps(complex_data)
print(json_result)
2. ファイル処理システム
from functools import singledispatch
import os
from pathlib import Path
@singledispatch
def process_file(file_path):
"""ファイルを処理する汎用関数"""
return f"未対応のファイル形式: {file_path}"
@process_file.register
def _(file_path: str):
"""文字列パスの処理"""
return process_file(Path(file_path))
@process_file.register
def _(file_path: Path):
"""Pathオブジェクトの処理"""
if not file_path.exists():
return f"ファイルが存在しません: {file_path}"
suffix = file_path.suffix.lower()
if suffix == '.txt':
return process_text_file(file_path)
elif suffix in ['.jpg', '.png', '.gif']:
return process_image_file(file_path)
elif suffix == '.py':
return process_python_file(file_path)
else:
return f"未対応のファイル形式: {suffix}"
def process_text_file(file_path):
"""テキストファイルの処理"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
"type": "テキストファイル",
"size": len(content),
"lines": len(content.splitlines()),
"words": len(content.split())
}
except Exception as e:
return f"テキストファイル読み込みエラー: {e}"
def process_image_file(file_path):
"""画像ファイルの処理"""
file_size = file_path.stat().st_size
return {
"type": "画像ファイル",
"format": file_path.suffix.upper(),
"size_bytes": file_size,
"size_mb": round(file_size / 1024 / 1024, 2)
}
def process_python_file(file_path):
"""Pythonファイルの処理"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
code_lines = [line for line in lines if line.strip() and not line.strip().startswith('#')]
comment_lines = [line for line in lines if line.strip().startswith('#')]
return {
"type": "Pythonファイル",
"total_lines": len(lines),
"code_lines": len(code_lines),
"comment_lines": len(comment_lines),
"imports": len([line for line in lines if line.strip().startswith(('import ', 'from '))])
}
except Exception as e:
return f"Pythonファイル解析エラー: {e}"
# 使用例(実際のファイルがある場合)
sample_files = [
"example.txt",
Path("script.py"),
"image.jpg",
"unknown.xyz"
]
print("\n=== ファイル処理システムのテスト ===")
for file_path in sample_files:
result = process_file(file_path)
print(f"{file_path}: {result}")
cached_property:プロパティのキャッシュ(Python 3.8+)

基本的な使い方
from functools import cached_property
import time
import math
class Circle:
"""円のクラス"""
def __init__(self, radius):
self.radius = radius
print(f"円が作成されました(半径: {radius})")
@cached_property
def area(self):
"""面積を計算(キャッシュ付き)"""
print("面積を計算しています...")
time.sleep(0.1) # 重い計算をシミュレート
return math.pi * self.radius ** 2
@cached_property
def circumference(self):
"""円周を計算(キャッシュ付き)"""
print("円周を計算しています...")
time.sleep(0.1) # 重い計算をシミュレート
return 2 * math.pi * self.radius
@property
def diameter(self):
"""直径(キャッシュなしの通常プロパティ)"""
print("直径を計算しています...")
return 2 * self.radius
def test_cached_property():
print("=== cached_property のテスト ===")
circle = Circle(5)
# 最初のアクセス(計算が実行される)
print(f"面積(1回目): {circle.area:.2f}")
print(f"円周(1回目): {circle.circumference:.2f}")
print(f"直径(1回目): {circle.diameter}")
print("\n--- 2回目のアクセス ---")
# 2回目のアクセス(キャッシュから取得)
print(f"面積(2回目): {circle.area:.2f}") # 計算されない
print(f"円周(2回目): {circle.circumference:.2f}") # 計算されない
print(f"直径(2回目): {circle.diameter}") # 毎回計算される
test_cached_property()
実践的な活用例
1. データ分析クラス
from functools import cached_property
import statistics
import time
class DataAnalyzer:
"""データ分析クラス"""
def __init__(self, data):
self.data = data
print(f"データアナライザが作成されました(データ数: {len(data)})")
@cached_property
def mean(self):
"""平均値(計算コストをシミュレート)"""
print("平均値を計算中...")
time.sleep(0.1)
return statistics.mean(self.data)
@cached_property
def median(self):
"""中央値"""
print("中央値を計算中...")
time.sleep(0.1)
return statistics.median(self.data)
@cached_property
def std_dev(self):
"""標準偏差"""
print("標準偏差を計算中...")
time.sleep(0.1)
return statistics.stdev(self.data) if len(self.data) > 1 else 0
@cached_property
def summary(self):
"""統計サマリー(他のcached_propertyを使用)"""
print("サマリーを作成中...")
return {
"count": len(self.data),
"mean": self.mean,
"median": self.median,
"std_dev": self.std_dev,
"min": min(self.data),
"max": max(self.data)
}
def add_data_point(self, value):
"""データポイントを追加(キャッシュをクリア)"""
self.data.append(value)
# キャッシュされたプロパティをクリア
properties_to_clear = ['mean', 'median', 'std_dev', 'summary']
for prop in properties_to_clear:
if prop in self.__dict__:
delattr(self, prop)
# テスト
def test_data_analyzer():
print("=== データ分析クラスのテスト ===")
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
analyzer = DataAnalyzer(data)
# 最初のアクセス
print(f"平均: {analyzer.mean:.2f}")
print(f"中央値: {analyzer.median:.2f}")
print(f"標準偏差: {analyzer.std_dev:.2f}")
print("\n--- 2回目のアクセス(キャッシュから) ---")
print(f"平均: {analyzer.mean:.2f}")
print(f"サマリー: {analyzer.summary}")
print("\n--- データ追加後 ---")
analyzer.add_data_point(100) # 外れ値を追加
print(f"新しい平均: {analyzer.mean:.2f}") # 再計算される
test_data_analyzer()
2. Webスクレイピングクラス
from functools import cached_property
import time
import random
class WebPageAnalyzer:
"""Webページ分析クラス"""
def __init__(self, url):
self.url = url
print(f"WebページアナライザーInit: {url}")
@cached_property
def page_content(self):
"""ページ内容を取得(実際のHTTPリクエストをシミュレート)"""
print(f"ページを取得中: {self.url}")
time.sleep(1) # ネットワーク遅延をシミュレート
# 実際のリクエストの代わりにダミーデータを返す
return f"Page content from {self.url} " * 100
@cached_property
def word_count(self):
"""単語数をカウント"""
print("単語数を計算中...")
return len(self.page_content.split())
@cached_property
def character_count(self):
"""文字数をカウント"""
print("文字数を計算中...")
return len(self.page_content)
@cached_property
def links(self):
"""リンクを抽出(シミュレート)"""
print("リンクを抽出中...")
time.sleep(0.5)
# 実際のリンク抽出の代わりにダミーリンクを生成
return [f"https://example.com/link{i}" for i in range(random.randint(5, 15))]
@cached_property
def analysis_report(self):
"""分析レポートを生成"""
print("分析レポートを生成中...")
return {
"url": self.url,
"word_count": self.word_count,
"character_count": self.character_count,
"link_count": len(self.links),
"avg_words_per_link": self.word_count / len(self.links) if self.links else 0
}
# テスト
def test_web_analyzer():
print("=== Webページ分析クラスのテスト ===")
analyzer = WebPageAnalyzer("https://example.com")
print("最初のアクセス:")
print(f"単語数: {analyzer.word_count}")
print(f"文字数: {analyzer.character_count}")
print("\n2回目のアクセス(キャッシュから):")
print(f"リンク数: {len(analyzer.links)}")
print(f"レポート: {analyzer.analysis_report}")
test_web_analyzer()
total_ordering:比較演算子の自動生成
基本的な使い方
from functools import total_ordering
@total_ordering
class Student:
"""学生クラス(成績で比較)"""
def __init__(self, name, grade):
self.name = name
self.grade = grade
def __eq__(self, other):
"""等価判定"""
if not isinstance(other, Student):
return NotImplemented
return self.grade == other.grade
def __lt__(self, other):
"""未満判定(これだけ定義すれば他の比較演算子が自動生成される)"""
if not isinstance(other, Student):
return NotImplemented
return self.grade < other.grade
def __repr__(self):
return f"Student('{self.name}', {self.grade})"
# テスト
def test_total_ordering():
print("=== total_ordering のテスト ===")
students = [
Student("太郎", 85),
Student("花子", 92),
Student("次郎", 78),
Student("美咲", 95)
]
print("元の順序:")
for student in students:
print(f" {student}")
# ソート(__lt__と__eq__から他の比較演算子が自動生成される)
sorted_students = sorted(students)
print("\n成績順(昇順):")
for student in sorted_students:
print(f" {student}")
# 各種比較演算子のテスト
print(f"\n比較テスト:")
student1, student2 = students[0], students[1]
print(f"{student1.name} vs {student2.name}:")
print(f" == : {student1 == student2}")
print(f" != : {student1 != student2}")
print(f" < : {student1 < student2}")
print(f" <= : {student1 <= student2}")
print(f" > : {student1 > student2}")
print(f" >= : {student1 >= student2}")
test_total_ordering()
実践的な活用例
1. バージョン比較クラス
from functools import total_ordering
import re
@total_ordering
class Version:
"""ソフトウェアバージョンの比較クラス"""
def __init__(self, version_string):
self.version_string = version_string
self.parts = self._parse_version(version_string)
def _parse_version(self, version_string):
"""バージョン文字列を解析"""
# "1.2.3-alpha" -> [1, 2, 3, "alpha"]
match = re.match(r'^(\d+)\.(\d+)\.(\d+)(?:-(.+))?, version_string)
if not match:
raise ValueError(f"無効なバージョン形式: {version_string}")
major, minor, patch, suffix = match.groups()
parts = [int(major), int(minor), int(patch)]
if suffix:
parts.append(suffix)
return parts
def __eq__(self, other):
if not isinstance(other, Version):
return NotImplemented
return self.parts == other.parts
def __lt__(self, other):
if not isinstance(other, Version):
return NotImplemented
# 数値部分の比較
for i in range(min(3, len(self.parts), len(other.parts))):
if isinstance(self.parts[i], int) and isinstance(other.parts[i], int):
if self.parts[i] != other.parts[i]:
return self.parts[i] < other.parts[i]
# サフィックスの比較
self_has_suffix = len(self.parts) > 3
other_has_suffix = len(other.parts) > 3
if self_has_suffix and not other_has_suffix:
return True # プレリリース版は正式版より小さい
elif not self_has_suffix and other_has_suffix:
return False
elif self_has_suffix and other_has_suffix:
return self.parts[3] < other.parts[3]
return False
def __repr__(self):
return f"Version('{self.version_string}')"
# テスト
def test_version_comparison():
print("=== バージョン比較のテスト ===")
versions = [
Version("2.1.0"),
Version("1.0.0"),
Version("2.0.0"),
Version("2.1.0-beta"),
Version("2.1.0-alpha"),
Version("1.9.5"),
Version("2.0.1")
]
print("ソート前:")
for v in versions:
print(f" {v}")
sorted_versions = sorted(versions)
print("\nソート後:")
for v in sorted_versions:
print(f" {v}")
# 比較例
v1, v2 = Version("2.0.0"), Version("2.0.0-beta")
print(f"\n{v1} > {v2}: {v1 > v2}")
test_version_comparison()
2. 優先度付きタスククラス
from functools import total_ordering
from datetime import datetime, timedelta
@total_ordering
class Task:
"""優先度付きタスククラス"""
PRIORITY_LEVELS = {"low": 1, "medium": 2, "high": 3, "urgent": 4}
def __init__(self, title, priority="medium", due_date=None):
self.title = title
self.priority = priority
self.due_date = due_date
self.created_at = datetime.now()
if priority not in self.PRIORITY_LEVELS:
raise ValueError(f"無効な優先度: {priority}")
@property
def priority_value(self):
return self.PRIORITY_LEVELS[self.priority]
@property
def days_until_due(self):
if self.due_date is None:
return float('inf') # 期限なしは最後
return (self.due_date - datetime.now()).days
def __eq__(self, other):
if not isinstance(other, Task):
return NotImplemented
return (self.priority_value, -self.days_until_due) == (other.priority_value, -other.days_until_due)
def __lt__(self, other):
if not isinstance(other, Task):
return NotImplemented
# 優先度が高いほど先に(降順)
if self.priority_value != other.priority_value:
return self.priority_value > other.priority_value
# 優先度が同じ場合は期限が近いほど先に
return self.days_until_due < other.days_until_due
def __repr__(self):
due_str = self.due_date.strftime("%Y-%m-%d") if self.due_date else "期限なし"
return f"Task('{self.title}', {self.priority}, {due_str})"
# テスト
def test_task_prioritization():
print("=== タスク優先順位のテスト ===")
now = datetime.now()
tasks = [
Task("会議資料作成", "medium", now + timedelta(days=3)),
Task("システム障害対応", "urgent", now + timedelta(hours=2)),
Task("来月の計画書", "low", now + timedelta(days=20)),
Task("クライアント報告", "high", now + timedelta(days=1)),
Task("コードレビュー", "medium", now + timedelta(days=2)),
Task("長期プロジェクト", "low"), # 期限なし
]
print("ソート前:")
for task in tasks:
print(f" {task}")
sorted_tasks = sorted(tasks)
print("\n優先順位順:")
for i, task in enumerate(sorted_tasks, 1):
print(f" {i}. {task}")
test_task_prioritization()
高度な応用例:複数機能の組み合わせ

デコレータファクトリーとキャッシュの組み合わせ
from functools import wraps, lru_cache
import time
import logging
def performance_cache(maxsize=128, typed=False, log_performance=True):
"""パフォーマンス監視機能付きキャッシュデコレータ"""
def decorator(func):
# まずlru_cacheを適用
cached_func = lru_cache(maxsize=maxsize, typed=typed)(func)
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
# キャッシュ情報を事前に取得
cache_info_before = cached_func.cache_info()
# 関数実行
result = cached_func(*args, **kwargs)
# パフォーマンス測定
execution_time = time.time() - start_time
cache_info_after = cached_func.cache_info()
# キャッシュヒットかどうかを判定
cache_hit = cache_info_after.hits > cache_info_before.hits
if log_performance:
hit_status = "HIT" if cache_hit else "MISS"
print(f"{func.__name__} [{hit_status}]: {execution_time:.4f}秒")
if not cache_hit:
print(f" キャッシュ統計: {cache_info_after}")
return result
# キャッシュ管理メソッドを追加
wrapper.cache_info = cached_func.cache_info
wrapper.cache_clear = cached_func.cache_clear
return wrapper
return decorator
@performance_cache(maxsize=50, log_performance=True)
def expensive_fibonacci(n):
"""計算コストの高いフィボナッチ数列"""
if n < 2:
return n
return expensive_fibonacci(n-1) + expensive_fibonacci(n-2)
def test_performance_cache():
print("=== パフォーマンスキャッシュのテスト ===")
# 最初の計算(キャッシュミス)
print("最初の計算:")
result1 = expensive_fibonacci(20)
print(f"結果: {result1}")
print("\n2回目の計算(キャッシュヒット):")
result2 = expensive_fibonacci(20)
print(f"結果: {result2}")
print("\nより大きな値での計算:")
result3 = expensive_fibonacci(25)
print(f"結果: {result3}")
test_performance_cache()
関数合成とpartialの組み合わせ
from functools import partial, reduce
import operator
def compose(*functions):
"""関数を合成する"""
return reduce(lambda f, g: lambda x: f(g(x)), functions, lambda x: x)
def pipeline(*functions):
"""データパイプラインを作成"""
def process_data(data):
return reduce(lambda result, func: func(result), functions, data)
return process_data
# データ処理関数群
def add_tax(rate):
"""税率を加算する関数を返す"""
return lambda price: price * (1 + rate)
def apply_discount(rate):
"""割引率を適用する関数を返す"""
return lambda price: price * (1 - rate)
def round_price(decimals=0):
"""価格を四捨五入する関数を返す"""
return lambda price: round(price, decimals)
def format_currency(symbol="¥"):
"""通貨記号を付ける関数を返す"""
return lambda price: f"{symbol}{price:,.0f}"
def test_function_composition():
print("=== 関数合成とパイプラインのテスト ===")
# 基本価格
base_prices = [1000, 2500, 5000, 8000]
# 価格処理パイプライン1: 10%割引 → 8%税込み → 四捨五入 → 通貨表示
process_sale_price = pipeline(
apply_discount(0.10), # 10%割引
add_tax(0.08), # 8%税込み
round_price(0), # 四捨五入
format_currency("¥") # 通貨表示
)
# 価格処理パイプライン2: 5%割引 → 10%税込み → 通貨表示(ドル)
process_export_price = pipeline(
apply_discount(0.05),
add_tax(0.10),
round_price(2),
format_currency("$")
)
print("国内セール価格:")
for price in base_prices:
processed = process_sale_price(price)
print(f" {price:,}円 → {processed}")
print("\n輸出価格(ドル換算):")
# 為替レート適用のためのpartial
jpy_to_usd = partial(operator.truediv, 1) # 1ドル = 150円と仮定
jpy_to_usd = partial(operator.truediv, b=150)
export_pipeline = pipeline(
lambda price: price / 150, # 為替変換
apply_discount(0.05),
add_tax(0.10),
round_price(2),
format_currency("$")
)
for price in base_prices:
processed = export_pipeline(price)
print(f" {price:,}円 → {processed}")
test_function_composition()
まとめとベストプラクティス
functoolsを効果的に使うためのガイドライン
def functools_best_practices():
"""functoolsのベストプラクティス"""
best_practices = {
"partial": [
"引数の一部を固定して専用関数を作成する際に使用",
"設定パターンやイベントハンドラーの作成に効果的",
"partialオブジェクトの.func, .args, .keywordsで情報確認可能",
"過度の使用は可読性を損なう可能性があるため注意"
],
"lru_cache": [
"計算コストの高い関数に適用",
"maxsizeは用途に応じて適切に設定(None=無制限、128=デフォルト)",
"キャッシュ統計をcache_info()で定期的に確認",
"メモリ使用量に注意し、必要に応じてcache_clear()でクリア",
"可変な引数(list、dictなど)はハッシュ化できないため注意"
],
"wraps": [
"デコレータ作成時は必ず使用",
"元の関数の__name__, __doc__, __module__などを保持",
"デバッグやドキュメント生成に重要",
"functools.WRAPPER_ASSIGNMENTSで追加属性も指定可能"
],
"singledispatch": [
"型による処理分岐が必要な場合に使用",
"if-elif文の羅列よりも保守性が高い",
"新しい型への対応が容易",
"パフォーマンスも良好"
],
"cached_property": [
"計算コストの高いプロパティに使用",
"一度計算したら値が変わらないプロパティに適用",
"データが変更された場合は手動でキャッシュクリアが必要",
"Python 3.8以降で使用可能"
]
}
print("=== functoolsベストプラクティス ===")
for tool, practices in best_practices.items():
print(f"\n{tool}:")
for practice in practices:
print(f" • {practice}")
print("\n=== 共通の注意点 ===")
common_warnings = [
"メモリ使用量を定期的に監視する",
"パフォーマンステストで効果を検証する",
"過度な最適化よりもコードの可読性を重視する",
"チーム開発では使用する機能について共有する",
"Pythonバージョンによる機能の違いに注意する"
]
for warning in common_warnings:
print(f" • {warning}")
functools_best_practices()
パフォーマンス比較とまとめ
import time
from functools import lru_cache, partial
def performance_comparison():
"""functoolsによる性能改善の比較"""
print("=== パフォーマンス比較まとめ ===")
# 1. キャッシュなし vs キャッシュあり
def fibonacci_no_cache(n):
if n < 2: return n
return fibonacci_no_cache(n-1) + fibonacci_no_cache(n-2)
@lru_cache(maxsize=None)
def fibonacci_with_cache(n):
if n < 2: return n
return fibonacci_with_cache(n-1) + fibonacci_with_cache(n-2)
test_n = 35
# キャッシュなしのテスト
start = time.time()
result_no_cache = fibonacci_no_cache(test_n)
time_no_cache = time.time() - start
# キャッシュありのテスト
start = time.time()
result_with_cache = fibonacci_with_cache(test_n)
time_with_cache = time.time() - start
print(f"フィボナッチ数列(n={test_n}):")
print(f" キャッシュなし: {time_no_cache:.4f}秒")
print(f" キャッシュあり: {time_with_cache:.6f}秒")
print(f" 改善倍率: {time_no_cache / time_with_cache:.0f}倍高速化")
# 2. partial使用例
def multiply(a, b, c=1, d=1):
return a * b * c * d
# 通常の関数呼び出し
regular_calls = []
start = time.time()
for i in range(10000):
result = multiply(i, 2, 3, 4)
regular_calls.append(result)
time_regular = time.time() - start
# partial使用
optimized_multiply = partial(multiply, b=2, c=3, d=4)
partial_calls = []
start = time.time()
for i in range(10000):
result = optimized_multiply(i)
partial_calls.append(result)
time_partial = time.time() - start
print(f"\n関数呼び出し(10,000回):")
print(f" 通常の呼び出し: {time_regular:.4f}秒")
print(f" partial使用: {time_partial:.4f}秒")
print(f" 改善率: {((time_regular - time_partial) / time_regular * 100):.1f}%")
performance_comparison()
実際のプロジェクトでの活用例
from functools import lru_cache, partial, wraps, singledispatch
import json
import time
from datetime import datetime
class WebAPIClient:
"""functoolsを活用したWebAPIクライアントの例"""
def __init__(self, base_url, api_key):
self.base_url = base_url
self.api_key = api_key
# partialを使ってAPI呼び出し関数を事前設定
self.get_user = partial(self._api_request, "GET", "/users")
self.get_posts = partial(self._api_request, "GET", "/posts")
self.create_post = partial(self._api_request, "POST", "/posts")
@lru_cache(maxsize=100)
def _api_request(self, method, endpoint, params=None):
"""API リクエスト(キャッシュ付き)"""
print(f"API呼び出し: {method} {self.base_url}{endpoint}")
time.sleep(0.1) # ネットワーク遅延をシミュレート
# 実際のAPIレスポンスをシミュレート
if endpoint == "/users":
return {"users": [{"id": 1, "name": "太郎"}, {"id": 2, "name": "花子"}]}
elif endpoint == "/posts":
return {"posts": [{"id": 1, "title": "投稿1"}, {"id": 2, "title": "投稿2"}]}
else:
return {"status": "success"}
def api_cache_monitor(func):
"""APIキャッシュ監視デコレータ"""
@wraps(func)
def wrapper(self, *args, **kwargs):
print(f"--- {func.__name__} 実行開始 ---")
# キャッシュ情報を取得
if hasattr(self, '_api_request'):
cache_info_before = self._api_request.cache_info()
print(f"キャッシュ状態(実行前): {cache_info_before}")
result = func(self, *args, **kwargs)
if hasattr(self, '_api_request'):
cache_info_after = self._api_request.cache_info()
print(f"キャッシュ状態(実行後): {cache_info_after}")
print(f"--- {func.__name__} 実行完了 ---\n")
return result
return wrapper
class APIService:
"""APIサービスクラス"""
def __init__(self):
self.client = WebAPIClient("https://api.example.com", "secret-key")
@api_cache_monitor
def get_user_dashboard(self, user_id):
"""ユーザーダッシュボードデータを取得"""
users = self.client.get_user()
posts = self.client.get_posts()
return {
"user_data": users,
"recent_posts": posts,
"timestamp": datetime.now().isoformat()
}
def test_real_world_example():
print("=== 実際のプロジェクトでの活用例 ===")
service = APIService()
# 最初の呼び出し
print("1回目の呼び出し(新規API呼び出し):")
dashboard1 = service.get_user_dashboard(1)
# 2回目の呼び出し(キャッシュヒット)
print("2回目の呼び出し(キャッシュから取得):")
dashboard2 = service.get_user_dashboard(1)
# キャッシュ統計の確認
print("最終キャッシュ統計:")
print(service.client._api_request.cache_info())
test_real_world_example()
エラーハンドリングとfunctoolsの組み合わせ
from functools import wraps, lru_cache
import time
import random
def retry_with_cache(max_retries=3, delay=1, exceptions=(Exception,)):
"""リトライ機能付きキャッシュデコレータ"""
def decorator(func):
@lru_cache(maxsize=128)
def cached_func(*args, **kwargs):
"""内部的にキャッシュされる関数"""
return func(*args, **kwargs)
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
# まずキャッシュをチェック
return cached_func(*args, **kwargs)
except exceptions as e:
last_exception = e
print(f"{func.__name__} 失敗 (試行 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(delay)
# キャッシュをクリアして再試行
cached_func.cache_clear()
raise last_exception
# キャッシュ管理メソッドを公開
wrapper.cache_info = cached_func.cache_info
wrapper.cache_clear = cached_func.cache_clear
return wrapper
return decorator
@retry_with_cache(max_retries=3, delay=0.5)
def unreliable_data_fetch(data_id):
"""不安定なデータ取得をシミュレート"""
print(f"データ取得試行: ID={data_id}")
# 60%の確率で失敗
if random.random() < 0.6:
raise ConnectionError(f"データID {data_id} の取得に失敗")
return f"データ内容: {data_id}"
def test_error_handling_with_functools():
print("=== エラーハンドリングとfunctoolsの組み合わせ ===")
# 成功するまで複数回試行
try:
result1 = unreliable_data_fetch("user_001")
print(f"成功: {result1}")
except ConnectionError as e:
print(f"最終的に失敗: {e}")
print(f"キャッシュ状態: {unreliable_data_fetch.cache_info()}")
# 同じデータの再取得(キャッシュから)
try:
result2 = unreliable_data_fetch("user_001")
print(f"キャッシュから取得: {result2}")
except ConnectionError as e:
print(f"キャッシュでも失敗: {e}")
test_error_handling_with_functools()
まとめ:functoolsでPythonコードをレベルアップしよう
functoolsは、Pythonの「関数を第一級オブジェクトとして扱う」哲学を体現するモジュールです。
適切に使用することで、コードの効率性、可読性、保守性を大幅に向上させることができます。
重要なポイントのまとめ
- partial: 関数の一部引数を固定して、より使いやすい関数を作成
- lru_cache: 計算結果をキャッシュして処理速度を劇的に改善
- wraps: デコレータ作成時に元の関数情報を保持
- singledispatch: 型による処理分岐で保守性の高いコードを実現
- cached_property: 重い計算を伴うプロパティのパフォーマンス改善
コメント