【保存容量は大丈夫?】SQLiteのレコード数上限を徹底解説

データベース・SQL

アプリやシステム開発で人気のSQLite(エスキューライト)

「軽量でかんたん」という印象が強いですが、いざプロダクトで使うとなると「レコード数に上限はあるの?」「データが増えたときの動作は大丈夫?」といった疑問が浮かんできます。

この記事では、SQLiteの実際のレコード数制限とについて、実例を交えながらわかりやすく解説します。

スポンサーリンク

SQLiteの理論上の制限値

基本的な制限値一覧

項目制限値備考
最大レコード数実質制限なしROWIDが2^63-1まで
最大ファイルサイズ約281TB2^47バイト
最大カラム数2,000個(デフォルト)コンパイル時変更可能
最大行サイズ約1GB
最大文字列長約1GB
最大BLOB長約1GB

レコード数の実際の計算

ROWIDによる制限

-- SQLiteの内部行ID(ROWID)の最大値
SELECT 9223372036854775807 as 最大ROWID;
-- 結果:9,223,372,036,854,775,807 (約920京件)

これは理論値であり、現実的には以下の要因で制限されます:

  • ストレージ容量
  • メモリ容量
  • パフォーマンス要件
  • システム制限

実用的な容量計算

-- 例:1レコードが平均1KBの場合
-- 281TB ÷ 1KB = 約2,810億件
-- 
-- 例:1レコードが平均100バイトの場合  
-- 281TB ÷ 100バイト = 約28兆件

重要なポイント:理論上は「ほぼ無制限」ですが、実用性を考えると全く別の話になります。

大量レコード時のパフォーマンス影響

レコード数とパフォーマンスの関係

import sqlite3
import time

def performance_test(record_count):
    """レコード数によるパフォーマンステスト"""
    conn = sqlite3.connect(':memory:')
    cursor = conn.cursor()
    
    # テーブル作成
    cursor.execute('''
        CREATE TABLE test_table (
            id INTEGER PRIMARY KEY,
            name TEXT,
            email TEXT,
            created_at TEXT
        )
    ''')
    
    # データ挿入のパフォーマンス測定
    start_time = time.time()
    
    data = [(i, f'user_{i}', f'user_{i}@example.com', '2024-06-19') 
            for i in range(record_count)]
    
    cursor.executemany(
        'INSERT INTO test_table (id, name, email, created_at) VALUES (?, ?, ?, ?)', 
        data
    )
    
    insert_time = time.time() - start_time
    
    # 検索パフォーマンス測定(インデックスなし)
    start_time = time.time()
    cursor.execute('SELECT * FROM test_table WHERE name = ?', ('user_50000',))
    result = cursor.fetchone()
    search_time_no_index = time.time() - start_time
    
    # インデックス作成
    cursor.execute('CREATE INDEX idx_name ON test_table(name)')
    
    # 検索パフォーマンス測定(インデックスあり)
    start_time = time.time()
    cursor.execute('SELECT * FROM test_table WHERE name = ?', ('user_50000',))
    result = cursor.fetchone()
    search_time_with_index = time.time() - start_time
    
    conn.close()
    
    return {
        'record_count': record_count,
        'insert_time': insert_time,
        'search_no_index': search_time_no_index,
        'search_with_index': search_time_with_index
    }

# 実際のテスト例
print("=== SQLiteパフォーマンステスト ===")
for count in [1000, 10000, 100000]:
    result = performance_test(count)
    print(f"レコード数: {result['record_count']:,}件")
    print(f"  挿入時間: {result['insert_time']:.3f}秒")
    print(f"  検索時間(インデックスなし): {result['search_no_index']:.6f}秒")
    print(f"  検索時間(インデックスあり): {result['search_with_index']:.6f}秒")
    print()

パフォーマンス低下の主な原因

1. インデックスの不足

-- 悪い例:インデックスなしでの検索
SELECT * FROM users WHERE email = 'user@example.com';
-- → 全件スキャンで非常に遅い

-- 良い例:インデックスありでの検索
CREATE INDEX idx_users_email ON users(email);
SELECT * FROM users WHERE email = 'user@example.com';
-- → 高速検索が可能

