【完全ガイド】Pythonのイテレータ(iterator)とは?基本から使い方まで徹底解説!

python

Pythonには「繰り返し処理」を効率よく実行する仕組みがあり、その中心にあるのがイテレータ(iterator)です。

リストやタプル、文字列などを1つずつ順に取り出して処理する際に使われます。

一見すると「for文だけで十分では?」と思うかもしれません。
しかし、イテレータを理解することで、より柔軟で効率的なコードが書けるようになります。

特に大量のデータを扱う際や、メモリ効率を重視したプログラムでは必須の知識です。

この記事では、初心者にも分かるように、イテレータの基本から応用、実践的な使い方のコツまで解説します。

スポンサーリンク

イテレータとは?基本概念を理解しよう

イテレータの定義

イテレータ(iterator)とは、「要素を1つずつ順番に返すことができるオブジェクト」です。

Pythonにおける繰り返し処理の基盤となる重要な概念で、以下の特徴を持ちます:

イテレータの主な特徴:

  • 順次アクセス:要素を1つずつ順番に取得
  • 遅延評価:必要な時にのみ値を生成
  • メモリ効率:全ての要素を一度にメモリに保持しない
  • 一回限り:一度使い切ると再利用できない
  • 状態保持:現在の位置を記憶している

イテラブルとイテレータの違い

Pythonでは、イテラブル(iterable)イテレータ(iterator)という2つの概念を区別することが重要です。

概念説明特徴
イテラブル繰り返し可能なオブジェクトlist, tuple, str, dict何度でも繰り返し可能
イテレータ実際に要素を返すオブジェクトiter()で生成されるオブジェクト一度使うと消耗する

基本的な例:

# イテラブルオブジェクト
numbers = [1, 2, 3, 4, 5]  # リストはイテラブル
print(f"numbersの型: {type(numbers)}")

# イテレータオブジェクト
iterator = iter(numbers)   # iter()でイテレータに変換
print(f"iteratorの型: {type(iterator)}")

# イテレータから要素を取得
print(next(iterator))  # 出力: 1
print(next(iterator))  # 出力: 2
print(next(iterator))  # 出力: 3

出力結果:

numbersの型: <class 'list'>
iteratorの型: <class 'list_iterator'>
1
2
3

イテレータのライフサイクル

イテレータは「使い捨て」のオブジェクトです。その動作を詳しく見てみましょう:

# イテラブルから複数のイテレータを作成
fruits = ['apple', 'banana', 'cherry']

# 1つ目のイテレータ
iter1 = iter(fruits)
print("1つ目のイテレータ:")
print(next(iter1))  # apple
print(next(iter1))  # banana

# 2つ目のイテレータ(独立している)
iter2 = iter(fruits)
print("\n2つ目のイテレータ:")
print(next(iter2))  # apple(最初から開始)

# 1つ目のイテレータは状態を保持
print("\n1つ目のイテレータ(続き):")
print(next(iter1))  # cherry

# イテレータが尽きた場合
try:
    print(next(iter1))  # StopIteration例外が発生
except StopIteration:
    print("イテレータが尽きました")

出力結果:

1つ目のイテレータ:
apple
banana

2つ目のイテレータ:
apple

1つ目のイテレータ(続き):
cherry
イテレータが尽きました

イテレータの基本的な使い方

iter()関数とnext()関数

Pythonでイテレータを扱う際の基本的な関数は以下の2つです:

iter()関数

  • イテラブルオブジェクトからイテレータを作成
  • 既にイテレータの場合はそのまま返す

next()関数

  • イテレータから次の要素を取得
  • 要素が尽きるとStopIteration例外を発生
# 様々なイテラブルオブジェクトでのiter()の使用
data_sources = [
    [1, 2, 3],           # リスト
    (4, 5, 6),           # タプル
    "hello",             # 文字列
    {7, 8, 9},          # セット
    {"a": 1, "b": 2}     # 辞書(キーのみ)
]

for data in data_sources:
    print(f"\n{type(data).__name__}: {data}")
    iterator = iter(data)
    
    # 最初の2つの要素を取得
    try:
        print(f"  1番目: {next(iterator)}")
        print(f"  2番目: {next(iterator)}")
    except StopIteration:
        print("  要素が足りません")

出力結果:

