Pythonには「繰り返し処理」を効率よく実行する仕組みがあり、その中心にあるのがイテレータ(iterator)です。
リストやタプル、文字列などを1つずつ順に取り出して処理する際に使われます。
一見すると「for文だけで十分では?」と思うかもしれません。
しかし、イテレータを理解することで、より柔軟で効率的なコードが書けるようになります。
特に大量のデータを扱う際や、メモリ効率を重視したプログラムでは必須の知識です。
この記事では、初心者にも分かるように、イテレータの基本から応用、実践的な使い方のコツまで解説します。
イテレータとは?基本概念を理解しよう

イテレータの定義
イテレータ(iterator)とは、「要素を1つずつ順番に返すことができるオブジェクト」です。
Pythonにおける繰り返し処理の基盤となる重要な概念で、以下の特徴を持ちます:
イテレータの主な特徴:
- 順次アクセス:要素を1つずつ順番に取得
- 遅延評価:必要な時にのみ値を生成
- メモリ効率:全ての要素を一度にメモリに保持しない
- 一回限り:一度使い切ると再利用できない
- 状態保持:現在の位置を記憶している
イテラブルとイテレータの違い
Pythonでは、イテラブル(iterable)とイテレータ(iterator)という2つの概念を区別することが重要です。
概念 | 説明 | 例 | 特徴 |
---|---|---|---|
イテラブル | 繰り返し可能なオブジェクト | list, tuple, str, dict | 何度でも繰り返し可能 |
イテレータ | 実際に要素を返すオブジェクト | iter()で生成されるオブジェクト | 一度使うと消耗する |
基本的な例:
# イテラブルオブジェクト
numbers = [1, 2, 3, 4, 5] # リストはイテラブル
print(f"numbersの型: {type(numbers)}")
# イテレータオブジェクト
iterator = iter(numbers) # iter()でイテレータに変換
print(f"iteratorの型: {type(iterator)}")
# イテレータから要素を取得
print(next(iterator)) # 出力: 1
print(next(iterator)) # 出力: 2
print(next(iterator)) # 出力: 3
出力結果:
numbersの型: <class 'list'>
iteratorの型: <class 'list_iterator'>
1
2
3
イテレータのライフサイクル
イテレータは「使い捨て」のオブジェクトです。その動作を詳しく見てみましょう:
# イテラブルから複数のイテレータを作成
fruits = ['apple', 'banana', 'cherry']
# 1つ目のイテレータ
iter1 = iter(fruits)
print("1つ目のイテレータ:")
print(next(iter1)) # apple
print(next(iter1)) # banana
# 2つ目のイテレータ(独立している)
iter2 = iter(fruits)
print("\n2つ目のイテレータ:")
print(next(iter2)) # apple(最初から開始)
# 1つ目のイテレータは状態を保持
print("\n1つ目のイテレータ(続き):")
print(next(iter1)) # cherry
# イテレータが尽きた場合
try:
print(next(iter1)) # StopIteration例外が発生
except StopIteration:
print("イテレータが尽きました")
出力結果:
1つ目のイテレータ:
apple
banana
2つ目のイテレータ:
apple
1つ目のイテレータ(続き):
cherry
イテレータが尽きました
イテレータの基本的な使い方

