アプリやシステム開発で人気のSQLite(エスキューライト)。
「軽量でかんたん」という印象が強いですが、いざプロダクトで使うとなると「レコード数に上限はあるの?」「データが増えたときの動作は大丈夫?」といった疑問が浮かんできます。
この記事では、SQLiteの実際のレコード数制限とについて、実例を交えながらわかりやすく解説します。
SQLiteの理論上の制限値

基本的な制限値一覧
項目 | 制限値 | 備考 |
---|---|---|
最大レコード数 | 実質制限なし | ROWIDが2^63-1まで |
最大ファイルサイズ | 約281TB | 2^47バイト |
最大カラム数 | 2,000個(デフォルト) | コンパイル時変更可能 |
最大行サイズ | 約1GB | |
最大文字列長 | 約1GB | |
最大BLOB長 | 約1GB |
レコード数の実際の計算
ROWIDによる制限
-- SQLiteの内部行ID(ROWID)の最大値
SELECT 9223372036854775807 as 最大ROWID;
-- 結果:9,223,372,036,854,775,807 (約920京件)
これは理論値であり、現実的には以下の要因で制限されます:
- ストレージ容量
- メモリ容量
- パフォーマンス要件
- システム制限
実用的な容量計算
-- 例:1レコードが平均1KBの場合
-- 281TB ÷ 1KB = 約2,810億件
--
-- 例:1レコードが平均100バイトの場合
-- 281TB ÷ 100バイト = 約28兆件
重要なポイント:理論上は「ほぼ無制限」ですが、実用性を考えると全く別の話になります。
大量レコード時のパフォーマンス影響
レコード数とパフォーマンスの関係
import sqlite3
import time
def performance_test(record_count):
"""レコード数によるパフォーマンステスト"""
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# テーブル作成
cursor.execute('''
CREATE TABLE test_table (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
created_at TEXT
)
''')
# データ挿入のパフォーマンス測定
start_time = time.time()
data = [(i, f'user_{i}', f'user_{i}@example.com', '2024-06-19')
for i in range(record_count)]
cursor.executemany(
'INSERT INTO test_table (id, name, email, created_at) VALUES (?, ?, ?, ?)',
data
)
insert_time = time.time() - start_time
# 検索パフォーマンス測定(インデックスなし)
start_time = time.time()
cursor.execute('SELECT * FROM test_table WHERE name = ?', ('user_50000',))
result = cursor.fetchone()
search_time_no_index = time.time() - start_time
# インデックス作成
cursor.execute('CREATE INDEX idx_name ON test_table(name)')
# 検索パフォーマンス測定(インデックスあり)
start_time = time.time()
cursor.execute('SELECT * FROM test_table WHERE name = ?', ('user_50000',))
result = cursor.fetchone()
search_time_with_index = time.time() - start_time
conn.close()
return {
'record_count': record_count,
'insert_time': insert_time,
'search_no_index': search_time_no_index,
'search_with_index': search_time_with_index
}
# 実際のテスト例
print("=== SQLiteパフォーマンステスト ===")
for count in [1000, 10000, 100000]:
result = performance_test(count)
print(f"レコード数: {result['record_count']:,}件")
print(f" 挿入時間: {result['insert_time']:.3f}秒")
print(f" 検索時間(インデックスなし): {result['search_no_index']:.6f}秒")
print(f" 検索時間(インデックスあり): {result['search_with_index']:.6f}秒")
print()
パフォーマンス低下の主な原因
1. インデックスの不足
-- 悪い例:インデックスなしでの検索
SELECT * FROM users WHERE email = 'user@example.com';
-- → 全件スキャンで非常に遅い
-- 良い例:インデックスありでの検索
CREATE INDEX idx_users_email ON users(email);
SELECT * FROM users WHERE email = 'user@example.com';
-- → 高速検索が可能
2. 不適切なクエリ設計
-- 悪い例:不要な全件取得
SELECT * FROM orders; -- 100万件すべて取得
-- 良い例:必要な分だけ取得
SELECT * FROM orders
WHERE created_at >= '2024-06-01'
ORDER BY created_at DESC
LIMIT 100;
3. メモリ不足
import sqlite3
def memory_efficient_processing():
"""メモリ効率的な大量データ処理"""
conn = sqlite3.connect('large_database.db')
cursor = conn.cursor()
# 悪い例:全件を一度にメモリに読み込み
# cursor.execute('SELECT * FROM large_table')
# all_records = cursor.fetchall() # メモリ不足の原因
# 良い例:バッチ処理
batch_size = 1000
offset = 0
while True:
cursor.execute(
'SELECT * FROM large_table LIMIT ? OFFSET ?',
(batch_size, offset)
)
batch = cursor.fetchall()
if not batch:
break
# バッチ単位で処理
process_batch(batch)
offset += batch_size
conn.close()
def process_batch(batch):
"""バッチデータの処理"""
for record in batch:
# 実際の処理をここに記述
pass
インデックス設計