list: [1, 2, 3]
  1番目: 1
  2番目: 2

tuple: (4, 5, 6)
  1番目: 4
  2番目: 5

str: hello
  1番目: h
  2番目: e

set: {8, 9, 7}
  1番目: 8
  2番目: 9

dict: {'a': 1, 'b': 2}
  1番目: a
  2番目: b

for文の内部動作

for文は内部的にイテレータプロトコルを使用しています。以下のコードは等価です:

for文(通常の書き方):

fruits = ['apple', 'banana', 'cherry']

for fruit in fruits:
    print(fruit)

イテレータを明示的に使った書き方:

fruits = ['apple', 'banana', 'cherry']

# for文の内部動作を再現
iterator = iter(fruits)
while True:
    try:
        fruit = next(iterator)
        print(fruit)
    except StopIteration:
        break

両方とも同じ結果を出力します:

apple
banana
cherry

enumerate()とzip()の内部構造

よく使われる組み込み関数も、実はイテレータを返します:

# enumerate()の動作確認
fruits = ['apple', 'banana', 'cherry']
enum_iter = enumerate(fruits)

print(f"enumerate()の型: {type(enum_iter)}")
print(next(enum_iter))  # (0, 'apple')
print(next(enum_iter))  # (1, 'banana')

# zip()の動作確認
numbers = [1, 2, 3]
letters = ['a', 'b', 'c']
zip_iter = zip(numbers, letters)

print(f"\nzip()の型: {type(zip_iter)}")
print(next(zip_iter))   # (1, 'a')
print(next(zip_iter))   # (2, 'b')

出力結果:

enumerate()の型: <class 'enumerate'>
(0, 'apple')
(1, 'banana')

zip()の型: <class 'zip'>
(1, 'a')
(2, 'b')

独自イテレータの作成

クラスベースのイテレータ実装

Pythonでは、__iter__()__next__()メソッドを実装することで、独自のイテレータを作成できます。

基本的なカウンターイテレータ:

class Counter:
    """指定した範囲の数値を順番に返すイテレータ"""
    
    def __init__(self, start, end):
        self.start = start
        self.end = end
        self.current = start
    
    def __iter__(self):
        """イテレータオブジェクト自身を返す"""
        return self
    
    def __next__(self):
        """次の値を返す。終了時はStopIterationを発生"""
        if self.current < self.end:
            num = self.current
            self.current += 1
            return num
        else:
            raise StopIteration

# 使用例
print("Counter(1, 5)の結果:")
counter = Counter(1, 5)
for num in counter:
    print(num)

print("\nnext()を使った取得:")
counter2 = Counter(10, 13)
print(next(counter2))  # 10
print(next(counter2))  # 11
print(next(counter2))  # 12

出力結果:

Counter(1, 5)の結果:
1
2
3
4

next()を使った取得:
10
11
12

より実用的なイテレータの例

ファイル読み込み用イテレータ:

class FileLineIterator:
    """ファイルを行ごとに読み込むイテレータ"""
    
    def __init__(self, filename, encoding='utf-8'):
        self.filename = filename
        self.encoding = encoding
        self.file = None
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.file is None:
            try:
                self.file = open(self.filename, 'r', encoding=self.encoding)
            except FileNotFoundError:
                raise StopIteration(f"ファイル '{self.filename}' が見つかりません")
        
        line = self.file.readline()
        if line:
            return line.rstrip('\n')  # 改行文字を除去
        else:
            self.file.close()
            raise StopIteration
    
    def __del__(self):
        """デストラクタでファイルを確実に閉じる"""
        if self.file and not self.file.closed:
            self.file.close()

# 使用例(テスト用ファイルを作成)
test_content = """Python
イテレータ
ジェネレータ
プログラミング"""

with open('test.txt', 'w', encoding='utf-8') as f:
    f.write(test_content)

# ファイルを行ごとに読み込み
print("ファイル内容:")
for line in FileLineIterator('test.txt'):
    print(f"  {line}")

出力結果:

ファイル内容:
  Python
  イテレータ
  ジェネレータ
  プログラミング

無限イテレータの実装

数学的な数列を生成する無限イテレータも作成できます:

