Pythonのfunctools
モジュールは、関数型プログラミングの要素を取り入れた便利な機能を提供しています。
「関数を加工する関数」や「パフォーマンスを向上させるデコレータ」など、プログラムをより効率的で読みやすくするためのツールが揃っています。
この記事では、functools
に含まれる主な関数とその使い方を、初心者にもわかりやすく解説します。
「関数型プログラミングは難しそう…」と思うかもしれませんが、実は日常的なプログラミングで非常に役立つ機能ばかりです。
適切に使えば、コードの重複を減らし、パフォーマンスを向上させ、保守性を高めることができます。
実際のコード例とともに、各関数の使いどころや注意点も紹介するので、ぜひ最後まで読んで実践してみてください。
functools モジュールの概要と重要性

functools の基本的な役割
関数を操作するための道具箱
- 既存の関数を加工して新しい関数を作る
- 関数の実行結果をキャッシュして高速化
- デコレータの作成を支援
- クラスの機能を自動的に拡張
なぜ functools が重要なのか
- コードの再利用性が向上
- パフォーマンスの改善が簡単に実現
- プログラムの保守性が向上
- Pythonらしい簡潔なコードが書ける
Python における関数型プログラミング
関数型プログラミングの考え方
- 関数を値として扱う(第一級関数)
- 副作用を避けて純粋な関数を作る
- 関数の合成により複雑な処理を構築
- 不変性を重視したデータ処理
functools が提供する価値
- 関数型の考え方を手軽に導入
- オブジェクト指向との適切な組み合わせ
- Pythonの標準的なコーディングスタイルに適合
functools の各機能を理解することで、より洗練されたPythonコードが書けるようになります。まずは、最もよく使われる関数から見ていきましょう。
partial() – 引数を固定して新しい関数を作る

partial() の基本的な使い方
基本概念 partial()
は、既存の関数に一部の引数をあらかじめ設定して、新しい関数を作る機能です。
from functools import partial
# 基本的な例
def multiply(x, y):
return x * y
# yを10に固定した新しい関数を作成
multiply_by_10 = partial(multiply, y=10)
print(multiply_by_10(5)) # 50 (5 * 10)
print(multiply_by_10(3)) # 30 (3 * 10)
実用的な活用例
設定値を固定したAPI呼び出し
import requests
from functools import partial
# 基本のAPI呼び出し関数
def api_request(endpoint, base_url, headers=None, timeout=30):
url = f"{base_url}/{endpoint}"
return requests.get(url, headers=headers, timeout=timeout)
# 特定のAPIサーバー用の関数を作成
github_api = partial(
api_request,
base_url="https://api.github.com",
headers={"Accept": "application/vnd.github+json"},
timeout=10
)
# 使用時はエンドポイントだけ指定
user_data = github_api("user")
repos_data = github_api("user/repos")
数学関数の特殊化
import math
from functools import partial
# 汎用の累乗関数
def power(base, exponent):
return base ** exponent
# よく使う関数を作成
square = partial(power, exponent=2)
cube = partial(power, exponent=3)
print(square(4)) # 16
print(cube(3)) # 27
# 対数関数の特殊化
log10 = partial(math.log, base=10)
print(log10(100)) # 2.0
GUI アプリケーションでの活用
イベントハンドラーの簡素化
import tkinter as tk
from functools import partial
class Calculator:
def __init__(self):
self.root = tk.Tk()
self.result = tk.StringVar()
self.setup_ui()
def add_number(self, number):
current = self.result.get()
self.result.set(current + str(number))
def setup_ui(self):
# 結果表示
entry = tk.Entry(self.root, textvariable=self.result)
entry.pack()
# 数字ボタンを動的に作成
for i in range(10):
# partial を使ってボタンごとに異なる数字を設定
btn = tk.Button(
self.root,
text=str(i),
command=partial(self.add_number, i)
)
btn.pack(side=tk.LEFT)
データ処理での応用
リスト処理の関数化
from functools import partial
# 汎用フィルタリング関数
def filter_by_threshold(data, threshold, comparison):
if comparison == 'greater':
return [x for x in data if x > threshold]
elif comparison == 'less':
return [x for x in data if x < threshold]
else:
return [x for x in data if x == threshold]
# 特定の条件でフィルタリングする関数を作成
filter_high = partial(filter_by_threshold, threshold=50, comparison='greater')
filter_low = partial(filter_by_threshold, threshold=20, comparison='less')
data = [10, 25, 35, 55, 75, 15, 85]
print(filter_high(data)) # [55, 75, 85]
print(filter_low(data)) # [10, 15]
partial の注意点と制限
可変引数との組み合わせ
def flexible_func(*args, **kwargs):
print(f"args: {args}")
print(f"kwargs: {kwargs}")
# partial でキーワード引数を固定
fixed_func = partial(flexible_func, x=10, y=20)
fixed_func(1, 2, z=30)
# args: (1, 2)
# kwargs: {'x': 10, 'y': 20, 'z': 30}
引数の順序に注意
def divide(a, b):
return a / b
# 正しい使い方
half = partial(divide, b=2)
print(half(10)) # 5.0
# 間違いやすい例
# divide_by = partial(divide, 2) # aが2に固定される
# print(divide_by(10)) # 0.2 (2/10)
partial は関数の引数を部分的に固定することで、より専用性の高い関数を簡単に作れる便利な機能です。次は、メソッド版である partialmethod について見てみましょう。
partialmethod() – クラスメソッドの引数を固定