2. 不適切なクエリ設計

-- 悪い例:不要な全件取得
SELECT * FROM orders; -- 100万件すべて取得

-- 良い例:必要な分だけ取得
SELECT * FROM orders 
WHERE created_at >= '2024-06-01' 
ORDER BY created_at DESC 
LIMIT 100;

3. メモリ不足

import sqlite3

def memory_efficient_processing():
    """メモリ効率的な大量データ処理"""
    conn = sqlite3.connect('large_database.db')
    cursor = conn.cursor()
    
    # 悪い例:全件を一度にメモリに読み込み
    # cursor.execute('SELECT * FROM large_table')
    # all_records = cursor.fetchall()  # メモリ不足の原因
    
    # 良い例:バッチ処理
    batch_size = 1000
    offset = 0
    
    while True:
        cursor.execute(
            'SELECT * FROM large_table LIMIT ? OFFSET ?', 
            (batch_size, offset)
        )
        batch = cursor.fetchall()
        
        if not batch:
            break
            
        # バッチ単位で処理
        process_batch(batch)
        offset += batch_size
    
    conn.close()

def process_batch(batch):
    """バッチデータの処理"""
    for record in batch:
        # 実際の処理をここに記述
        pass

インデックス設計

効果的なインデックス設計

-- 1. 単一カラムインデックス
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_products_category ON products(category);

-- 2. 複合インデックス(重要:カラムの順序が影響)
-- よく使われる検索条件の組み合わせ
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at);
CREATE INDEX idx_products_cat_price ON products(category, price);

-- 3. 部分インデックス(条件付きインデックス)
-- 特定の条件のレコードのみにインデックスを作成
CREATE INDEX idx_orders_pending ON orders(user_id) 
WHERE status = 'pending';

-- 4. ユニークインデックス
-- 重複を防ぎつつ検索を高速化
CREATE UNIQUE INDEX idx_users_email_unique ON users(email);

インデックスの効果確認

-- EXPLAINでクエリの実行計画を確認
EXPLAIN QUERY PLAN 
SELECT * FROM orders WHERE user_id = 123 AND created_at > '2024-01-01';

-- インデックスが使われているかチェック
-- "USING INDEX"が表示されれば正常に使用されている

インデックス管理のベストプラクティス

import sqlite3

def index_management_example():
    """インデックス管理の実例"""
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    
    # 現在のインデックス一覧を確認
    cursor.execute("""
        SELECT name, sql FROM sqlite_master 
        WHERE type = 'index' AND sql IS NOT NULL
    """)
    
    print("=== 現在のインデックス ===")
    for index_name, index_sql in cursor.fetchall():
        print(f"インデックス名: {index_name}")
        print(f"SQL: {index_sql}")
        print()
    
    # インデックスのサイズ確認
    cursor.execute("PRAGMA page_count")
    total_pages = cursor.fetchone()[0]
    
    cursor.execute("PRAGMA page_size")
    page_size = cursor.fetchone()[0]
    
    total_size = total_pages * page_size
    print(f"データベース総サイズ: {total_size:,} バイト ({total_size/1024/1024:.2f} MB)")
    
    # 統計情報の更新
    cursor.execute("ANALYZE")
    print("統計情報を更新しました")
    
    conn.close()

# index_management_example()

大量データ処理

テーブル設計の最適化

-- 効率的なテーブル設計例
CREATE TABLE user_logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER NOT NULL,
    action_type TEXT NOT NULL,
    created_at INTEGER NOT NULL,  -- UNIXタイムスタンプで高速
    details TEXT,
    
    -- 必要なインデックスを最初から定義
    INDEX idx_user_logs_user_id (user_id),
    INDEX idx_user_logs_created_at (created_at),
    INDEX idx_user_logs_user_date (user_id, created_at)
);

-- パーティション的な考え方(月ごとにテーブル分割)
CREATE TABLE user_logs_202406 (
    -- 同じ構造
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER NOT NULL,
    action_type TEXT NOT NULL,
    created_at INTEGER NOT NULL,
    details TEXT
);