class FibonacciIterator:
    """フィボナッチ数列を生成する無限イテレータ"""
    
    def __init__(self):
        self.a, self.b = 0, 1
    
    def __iter__(self):
        return self
    
    def __next__(self):
        result = self.a
        self.a, self.b = self.b, self.a + self.b
        return result

# 使用例:最初の10個のフィボナッチ数を取得
print("フィボナッチ数列の最初の10個:")
fib = FibonacciIterator()
for i, num in enumerate(fib):
    if i >= 10:
        break
    print(f"F({i}) = {num}")

出力結果:

フィボナッチ数列の最初の10個:
F(0) = 0
F(1) = 1
F(2) = 1
F(3) = 2
F(4) = 3
F(5) = 5
F(6) = 8
F(7) = 13
F(8) = 21
F(9) = 34

ジェネレータとイテレータの関係

ジェネレータ関数の基本

ジェネレータは、yieldキーワードを使ってイテレータを簡潔に作成する方法です:

def simple_generator():
    """基本的なジェネレータ関数"""
    print("ジェネレータ開始")
    yield 1
    print("1を返しました")
    yield 2
    print("2を返しました")
    yield 3
    print("3を返しました")
    print("ジェネレータ終了")

# 使用例
print("ジェネレータの動作:")
gen = simple_generator()
print(f"ジェネレータの型: {type(gen)}")

print("\nnext()での取得:")
print(next(gen))
print(next(gen))
print(next(gen))

try:
    print(next(gen))
except StopIteration:
    print("StopIteration例外が発生")

出力結果:

ジェネレータの動作:
ジェネレータの型: <class 'generator'>

next()での取得:
ジェネレータ開始
1
1を返しました
2
2を返しました
3
3を返しました
ジェネレータ終了
StopIteration例外が発生

クラスベース vs ジェネレータの比較

同じ機能をクラスベースとジェネレータで実装した比較:

クラスベースのイテレータ:

class SquareIterator:
    """数値の二乗を返すイテレータ(クラス版)"""
    
    def __init__(self, max_num):
        self.max_num = max_num
        self.current = 1
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.current <= self.max_num:
            result = self.current ** 2
            self.current += 1
            return result
        else:
            raise StopIteration

ジェネレータ版:

def square_generator(max_num):
    """数値の二乗を返すジェネレータ"""
    for i in range(1, max_num + 1):
        yield i ** 2

動作比較:

print("クラスベース版:")
for square in SquareIterator(5):
    print(square, end=' ')

print("\n\nジェネレータ版:")
for square in square_generator(5):
    print(square, end=' ')

出力結果:

クラスベース版:
1 4 9 16 25 

ジェネレータ版:
1 4 9 16 25 

ジェネレータ式(Generator Expression)

リスト内包表記のような記法でジェネレータを作成できます:

# リスト内包表記(全て一度にメモリに展開)
squares_list = [x**2 for x in range(10)]
print(f"リスト内包表記: {type(squares_list)}")
print(f"メモリ使用量: {squares_list.__sizeof__()}")

# ジェネレータ式(遅延評価)
squares_gen = (x**2 for x in range(10))
print(f"ジェネレータ式: {type(squares_gen)}")
print(f"メモリ使用量: {squares_gen.__sizeof__()}")

print("\nジェネレータ式の中身:")
for square in squares_gen:
    print(square, end=' ')

出力結果:

リスト内包表記: <class 'list'>
メモリ使用量: 120

ジェネレータ式: <class 'generator'>
メモリ使用量: 96

ジェネレータ式の中身:
0 1 4 9 16 25 36 49 64 81

実践的な活用例

大容量ファイルの処理

メモリ効率を重視した大容量ファイル処理:

def read_large_file(filename, chunk_size=1024):
    """大容量ファイルをチャンクごとに読み込むジェネレータ"""
    try:
        with open(filename, 'r', encoding='utf-8') as file:
            while True:
                chunk = file.read(chunk_size)
                if not chunk:
                    break
                yield chunk
    except FileNotFoundError:
        print(f"ファイル '{filename}' が見つかりません")
        return

def process_log_file(filename):
    """ログファイルを行ごとに処理するジェネレータ"""
    try:
        with open(filename, 'r', encoding='utf-8') as file:
            for line_num, line in enumerate(file, 1):
                # エラーログのみをフィルタリング
                if 'ERROR' in line.upper():
                    yield {
                        'line_number': line_num,
                        'content': line.strip(),
                        'timestamp': line[:19] if len(line) > 19 else ''
                    }
    except FileNotFoundError:
        print(f"ログファイル '{filename}' が見つかりません")