効果的なインデックス設計
-- 1. 単一カラムインデックス
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_products_category ON products(category);
-- 2. 複合インデックス(重要:カラムの順序が影響)
-- よく使われる検索条件の組み合わせ
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at);
CREATE INDEX idx_products_cat_price ON products(category, price);
-- 3. 部分インデックス(条件付きインデックス)
-- 特定の条件のレコードのみにインデックスを作成
CREATE INDEX idx_orders_pending ON orders(user_id)
WHERE status = 'pending';
-- 4. ユニークインデックス
-- 重複を防ぎつつ検索を高速化
CREATE UNIQUE INDEX idx_users_email_unique ON users(email);
インデックスの効果確認
-- EXPLAINでクエリの実行計画を確認
EXPLAIN QUERY PLAN
SELECT * FROM orders WHERE user_id = 123 AND created_at > '2024-01-01';
-- インデックスが使われているかチェック
-- "USING INDEX"が表示されれば正常に使用されている
インデックス管理のベストプラクティス
import sqlite3
def index_management_example():
"""インデックス管理の実例"""
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 現在のインデックス一覧を確認
cursor.execute("""
SELECT name, sql FROM sqlite_master
WHERE type = 'index' AND sql IS NOT NULL
""")
print("=== 現在のインデックス ===")
for index_name, index_sql in cursor.fetchall():
print(f"インデックス名: {index_name}")
print(f"SQL: {index_sql}")
print()
# インデックスのサイズ確認
cursor.execute("PRAGMA page_count")
total_pages = cursor.fetchone()[0]
cursor.execute("PRAGMA page_size")
page_size = cursor.fetchone()[0]
total_size = total_pages * page_size
print(f"データベース総サイズ: {total_size:,} バイト ({total_size/1024/1024:.2f} MB)")
# 統計情報の更新
cursor.execute("ANALYZE")
print("統計情報を更新しました")
conn.close()
# index_management_example()
大量データ処理