iter()関数とnext()関数
Pythonでイテレータを扱う際の基本的な関数は以下の2つです:
iter()関数
- イテラブルオブジェクトからイテレータを作成
- 既にイテレータの場合はそのまま返す
next()関数
- イテレータから次の要素を取得
- 要素が尽きるとStopIteration例外を発生
# 様々なイテラブルオブジェクトでのiter()の使用
data_sources = [
[1, 2, 3], # リスト
(4, 5, 6), # タプル
"hello", # 文字列
{7, 8, 9}, # セット
{"a": 1, "b": 2} # 辞書(キーのみ)
]
for data in data_sources:
print(f"\n{type(data).__name__}: {data}")
iterator = iter(data)
# 最初の2つの要素を取得
try:
print(f" 1番目: {next(iterator)}")
print(f" 2番目: {next(iterator)}")
except StopIteration:
print(" 要素が足りません")
出力結果:
list: [1, 2, 3]
1番目: 1
2番目: 2
tuple: (4, 5, 6)
1番目: 4
2番目: 5
str: hello
1番目: h
2番目: e
set: {8, 9, 7}
1番目: 8
2番目: 9
dict: {'a': 1, 'b': 2}
1番目: a
2番目: b
for文の内部動作
for文は内部的にイテレータプロトコルを使用しています。以下のコードは等価です:
for文(通常の書き方):
fruits = ['apple', 'banana', 'cherry']
for fruit in fruits:
print(fruit)
イテレータを明示的に使った書き方:
fruits = ['apple', 'banana', 'cherry']
# for文の内部動作を再現
iterator = iter(fruits)
while True:
try:
fruit = next(iterator)
print(fruit)
except StopIteration:
break
両方とも同じ結果を出力します:
apple
banana
cherry
enumerate()とzip()の内部構造
よく使われる組み込み関数も、実はイテレータを返します:
# enumerate()の動作確認
fruits = ['apple', 'banana', 'cherry']
enum_iter = enumerate(fruits)
print(f"enumerate()の型: {type(enum_iter)}")
print(next(enum_iter)) # (0, 'apple')
print(next(enum_iter)) # (1, 'banana')
# zip()の動作確認
numbers = [1, 2, 3]
letters = ['a', 'b', 'c']
zip_iter = zip(numbers, letters)
print(f"\nzip()の型: {type(zip_iter)}")
print(next(zip_iter)) # (1, 'a')
print(next(zip_iter)) # (2, 'b')
出力結果:
enumerate()の型: <class 'enumerate'>
(0, 'apple')
(1, 'banana')
zip()の型: <class 'zip'>
(1, 'a')
(2, 'b')
独自イテレータの作成

クラスベースのイテレータ実装
Pythonでは、__iter__()
と__next__()
メソッドを実装することで、独自のイテレータを作成できます。
基本的なカウンターイテレータ:
class Counter:
"""指定した範囲の数値を順番に返すイテレータ"""
def __init__(self, start, end):
self.start = start
self.end = end
self.current = start
def __iter__(self):
"""イテレータオブジェクト自身を返す"""
return self
def __next__(self):
"""次の値を返す。終了時はStopIterationを発生"""
if self.current < self.end:
num = self.current
self.current += 1
return num
else:
raise StopIteration
# 使用例
print("Counter(1, 5)の結果:")
counter = Counter(1, 5)
for num in counter:
print(num)
print("\nnext()を使った取得:")
counter2 = Counter(10, 13)
print(next(counter2)) # 10
print(next(counter2)) # 11
print(next(counter2)) # 12
出力結果:
Counter(1, 5)の結果:
1
2
3
4
next()を使った取得:
10
11
12
より実用的なイテレータの例
ファイル読み込み用イテレータ:
class FileLineIterator:
"""ファイルを行ごとに読み込むイテレータ"""
def __init__(self, filename, encoding='utf-8'):
self.filename = filename
self.encoding = encoding
self.file = None
def __iter__(self):
return self
def __next__(self):
if self.file is None:
try:
self.file = open(self.filename, 'r', encoding=self.encoding)
except FileNotFoundError:
raise StopIteration(f"ファイル '{self.filename}' が見つかりません")
line = self.file.readline()
if line:
return line.rstrip('\n') # 改行文字を除去
else:
self.file.close()
raise StopIteration
def __del__(self):
"""デストラクタでファイルを確実に閉じる"""
if self.file and not self.file.closed:
self.file.close()
# 使用例(テスト用ファイルを作成)
test_content = """Python
イテレータ
ジェネレータ
プログラミング"""
with open('test.txt', 'w', encoding='utf-8') as f:
f.write(test_content)
# ファイルを行ごとに読み込み
print("ファイル内容:")
for line in FileLineIterator('test.txt'):
print(f" {line}")
出力結果:
ファイル内容:
Python
イテレータ
ジェネレータ
プログラミング
無限イテレータの実装
数学的な数列を生成する無限イテレータも作成できます:
class FibonacciIterator:
"""フィボナッチ数列を生成する無限イテレータ"""
def __init__(self):
self.a, self.b = 0, 1
def __iter__(self):
return self
def __next__(self):
result = self.a
self.a, self.b = self.b, self.a + self.b
return result
# 使用例:最初の10個のフィボナッチ数を取得
print("フィボナッチ数列の最初の10個:")
fib = FibonacciIterator()
for i, num in enumerate(fib):
if i >= 10:
break
print(f"F({i}) = {num}")
出力結果:
フィボナッチ数列の最初の10個:
F(0) = 0
F(1) = 1
F(2) = 1
F(3) = 2
F(4) = 3
F(5) = 5
F(6) = 8
F(7) = 13
F(8) = 21
F(9) = 34
ジェネレータとイテレータの関係