# テスト用ログファイルの作成
log_content = """2024-06-18 10:30:01 INFO User login successful
2024-06-18 10:30:15 ERROR Database connection failed
2024-06-18 10:30:16 INFO Retrying database connection
2024-06-18 10:30:17 INFO Database connection successful
2024-06-18 10:31:23 ERROR File not found: config.txt
2024-06-18 10:31:24 ERROR Permission denied: /var/log/
2024-06-18 10:32:01 INFO Application started"""

with open('app.log', 'w', encoding='utf-8') as f:
    f.write(log_content)

# エラーログのみを処理
print("エラーログの抽出:")
for error_info in process_log_file('app.log'):
    print(f"行 {error_info['line_number']}: {error_info['content']}")

出力結果:

エラーログの抽出:
行 2: 2024-06-18 10:30:15 ERROR Database connection failed
行 5: 2024-06-18 10:31:23 ERROR File not found: config.txt
行 6: 2024-06-18 10:31:24 ERROR Permission denied: /var/log/

API データの段階的取得

Web APIからのデータを効率的に取得するイテレータ:

import time
import random

class PaginatedAPIIterator:
    """ページネーション対応のAPI データイテレータ"""
    
    def __init__(self, base_url, page_size=10):
        self.base_url = base_url
        self.page_size = page_size
        self.current_page = 1
        self.total_pages = None
        self.current_items = []
        self.item_index = 0
    
    def __iter__(self):
        return self
    
    def __next__(self):
        # 現在のページの全アイテムを処理済みの場合
        if self.item_index >= len(self.current_items):
            # 次のページをロード
            if not self._load_next_page():
                raise StopIteration
            self.item_index = 0
        
        # 現在のアイテムを返す
        item = self.current_items[self.item_index]
        self.item_index += 1
        return item
    
    def _load_next_page(self):
        """次のページのデータを取得(模擬的な実装)"""
        if self.total_pages and self.current_page > self.total_pages:
            return False
        
        # 模擬的なAPI呼び出し
        print(f"API呼び出し: ページ {self.current_page}")
        time.sleep(0.1)  # API呼び出しの遅延をシミュレート
        
        # 模擬データの生成
        if self.current_page == 1:
            self.total_pages = 3  # 総ページ数を設定
        
        start_id = (self.current_page - 1) * self.page_size + 1
        self.current_items = [
            f"item_{i}" for i in range(start_id, start_id + self.page_size)
        ]
        
        # 最後のページでは少ないアイテム数をシミュレート
        if self.current_page == self.total_pages:
            self.current_items = self.current_items[:5]
        
        self.current_page += 1
        return True

# 使用例
print("API データの段階的取得:")
api_iterator = PaginatedAPIIterator("https://api.example.com/items")

for item in api_iterator:
    print(f"  処理中: {item}")
    # 実際の処理をここに記述

出力結果:

API データの段階的取得:
API呼び出し: ページ 1
  処理中: item_1
  処理中: item_2
  処理中: item_3
  処理中: item_4
  処理中: item_5
  処理中: item_6
  処理中: item_7
  処理中: item_8
  処理中: item_9
  処理中: item_10
API呼び出し: ページ 2
  処理中: item_11
  処理中: item_12
  処理中: item_13
  処理中: item_14
  処理中: item_15
  処理中: item_16
  処理中: item_17
  処理中: item_18
  処理中: item_19
  処理中: item_20
API呼び出し: ページ 3
  処理中: item_21
  処理中: item_22
  処理中: item_23
  処理中: item_24
  処理中: item_25

高度なイテレータテクニック

itertools モジュールの活用

Pythonのitertoolsモジュールは、イテレータを組み合わせる強力なツールを提供します:

import itertools

# 複数のイテラブルを連結
numbers1 = [1, 2, 3]
numbers2 = [4, 5, 6]
letters = ['a', 'b', 'c']

print("chain() - 複数のイテラブルを連結:")
for item in itertools.chain(numbers1, numbers2, letters):
    print(item, end=' ')