テーブル設計の最適化
-- 効率的なテーブル設計例
CREATE TABLE user_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
action_type TEXT NOT NULL,
created_at INTEGER NOT NULL, -- UNIXタイムスタンプで高速
details TEXT,
-- 必要なインデックスを最初から定義
INDEX idx_user_logs_user_id (user_id),
INDEX idx_user_logs_created_at (created_at),
INDEX idx_user_logs_user_date (user_id, created_at)
);
-- パーティション的な考え方(月ごとにテーブル分割)
CREATE TABLE user_logs_202406 (
-- 同じ構造
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
action_type TEXT NOT NULL,
created_at INTEGER NOT NULL,
details TEXT
);
CREATE TABLE user_logs_202407 (
-- 同じ構造
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
action_type TEXT NOT NULL,
created_at INTEGER NOT NULL,
details TEXT
);
バルクデータの効率的な処理
import sqlite3
import csv
from datetime import datetime
def bulk_data_processing():
"""大量データの効率的な処理例"""
conn = sqlite3.connect('large_data.db')
cursor = conn.cursor()
# テーブル作成
cursor.execute('''
CREATE TABLE IF NOT EXISTS sales_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
customer_id INTEGER,
sale_date TEXT,
amount DECIMAL(10,2),
quantity INTEGER
)
''')
# 1. 大量データの高速挿入
def fast_bulk_insert(data_file):
"""CSVファイルからの高速一括挿入"""
# トランザクション開始
cursor.execute('BEGIN TRANSACTION')
try:
with open(data_file, 'r') as file:
csv_reader = csv.reader(file)
next(csv_reader) # ヘッダーをスキップ
batch = []
batch_size = 10000
for row in csv_reader:
batch.append(tuple(row))
if len(batch) >= batch_size:
cursor.executemany(
'INSERT INTO sales_data (product_id, customer_id, sale_date, amount, quantity) VALUES (?, ?, ?, ?, ?)',
batch
)
batch = []
# 残りのデータを挿入
if batch:
cursor.executemany(
'INSERT INTO sales_data (product_id, customer_id, sale_date, amount, quantity) VALUES (?, ?, ?, ?, ?)',
batch
)
# トランザクションコミット
cursor.execute('COMMIT')
print("データの一括挿入が完了しました")
except Exception as e:
cursor.execute('ROLLBACK')
print(f"エラーが発生しました: {e}")
# 2. 効率的な集計処理
def efficient_aggregation():
"""効率的な集計クエリ"""
# インデックスを活用した集計
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_sales_date ON sales_data(sale_date);
CREATE INDEX IF NOT EXISTS idx_sales_product ON sales_data(product_id);
''')
# 月別売上集計
cursor.execute('''
SELECT
strftime('%Y-%m', sale_date) as month,
COUNT(*) as transaction_count,
SUM(amount) as total_amount,
AVG(amount) as average_amount
FROM sales_data
WHERE sale_date >= '2024-01-01'
GROUP BY strftime('%Y-%m', sale_date)
ORDER BY month
''')
results = cursor.fetchall()
print("=== 月別売上集計 ===")
for month, count, total, avg in results:
print(f"{month}: {count:,}件, 合計{total:,.2f}円, 平均{avg:.2f}円")
# 3. データの効率的な削除
def efficient_data_cleanup():
"""古いデータの効率的な削除"""
# 古いデータの削除(バッチ処理)
deleted_count = 0
batch_size = 10000
while True:
cursor.execute('''
DELETE FROM sales_data
WHERE id IN (
SELECT id FROM sales_data
WHERE sale_date < '2023-01-01'
LIMIT ?
)
''', (batch_size,))
batch_deleted = cursor.rowcount
deleted_count += batch_deleted
if batch_deleted == 0:
break
print(f"削除済み: {deleted_count:,}件")
# VACUUM実行(領域の解放)
cursor.execute('VACUUM')
print("データベースの最適化が完了しました")
conn.close()
# bulk_data_processing()
メモリ使用量の最適化
import sqlite3
def memory_optimization():
"""メモリ使用量の最適化設定"""
conn = sqlite3.connect('optimized.db')
cursor = conn.cursor()
# SQLiteの設定最適化
optimizations = [
# ページキャッシュサイズ(メモリ使用量を制御)
'PRAGMA cache_size = 10000', # 約40MBのキャッシュ
# 一時ファイルの保存場所
'PRAGMA temp_store = MEMORY', # 一時データはメモリに保存
# 同期モード(書き込み速度向上、ただし安全性とのトレードオフ)
'PRAGMA synchronous = NORMAL', # デフォルトはFULL
# ジャーナルモード(WALモードで読み書き性能向上)
'PRAGMA journal_mode = WAL',
# 自動VACUUM(ファイルサイズの自動最適化)
'PRAGMA auto_vacuum = INCREMENTAL',
# mmapサイズ(大きなデータベースで有効)
'PRAGMA mmap_size = 268435456', # 256MB
]
print("=== SQLite最適化設定の適用 ===")
for pragma in optimizations:
cursor.execute(pragma)
print(f"適用: {pragma}")
# 設定確認
settings_to_check = [
'cache_size', 'temp_store', 'synchronous',
'journal_mode', 'auto_vacuum', 'mmap_size'
]
print("\n=== 現在の設定値 ===")
for setting in settings_to_check:
cursor.execute(f'PRAGMA {setting}')
value = cursor.fetchone()[0]
print(f"{setting}: {value}")
conn.close()
# memory_optimization()
SQLiteの限界と代替案
SQLiteが不向きなケース
1. 高い同時書き込み要求
# SQLiteの制限例
import sqlite3
import threading
import time
def concurrent_write_test():
"""同時書き込みのテスト(SQLiteの制限を確認)"""
def write_worker(worker_id):
try:
conn = sqlite3.connect('test_concurrent.db', timeout=10)
cursor = conn.cursor()
for i in range(100):
cursor.execute(
'INSERT INTO test_table (worker_id, value) VALUES (?, ?)',
(worker_id, i)
)
conn.commit()
time.sleep(0.01) # 少し待機
conn.close()
print(f"Worker {worker_id} 完了")
except sqlite3.OperationalError as e:
print(f"Worker {worker_id} エラー: {e}")
# テーブル作成
conn = sqlite3.connect('test_concurrent.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS test_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
worker_id INTEGER,
value INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.close()
# 複数のスレッドで同時書き込み
threads = []
for i in range(5): # 5つのワーカー
thread = threading.Thread(target=write_worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print("同時書き込みテスト完了")
# concurrent_write_test()
2. 非常に大きなデータセット
# 容量制限の確認
def storage_limitation_check():
"""ストレージ制限の確認"""
import os
# ファイルサイズの確認
def get_file_size(filename):
if os.path.exists(filename):
size = os.path.getsize(filename)
return size
return 0
# 大量データでのテスト
conn = sqlite3.connect('large_test.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS large_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data TEXT
)
''')
# 大きなデータを挿入
large_text = 'x' * 10000 # 10KBのテキスト
print("大量データの挿入開始...")
for i in range(10000): # 約100MBのデータ
cursor.execute('INSERT INTO large_table (data) VALUES (?)', (large_text,))
if i % 1000 == 0:
conn.commit()
size = get_file_size('large_test.db')
print(f"レコード数: {i:,}, ファイルサイズ: {size/1024/1024:.2f}MB")
conn.commit()
final_size = get_file_size('large_test.db')
print(f"最終ファイルサイズ: {final_size/1024/1024:.2f}MB")
conn.close()
# storage_limitation_check()
代替データベースの選択指針
def database_selection_guide():
"""データベース選択の指針"""
criteria = {
"SQLite": {
"適用場面": [
"モバイルアプリのローカルストレージ",
"デスクトップアプリの設定保存",
"プロトタイプやテスト環境",
"小〜中規模のWebアプリ",
"組み込みシステム"
],
"制限": [
"同時書き込み接続は1つのみ",
"ネットワーク経由でのアクセス不可",
"ユーザー管理機能なし",
"レプリケーション機能なし"
],
"パフォーマンス目安": "〜数十GB、〜数百万件"
},
"PostgreSQL": {
"適用場面": [
"本格的なWebアプリケーション",
"複雑なクエリが必要なシステム",
"高い整合性が求められるシステム",
"分析系ワークロード"
],
"利点": [
"高い同時接続性能",
"豊富な機能とデータ型",
"優秀なオプティマイザ",
"レプリケーション対応"
],
"パフォーマンス目安": "〜数TB、〜数億件"
},
"MySQL": {
"適用場面": [
"Webアプリケーション",
"読み取り中心のワークロード",
"レプリケーションが重要なシステム"
],
"利点": [
"高い読み取り性能",
"優秀なレプリケーション機能",
"豊富な運用実績"
],
"パフォーマンス目安": "〜数TB、〜数億件"
}
}
print("=== データベース選択ガイド ===")
for db_name, info in criteria.items():
print(f"\n{db_name}:")
print(" 適用場面:")
for scene in info["適用場面"]:
print(f" • {scene}")
if "制限" in info:
print(" 制限:")
for limit in info["制限"]:
print(f" • {limit}")
if "利点" in info:
print(" 利点:")
for advantage in info["利点"]:
print(f" • {advantage}")
print(f" パフォーマンス目安: {info['パフォーマンス目安']}")
database_selection_guide()
トラブルシューティングガイド