CREATE TABLE user_logs_202407 (
    -- 同じ構造
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER NOT NULL,
    action_type TEXT NOT NULL,
    created_at INTEGER NOT NULL,
    details TEXT
);

バルクデータの効率的な処理

import sqlite3
import csv
from datetime import datetime

def bulk_data_processing():
    """大量データの効率的な処理例"""
    conn = sqlite3.connect('large_data.db')
    cursor = conn.cursor()
    
    # テーブル作成
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS sales_data (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id INTEGER,
            customer_id INTEGER,
            sale_date TEXT,
            amount DECIMAL(10,2),
            quantity INTEGER
        )
    ''')
    
    # 1. 大量データの高速挿入
    def fast_bulk_insert(data_file):
        """CSVファイルからの高速一括挿入"""
        
        # トランザクション開始
        cursor.execute('BEGIN TRANSACTION')
        
        try:
            with open(data_file, 'r') as file:
                csv_reader = csv.reader(file)
                next(csv_reader)  # ヘッダーをスキップ
                
                batch = []
                batch_size = 10000
                
                for row in csv_reader:
                    batch.append(tuple(row))
                    
                    if len(batch) >= batch_size:
                        cursor.executemany(
                            'INSERT INTO sales_data (product_id, customer_id, sale_date, amount, quantity) VALUES (?, ?, ?, ?, ?)',
                            batch
                        )
                        batch = []
                
                # 残りのデータを挿入
                if batch:
                    cursor.executemany(
                        'INSERT INTO sales_data (product_id, customer_id, sale_date, amount, quantity) VALUES (?, ?, ?, ?, ?)',
                        batch
                    )
            
            # トランザクションコミット
            cursor.execute('COMMIT')
            print("データの一括挿入が完了しました")
            
        except Exception as e:
            cursor.execute('ROLLBACK')
            print(f"エラーが発生しました: {e}")
    
    # 2. 効率的な集計処理
    def efficient_aggregation():
        """効率的な集計クエリ"""
        
        # インデックスを活用した集計
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_sales_date ON sales_data(sale_date);
            CREATE INDEX IF NOT EXISTS idx_sales_product ON sales_data(product_id);
        ''')
        
        # 月別売上集計
        cursor.execute('''
            SELECT 
                strftime('%Y-%m', sale_date) as month,
                COUNT(*) as transaction_count,
                SUM(amount) as total_amount,
                AVG(amount) as average_amount
            FROM sales_data 
            WHERE sale_date >= '2024-01-01'
            GROUP BY strftime('%Y-%m', sale_date)
            ORDER BY month
        ''')
        
        results = cursor.fetchall()
        print("=== 月別売上集計 ===")
        for month, count, total, avg in results:
            print(f"{month}: {count:,}件, 合計{total:,.2f}円, 平均{avg:.2f}円")
    
    # 3. データの効率的な削除
    def efficient_data_cleanup():
        """古いデータの効率的な削除"""
        
        # 古いデータの削除(バッチ処理)
        deleted_count = 0
        batch_size = 10000
        
        while True:
            cursor.execute('''
                DELETE FROM sales_data 
                WHERE id IN (
                    SELECT id FROM sales_data 
                    WHERE sale_date < '2023-01-01'
                    LIMIT ?
                )
            ''', (batch_size,))
            
            batch_deleted = cursor.rowcount
            deleted_count += batch_deleted
            
            if batch_deleted == 0:
                break
            
            print(f"削除済み: {deleted_count:,}件")
        
        # VACUUM実行(領域の解放)
        cursor.execute('VACUUM')
        print("データベースの最適化が完了しました")
    
    conn.close()

# bulk_data_processing()

メモリ使用量の最適化

import sqlite3