print("\n\ncycle() - 無限繰り返し(最初の5個のみ表示):")
colors = ['red', 'green', 'blue']
color_cycle = itertools.cycle(colors)
for i, color in enumerate(color_cycle):
    if i >= 8:
        break
    print(color, end=' ')

print("\n\nrepeat() - 同じ値を繰り返し:")
for item in itertools.repeat('Hello', 3):
    print(item)

print("\ncount() - 無限カウンタ(最初の5個のみ):")
counter = itertools.count(start=10, step=2)
for i, num in enumerate(counter):
    if i >= 5:
        break
    print(num, end=' ')

print("\n\ntakewhile() - 条件を満たす間だけ取得:")
numbers = [1, 3, 5, 8, 9, 11, 12]
for num in itertools.takewhile(lambda x: x < 10, numbers):
    print(num, end=' ')

print("\n\ndropwhile() - 条件を満たさなくなったら以降を取得:")
for num in itertools.dropwhile(lambda x: x < 10, numbers):
    print(num, end=' ')

出力結果:

chain() - 複数のイテラブルを連結:
1 2 3 4 5 6 a b c 

cycle() - 無限繰り返し(最初の5個のみ表示):
red green blue red green blue red green 

repeat() - 同じ値を繰り返し:
Hello
Hello
Hello

count() - 無限カウンタ(最初の5個のみ):
10 12 14 16 18 

takewhile() - 条件を満たす間だけ取得:
1 3 5 

dropwhile() - 条件を満たさなくなったら以降を取得:
8 9 11 12 

組み合わせとフィルタリング

import itertools

# 順列と組み合わせ
items = ['A', 'B', 'C']

print("permutations() - 順列:")
for perm in itertools.permutations(items, 2):
    print(perm)

print("\ncombinations() - 組み合わせ:")
for comb in itertools.combinations(items, 2):
    print(comb)

print("\nproduct() - 直積:")
for prod in itertools.product([1, 2], ['x', 'y']):
    print(prod)

# フィルタリング
numbers = range(1, 11)

print("\nfilter() - 条件に合う要素のみ:")
even_numbers = filter(lambda x: x % 2 == 0, numbers)
for num in even_numbers:
    print(num, end=' ')

print("\n\ncompress() - マスクによるフィルタリング:")
data = ['A', 'B', 'C', 'D', 'E']
mask = [1, 0, 1, 0, 1]  # 1の位置の要素のみ取得
for item in itertools.compress(data, mask):
    print(item, end=' ')

出力結果:

permutations() - 順列:
('A', 'B')
('A', 'C')
('B', 'A')
('B', 'C')
('C', 'A')
('C', 'B')

combinations() - 組み合わせ:
('A', 'B')
('A', 'C')
('B', 'C')

product() - 直積:
(1, 'x')
(1, 'y')
(2, 'x')
(2, 'y')

filter() - 条件に合う要素のみ:
2 4 6 8 10 

compress() - マスクによるフィルタリング:
A C E 

パフォーマンスとメモリ効率

メモリ使用量の比較

イテレータとリストのメモリ使用量を比較してみましょう:

import sys

def memory_usage_comparison():
    """メモリ使用量の比較"""
    
    # 大きなデータセットのサイズ
    size = 1000000
    
    # リスト(全データをメモリに保持)
    number_list = list(range(size))
    list_memory = sys.getsizeof(number_list)
    
    # ジェネレータ(遅延評価)
    number_generator = (x for x in range(size))
    generator_memory = sys.getsizeof(number_generator)
    
    print(f"データサイズ: {size:,}")
    print(f"リストのメモリ使用量: {list_memory:,} bytes")
    print(f"ジェネレータのメモリ使用量: {generator_memory:,} bytes")
    print(f"メモリ節約率: {(list_memory - generator_memory) / list_memory * 100:.1f}%")

# 実行結果
memory_usage_comparison()

出力結果:

データサイズ: 1,000,000
リストのメモリ使用量: 8,448,728 bytes
ジェネレータのメモリ使用量: 96 bytes
メモリ節約率: 99.9%

処理速度の比較

異なる実装方法での処理速度を測定:

import time
import itertools