partialmethod() の基本概念
partialmethod と partial の違い
partial()
: 通常の関数用partialmethod()
: クラスのメソッド用に最適化
from functools import partialmethod
class Calculator:
def operation(self, a, b, op):
if op == 'add':
return a + b
elif op == 'subtract':
return a - b
elif op == 'multiply':
return a * b
elif op == 'divide':
return a / b if b != 0 else None
# 特定の操作用のメソッドを自動生成
add = partialmethod(operation, op='add')
subtract = partialmethod(operation, op='subtract')
multiply = partialmethod(operation, op='multiply')
divide = partialmethod(operation, op='divide')
# 使用例
calc = Calculator()
print(calc.add(5, 3)) # 8
print(calc.multiply(4, 7)) # 28
実際のアプリケーション例
ファイル処理クラスでの活用
import json
import csv
from functools import partialmethod
class FileProcessor:
def save_data(self, data, filename, file_format, encoding='utf-8'):
if file_format == 'json':
with open(filename, 'w', encoding=encoding) as f:
json.dump(data, f, ensure_ascii=False, indent=2)
elif file_format == 'csv':
with open(filename, 'w', encoding=encoding, newline='') as f:
if data and isinstance(data[0], dict):
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
# 形式ごとの専用メソッドを作成
save_json = partialmethod(save_data, file_format='json')
save_csv = partialmethod(save_data, file_format='csv')
save_json_utf8 = partialmethod(save_data, file_format='json', encoding='utf-8')
# 使用例
processor = FileProcessor()
data = [{'name': '田中', 'age': 30}, {'name': '佐藤', 'age': 25}]
processor.save_json(data, 'users.json')
processor.save_csv(data, 'users.csv')
HTTP クライアントでの応用
REST API クライアントの実装
import requests
from functools import partialmethod
class APIClient:
def __init__(self, base_url, auth_token=None):
self.base_url = base_url
self.headers = {'Authorization': f'Bearer {auth_token}'} if auth_token else {}
def request(self, endpoint, method='GET', data=None, params=None):
url = f"{self.base_url}/{endpoint.lstrip('/')}"
response = requests.request(
method=method,
url=url,
headers=self.headers,
json=data,
params=params
)
response.raise_for_status()
return response.json()
# HTTP メソッドごとの専用メソッド
get = partialmethod(request, method='GET')
post = partialmethod(request, method='POST')
put = partialmethod(request, method='PUT')
delete = partialmethod(request, method='DELETE')
# 使用例
api = APIClient('https://api.example.com', 'your-token')
# 簡潔なメソッド呼び出し
users = api.get('users')
new_user = api.post('users', data={'name': '新規ユーザー'})
updated_user = api.put('users/123', data={'name': '更新ユーザー'})
api.delete('users/123')
データベース操作での活用
クエリビルダーの実装
from functools import partialmethod
class QueryBuilder:
def __init__(self, table_name):
self.table_name = table_name
def query(self, operation, columns='*', where=None, order_by=None, limit=None):
sql_parts = []
if operation == 'SELECT':
sql_parts.append(f"SELECT {columns} FROM {self.table_name}")
elif operation == 'DELETE':
sql_parts.append(f"DELETE FROM {self.table_name}")
if where:
sql_parts.append(f"WHERE {where}")
if order_by:
sql_parts.append(f"ORDER BY {order_by}")
if limit:
sql_parts.append(f"LIMIT {limit}")
return ' '.join(sql_parts)
# 操作ごとの専用メソッド
select = partialmethod(query, operation='SELECT')
delete = partialmethod(query, operation='DELETE')
# 使用例
users_query = QueryBuilder('users')
# 簡潔なクエリ生成
print(users_query.select(columns='name, email', where='age > 20'))
# SELECT name, email FROM users WHERE age > 20
print(users_query.delete(where='status = "inactive"'))
# DELETE FROM users WHERE status = "inactive"
クラスデコレータとの組み合わせ
自動メソッド生成
from functools import partialmethod
def auto_crud_methods(cls):
"""CRUD操作用のメソッドを自動生成するデコレータ"""
crud_operations = ['create', 'read', 'update', 'delete']
for operation in crud_operations:
method = partialmethod(cls.execute_operation, operation=operation)
setattr(cls, operation, method)
return cls
@auto_crud_methods
class UserService:
def execute_operation(self, data=None, user_id=None, operation=None):
return f"{operation.upper()} operation: data={data}, id={user_id}"
# 使用例
service = UserService()
print(service.create(data={'name': 'John'}))
print(service.read(user_id=123))
print(service.update(data={'name': 'Jane'}, user_id=123))
print(service.delete(user_id=123))
partialmethod は、クラス設計において重複したメソッド定義を避け、保守性の高いコードを書くために非常に有用です。
次は、リスト処理でよく使われる reduce() について説明します。
reduce() – リストを一つの値に集約