def memory_optimization():
    """メモリ使用量の最適化設定"""
    conn = sqlite3.connect('optimized.db')
    cursor = conn.cursor()
    
    # SQLiteの設定最適化
    optimizations = [
        # ページキャッシュサイズ(メモリ使用量を制御)
        'PRAGMA cache_size = 10000',  # 約40MBのキャッシュ
        
        # 一時ファイルの保存場所
        'PRAGMA temp_store = MEMORY',  # 一時データはメモリに保存
        
        # 同期モード(書き込み速度向上、ただし安全性とのトレードオフ)
        'PRAGMA synchronous = NORMAL',  # デフォルトはFULL
        
        # ジャーナルモード(WALモードで読み書き性能向上)
        'PRAGMA journal_mode = WAL',
        
        # 自動VACUUM(ファイルサイズの自動最適化)
        'PRAGMA auto_vacuum = INCREMENTAL',
        
        # mmapサイズ(大きなデータベースで有効)
        'PRAGMA mmap_size = 268435456',  # 256MB
    ]
    
    print("=== SQLite最適化設定の適用 ===")
    for pragma in optimizations:
        cursor.execute(pragma)
        print(f"適用: {pragma}")
    
    # 設定確認
    settings_to_check = [
        'cache_size', 'temp_store', 'synchronous', 
        'journal_mode', 'auto_vacuum', 'mmap_size'
    ]
    
    print("\n=== 現在の設定値 ===")
    for setting in settings_to_check:
        cursor.execute(f'PRAGMA {setting}')
        value = cursor.fetchone()[0]
        print(f"{setting}: {value}")
    
    conn.close()

# memory_optimization()

SQLiteの限界と代替案

SQLiteが不向きなケース

1. 高い同時書き込み要求

# SQLiteの制限例
import sqlite3
import threading
import time