ジェネレータ関数の基本
ジェネレータは、yield
キーワードを使ってイテレータを簡潔に作成する方法です:
def simple_generator():
"""基本的なジェネレータ関数"""
print("ジェネレータ開始")
yield 1
print("1を返しました")
yield 2
print("2を返しました")
yield 3
print("3を返しました")
print("ジェネレータ終了")
# 使用例
print("ジェネレータの動作:")
gen = simple_generator()
print(f"ジェネレータの型: {type(gen)}")
print("\nnext()での取得:")
print(next(gen))
print(next(gen))
print(next(gen))
try:
print(next(gen))
except StopIteration:
print("StopIteration例外が発生")
出力結果:
ジェネレータの動作:
ジェネレータの型: <class 'generator'>
next()での取得:
ジェネレータ開始
1
1を返しました
2
2を返しました
3
3を返しました
ジェネレータ終了
StopIteration例外が発生
クラスベース vs ジェネレータの比較
同じ機能をクラスベースとジェネレータで実装した比較:
クラスベースのイテレータ:
class SquareIterator:
"""数値の二乗を返すイテレータ(クラス版)"""
def __init__(self, max_num):
self.max_num = max_num
self.current = 1
def __iter__(self):
return self
def __next__(self):
if self.current <= self.max_num:
result = self.current ** 2
self.current += 1
return result
else:
raise StopIteration
ジェネレータ版:
def square_generator(max_num):
"""数値の二乗を返すジェネレータ"""
for i in range(1, max_num + 1):
yield i ** 2
動作比較:
print("クラスベース版:")
for square in SquareIterator(5):
print(square, end=' ')
print("\n\nジェネレータ版:")
for square in square_generator(5):
print(square, end=' ')
出力結果:
クラスベース版:
1 4 9 16 25
ジェネレータ版:
1 4 9 16 25
ジェネレータ式(Generator Expression)
リスト内包表記のような記法でジェネレータを作成できます:
# リスト内包表記(全て一度にメモリに展開)
squares_list = [x**2 for x in range(10)]
print(f"リスト内包表記: {type(squares_list)}")
print(f"メモリ使用量: {squares_list.__sizeof__()}")
# ジェネレータ式(遅延評価)
squares_gen = (x**2 for x in range(10))
print(f"ジェネレータ式: {type(squares_gen)}")
print(f"メモリ使用量: {squares_gen.__sizeof__()}")
print("\nジェネレータ式の中身:")
for square in squares_gen:
print(square, end=' ')
出力結果:
リスト内包表記: <class 'list'>
メモリ使用量: 120
ジェネレータ式: <class 'generator'>
メモリ使用量: 96
ジェネレータ式の中身:
0 1 4 9 16 25 36 49 64 81
実践的な活用例
大容量ファイルの処理
メモリ効率を重視した大容量ファイル処理:
def read_large_file(filename, chunk_size=1024):
"""大容量ファイルをチャンクごとに読み込むジェネレータ"""
try:
with open(filename, 'r', encoding='utf-8') as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
except FileNotFoundError:
print(f"ファイル '{filename}' が見つかりません")
return
def process_log_file(filename):
"""ログファイルを行ごとに処理するジェネレータ"""
try:
with open(filename, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
# エラーログのみをフィルタリング
if 'ERROR' in line.upper():
yield {
'line_number': line_num,
'content': line.strip(),
'timestamp': line[:19] if len(line) > 19 else ''
}
except FileNotFoundError:
print(f"ログファイル '{filename}' が見つかりません")
# テスト用ログファイルの作成
log_content = """2024-06-18 10:30:01 INFO User login successful
2024-06-18 10:30:15 ERROR Database connection failed
2024-06-18 10:30:16 INFO Retrying database connection
2024-06-18 10:30:17 INFO Database connection successful
2024-06-18 10:31:23 ERROR File not found: config.txt
2024-06-18 10:31:24 ERROR Permission denied: /var/log/
2024-06-18 10:32:01 INFO Application started"""
with open('app.log', 'w', encoding='utf-8') as f:
f.write(log_content)
# エラーログのみを処理
print("エラーログの抽出:")
for error_info in process_log_file('app.log'):
print(f"行 {error_info['line_number']}: {error_info['content']}")
出力結果:
エラーログの抽出:
行 2: 2024-06-18 10:30:15 ERROR Database connection failed
行 5: 2024-06-18 10:31:23 ERROR File not found: config.txt
行 6: 2024-06-18 10:31:24 ERROR Permission denied: /var/log/
API データの段階的取得
Web APIからのデータを効率的に取得するイテレータ:
import time
import random
class PaginatedAPIIterator:
"""ページネーション対応のAPI データイテレータ"""
def __init__(self, base_url, page_size=10):
self.base_url = base_url
self.page_size = page_size
self.current_page = 1
self.total_pages = None
self.current_items = []
self.item_index = 0
def __iter__(self):
return self
def __next__(self):
# 現在のページの全アイテムを処理済みの場合
if self.item_index >= len(self.current_items):
# 次のページをロード
if not self._load_next_page():
raise StopIteration
self.item_index = 0
# 現在のアイテムを返す
item = self.current_items[self.item_index]
self.item_index += 1
return item
def _load_next_page(self):
"""次のページのデータを取得(模擬的な実装)"""
if self.total_pages and self.current_page > self.total_pages:
return False
# 模擬的なAPI呼び出し
print(f"API呼び出し: ページ {self.current_page}")
time.sleep(0.1) # API呼び出しの遅延をシミュレート
# 模擬データの生成
if self.current_page == 1:
self.total_pages = 3 # 総ページ数を設定
start_id = (self.current_page - 1) * self.page_size + 1
self.current_items = [
f"item_{i}" for i in range(start_id, start_id + self.page_size)
]
# 最後のページでは少ないアイテム数をシミュレート
if self.current_page == self.total_pages:
self.current_items = self.current_items[:5]
self.current_page += 1
return True
# 使用例
print("API データの段階的取得:")
api_iterator = PaginatedAPIIterator("https://api.example.com/items")
for item in api_iterator:
print(f" 処理中: {item}")
# 実際の処理をここに記述
出力結果:
API データの段階的取得:
API呼び出し: ページ 1
処理中: item_1
処理中: item_2
処理中: item_3
処理中: item_4
処理中: item_5
処理中: item_6
処理中: item_7
処理中: item_8
処理中: item_9
処理中: item_10
API呼び出し: ページ 2
処理中: item_11
処理中: item_12
処理中: item_13
処理中: item_14
処理中: item_15
処理中: item_16
処理中: item_17
処理中: item_18
処理中: item_19
処理中: item_20
API呼び出し: ページ 3
処理中: item_21
処理中: item_22
処理中: item_23
処理中: item_24
処理中: item_25
高度なイテレータテクニック

itertools モジュールの活用
Pythonのitertools
モジュールは、イテレータを組み合わせる強力なツールを提供します:
import itertools
# 複数のイテラブルを連結
numbers1 = [1, 2, 3]
numbers2 = [4, 5, 6]
letters = ['a', 'b', 'c']
print("chain() - 複数のイテラブルを連結:")
for item in itertools.chain(numbers1, numbers2, letters):
print(item, end=' ')
print("\n\ncycle() - 無限繰り返し(最初の5個のみ表示):")
colors = ['red', 'green', 'blue']
color_cycle = itertools.cycle(colors)
for i, color in enumerate(color_cycle):
if i >= 8:
break
print(color, end=' ')
print("\n\nrepeat() - 同じ値を繰り返し:")
for item in itertools.repeat('Hello', 3):
print(item)
print("\ncount() - 無限カウンタ(最初の5個のみ):")
counter = itertools.count(start=10, step=2)
for i, num in enumerate(counter):
if i >= 5:
break
print(num, end=' ')
print("\n\ntakewhile() - 条件を満たす間だけ取得:")
numbers = [1, 3, 5, 8, 9, 11, 12]
for num in itertools.takewhile(lambda x: x < 10, numbers):
print(num, end=' ')
print("\n\ndropwhile() - 条件を満たさなくなったら以降を取得:")
for num in itertools.dropwhile(lambda x: x < 10, numbers):
print(num, end=' ')
出力結果:
chain() - 複数のイテラブルを連結:
1 2 3 4 5 6 a b c
cycle() - 無限繰り返し(最初の5個のみ表示):
red green blue red green blue red green
repeat() - 同じ値を繰り返し:
Hello
Hello
Hello
count() - 無限カウンタ(最初の5個のみ):
10 12 14 16 18
takewhile() - 条件を満たす間だけ取得:
1 3 5
dropwhile() - 条件を満たさなくなったら以降を取得:
8 9 11 12
組み合わせとフィルタリング
import itertools
# 順列と組み合わせ
items = ['A', 'B', 'C']
print("permutations() - 順列:")
for perm in itertools.permutations(items, 2):
print(perm)
print("\ncombinations() - 組み合わせ:")
for comb in itertools.combinations(items, 2):
print(comb)
print("\nproduct() - 直積:")
for prod in itertools.product([1, 2], ['x', 'y']):
print(prod)
# フィルタリング
numbers = range(1, 11)
print("\nfilter() - 条件に合う要素のみ:")
even_numbers = filter(lambda x: x % 2 == 0, numbers)
for num in even_numbers:
print(num, end=' ')
print("\n\ncompress() - マスクによるフィルタリング:")
data = ['A', 'B', 'C', 'D', 'E']
mask = [1, 0, 1, 0, 1] # 1の位置の要素のみ取得
for item in itertools.compress(data, mask):
print(item, end=' ')
出力結果:
permutations() - 順列:
('A', 'B')
('A', 'C')
('B', 'A')
('B', 'C')
('C', 'A')
('C', 'B')
combinations() - 組み合わせ:
('A', 'B')
('A', 'C')
('B', 'C')
product() - 直積:
(1, 'x')
(1, 'y')
(2, 'x')
(2, 'y')
filter() - 条件に合う要素のみ:
2 4 6 8 10
compress() - マスクによるフィルタリング:
A C E
パフォーマンスとメモリ効率
メモリ使用量の比較
イテレータとリストのメモリ使用量を比較してみましょう:
import sys
def memory_usage_comparison():
"""メモリ使用量の比較"""
# 大きなデータセットのサイズ
size = 1000000
# リスト(全データをメモリに保持)
number_list = list(range(size))
list_memory = sys.getsizeof(number_list)
# ジェネレータ(遅延評価)
number_generator = (x for x in range(size))
generator_memory = sys.getsizeof(number_generator)
print(f"データサイズ: {size:,}")
print(f"リストのメモリ使用量: {list_memory:,} bytes")
print(f"ジェネレータのメモリ使用量: {generator_memory:,} bytes")
print(f"メモリ節約率: {(list_memory - generator_memory) / list_memory * 100:.1f}%")
# 実行結果
memory_usage_comparison()
出力結果:
データサイズ: 1,000,000
リストのメモリ使用量: 8,448,728 bytes
ジェネレータのメモリ使用量: 96 bytes
メモリ節約率: 99.9%
処理速度の比較
異なる実装方法での処理速度を測定:
import time
import itertools
def timing_comparison():
"""処理速度の比較"""
def time_function(func, name, *args):
start_time = time.time()
result = func(*args)
# イテレータの場合は実際に消費する
if hasattr(result, '__iter__') and not isinstance(result, (list, tuple, str)):
list(result)
end_time = time.time()
print(f"{name}: {end_time - start_time:.4f}秒")
size = 100000
# 1. リスト内包表記 vs ジェネレータ式
print("平方数の生成(サイズ: {:,}):".format(size))
time_function(lambda: [x**2 for x in range(size)], "リスト内包表記")
time_function(lambda: (x**2 for x in range(size)), "ジェネレータ式")
# 2. フィルタリング比較
numbers = list(range(size))
print(f"\n偶数のフィルタリング(サイズ: {len(numbers):,}):")
time_function(lambda: [x for x in numbers if x % 2 == 0], "リスト内包表記")
time_function(lambda: (x for x in numbers if x % 2 == 0), "ジェネレータ式")
time_function(lambda: filter(lambda x: x % 2 == 0, numbers), "filter()関数")
timing_comparison()
実践的なパフォーマンス最適化例
def process_data_efficiently(data_source):
"""大量データを効率的に処理する例"""
def validate_record(record):
"""レコードの妥当性チェック"""
return len(record) >= 3 and record[0].isdigit()
def transform_record(record):
"""レコードの変換処理"""
return {
'id': int(record[0]),
'name': record[1].strip().title(),
'value': float(record[2]) if record[2].replace('.', '').isdigit() else 0.0
}
# パイプライン処理(遅延評価)
valid_records = filter(validate_record, data_source)
transformed_records = map(transform_record, valid_records)
return transformed_records
# テストデータの生成
test_data = [
['1', 'alice', '100.5'],
['2', 'bob', '200.0'],
['invalid', 'charlie', '300'], # 無効なレコード
['3', 'david', '150.75'],
['', 'eve', '250'], # 無効なレコード
['4', 'frank', '180.25']
]
print("効率的なデータ処理:")
processed_data = process_data_efficiently(test_data)
# 実際に処理を実行(遅延評価のため、ここで初めて処理される)
for record in processed_data:
print(f"ID: {record['id']}, 名前: {record['name']}, 値: {record['value']}")
出力結果:
効率的なデータ処理:
ID: 1, 名前: Alice, 値: 100.5
ID: 2, 名前: Bob, 値: 200.0
ID: 3, 名前: David, 値: 150.75
ID: 4, 名前: Frank, 値: 180.25
実践的な応用パターン
バッチ処理パターン
大量データを一定サイズずつ処理するパターン:
def batch_iterator(iterable, batch_size):
"""イテラブルを指定サイズのバッチに分割するジェネレータ"""
iterator = iter(iterable)
while True:
batch = list(itertools.islice(iterator, batch_size))
if not batch:
break
yield batch
def process_in_batches(data, batch_size=3):
"""データをバッチ単位で処理"""
print(f"バッチサイズ {batch_size} でデータを処理:")
for batch_num, batch in enumerate(batch_iterator(data, batch_size), 1):
print(f" バッチ {batch_num}: {batch}")
# 実際の処理をここに記述
time.sleep(0.1) # 処理時間のシミュレート
# 使用例
data = list(range(1, 11))
process_in_batches(data)
出力結果:
バッチサイズ 3 でデータを処理:
バッチ 1: [1, 2, 3]
バッチ 2: [4, 5, 6]
バッチ 3: [7, 8, 9]
バッチ 4: [10]
ストリーミング処理パターン
リアルタイムデータストリームの処理:
import random
import time
from collections import deque
class DataStream:
"""模擬的なデータストリーム"""
def __init__(self, data_source):
self.data_source = data_source
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index < len(self.data_source):
data = self.data_source[self.index]
self.index += 1
time.sleep(0.1) # ストリームの遅延をシミュレート
return data
else:
raise StopIteration
class MovingAverageCalculator:
"""移動平均を計算するイテレータ"""
def __init__(self, data_stream, window_size=3):
self.data_stream = iter(data_stream)
self.window_size = window_size
self.window = deque(maxlen=window_size)
def __iter__(self):
return self
def __next__(self):
try:
value = next(self.data_stream)
self.window.append(value)
if len(self.window) == self.window_size:
return {
'value': value,
'moving_average': sum(self.window) / len(self.window),
'window': list(self.window)
}
else:
return {
'value': value,
'moving_average': None,
'window': list(self.window)
}
except StopIteration:
raise
# 使用例
stream_data = [10, 15, 20, 25, 30, 35, 40, 45, 50]
data_stream = DataStream(stream_data)
ma_calculator = MovingAverageCalculator(data_stream, window_size=3)
print("ストリーミングデータの移動平均:")
for result in ma_calculator:
if result['moving_average'] is not None:
print(f"値: {result['value']:2d}, 移動平均: {result['moving_average']:5.1f}, "
f"ウィンドウ: {result['window']}")
else:
print(f"値: {result['value']:2d}, 移動平均: N/A , "
f"ウィンドウ: {result['window']}")
エラーハンドリングパターン
堅牢なイテレータの実装:
class RobustIterator:
"""エラー処理機能付きイテレータ"""
def __init__(self, data_source, error_handler=None):
self.data_source = data_source
self.error_handler = error_handler or self._default_error_handler
self.index = 0
self.error_count = 0
self.processed_count = 0
def _default_error_handler(self, error, item, index):
"""デフォルトのエラーハンドラ"""
print(f"エラー発生 (インデックス {index}): {error}")
return None # エラー時はNoneを返す
def __iter__(self):
return self
def __next__(self):
while self.index < len(self.data_source):
item = self.data_source[self.index]
current_index = self.index
self.index += 1
try:
# 処理を実行(意図的にエラーを発生させる可能性がある)
result = self._process_item(item)
self.processed_count += 1
return result
except Exception as e:
self.error_count += 1
error_result = self.error_handler(e, item, current_index)
if error_result is not None:
return error_result
# エラーハンドラがNoneを返した場合は次の要素に進む
continue
# 処理完了時の統計情報
print(f"\n処理完了: 成功 {self.processed_count}件, "
f"エラー {self.error_count}件")
raise StopIteration
def _process_item(self, item):
"""アイテムの処理(エラーが発生する可能性がある)"""
if isinstance(item, str) and item.startswith('error'):
raise ValueError(f"処理エラー: {item}")
if isinstance(item, (int, float)):
return item ** 2
elif isinstance(item, str):
return item.upper()
else:
raise TypeError(f"サポートされていない型: {type(item)}")
# テストデータ(エラーを含む)
test_data = [1, 2, 'hello', 'error_item', 3, None, 'world', 'error_again', 4]
# カスタムエラーハンドラ
def custom_error_handler(error, item, index):
print(f"⚠️ 警告 (インデックス {index}): {item} -> {error}")
return f"ERROR_HANDLED_{index}" # エラー時の代替値
print("堅牢なイテレータのテスト:")
robust_iter = RobustIterator(test_data, custom_error_handler)
for result in robust_iter:
print(f"結果: {result}")
デバッグとトラブルシューティング

イテレータの状態確認
def debug_iterator(iterator, name="Iterator"):
"""イテレータの状態をデバッグするヘルパー関数"""
print(f"\n=== {name} のデバッグ情報 ===")
print(f"型: {type(iterator)}")
print(f"イテレータか: {hasattr(iterator, '__iter__') and hasattr(iterator, '__next__')}")
# 安全に次の要素を確認
try:
# teeを使って元のイテレータを保持
iter1, iter2 = itertools.tee(iterator, 2)
first_few = list(itertools.islice(iter1, 3))
print(f"最初の3要素: {first_few}")
return iter2 # 元のイテレータの代わりに返す
except Exception as e:
print(f"エラー: {e}")
return iterator
# 使用例
numbers = [1, 2, 3, 4, 5]
num_iter = iter(numbers)
# デバッグ情報を表示
debugged_iter = debug_iterator(num_iter, "Number Iterator")
# 通常通り使用
print("\n実際の処理:")
for num in debugged_iter:
print(num)
パフォーマンス監視
import time
import functools
def monitor_iterator(iterator_func):
"""イテレータのパフォーマンスを監視するデコレータ"""
@functools.wraps(iterator_func)
def wrapper(*args, **kwargs):
start_time = time.time()
iterator = iterator_func(*args, **kwargs)
class MonitoredIterator:
def __init__(self, wrapped_iterator):
self.wrapped_iterator = wrapped_iterator
self.count = 0
self.start_time = start_time
def __iter__(self):
return self
def __next__(self):
try:
result = next(self.wrapped_iterator)
self.count += 1
return result
except StopIteration:
end_time = time.time()
print(f"\n監視結果:")
print(f" 処理時間: {end_time - self.start_time:.4f}秒")
print(f" 処理件数: {self.count}件")
print(f" スループット: {self.count / (end_time - self.start_time):.2f}件/秒")
raise
return MonitoredIterator(iterator)
return wrapper
# 使用例
@monitor_iterator
def slow_processor(data):
"""時間のかかる処理をシミュレート"""
for item in data:
time.sleep(0.01) # 処理時間をシミュレート
yield item * 2
# テスト実行
data = list(range(10))
print("パフォーマンス監視付き処理:")
for result in slow_processor(data):
print(f"処理結果: {result}")
まとめと実践的なアドバイス
イテレータ使用時のベストプラクティス
1. 適切な選択基準
# メモリ効率を重視する場合:ジェネレータ
def memory_efficient_processing(large_dataset):
for item in large_dataset:
if condition(item):
yield transform(item)
# 再利用が必要な場合:リスト
def reusable_data():
# 一度作成したら何度も使用
return [expensive_computation(x) for x in range(100)]
# 一回限りの処理:イテレータ
def one_time_processing():
# 一度だけ使用するデータ
return (simple_transform(x) for x in data_source)
2. エラーハンドリング
def safe_iteration(iterable):
"""安全な反復処理のパターン"""
iterator = iter(iterable)
while True:
try:
item = next(iterator)
# 処理を実行
yield process_item(item)
except StopIteration:
break
except Exception as e:
# エラーログを出力し、処理を継続
print(f"処理エラー: {e}")
continue
3. パフォーマンス最適化
# 良い例:遅延評価を活用
def optimized_pipeline(data):
# チェーン化された処理
filtered = filter(is_valid, data)
transformed = map(transform, filtered)
processed = map(expensive_operation, transformed)
return processed
# 避けるべき例:中間リストの作成
def unoptimized_pipeline(data):
filtered = [item for item in data if is_valid(item)]
transformed = [transform(item) for item in filtered]
processed = [expensive_operation(item) for item in transformed]
return processed
よくある間違いと対策
間違い1:イテレータの再利用
# ❌ 間違い
iterator = iter(data)
list1 = list(iterator) # データを消費
list2 = list(iterator) # 空のリスト
# ✅ 正しい方法
iterator1, iterator2 = itertools.tee(iter(data), 2)
list1 = list(iterator1)
list2 = list(iterator2)
間違い2:無限イテレータの不適切な使用
# ❌ 危険:無限ループ
infinite_iter = itertools.count()
for num in infinite_iter: # 終了条件なし
print(num)
# ✅ 安全:適切な終了条件
for i, num in enumerate(itertools.count()):
if i >= 10:
break
print(num)
間違い3:メモリ効率を考慮しない実装
# ❌ メモリ非効率
def process_large_file(filename):
return [process_line(line) for line in open(filename)]
# ✅ メモリ効率的
def process_large_file(filename):
with open(filename) as file:
for line in file:
yield process_line(line)
Pythonのイテレータは、効率的で柔軟なプログラムを書くための強力なツールです。
重要なポイント:
✅ メモリ効率:大量データも少ないメモリで処理
✅ 遅延評価:必要な時だけ計算を実行
✅ 組み合わせ可能:複数のイテレータを柔軟に組み合わせ
✅ Pythonic:Pythonらしい美しいコードの実現
コメント