reduce() の基本的な仕組み
reduce の動作原理 連続した要素を2引数関数で順次処理し、最終的に1つの値を得る機能です。
from functools import reduce
# 基本的な例:数値の合計
numbers = [1, 2, 3, 4, 5]
total = reduce(lambda x, y: x + y, numbers)
print(total) # 15
# 同じ処理の詳細な動作
# Step 1: 1 + 2 = 3
# Step 2: 3 + 3 = 6
# Step 3: 6 + 4 = 10
# Step 4: 10 + 5 = 15
初期値を指定した reduce
from functools import reduce
# 初期値ありの例
numbers = [1, 2, 3, 4, 5]
total_with_initial = reduce(lambda x, y: x + y, numbers, 100)
print(total_with_initial) # 115 (100 + 1 + 2 + 3 + 4 + 5)
# 空のリストでも安全
empty_list = []
safe_total = reduce(lambda x, y: x + y, empty_list, 0)
print(safe_total) # 0
実用的な reduce の活用例
リストの中の最大値・最小値
from functools import reduce
# 最大値を見つける
numbers = [23, 45, 12, 67, 34, 89, 15]
maximum = reduce(lambda x, y: x if x > y else y, numbers)
print(maximum) # 89
# より読みやすい書き方
def get_max(a, b):
return a if a > b else b
maximum = reduce(get_max, numbers)
print(maximum) # 89
# 組み込み関数との比較
print(max(numbers)) # 89 (通常はこちらを使用)
辞書のマージ
from functools import reduce
# 複数の辞書を統合
dicts = [
{'a': 1, 'b': 2},
{'c': 3, 'd': 4},
{'e': 5, 'f': 6}
]
merged = reduce(lambda x, y: {**x, **y}, dicts, {})
print(merged) # {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6}
# より実用的な例:設定の統合
default_config = {'debug': False, 'timeout': 30}
user_configs = [
{'debug': True},
{'timeout': 60, 'retries': 3},
{'log_level': 'INFO'}
]
final_config = reduce(
lambda base, update: {**base, **update},
user_configs,
default_config
)
print(final_config)
# {'debug': True, 'timeout': 60, 'retries': 3, 'log_level': 'INFO'}
複雑なデータ処理での応用
ネストした構造の平坦化
from functools import reduce
# 2次元リストの平坦化
nested_lists = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
flattened = reduce(lambda acc, lst: acc + lst, nested_lists, [])
print(flattened) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
# より複雑な例:辞書のリストから特定の値を抽出して統合
products = [
{'name': 'りんご', 'tags': ['果物', '赤い', '甘い']},
{'name': 'トマト', 'tags': ['野菜', '赤い', '栄養豊富']},
{'name': 'バナナ', 'tags': ['果物', '黄色い', '甘い']}
]
all_tags = reduce(
lambda acc, product: acc.union(set(product['tags'])),
products,
set()
)
print(all_tags) # {'果物', '赤い', '甘い', '野菜', '栄養豊富', '黄色い'}
数値計算での応用
from functools import reduce
import operator
# 階乗の計算
def factorial(n):
if n <= 1:
return 1
return reduce(operator.mul, range(1, n + 1))
print(factorial(5)) # 120 (5! = 5×4×3×2×1)
# 文字列の処理
words = ['Python', 'は', '素晴らしい', 'プログラミング', '言語', 'です']
sentence = reduce(lambda acc, word: acc + ' ' + word, words)
print(sentence) # Python は 素晴らしい プログラミング 言語 です
# よりエレガントな文字列結合
sentence2 = reduce(operator.add, [word + ' ' for word in words]).strip()
print(sentence2) # Python は 素晴らしい プログラミング 言語 です
reduce の代替手段との比較
いつ reduce を使うべきか
from functools import reduce
# sum() で十分な場合
numbers = [1, 2, 3, 4, 5]
print(sum(numbers)) # 15 (推奨)
print(reduce(lambda x, y: x + y, numbers)) # 15 (不要)
# all() で十分な場合
booleans = [True, True, False, True]
print(all(booleans)) # False (推奨)
print(reduce(lambda x, y: x and y, booleans)) # False (不要)
# reduce が適している場合
custom_operation = reduce(
lambda acc, item: acc * 2 + item,
[1, 2, 3],
0
)
print(custom_operation) # 11 (0*2+1=1, 1*2+2=4, 4*2+3=11)
reduce は強力な機能ですが、多くの場合は専用の組み込み関数(sum、max、min など)の方が読みやすくて効率的です。独自の累積処理が必要な場合に使用しましょう。
キャッシュ機能 – lru_cache(), cache(), cached_property()
lru_cache() – 関数結果のキャッシュ
基本的な使い方
from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_function(n):
"""時間のかかる計算をシミュレート"""
print(f"計算中: {n}")
result = sum(i * i for i in range(n))
return result
# 初回は計算が実行される
print(expensive_function(1000)) # "計算中: 1000" が表示される
print(expensive_function(1000)) # キャッシュから取得(表示されない)
print(expensive_function(2000)) # "計算中: 2000" が表示される
フィボナッチ数列での劇的な高速化
from functools import lru_cache
import time
# キャッシュなしの非効率な実装
def fibonacci_slow(n):
if n < 2:
return n
return fibonacci_slow(n - 1) + fibonacci_slow(n - 2)
# キャッシュありの効率的な実装
@lru_cache(maxsize=None)
def fibonacci_fast(n):
if n < 2:
return n
return fibonacci_fast(n - 1) + fibonacci_fast(n - 2)
# 性能比較
start = time.time()
result_slow = fibonacci_slow(30)
time_slow = time.time() - start
start = time.time()
result_fast = fibonacci_fast(30)
time_fast = time.time() - start
print(f"結果: {result_slow} (両方同じ)")
print(f"キャッシュなし: {time_slow:.3f}秒")
print(f"キャッシュあり: {time_fast:.3f}秒")
print(f"高速化倍率: {time_slow/time_fast:.1f}倍")
cache() – 無制限キャッシュ (Python 3.9+)
cache() の簡潔な使い方
from functools import cache
@cache # lru_cache(maxsize=None) と同等
def prime_check(n):
"""素数判定(簡単な実装)"""
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
# 一度計算した結果は永続的にキャッシュされる
print(prime_check(97)) # True (計算実行)
print(prime_check(97)) # True (キャッシュから取得)
cached_property() – クラス属性のキャッシュ
重い計算を行うプロパティの最適化
from functools import cached_property
import time
class DataAnalyzer:
def __init__(self, data):
self.data = data
@cached_property
def expensive_analysis(self):
"""時間のかかる分析処理"""
print("分析実行中...")
time.sleep(1) # 重い処理をシミュレート
return {
'mean': sum(self.data) / len(self.data),
'max': max(self.data),
'min': min(self.data)
}
@cached_property
def data_summary(self):
"""分析結果に基づくサマリー"""
analysis = self.expensive_analysis
return f"平均: {analysis['mean']:.2f}, 範囲: {analysis['min']}-{analysis['max']}"
# 使用例
analyzer = DataAnalyzer([1, 5, 3, 9, 2, 7, 4])
# 初回アクセス時のみ計算される
print(analyzer.expensive_analysis) # "分析実行中..." が表示
print(analyzer.data_summary) # キャッシュを利用
# 再アクセス時はキャッシュから取得
print(analyzer.expensive_analysis) # 即座に結果が返る
実際のアプリケーションでの活用
Web API レスポンスのキャッシュ
from functools import lru_cache
import requests
import time
class APIClient:
@lru_cache(maxsize=100)
def get_user_data(self, user_id):
"""ユーザーデータをAPIから取得(キャッシュ付き)"""
print(f"API呼び出し: user_id={user_id}")
# 実際のAPI呼び出しの代わり
time.sleep(0.5) # ネットワーク遅延をシミュレート
return {'id': user_id, 'name': f'User{user_id}', 'email': f'user{user_id}@example.com'}
def get_cache_info(self):
"""キャッシュの統計情報"""
return self.get_user_data.cache_info()
def clear_cache(self):
"""キャッシュをクリア"""
self.get_user_data.cache_clear()
# 使用例
client = APIClient()
# 初回はAPI呼び出し
user1 = client.get_user_data(123)
print(f"取得: {user1}")
# 2回目はキャッシュから取得
user1_cached = client.get_user_data(123)
print(f"キャッシュ: {user1_cached}")
# キャッシュ情報の確認
print(client.get_cache_info())
# CacheInfo(hits=1, misses=1, maxsize=100, currsize=1)
キャッシュ管理のベストプラクティス
適切なキャッシュサイズの設定
from functools import lru_cache
# メモリ使用量を考慮したサイズ設定
@lru_cache(maxsize=1000) # 中程度のキャッシュサイズ
def moderate_cache_function(param):
return expensive_computation(param)
@lru_cache(maxsize=32) # 小さなキャッシュサイズ
def small_cache_function(param):
return quick_but_frequent_computation(param)
条件付きキャッシュの実装
from functools import lru_cache
def conditional_cache(func):
"""特定の条件でのみキャッシュを使用するデコレータ"""
cached_func = lru_cache(maxsize=128)(func)
def wrapper(*args, **kwargs):
# 引数に基づいてキャッシュを使うかどうか決定
if len(args) > 0 and args[0] > 1000:
return cached_func(*args, **kwargs)
else:
return func(*args, **kwargs)
return wrapper
@conditional_cache
def smart_function(n):
"""大きな値の場合のみキャッシュを使用"""
return sum(range(n))
キャッシュ機能は適切に使用すれば大幅なパフォーマンス向上をもたらしますが、メモリ使用量とのバランスを考慮することが重要です。
wraps() と update_wrapper() – デコレータのメタ情報保持