def concurrent_write_test():
    """同時書き込みのテスト(SQLiteの制限を確認)"""
    
    def write_worker(worker_id):
        try:
            conn = sqlite3.connect('test_concurrent.db', timeout=10)
            cursor = conn.cursor()
            
            for i in range(100):
                cursor.execute(
                    'INSERT INTO test_table (worker_id, value) VALUES (?, ?)',
                    (worker_id, i)
                )
                conn.commit()
                time.sleep(0.01)  # 少し待機
            
            conn.close()
            print(f"Worker {worker_id} 完了")
            
        except sqlite3.OperationalError as e:
            print(f"Worker {worker_id} エラー: {e}")
    
    # テーブル作成
    conn = sqlite3.connect('test_concurrent.db')
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS test_table (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            worker_id INTEGER,
            value INTEGER,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    conn.close()
    
    # 複数のスレッドで同時書き込み
    threads = []
    for i in range(5):  # 5つのワーカー
        thread = threading.Thread(target=write_worker, args=(i,))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    print("同時書き込みテスト完了")

# concurrent_write_test()

2. 非常に大きなデータセット

# 容量制限の確認
def storage_limitation_check():
    """ストレージ制限の確認"""
    import os
    
    # ファイルサイズの確認
    def get_file_size(filename):
        if os.path.exists(filename):
            size = os.path.getsize(filename)
            return size
        return 0
    
    # 大量データでのテスト
    conn = sqlite3.connect('large_test.db')
    cursor = conn.cursor()
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS large_table (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            data TEXT
        )
    ''')
    
    # 大きなデータを挿入
    large_text = 'x' * 10000  # 10KBのテキスト
    
    print("大量データの挿入開始...")
    for i in range(10000):  # 約100MBのデータ
        cursor.execute('INSERT INTO large_table (data) VALUES (?)', (large_text,))
        
        if i % 1000 == 0:
            conn.commit()
            size = get_file_size('large_test.db')
            print(f"レコード数: {i:,}, ファイルサイズ: {size/1024/1024:.2f}MB")
    
    conn.commit()
    final_size = get_file_size('large_test.db')
    print(f"最終ファイルサイズ: {final_size/1024/1024:.2f}MB")
    
    conn.close()

# storage_limitation_check()

代替データベースの選択指針

def database_selection_guide():
    """データベース選択の指針"""
    
    criteria = {
        "SQLite": {
            "適用場面": [
                "モバイルアプリのローカルストレージ",
                "デスクトップアプリの設定保存",
                "プロトタイプやテスト環境",
                "小〜中規模のWebアプリ",
                "組み込みシステム"
            ],
            "制限": [
                "同時書き込み接続は1つのみ",
                "ネットワーク経由でのアクセス不可",
                "ユーザー管理機能なし",
                "レプリケーション機能なし"
            ],
            "パフォーマンス目安": "〜数十GB、〜数百万件"
        },
        
        "PostgreSQL": {
            "適用場面": [
                "本格的なWebアプリケーション",
                "複雑なクエリが必要なシステム",
                "高い整合性が求められるシステム",
                "分析系ワークロード"
            ],
            "利点": [
                "高い同時接続性能",
                "豊富な機能とデータ型",
                "優秀なオプティマイザ",
                "レプリケーション対応"
            ],
            "パフォーマンス目安": "〜数TB、〜数億件"
        },
        
        "MySQL": {
            "適用場面": [
                "Webアプリケーション",
                "読み取り中心のワークロード",
                "レプリケーションが重要なシステム"
            ],
            "利点": [
                "高い読み取り性能",
                "優秀なレプリケーション機能",
                "豊富な運用実績"
            ],
            "パフォーマンス目安": "〜数TB、〜数億件"
        }
    }
    
    print("=== データベース選択ガイド ===")
    for db_name, info in criteria.items():
        print(f"\n{db_name}:")
        print("  適用場面:")
        for scene in info["適用場面"]:
            print(f"    • {scene}")
        
        if "制限" in info:
            print("  制限:")
            for limit in info["制限"]:
                print(f"    • {limit}")
        
        if "利点" in info:
            print("  利点:")
            for advantage in info["利点"]:
                print(f"    • {advantage}")
        
        print(f"  パフォーマンス目安: {info['パフォーマンス目安']}")

database_selection_guide()

トラブルシューティングガイド

よくある問題と解決方法

import sqlite3
import os

def troubleshooting_guide():
    """SQLiteトラブルシューティングガイド"""
    
    def diagnose_database_locked():
        """データベースロック問題の診断"""
        print("=== データベースロック問題の解決 ===")
        
        solutions = [
            {
                "問題": "database is locked エラー",
                "原因": ["接続の適切なクローズ不足", "長時間実行されるトランザクション"],
                "解決策": [
                    "try-finally文で確実にconnection.close()を実行",
                    "トランザクションのタイムアウト設定",
                    "WALモードの活用(読み書き並行性向上)"
                ],
                "コード例": """
try:
    conn = sqlite3.connect('database.db', timeout=20)
    # データベース操作
finally:
    conn.close()
"""
            },
            {
                "問題": "disk I/O error",
                "原因": ["ディスク容量不足", "ファイルシステムの問題"],
                "解決策": [
                    "ディスク容量の確認と確保",
                    "ファイルパーミッションの確認",
                    "異なるディスクへのデータベース移動"
                ]
            }
        ]
        
        for i, solution in enumerate(solutions, 1):
            print(f"{i}. {solution['問題']}")
            print("   原因:")
            for cause in solution["原因"]:
                print(f"     • {cause}")
            print("   解決策:")
            for fix in solution["解決策"]:
                print(f"     • {fix}")
            if "コード例" in solution:
                print("   コード例:")
                print(solution["コード例"])
            print()
    
    def diagnose_performance_issues():
        """パフォーマンス問題の診断"""
        print("=== パフォーマンス問題の診断 ===")
        
        def query_analyzer_example():
            """クエリ分析の例"""
            return """
-- 遅いクエリの分析
EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = 'user@example.com';

-- インデックスの効果確認
CREATE INDEX idx_users_email ON users(email);
EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = 'user@example.com';

-- 統計情報の更新
ANALYZE;

-- テーブルサイズの確認
SELECT 
    name,
    (SELECT COUNT(*) FROM pragma_table_info(name)) as columns,
    (SELECT sql FROM sqlite_master WHERE type='table' AND name=outer.name) as create_sql
FROM sqlite_master outer WHERE type='table';
"""
        
        print("パフォーマンス問題の一般的な対処法:")
        print("1. EXPLAIN QUERY PLANでクエリ実行計画を確認")
        print("2. 適切なインデックスの作成")
        print("3. ANALYZEで統計情報を更新")
        print("4. VACUUMでファイルサイズの最適化")
        print("\nクエリ分析の例:")
        print(query_analyzer_example())
    
    def diagnose_corruption():
        """データ破損問題の診断"""
        print("=== データ破損問題の診断と修復 ===")
        
        recovery_steps = [
            "1. 整合性チェックの実行: PRAGMA integrity_check",
            "2. 読み込み可能なデータのエクスポート: .dump",
            "3. 新しいデータベースファイルの作成",
            "4. エクスポートしたデータのインポート",
            "5. インデックスとトリガーの再作成"
        ]
        
        print("データ破損時の回復手順:")
        for step in recovery_steps:
            print(f"  {step}")
        
        print("\n実行例:")
        print("""
# コマンドライン版SQLiteでの回復例
sqlite3 corrupted.db
> PRAGMA integrity_check;
> .output backup.sql
> .dump
> .quit

# 新しいデータベースに復元
sqlite3 recovered.db < backup.sql
""")
    
    # 各診断の実行
    diagnose_database_locked()
    diagnose_performance_issues()
    diagnose_corruption()

troubleshooting_guide()

使用例

レコード数別パフォーマンステスト

import sqlite3
import time
import random
import string
from contextlib import contextmanager

def comprehensive_benchmark():
    """包括的なベンチマークテスト"""
    
    @contextmanager
    def timer(description):
        """実行時間測定のコンテキストマネージャー"""
        start = time.time()
        yield
        end = time.time()
        print(f"{description}: {end - start:.3f}秒")
    
    def generate_test_data(count):
        """テストデータの生成"""
        data = []
        for i in range(count):
            name = ''.join(random.choices(string.ascii_letters, k=10))
            email = f"user{i}@example.com"
            age = random.randint(18, 80)
            data.append((name, email, age))
        return data
    
    def benchmark_operations(record_counts):
        """各種操作のベンチマーク"""
        results = {}
        
        for count in record_counts:
            print(f"\n=== {count:,}件でのベンチマーク ===")
            
            # データベース作成
            conn = sqlite3.connect(':memory:')
            cursor = conn.cursor()
            
            # テーブル作成
            cursor.execute('''
                CREATE TABLE users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    email TEXT UNIQUE NOT NULL,
                    age INTEGER NOT NULL
                )
            ''')
            
            # データ生成
            test_data = generate_test_data(count)
            
            # 挿入性能テスト
            with timer(f"  {count:,}件挿入(トランザクションあり)"):
                cursor.execute('BEGIN TRANSACTION')
                cursor.executemany(
                    'INSERT INTO users (name, email, age) VALUES (?, ?, ?)', 
                    test_data
                )
                cursor.execute('COMMIT')
            
            # インデックス作成
            with timer("  インデックス作成"):
                cursor.execute('CREATE INDEX idx_users_email ON users(email)')
                cursor.execute('CREATE INDEX idx_users_age ON users(age)')
            
            # 検索性能テスト
            search_email = test_data[count//2][1]  # 中間のデータを検索
            
            with timer("  主キー検索"):
                cursor.execute('SELECT * FROM users WHERE id = ?', (count//2,))
                result = cursor.fetchone()
            
            with timer("  インデックス使用検索(email)"):
                cursor.execute('SELECT * FROM users WHERE email = ?', (search_email,))
                result = cursor.fetchone()
            
            with timer("  範囲検索(age)"):
                cursor.execute('SELECT * FROM users WHERE age BETWEEN 25 AND 35 LIMIT 100')
                result = cursor.fetchall()
            
            with timer("  全件カウント"):
                cursor.execute('SELECT COUNT(*) FROM users')
                result = cursor.fetchone()
            
            # 更新性能テスト
            with timer("  バッチ更新(100件)"):
                cursor.execute('BEGIN TRANSACTION')
                for i in range(100):
                    user_id = random.randint(1, count)
                    new_age = random.randint(18, 80)
                    cursor.execute('UPDATE users SET age = ? WHERE id = ?', (new_age, user_id))
                cursor.execute('COMMIT')
            
            # 削除性能テスト
            with timer("  バッチ削除(100件)"):
                cursor.execute('BEGIN TRANSACTION')
                cursor.execute('DELETE FROM users WHERE age > 70')
                cursor.execute('COMMIT')
            
            # ファイルサイズ測定(メモリDBなので概算)
            cursor.execute('PRAGMA page_count')
            page_count = cursor.fetchone()[0]
            cursor.execute('PRAGMA page_size')
            page_size = cursor.fetchone()[0]
            estimated_size = page_count * page_size
            print(f"  推定データサイズ: {estimated_size/1024/1024:.2f}MB")
            
            conn.close()
            
            results[count] = {
                'estimated_size_mb': estimated_size/1024/1024,
                'completed': True
            }
        
        return results
    
    # ベンチマーク実行
    test_counts = [1000, 10000, 100000, 500000]
    print("SQLite包括的ベンチマーク開始")
    print("テスト環境: メモリDB、単一スレッド")
    
    results = benchmark_operations(test_counts)
    
    print("\n=== ベンチマーク結果サマリー ===")
    for count, result in results.items():
        if result['completed']:
            print(f"{count:,}件: 推定サイズ {result['estimated_size_mb']:.2f}MB")

# comprehensive_benchmark()

実運用でのパフォーマンス最適化事例

def real_world_optimization_examples():
    """実運用でのパフォーマンス最適化事例"""
    
    print("=== 実運用での最適化事例 ===")
    
    case_studies = [
        {
            "事例": "モバイルアプリのユーザーデータ管理",
            "課題": "起動時のデータ読み込みが遅い(100万件のメッセージデータ)",
            "解決策": [
                "日付ベースでのテーブル分割",
                "遅延読み込みの実装",
                "必要なカラムのみの取得",
                "適切なインデックス設計"
            ],
            "結果": "起動時間を3秒から0.5秒に短縮",
            "実装例": """
-- 月別テーブル分割
CREATE TABLE messages_202406 (
    id INTEGER PRIMARY KEY,
    user_id INTEGER,
    content TEXT,
    created_at INTEGER,
    INDEX idx_user_date (user_id, created_at)
);

-- 遅延読み込み用クエリ
SELECT id, preview, created_at 
FROM messages_202406 
WHERE user_id = ? 
ORDER BY created_at DESC 
LIMIT 50;
"""
        },
        {
            "事例": "IoTデータの収集・分析システム", 
            "課題": "センサーデータ(毎分1000件)の書き込み性能",
            "解決策": [
                "WALモードの活用",
                "バッチ挿入の実装",
                "古いデータの自動削除",
                "インデックスの最小化"
            ],
            "結果": "書き込み性能が5倍向上",
            "実装例": """
-- WALモード設定
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;

-- バッチ挿入
INSERT INTO sensor_data (device_id, value, timestamp) 
VALUES (?, ?, ?), (?, ?, ?), ... -- 1000件分

-- 自動削除(7日以上古いデータ)
DELETE FROM sensor_data 
WHERE timestamp < (strftime('%s', 'now') - 604800);
"""
        },
        {
            "事例": "ECサイトの商品検索システム",
            "課題": "複雑な条件での商品検索が遅い(50万商品)",
            "解決策": [
                "FTS(全文検索)の活用",
                "複合インデックスの最適化", 
                "キャッシュ戦略の実装",
                "検索結果のページネーション"
            ],
            "結果": "検索レスポンスが2秒から0.2秒に改善",
            "実装例": """
-- FTS仮想テーブル
CREATE VIRTUAL TABLE products_fts USING fts5(
    name, description, category, content='products'
);

-- 複合インデックス
CREATE INDEX idx_products_search 
ON products(category, price, in_stock, brand);

-- 効率的な検索クエリ
SELECT p.* FROM products p
JOIN products_fts fts ON p.id = fts.rowid
WHERE fts MATCH 'keyword'
  AND p.category = 'electronics'
  AND p.price BETWEEN 1000 AND 50000
  AND p.in_stock = 1
ORDER BY p.popularity DESC
LIMIT 20 OFFSET 0;
"""
        }
    ]
    
    for i, case in enumerate(case_studies, 1):
        print(f"\n{i}. {case['事例']}")
        print(f"課題: {case['課題']}")
        print("解決策:")
        for solution in case["解決策"]:
            print(f"  • {solution}")
        print(f"結果: {case['結果']}")
        print("実装例:")
        print(case["実装例"])

real_world_optimization_examples()

まとめ:SQLiteのレコード数制限を正しく理解して活用する

重要なポイントの再確認

def summary_key_points():
    """重要ポイントのまとめ"""
    
    key_points = {
        "理論的制限": {
            "レコード数": "実質無制限(ROWID上限:2^63-1)",
            "ファイルサイズ": "最大281TB",
            "実用性": "ハードウェアとパフォーマンス要件に依存"
        },
        
        "実運用での制限": {
            "推奨レコード数": "数百万〜数千万件程度",
            "推奨ファイルサイズ": "数GB〜数十GB程度",
            "注意点": "用途とパフォーマンス要件により大きく変動"
        },
        
        "パフォーマンス最適化": {
            "必須対策": ["適切なインデックス設計", "クエリの最適化", "トランザクションの活用"],
            "推奨設定": ["WALモード", "適切なキャッシュサイズ", "定期メンテナンス"],
            "監視項目": ["ファイルサイズ", "クエリ実行時間", "同時接続数"]
        },
        
        "移行の判断基準": {
            "SQLite継続": "単一ユーザー、軽〜中程度の負荷、組み込み用途",
            "他DB検討": "高同時性、大規模データ、複雑な分析処理",
            "移行タイミング": "パフォーマンス問題、スケーラビリティ限界、機能不足"
        }
    }
    
    print("=== SQLiteレコード数制限:重要ポイント ===")
    for category, points in key_points.items():
        print(f"\n{category}:")
        for key, value in points.items():
            if isinstance(value, list):
                print(f"  {key}: {', '.join(value)}")
            else:
                print(f"  {key}: {value}")
    
    print("\n=== 実践的な推奨事項 ===")
    recommendations = [
        "設計段階からインデックス戦略を検討する",
        "定期的なパフォーマンス監視を実装する", 
        "データ増加に備えた移行計画を準備する",
        "用途に応じたデータベース選択を行う",
        "適切なバックアップ・復旧手順を確立する"
    ]
    
    for i, rec in enumerate(recommendations, 1):
        print(f"{i}. {rec}")

summary_key_points()

チェックリスト:SQLite運用の成功のために

def sqlite_success_checklist():
    """SQLite運用成功のためのチェックリスト"""
    
    checklist = {
        "設計段階": [
            "□ データ量とアクセスパターンの想定",
            "□ 適切なテーブル構造の設計",
            "□ 必要なインデックスの計画",
            "□ パフォーマンス要件の明確化",
            "□ 将来的な拡張性の検討"
        ],
        
        "実装段階": [
            "□ 適切なSQLite設定(WAL、キャッシュサイズ等)",
            "□ エラーハンドリングの実装",
            "□ トランザクション管理の適切な実装",
            "□ 接続の適切なクローズ処理",
            "□ バッチ処理の実装"
        ],
        
        "運用段階": [
            "□ 定期的な統計情報更新(ANALYZE)",
            "□ 定期的なデータベース最適化(VACUUM)",
            "□ パフォーマンス監視の実装",
            "□ バックアップ戦略の実行",
            "□ 容量監視とアラート設定"
        ],
        
        "保守段階": [
            "□ 定期的なパフォーマンステスト",
            "□ インデックス効果の検証",
            "□ 古いデータの削除戦略",
            "□ データベース移行計画の見直し",
            "□ セキュリティ対策の確認"
        ]
    }
    
    print("=== SQLite運用成功チェックリスト ===")
    for phase, items in checklist.items():
        print(f"\n【{phase}】")
        for item in items:
            print(f"  {item}")
    
    print("\n=== 緊急時対応準備 ===")
    emergency_prep = [
        "□ データベース破損時の復旧手順書",
        "□ パフォーマンス問題時の対応フロー",
        "□ 容量不足時のデータ削除・移行手順",
        "□ バックアップからの復旧テスト実施",
        "□ 代替データベースへの移行手順書"
    ]
    
    for item in emergency_prep:
        print(f"  {item}")

sqlite_success_checklist()

コメント

タイトルとURLをコピーしました