def timing_comparison():
    """処理速度の比較"""
    
    def time_function(func, name, *args):
        start_time = time.time()
        result = func(*args)
        # イテレータの場合は実際に消費する
        if hasattr(result, '__iter__') and not isinstance(result, (list, tuple, str)):
            list(result)
        end_time = time.time()
        print(f"{name}: {end_time - start_time:.4f}秒")
    
    size = 100000
    
    # 1. リスト内包表記 vs ジェネレータ式
    print("平方数の生成(サイズ: {:,}):".format(size))
    
    time_function(lambda: [x**2 for x in range(size)], "リスト内包表記")
    time_function(lambda: (x**2 for x in range(size)), "ジェネレータ式")
    
    # 2. フィルタリング比較
    numbers = list(range(size))
    print(f"\n偶数のフィルタリング(サイズ: {len(numbers):,}):")
    
    time_function(lambda: [x for x in numbers if x % 2 == 0], "リスト内包表記")
    time_function(lambda: (x for x in numbers if x % 2 == 0), "ジェネレータ式")
    time_function(lambda: filter(lambda x: x % 2 == 0, numbers), "filter()関数")

timing_comparison()

実践的なパフォーマンス最適化例

def process_data_efficiently(data_source):
    """大量データを効率的に処理する例"""
    
    def validate_record(record):
        """レコードの妥当性チェック"""
        return len(record) >= 3 and record[0].isdigit()
    
    def transform_record(record):
        """レコードの変換処理"""
        return {
            'id': int(record[0]),
            'name': record[1].strip().title(),
            'value': float(record[2]) if record[2].replace('.', '').isdigit() else 0.0
        }
    
    # パイプライン処理(遅延評価)
    valid_records = filter(validate_record, data_source)
    transformed_records = map(transform_record, valid_records)
    
    return transformed_records

# テストデータの生成
test_data = [
    ['1', 'alice', '100.5'],
    ['2', 'bob', '200.0'],
    ['invalid', 'charlie', '300'],  # 無効なレコード
    ['3', 'david', '150.75'],
    ['', 'eve', '250'],              # 無効なレコード
    ['4', 'frank', '180.25']
]

print("効率的なデータ処理:")
processed_data = process_data_efficiently(test_data)

# 実際に処理を実行(遅延評価のため、ここで初めて処理される)
for record in processed_data:
    print(f"ID: {record['id']}, 名前: {record['name']}, 値: {record['value']}")

出力結果:

効率的なデータ処理:
ID: 1, 名前: Alice, 値: 100.5
ID: 2, 名前: Bob, 値: 200.0
ID: 3, 名前: David, 値: 150.75
ID: 4, 名前: Frank, 値: 180.25

実践的な応用パターン

バッチ処理パターン

大量データを一定サイズずつ処理するパターン:

def batch_iterator(iterable, batch_size):
    """イテラブルを指定サイズのバッチに分割するジェネレータ"""
    iterator = iter(iterable)
    while True:
        batch = list(itertools.islice(iterator, batch_size))
        if not batch:
            break
        yield batch

def process_in_batches(data, batch_size=3):
    """データをバッチ単位で処理"""
    print(f"バッチサイズ {batch_size} でデータを処理:")
    
    for batch_num, batch in enumerate(batch_iterator(data, batch_size), 1):
        print(f"  バッチ {batch_num}: {batch}")
        # 実際の処理をここに記述
        time.sleep(0.1)  # 処理時間のシミュレート

# 使用例
data = list(range(1, 11))
process_in_batches(data)

出力結果:

バッチサイズ 3 でデータを処理:
  バッチ 1: [1, 2, 3]
  バッチ 2: [4, 5, 6]
  バッチ 3: [7, 8, 9]
  バッチ 4: [10]

ストリーミング処理パターン

リアルタイムデータストリームの処理:

import random
import time
from collections import deque

class DataStream:
    """模擬的なデータストリーム"""
    
    def __init__(self, data_source):
        self.data_source = data_source
        self.index = 0
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.index < len(self.data_source):
            data = self.data_source[self.index]
            self.index += 1
            time.sleep(0.1)  # ストリームの遅延をシミュレート
            return data
        else:
            raise StopIteration