よくある問題と解決方法
import sqlite3
import os
def troubleshooting_guide():
"""SQLiteトラブルシューティングガイド"""
def diagnose_database_locked():
"""データベースロック問題の診断"""
print("=== データベースロック問題の解決 ===")
solutions = [
{
"問題": "database is locked エラー",
"原因": ["接続の適切なクローズ不足", "長時間実行されるトランザクション"],
"解決策": [
"try-finally文で確実にconnection.close()を実行",
"トランザクションのタイムアウト設定",
"WALモードの活用(読み書き並行性向上)"
],
"コード例": """
try:
conn = sqlite3.connect('database.db', timeout=20)
# データベース操作
finally:
conn.close()
"""
},
{
"問題": "disk I/O error",
"原因": ["ディスク容量不足", "ファイルシステムの問題"],
"解決策": [
"ディスク容量の確認と確保",
"ファイルパーミッションの確認",
"異なるディスクへのデータベース移動"
]
}
]
for i, solution in enumerate(solutions, 1):
print(f"{i}. {solution['問題']}")
print(" 原因:")
for cause in solution["原因"]:
print(f" • {cause}")
print(" 解決策:")
for fix in solution["解決策"]:
print(f" • {fix}")
if "コード例" in solution:
print(" コード例:")
print(solution["コード例"])
print()
def diagnose_performance_issues():
"""パフォーマンス問題の診断"""
print("=== パフォーマンス問題の診断 ===")
def query_analyzer_example():
"""クエリ分析の例"""
return """
-- 遅いクエリの分析
EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = 'user@example.com';
-- インデックスの効果確認
CREATE INDEX idx_users_email ON users(email);
EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = 'user@example.com';
-- 統計情報の更新
ANALYZE;
-- テーブルサイズの確認
SELECT
name,
(SELECT COUNT(*) FROM pragma_table_info(name)) as columns,
(SELECT sql FROM sqlite_master WHERE type='table' AND name=outer.name) as create_sql
FROM sqlite_master outer WHERE type='table';
"""
print("パフォーマンス問題の一般的な対処法:")
print("1. EXPLAIN QUERY PLANでクエリ実行計画を確認")
print("2. 適切なインデックスの作成")
print("3. ANALYZEで統計情報を更新")
print("4. VACUUMでファイルサイズの最適化")
print("\nクエリ分析の例:")
print(query_analyzer_example())
def diagnose_corruption():
"""データ破損問題の診断"""
print("=== データ破損問題の診断と修復 ===")
recovery_steps = [
"1. 整合性チェックの実行: PRAGMA integrity_check",
"2. 読み込み可能なデータのエクスポート: .dump",
"3. 新しいデータベースファイルの作成",
"4. エクスポートしたデータのインポート",
"5. インデックスとトリガーの再作成"
]
print("データ破損時の回復手順:")
for step in recovery_steps:
print(f" {step}")
print("\n実行例:")
print("""
# コマンドライン版SQLiteでの回復例
sqlite3 corrupted.db
> PRAGMA integrity_check;
> .output backup.sql
> .dump
> .quit
# 新しいデータベースに復元
sqlite3 recovered.db < backup.sql
""")
# 各診断の実行
diagnose_database_locked()
diagnose_performance_issues()
diagnose_corruption()
troubleshooting_guide()
使用例
レコード数別パフォーマンステスト
import sqlite3
import time
import random
import string
from contextlib import contextmanager
def comprehensive_benchmark():
"""包括的なベンチマークテスト"""
@contextmanager
def timer(description):
"""実行時間測定のコンテキストマネージャー"""
start = time.time()
yield
end = time.time()
print(f"{description}: {end - start:.3f}秒")
def generate_test_data(count):
"""テストデータの生成"""
data = []
for i in range(count):
name = ''.join(random.choices(string.ascii_letters, k=10))
email = f"user{i}@example.com"
age = random.randint(18, 80)
data.append((name, email, age))
return data
def benchmark_operations(record_counts):
"""各種操作のベンチマーク"""
results = {}
for count in record_counts:
print(f"\n=== {count:,}件でのベンチマーク ===")
# データベース作成
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# テーブル作成
cursor.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER NOT NULL
)
''')
# データ生成
test_data = generate_test_data(count)
# 挿入性能テスト
with timer(f" {count:,}件挿入(トランザクションあり)"):
cursor.execute('BEGIN TRANSACTION')
cursor.executemany(
'INSERT INTO users (name, email, age) VALUES (?, ?, ?)',
test_data
)
cursor.execute('COMMIT')
# インデックス作成
with timer(" インデックス作成"):
cursor.execute('CREATE INDEX idx_users_email ON users(email)')
cursor.execute('CREATE INDEX idx_users_age ON users(age)')
# 検索性能テスト
search_email = test_data[count//2][1] # 中間のデータを検索
with timer(" 主キー検索"):
cursor.execute('SELECT * FROM users WHERE id = ?', (count//2,))
result = cursor.fetchone()
with timer(" インデックス使用検索(email)"):
cursor.execute('SELECT * FROM users WHERE email = ?', (search_email,))
result = cursor.fetchone()
with timer(" 範囲検索(age)"):
cursor.execute('SELECT * FROM users WHERE age BETWEEN 25 AND 35 LIMIT 100')
result = cursor.fetchall()
with timer(" 全件カウント"):
cursor.execute('SELECT COUNT(*) FROM users')
result = cursor.fetchone()
# 更新性能テスト
with timer(" バッチ更新(100件)"):
cursor.execute('BEGIN TRANSACTION')
for i in range(100):
user_id = random.randint(1, count)
new_age = random.randint(18, 80)
cursor.execute('UPDATE users SET age = ? WHERE id = ?', (new_age, user_id))
cursor.execute('COMMIT')
# 削除性能テスト
with timer(" バッチ削除(100件)"):
cursor.execute('BEGIN TRANSACTION')
cursor.execute('DELETE FROM users WHERE age > 70')
cursor.execute('COMMIT')
# ファイルサイズ測定(メモリDBなので概算)
cursor.execute('PRAGMA page_count')
page_count = cursor.fetchone()[0]
cursor.execute('PRAGMA page_size')
page_size = cursor.fetchone()[0]
estimated_size = page_count * page_size
print(f" 推定データサイズ: {estimated_size/1024/1024:.2f}MB")
conn.close()
results[count] = {
'estimated_size_mb': estimated_size/1024/1024,
'completed': True
}
return results
# ベンチマーク実行
test_counts = [1000, 10000, 100000, 500000]
print("SQLite包括的ベンチマーク開始")
print("テスト環境: メモリDB、単一スレッド")
results = benchmark_operations(test_counts)
print("\n=== ベンチマーク結果サマリー ===")
for count, result in results.items():
if result['completed']:
print(f"{count:,}件: 推定サイズ {result['estimated_size_mb']:.2f}MB")
# comprehensive_benchmark()
実運用でのパフォーマンス最適化事例
def real_world_optimization_examples():
"""実運用でのパフォーマンス最適化事例"""
print("=== 実運用での最適化事例 ===")
case_studies = [
{
"事例": "モバイルアプリのユーザーデータ管理",
"課題": "起動時のデータ読み込みが遅い(100万件のメッセージデータ)",
"解決策": [
"日付ベースでのテーブル分割",
"遅延読み込みの実装",
"必要なカラムのみの取得",
"適切なインデックス設計"
],
"結果": "起動時間を3秒から0.5秒に短縮",
"実装例": """
-- 月別テーブル分割
CREATE TABLE messages_202406 (
id INTEGER PRIMARY KEY,
user_id INTEGER,
content TEXT,
created_at INTEGER,
INDEX idx_user_date (user_id, created_at)
);
-- 遅延読み込み用クエリ
SELECT id, preview, created_at
FROM messages_202406
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT 50;
"""
},
{
"事例": "IoTデータの収集・分析システム",
"課題": "センサーデータ(毎分1000件)の書き込み性能",
"解決策": [
"WALモードの活用",
"バッチ挿入の実装",
"古いデータの自動削除",
"インデックスの最小化"
],
"結果": "書き込み性能が5倍向上",
"実装例": """
-- WALモード設定
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
-- バッチ挿入
INSERT INTO sensor_data (device_id, value, timestamp)
VALUES (?, ?, ?), (?, ?, ?), ... -- 1000件分
-- 自動削除(7日以上古いデータ)
DELETE FROM sensor_data
WHERE timestamp < (strftime('%s', 'now') - 604800);
"""
},
{
"事例": "ECサイトの商品検索システム",
"課題": "複雑な条件での商品検索が遅い(50万商品)",
"解決策": [
"FTS(全文検索)の活用",
"複合インデックスの最適化",
"キャッシュ戦略の実装",
"検索結果のページネーション"
],
"結果": "検索レスポンスが2秒から0.2秒に改善",
"実装例": """
-- FTS仮想テーブル
CREATE VIRTUAL TABLE products_fts USING fts5(
name, description, category, content='products'
);
-- 複合インデックス
CREATE INDEX idx_products_search
ON products(category, price, in_stock, brand);
-- 効率的な検索クエリ
SELECT p.* FROM products p
JOIN products_fts fts ON p.id = fts.rowid
WHERE fts MATCH 'keyword'
AND p.category = 'electronics'
AND p.price BETWEEN 1000 AND 50000
AND p.in_stock = 1
ORDER BY p.popularity DESC
LIMIT 20 OFFSET 0;
"""
}
]
for i, case in enumerate(case_studies, 1):
print(f"\n{i}. {case['事例']}")
print(f"課題: {case['課題']}")
print("解決策:")
for solution in case["解決策"]:
print(f" • {solution}")
print(f"結果: {case['結果']}")
print("実装例:")
print(case["実装例"])
real_world_optimization_examples()
まとめ:SQLiteのレコード数制限を正しく理解して活用する
重要なポイントの再確認
def summary_key_points():
"""重要ポイントのまとめ"""
key_points = {
"理論的制限": {
"レコード数": "実質無制限(ROWID上限:2^63-1)",
"ファイルサイズ": "最大281TB",
"実用性": "ハードウェアとパフォーマンス要件に依存"
},
"実運用での制限": {
"推奨レコード数": "数百万〜数千万件程度",
"推奨ファイルサイズ": "数GB〜数十GB程度",
"注意点": "用途とパフォーマンス要件により大きく変動"
},
"パフォーマンス最適化": {
"必須対策": ["適切なインデックス設計", "クエリの最適化", "トランザクションの活用"],
"推奨設定": ["WALモード", "適切なキャッシュサイズ", "定期メンテナンス"],
"監視項目": ["ファイルサイズ", "クエリ実行時間", "同時接続数"]
},
"移行の判断基準": {
"SQLite継続": "単一ユーザー、軽〜中程度の負荷、組み込み用途",
"他DB検討": "高同時性、大規模データ、複雑な分析処理",
"移行タイミング": "パフォーマンス問題、スケーラビリティ限界、機能不足"
}
}
print("=== SQLiteレコード数制限:重要ポイント ===")
for category, points in key_points.items():
print(f"\n{category}:")
for key, value in points.items():
if isinstance(value, list):
print(f" {key}: {', '.join(value)}")
else:
print(f" {key}: {value}")
print("\n=== 実践的な推奨事項 ===")
recommendations = [
"設計段階からインデックス戦略を検討する",
"定期的なパフォーマンス監視を実装する",
"データ増加に備えた移行計画を準備する",
"用途に応じたデータベース選択を行う",
"適切なバックアップ・復旧手順を確立する"
]
for i, rec in enumerate(recommendations, 1):
print(f"{i}. {rec}")
summary_key_points()
チェックリスト:SQLite運用の成功のために
def sqlite_success_checklist():
"""SQLite運用成功のためのチェックリスト"""
checklist = {
"設計段階": [
"□ データ量とアクセスパターンの想定",
"□ 適切なテーブル構造の設計",
"□ 必要なインデックスの計画",
"□ パフォーマンス要件の明確化",
"□ 将来的な拡張性の検討"
],
"実装段階": [
"□ 適切なSQLite設定(WAL、キャッシュサイズ等)",
"□ エラーハンドリングの実装",
"□ トランザクション管理の適切な実装",
"□ 接続の適切なクローズ処理",
"□ バッチ処理の実装"
],
"運用段階": [
"□ 定期的な統計情報更新(ANALYZE)",
"□ 定期的なデータベース最適化(VACUUM)",
"□ パフォーマンス監視の実装",
"□ バックアップ戦略の実行",
"□ 容量監視とアラート設定"
],
"保守段階": [
"□ 定期的なパフォーマンステスト",
"□ インデックス効果の検証",
"□ 古いデータの削除戦略",
"□ データベース移行計画の見直し",
"□ セキュリティ対策の確認"
]
}
print("=== SQLite運用成功チェックリスト ===")
for phase, items in checklist.items():
print(f"\n【{phase}】")
for item in items:
print(f" {item}")
print("\n=== 緊急時対応準備 ===")
emergency_prep = [
"□ データベース破損時の復旧手順書",
"□ パフォーマンス問題時の対応フロー",
"□ 容量不足時のデータ削除・移行手順",
"□ バックアップからの復旧テスト実施",
"□ 代替データベースへの移行手順書"
]
for item in emergency_prep:
print(f" {item}")
sqlite_success_checklist()
コメント