デコレータ作成時の問題
wraps() なしの問題のあるデコレータ
def bad_decorator(func):
def wrapper(*args, **kwargs):
print(f"関数 {func.__name__} を実行中...")
return func(*args, **kwargs)
return wrapper
@bad_decorator
def example_function():
"""これは例の関数です"""
return "Hello, World!"
# メタ情報が失われる問題
print(example_function.__name__) # "wrapper" (元の名前ではない)
print(example_function.__doc__) # None (ドキュメントが失われる)
wraps() による正しいデコレータ作成
wraps() を使った適切なデコレータ
from functools import wraps
def good_decorator(func):
@wraps(func) # 元の関数のメタ情報を保持
def wrapper(*args, **kwargs):
print(f"関数 {func.__name__} を実行中...")
return func(*args, **kwargs)
return wrapper
@good_decorator
def example_function():
"""これは例の関数です"""
return "Hello, World!"
# メタ情報が正しく保持される
print(example_function.__name__) # "example_function"
print(example_function.__doc__) # "これは例の関数です"
実用的なデコレータの実装例
実行時間測定デコレータ
import time
from functools import wraps
def timing_decorator(func):
"""関数の実行時間を測定するデコレータ"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} の実行時間: {end_time - start_time:.4f}秒")
return result
return wrapper
@timing_decorator
def slow_calculation(n):
"""重い計算処理の例"""
total = 0
for i in range(n):
total += i ** 2
return total
# 使用例
result = slow_calculation(100000)
# slow_calculation の実行時間: 0.0156秒
リトライ機能付きデコレータ
import time
import random
from functools import wraps
def retry(max_attempts=3, delay=1):
"""失敗時にリトライするデコレータ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise e
print(f"{func.__name__} 失敗 (試行 {attempt + 1}/{max_attempts}): {e}")
time.sleep(delay)
return None
return wrapper
return decorator
@retry(max_attempts=3, delay=0.5)
def unreliable_api_call():
"""不安定なAPI呼び出しをシミュレート"""
if random.random() < 0.7: # 70%の確率で失敗
raise ConnectionError("API接続エラー")
return {"status": "success", "data": "重要なデータ"}
# 使用例
try:
result = unreliable_api_call()
print(f"成功: {result}")
except Exception as e:
print(f"最終的に失敗: {e}")
パラメータ付きデコレータの実装
ログ出力レベル指定デコレータ
import logging
from functools import wraps
def log_calls(level=logging.INFO, include_args=True):
"""関数呼び出しをログ出力するデコレータ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 引数情報の準備
arg_info = ""
if include_args:
args_str = ', '.join(map(str, args))
kwargs_str = ', '.join(f"{k}={v}" for k, v in kwargs.items())
all_args = ', '.join(filter(None, [args_str, kwargs_str]))
arg_info = f"({all_args})" if all_args else "()"
# 関数実行前のログ
logging.log(level, f"呼び出し: {func.__name__}{arg_info}")
try:
result = func(*args, **kwargs)
logging.log(level, f"完了: {func.__name__} -> {result}")
return result
except Exception as e:
logging.log(logging.ERROR, f"エラー: {func.__name__} -> {e}")
raise
return wrapper
return decorator
# 使用例
logging.basicConfig(level=logging.INFO)
@log_calls(level=logging.DEBUG, include_args=True)
def calculate_area(width, height):
"""長方形の面積を計算"""
return width * height
@log_calls(level=logging.WARNING, include_args=False)
def risky_operation():
"""エラーが発生する可能性のある操作"""
if random.random() < 0.3:
raise ValueError("処理に失敗しました")
return "成功"
update_wrapper() の直接使用
wraps() を使わない低レベルな実装
from functools import update_wrapper
def manual_decorator(func):
def wrapper(*args, **kwargs):
print(f"手動デコレータで {func.__name__} を実行")
return func(*args, **kwargs)
# 手動でメタ情報をコピー
update_wrapper(wrapper, func)
# 追加のカスタム属性
wrapper.original_function = func
wrapper.decoration_time = time.time()
return wrapper
@manual_decorator
def test_function(x, y):
"""テスト用の関数"""
return x + y
# メタ情報とカスタム属性の確認
print(test_function.__name__) # "test_function"
print(test_function.__doc__) # "テスト用の関数"
print(test_function.original_function) # <function test_function at ...>
print(test_function.decoration_time) # タイムスタンプ
デコレータのデバッグ支援
デコレータ情報を保持するヘルパー
from functools import wraps
import inspect
def debug_decorator(func):
"""デバッグ情報を追加するデコレータ"""
@wraps(func)
def wrapper(*args, **kwargs):
# 呼び出し情報を収集
frame = inspect.currentframe()
caller_info = inspect.getframeinfo(frame.f_back)
print(f"=== デバッグ情報 ===")
print(f"関数名: {func.__name__}")
print(f"呼び出し元: {caller_info.filename}:{caller_info.lineno}")
print(f"引数: args={args}, kwargs={kwargs}")
result = func(*args, **kwargs)
print(f"戻り値: {result}")
print(f"=================")
return result
# デバッグ用の追加情報
wrapper._original_func = func
wrapper._debug_info = {
'decorated_at': time.time(),
'source_file': inspect.getfile(func),
'source_line': inspect.getsourcelines(func)[1]
}
return wrapper
@debug_decorator
def sample_function(a, b=10):
"""サンプル関数"""
return a * b
# 使用とデバッグ情報の確認
result = sample_function(5, b=20)
print(f"デバッグ情報: {sample_function._debug_info}")
wraps() の使用は、デコレータを作成する際の基本的なマナーです。デバッグやドキュメント生成ツールが正しく動作するために必須の要素となります。
total_ordering() – 比較演算子の自動生成

total_ordering の基本概念
比較演算子の手動実装の煩わしさ
# total_ordering なしの場合、全ての比較演算子を手動実装
class Student:
def __init__(self, name, grade):
self.name = name
self.grade = grade
def __eq__(self, other):
return self.grade == other.grade
def __lt__(self, other):
return self.grade < other.grade
def __le__(self, other):
return self.grade <= other.grade
def __gt__(self, other):
return self.grade > other.grade
def __ge__(self, other):
return self.grade >= other.grade
def __ne__(self, other):
return self.grade != other.grade
total_ordering による簡素化
from functools import total_ordering
@total_ordering
class Student:
def __init__(self, name, grade):
self.name = name
self.grade = grade
def __eq__(self, other):
if not isinstance(other, Student):
return NotImplemented
return self.grade == other.grade
def __lt__(self, other):
if not isinstance(other, Student):
return NotImplemented
return self.grade < other.grade
def __repr__(self):
return f"Student('{self.name}', {self.grade})"
# 使用例
students = [
Student("田中", 85),
Student("佐藤", 92),
Student("鈴木", 78)
]
# 全ての比較演算子が自動的に利用可能
print(students[0] < students[1]) # True
print(students[1] >= students[2]) # True
print(sorted(students)) # グレード順にソート
実用的なクラス設計例
バージョン情報の比較
from functools import total_ordering
@total_ordering
class Version:
def __init__(self, version_string):
self.major, self.minor, self.patch = map(int, version_string.split('.'))
def __eq__(self, other):
if not isinstance(other, Version):
return NotImplemented
return (self.major, self.minor, self.patch) == (other.major, other.minor, other.patch)
def __lt__(self, other):
if not isinstance(other, Version):
return NotImplemented
return (self.major, self.minor, self.patch) < (other.major, other.minor, other.patch)
def __repr__(self):
return f"Version('{self.major}.{self.minor}.{self.patch}')"
# 使用例
versions = [
Version("1.2.3"),
Version("2.0.0"),
Version("1.10.5"),
Version("1.2.10")
]
print("ソート前:", versions)
print("ソート後:", sorted(versions))
# バージョン比較
current = Version("1.5.0")
required = Version("1.2.0")
print(f"{current} >= {required}: {current >= required}")
価格比較可能な商品クラス
from functools import total_ordering
@total_ordering
class Product:
def __init__(self, name, price, currency="JPY"):
self.name = name
self.price = price
self.currency = currency
def __eq__(self, other):
if not isinstance(other, Product):
return NotImplemented
# 同じ通貨の場合のみ比較可能
if self.currency != other.currency:
raise ValueError(f"異なる通貨での比較はできません: {self.currency} vs {other.currency}")
return self.price == other.price
def __lt__(self, other):
if not isinstance(other, Product):
return NotImplemented
if self.currency != other.currency:
raise ValueError(f"異なる通貨での比較はできません: {self.currency} vs {other.currency}")
return self.price < other.price
def __repr__(self):
return f"Product('{self.name}', {self.price} {self.currency})"
# 使用例
products = [
Product("ノートパソコン", 89800),
Product("マウス", 2500),
Product("キーボード", 12000),
Product("モニター", 35000)
]
# 価格順でソート
print("価格順:")
for product in sorted(products):
print(f" {product}")
# 価格比較
laptop = Product("ゲーミングPC", 150000)
monitor = Product("4Kモニター", 80000)
print(f"{laptop.name} > {monitor.name}: {laptop > monitor}")
複雑な比較ロジックの実装
優先度付きタスククラス
from functools import total_ordering
from datetime import datetime, timedelta
@total_ordering
class Task:
PRIORITY_LEVELS = {'low': 1, 'medium': 2, 'high': 3, 'urgent': 4}
def __init__(self, name, priority='medium', due_date=None):
self.name = name
self.priority = priority
self.due_date = due_date or (datetime.now() + timedelta(days=7))
self.created_at = datetime.now()
def __eq__(self, other):
if not isinstance(other, Task):
return NotImplemented
return (self._priority_value(), self.due_date) == (other._priority_value(), other.due_date)
def __lt__(self, other):
if not isinstance(other, Task):
return NotImplemented
# 優先度が高い方が「大きい」(ソート時に後に来る)
# 期日が近い方が「大きい」(ソート時に後に来る)
self_priority = self._priority_value()
other_priority = other._priority_value()
if self_priority != other_priority:
return self_priority < other_priority
# 優先度が同じ場合は期日で比較(期日が近い方が優先)
return self.due_date > other.due_date
def _priority_value(self):
return self.PRIORITY_LEVELS.get(self.priority, 2)
def __repr__(self):
return f"Task('{self.name}', '{self.priority}', {self.due_date.strftime('%Y-%m-%d')})"
# 使用例
now = datetime.now()
tasks = [
Task("レポート作成", "high", now + timedelta(days=2)),
Task("メール返信", "medium", now + timedelta(days=1)),
Task("書類整理", "low", now + timedelta(days=5)),
Task("会議準備", "urgent", now + timedelta(hours=6)),
Task("システム更新", "high", now + timedelta(days=3))
]
print("優先度順(緊急度の高い順):")
for task in sorted(tasks, reverse=True):
print(f" {task}")
total_ordering の注意点
型チェックの重要性
from functools import total_ordering
@total_ordering
class SafeNumber:
def __init__(self, value):
self.value = value
def __eq__(self, other):
# 適切な型チェック
if not isinstance(other, (SafeNumber, int, float)):
return NotImplemented
other_value = other.value if isinstance(other, SafeNumber) else other
return self.value == other_value
def __lt__(self, other):
if not isinstance(other, (SafeNumber, int, float)):
return NotImplemented
other_value = other.value if isinstance(other, SafeNumber) else other
return self.value < other_value
def __repr__(self):
return f"SafeNumber({self.value})"
# 使用例
num1 = SafeNumber(10)
num2 = SafeNumber(20)
print(num1 < num2) # True
print(num1 < 15) # True
print(num1 == 10) # True
# 不適切な型との比較
try:
print(num1 < "invalid") # NotImplemented が返される
except TypeError as e:
print(f"型エラー: {e}")
total_ordering は比較可能なクラスを簡単に作成できる便利な機能ですが、適切な型チェックと一貫した比較ロジックの実装が重要です。
singledispatch() – 型による関数の分岐

singledispatch の基本概念
従来の型判定による分岐
def process_data_old(data):
"""従来の方法:if文による型判定"""
if isinstance(data, str):
return data.upper()
elif isinstance(data, list):
return [str(item) for item in data]
elif isinstance(data, dict):
return {k: str(v) for k, v in data.items()}
elif isinstance(data, int):
return data * 2
else:
return str(data)
singledispatch による型ベース分岐
from functools import singledispatch
@singledispatch
def process_data(data):
"""デフォルトの処理(型が登録されていない場合)"""
return str(data)
@process_data.register
def _(data: str):
"""文字列の処理"""
return data.upper()
@process_data.register
def _(data: list):
"""リストの処理"""
return [str(item) for item in data]
@process_data.register
def _(data: dict):
"""辞書の処理"""
return {k: str(v) for k, v in data.items()}
@process_data.register(int)
def process_integer(data):
"""整数の処理(関数名を指定する場合)"""
return data * 2
# 使用例
print(process_data("hello")) # "HELLO"
print(process_data([1, 2, 3])) # ["1", "2", "3"]
print(process_data({"a": 1, "b": 2})) # {"a": "1", "b": "2"}
print(process_data(42)) # 84
print(process_data(3.14)) # "3.14" (デフォルト処理)
実用的な応用例
ファイル形式別の処理システム
from functools import singledispatch
import json
import csv
import io
from pathlib import Path
@singledispatch
def save_data(data, filename):
"""デフォルト:テキストファイルとして保存"""
with open(filename, 'w', encoding='utf-8') as f:
f.write(str(data))
@save_data.register
def _(data: dict, filename):
"""辞書データをJSONとして保存"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
@save_data.register
def _(data: list, filename):
"""リストデータの保存(辞書のリストはCSV、その他はJSON)"""
if data and isinstance(data[0], dict):
# 辞書のリストはCSVとして保存
with open(filename, 'w', encoding='utf-8', newline='') as f:
if data:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
else:
# その他のリストはJSONとして保存
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
@save_data.register(str)
def save_string_data(data, filename):
"""文字列データをテキストファイルとして保存"""
with open(filename, 'w', encoding='utf-8') as f:
f.write(data)
# 使用例
users = [
{"name": "田中", "age": 30, "city": "東京"},
{"name": "佐藤", "age": 25, "city": "大阪"}
]
config = {"debug": True, "timeout": 30}
log_text = "アプリケーション開始\n処理完了"
save_data(users, "users.csv") # CSVファイルとして保存
save_data(config, "config.json") # JSONファイルとして保存
save_data(log_text, "app.log") # テキストファイルとして保存
データ変換システム
from functools import singledispatch
from decimal import Decimal
from datetime import datetime, date
@singledispatch
def to_json_serializable(obj):
"""JSONシリアライズ可能な形式に変換"""
# デフォルト:文字列化
return str(obj)
@to_json_serializable.register
def _(obj: datetime):
"""datetime を ISO 形式文字列に変換"""
return obj.isoformat()
@to_json_serializable.register
def _(obj: date):
"""date を文字列に変換"""
return obj.strftime('%Y-%m-%d')
@to_json_serializable.register
def _(obj: Decimal):
"""Decimal を float に変換"""
return float(obj)
@to_json_serializable.register
def _(obj: set):
"""set を list に変換"""
return list(obj)
@to_json_serializable.register
def _(obj: dict):
"""辞書の値を再帰的に変換"""
return {k: to_json_serializable(v) for k, v in obj.items()}
@to_json_serializable.register
def _(obj: list):
"""リストの要素を再帰的に変換"""
return [to_json_serializable(item) for item in obj]
# 使用例
complex_data = {
"created_at": datetime.now(),
"birth_date": date(1990, 5, 15),
"price": Decimal("99.99"),
"tags": {"python", "programming", "tutorial"},
"metadata": {
"updated": datetime.now(),
"version": Decimal("1.0")
}
}
serializable_data = to_json_serializable(complex_data)
print(json.dumps(serializable_data, indent=2))
カスタム型での活用
図形の面積計算システム
from functools import singledispatch
import math
class Circle:
def __init__(self, radius):
self.radius = radius
class Rectangle:
def __init__(self, width, height):
self.width = width
self.height = height
class Triangle:
def __init__(self, base, height):
self.base = base
self.height = height
@singledispatch
def calculate_area(shape):
"""面積計算のデフォルト実装"""
raise NotImplementedError(f"面積計算が未対応の図形: {type(shape)}")
@calculate_area.register
def _(shape: Circle):
"""円の面積"""
return math.pi * shape.radius ** 2
@calculate_area.register
def _(shape: Rectangle):
"""長方形の面積"""
return shape.width * shape.height
@calculate_area.register
def _(shape: Triangle):
"""三角形の面積"""
return 0.5 * shape.base * shape.height
# 使用例
shapes = [
Circle(5),
Rectangle(4, 6),
Triangle(8, 3)
]
for shape in shapes:
area = calculate_area(shape)
print(f"{type(shape).__name__}: {area:.2f}")
# 新しい図形型を後から追加
class Square:
def __init__(self, side):
self.side = side
@calculate_area.register
def _(shape: Square):
"""正方形の面積"""
return shape.side ** 2
square = Square(4)
print(f"Square: {calculate_area(square)}")
高度な使用例:ビジターパターンの実装
AST(抽象構文木)の処理
from functools import singledispatch
class NumberNode:
def __init__(self, value):
self.value = value
class BinaryOpNode:
def __init__(self, left, operator, right):
self.left = left
self.operator = operator
self.right = right
class UnaryOpNode:
def __init__(self, operator, operand):
self.operator = operator
self.operand = operand
@singledispatch
def evaluate(node):
"""数式ノードを評価"""
raise NotImplementedError(f"未対応のノード型: {type(node)}")
@evaluate.register
def _(node: NumberNode):
"""数値ノードの評価"""
return node.value
@evaluate.register
def _(node: BinaryOpNode):
"""二項演算ノードの評価"""
left_val = evaluate(node.left)
right_val = evaluate(node.right)
if node.operator == '+':
return left_val + right_val
elif node.operator == '-':
return left_val - right_val
elif node.operator == '*':
return left_val * right_val
elif node.operator == '/':
return left_val / right_val
else:
raise ValueError(f"未対応の演算子: {node.operator}")
@evaluate.register
def _(node: UnaryOpNode):
"""単項演算ノードの評価"""
operand_val = evaluate(node.operand)
if node.operator == '-':
return -operand_val
elif node.operator == '+':
return operand_val
else:
raise ValueError(f"未対応の単項演算子: {node.operator}")
# 使用例: (5 + 3) * (-2)
ast = BinaryOpNode(
left=BinaryOpNode(
left=NumberNode(5),
operator='+',
right=NumberNode(3)
),
operator='*',
right=UnaryOpNode(
operator='-',
operand=NumberNode(2)
)
)
result = evaluate(ast)
print(f"計算結果: {result}") # -16
singledispatch は、型安全な方法で処理を分岐させることができ、コードの拡張性と保守性を大幅に向上させます。新しい型が追加されても既存のコードを変更する必要がないのが大きな利点です。
まとめ:functools を活用した効率的なプログラミング
この記事では、Python の functools モジュールの主要な機能について詳しく解説しました。
各機能の特徴と使いどころ
関数の加工・カスタマイズ
- partial/partialmethod:引数を固定して専用関数を作成
- wraps/update_wrapper:デコレータ作成時のメタ情報保持
パフォーマンス最適化
- lru_cache/cache:関数結果のキャッシュによる高速化
- cached_property:クラス属性の計算結果キャッシュ
データ処理・集約
- reduce:リストを一つの値に集約
- singledispatch:型による処理の分岐
クラス設計の支援
- total_ordering:比較演算子の自動生成
実用的な活用指針
コードの保守性向上
- デコレータ作成時は必ず
@wraps
を使用 - 型ごとの処理分岐は
singledispatch
で整理 - 比較可能なクラスには
total_ordering
を活用
パフォーマンス改善
- 重い計算には適切なキャッシュ機能を適用
- メモリ使用量とのバランスを考慮したキャッシュサイズ設定
関数型プログラミングの導入
partial
で関数の再利用性向上reduce
で宣言的なデータ処理- 副作用を最小限に抑えた純粋な関数設計
コメント