class MovingAverageCalculator:
    """移動平均を計算するイテレータ"""
    
    def __init__(self, data_stream, window_size=3):
        self.data_stream = iter(data_stream)
        self.window_size = window_size
        self.window = deque(maxlen=window_size)
    
    def __iter__(self):
        return self
    
    def __next__(self):
        try:
            value = next(self.data_stream)
            self.window.append(value)
            
            if len(self.window) == self.window_size:
                return {
                    'value': value,
                    'moving_average': sum(self.window) / len(self.window),
                    'window': list(self.window)
                }
            else:
                return {
                    'value': value,
                    'moving_average': None,
                    'window': list(self.window)
                }
        except StopIteration:
            raise

# 使用例
stream_data = [10, 15, 20, 25, 30, 35, 40, 45, 50]
data_stream = DataStream(stream_data)
ma_calculator = MovingAverageCalculator(data_stream, window_size=3)

print("ストリーミングデータの移動平均:")
for result in ma_calculator:
    if result['moving_average'] is not None:
        print(f"値: {result['value']:2d}, 移動平均: {result['moving_average']:5.1f}, "
              f"ウィンドウ: {result['window']}")
    else:
        print(f"値: {result['value']:2d}, 移動平均: N/A    , "
              f"ウィンドウ: {result['window']}")

エラーハンドリングパターン

堅牢なイテレータの実装:

class RobustIterator:
    """エラー処理機能付きイテレータ"""
    
    def __init__(self, data_source, error_handler=None):
        self.data_source = data_source
        self.error_handler = error_handler or self._default_error_handler
        self.index = 0
        self.error_count = 0
        self.processed_count = 0
    
    def _default_error_handler(self, error, item, index):
        """デフォルトのエラーハンドラ"""
        print(f"エラー発生 (インデックス {index}): {error}")
        return None  # エラー時はNoneを返す
    
    def __iter__(self):
        return self
    
    def __next__(self):
        while self.index < len(self.data_source):
            item = self.data_source[self.index]
            current_index = self.index
            self.index += 1
            
            try:
                # 処理を実行(意図的にエラーを発生させる可能性がある)
                result = self._process_item(item)
                self.processed_count += 1
                return result
            
            except Exception as e:
                self.error_count += 1
                error_result = self.error_handler(e, item, current_index)
                if error_result is not None:
                    return error_result
                # エラーハンドラがNoneを返した場合は次の要素に進む
                continue
        
        # 処理完了時の統計情報
        print(f"\n処理完了: 成功 {self.processed_count}件, "
              f"エラー {self.error_count}件")
        raise StopIteration
    
    def _process_item(self, item):
        """アイテムの処理(エラーが発生する可能性がある)"""
        if isinstance(item, str) and item.startswith('error'):
            raise ValueError(f"処理エラー: {item}")
        
        if isinstance(item, (int, float)):
            return item ** 2
        elif isinstance(item, str):
            return item.upper()
        else:
            raise TypeError(f"サポートされていない型: {type(item)}")

# テストデータ(エラーを含む)
test_data = [1, 2, 'hello', 'error_item', 3, None, 'world', 'error_again', 4]

# カスタムエラーハンドラ
def custom_error_handler(error, item, index):
    print(f"⚠️  警告 (インデックス {index}): {item} -> {error}")
    return f"ERROR_HANDLED_{index}"  # エラー時の代替値

print("堅牢なイテレータのテスト:")
robust_iter = RobustIterator(test_data, custom_error_handler)

for result in robust_iter:
    print(f"結果: {result}")

デバッグとトラブルシューティング

イテレータの状態確認

def debug_iterator(iterator, name="Iterator"):
    """イテレータの状態をデバッグするヘルパー関数"""
    print(f"\n=== {name} のデバッグ情報 ===")
    print(f"型: {type(iterator)}")
    print(f"イテレータか: {hasattr(iterator, '__iter__') and hasattr(iterator, '__next__')}")
    
    # 安全に次の要素を確認
    try:
        # teeを使って元のイテレータを保持
        iter1, iter2 = itertools.tee(iterator, 2)
        first_few = list(itertools.islice(iter1, 3))
        print(f"最初の3要素: {first_few}")
        return iter2  # 元のイテレータの代わりに返す
    except Exception as e:
        print(f"エラー: {e}")
        return iterator

# 使用例
numbers = [1, 2, 3, 4, 5]
num_iter = iter(numbers)

# デバッグ情報を表示
debugged_iter = debug_iterator(num_iter, "Number Iterator")

# 通常通り使用
print("\n実際の処理:")
for num in debugged_iter:
    print(num)

パフォーマンス監視

import time
import functools

def monitor_iterator(iterator_func):
    """イテレータのパフォーマンスを監視するデコレータ"""
    @functools.wraps(iterator_func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        iterator = iterator_func(*args, **kwargs)
        
        class MonitoredIterator:
            def __init__(self, wrapped_iterator):
                self.wrapped_iterator = wrapped_iterator
                self.count = 0
                self.start_time = start_time
            
            def __iter__(self):
                return self
            
            def __next__(self):
                try:
                    result = next(self.wrapped_iterator)
                    self.count += 1
                    return result
                except StopIteration:
                    end_time = time.time()
                    print(f"\n監視結果:")
                    print(f"  処理時間: {end_time - self.start_time:.4f}秒")
                    print(f"  処理件数: {self.count}件")
                    print(f"  スループット: {self.count / (end_time - self.start_time):.2f}件/秒")
                    raise
        
        return MonitoredIterator(iterator)
    
    return wrapper

# 使用例
@monitor_iterator
def slow_processor(data):
    """時間のかかる処理をシミュレート"""
    for item in data:
        time.sleep(0.01)  # 処理時間をシミュレート
        yield item * 2

# テスト実行
data = list(range(10))
print("パフォーマンス監視付き処理:")
for result in slow_processor(data):
    print(f"処理結果: {result}")

まとめと実践的なアドバイス

イテレータ使用時のベストプラクティス

1. 適切な選択基準

# メモリ効率を重視する場合:ジェネレータ
def memory_efficient_processing(large_dataset):
    for item in large_dataset:
        if condition(item):
            yield transform(item)

# 再利用が必要な場合:リスト
def reusable_data():
    # 一度作成したら何度も使用
    return [expensive_computation(x) for x in range(100)]

# 一回限りの処理:イテレータ
def one_time_processing():
    # 一度だけ使用するデータ
    return (simple_transform(x) for x in data_source)

2. エラーハンドリング

def safe_iteration(iterable):
    """安全な反復処理のパターン"""
    iterator = iter(iterable)
    
    while True:
        try:
            item = next(iterator)
            # 処理を実行
            yield process_item(item)
        except StopIteration:
            break
        except Exception as e:
            # エラーログを出力し、処理を継続
            print(f"処理エラー: {e}")
            continue

3. パフォーマンス最適化

# 良い例:遅延評価を活用
def optimized_pipeline(data):
    # チェーン化された処理
    filtered = filter(is_valid, data)
    transformed = map(transform, filtered)
    processed = map(expensive_operation, transformed)
    return processed

# 避けるべき例:中間リストの作成
def unoptimized_pipeline(data):
    filtered = [item for item in data if is_valid(item)]
    transformed = [transform(item) for item in filtered]
    processed = [expensive_operation(item) for item in transformed]
    return processed

よくある間違いと対策

間違い1:イテレータの再利用

# ❌ 間違い
iterator = iter(data)
list1 = list(iterator)  # データを消費
list2 = list(iterator)  # 空のリスト

# ✅ 正しい方法
iterator1, iterator2 = itertools.tee(iter(data), 2)
list1 = list(iterator1)
list2 = list(iterator2)

間違い2:無限イテレータの不適切な使用

# ❌ 危険:無限ループ
infinite_iter = itertools.count()
for num in infinite_iter:  # 終了条件なし
    print(num)

# ✅ 安全:適切な終了条件
for i, num in enumerate(itertools.count()):
    if i >= 10:
        break
    print(num)

間違い3:メモリ効率を考慮しない実装

# ❌ メモリ非効率
def process_large_file(filename):
    return [process_line(line) for line in open(filename)]

# ✅ メモリ効率的
def process_large_file(filename):
    with open(filename) as file:
        for line in file:
            yield process_line(line)

Pythonのイテレータは、効率的で柔軟なプログラムを書くための強力なツールです。

重要なポイント:

メモリ効率:大量データも少ないメモリで処理
遅延評価:必要な時だけ計算を実行
組み合わせ可能:複数のイテレータを柔軟に組み合わせ
Pythonic:Pythonらしい美しいコードの実現

コメント

タイトルとURLをコピーしました