【Python完全版】ファイルの存在確認を行う方法と実践的な活用ポイント

python

Pythonでスクリプトを書くとき、「あるファイルが存在するかどうか」を確認したい場面は意外と多くあります。

たとえば、ログファイルの追記前やデータファイルの読み込み前に、ファイルが実際に存在するかどうかを調べることは、エラーを未然に防ぐための基本です。

この記事では、Pythonでファイルの存在を確認する方法をわかりやすく紹介し、用途に応じた使い分けや注意点についても解説します。

スポンサーリンク

なぜファイル存在確認が重要なのか

プログラムの安定性向上

ファイル存在確認を行うことで、以下のようなトラブルを未然に防げます:

よくあるエラーの例

  • FileNotFoundErrorによる予期しないプログラム停止
  • データ処理の途中でファイルが見つからずに失敗
  • バッチ処理で大量のファイルを扱う際の部分的な失敗
  • ユーザーが指定したパスが間違っている場合の適切な対応

Pythonでファイル存在確認の基本方法

方法1:os.pathモジュールを使用

最も伝統的で広く使われている方法です:

import os

# 基本的な存在確認
if os.path.exists("example.txt"):
    print("ファイルは存在します")
else:
    print("ファイルは存在しません")

# 相対パスと絶対パスの例
relative_path = "data/sample.csv"
absolute_path = "/home/user/documents/report.pdf"

print(f"相対パス確認: {os.path.exists(relative_path)}")
print(f"絶対パス確認: {os.path.exists(absolute_path)}")

os.pathの主要な関数

関数用途戻り値
os.path.exists()ファイルまたはディレクトリの存在確認True/False
os.path.isfile()ファイルかどうかの確認True/False
os.path.isdir()ディレクトリかどうかの確認True/False
os.path.islink()シンボリックリンクかどうかの確認True/False

方法2:pathlibモジュールを使用(推奨)

Python 3.4以降で使える、よりモダンなアプローチです:

from pathlib import Path

# 基本的な使い方
file_path = Path("example.txt")

if file_path.exists():
    print("ファイルは見つかりました")
else:
    print("ファイルは見つかりません")

# パスの組み立ても簡単
data_dir = Path("data")
csv_file = data_dir / "sample.csv"
config_file = Path.home() / ".config" / "myapp" / "settings.json"

print(f"CSVファイル確認: {csv_file.exists()}")
print(f"設定ファイル確認: {config_file.exists()}")

pathlibの利点

  • オブジェクト指向的で直感的な操作
  • パス操作が簡潔で読みやすい
  • クロスプラットフォーム対応
  • 豊富なメソッドで様々な操作が可能

実践的な比較例

import os
from pathlib import Path

def check_file_old_way(filepath):
    """従来のos.pathを使った方法"""
    if os.path.exists(filepath):
        if os.path.isfile(filepath):
            size = os.path.getsize(filepath)
            return f"ファイル存在: サイズ {size} bytes"
        else:
            return "パスは存在しますが、ディレクトリです"
    else:
        return "ファイルが存在しません"

def check_file_new_way(filepath):
    """pathlibを使った方法"""
    path = Path(filepath)
    if path.exists():
        if path.is_file():
            size = path.stat().st_size
            return f"ファイル存在: サイズ {size} bytes"
        else:
            return "パスは存在しますが、ディレクトリです"
    else:
        return "ファイルが存在しません"

# 使用例
test_file = "sample.txt"
print("os.path方式:", check_file_old_way(test_file))
print("pathlib方式:", check_file_new_way(test_file))

os.pathとpathlibはどちらも有効ですが、新しいコードにはpathlibの使用が強く推奨されます。次に、より詳細な条件確認の方法を見てみましょう。

ファイルとディレクトリを区別する詳細方法

基本的な区別方法

存在確認だけでなく、それがファイルなのかディレクトリなのかも正確に判別することは、安全なプログラム作成に不可欠です。

os.pathを使った詳細確認:

import os

def analyze_path_os(path):
    """os.pathを使った詳細なパス解析"""
    results = {
        'exists': os.path.exists(path),
        'is_file': os.path.isfile(path),
        'is_dir': os.path.isdir(path),
        'is_link': os.path.islink(path),
        'is_mount': os.path.ismount(path),
        'size': None,
        'modified': None
    }
    
    if results['exists']:
        try:
            stat_info = os.stat(path)
            results['size'] = stat_info.st_size
            results['modified'] = stat_info.st_mtime
        except OSError as e:
            results['error'] = str(e)
    
    return results

# 使用例
paths = ["example.txt", "data/", "/usr/bin", "nonexistent.file"]
for path in paths:
    result = analyze_path_os(path)
    print(f"{path}: {result}")

pathlibを使った詳細確認:

from pathlib import Path
import datetime

def analyze_path_pathlib(path):
    """pathlibを使った詳細なパス解析"""
    p = Path(path)
    results = {
        'exists': p.exists(),
        'is_file': p.is_file(),
        'is_dir': p.is_dir(),
        'is_symlink': p.is_symlink(),
        'is_mount': p.is_mount() if hasattr(p, 'is_mount') else None,
        'suffix': p.suffix,
        'stem': p.stem,
        'parent': str(p.parent),
    }
    
    if results['exists']:
        try:
            stat_info = p.stat()
            results['size'] = stat_info.st_size
            results['modified'] = datetime.datetime.fromtimestamp(stat_info.st_mtime)
            if results['is_file']:
                results['readable'] = os.access(p, os.R_OK)
                results['writable'] = os.access(p, os.W_OK)
        except OSError as e:
            results['error'] = str(e)
    
    return results

# 使用例と出力整形
def print_path_info(path):
    """パス情報を見やすく表示"""
    info = analyze_path_pathlib(path)
    print(f"\n--- {path} ---")
    for key, value in info.items():
        if value is not None:
            print(f"{key}: {value}")

# 実行例
test_paths = ["README.md", "src/", "config.json", "missing.txt"]
for path in test_paths:
    print_path_info(path)

特殊なケースの処理

権限がない場合の処理:

from pathlib import Path
import os

def safe_check_file(filepath):
    """権限エラーも考慮した安全なファイル確認"""
    try:
        path = Path(filepath)
        
        # 基本的な存在確認
        if not path.exists():
            return {"status": "not_found", "message": "ファイルが存在しません"}
        
        # ファイルタイプの確認
        if path.is_file():
            # 読み書き権限の確認
            readable = os.access(path, os.R_OK)
            writable = os.access(path, os.W_OK)
            
            return {
                "status": "file",
                "readable": readable,
                "writable": writable,
                "size": path.stat().st_size,
                "message": f"ファイル (読み: {readable}, 書き: {writable})"
            }
        elif path.is_dir():
            return {"status": "directory", "message": "ディレクトリです"}
        else:
            return {"status": "other", "message": "特殊ファイルです"}
            
    except PermissionError:
        return {"status": "permission_denied", "message": "アクセス権限がありません"}
    except OSError as e:
        return {"status": "error", "message": f"エラー: {e}"}

# 使用例
files_to_check = [
    "accessible_file.txt",
    "/etc/shadow",  # 通常はアクセス権限なし
    "directory/",
    "nonexistent.file"
]

for file_path in files_to_check:
    result = safe_check_file(file_path)
    print(f"{file_path}: {result['message']}")

ファイルとディレクトリを正しく区別すれば、意図しない操作を避けることができます。次に、実務での具体的な活用例を詳しく見ていきましょう。

実務で役立つファイル存在確認の活用例

データ処理での活用

CSVファイルの一括処理:

import pandas as pd
from pathlib import Path
import logging

# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class CSVProcessor:
    def __init__(self, input_dir, output_dir):
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.ensure_output_dir()
    
    def ensure_output_dir(self):
        """出力ディレクトリの存在確認と作成"""
        if not self.output_dir.exists():
            self.output_dir.mkdir(parents=True, exist_ok=True)
            logging.info(f"出力ディレクトリを作成: {self.output_dir}")
    
    def process_csv_files(self, pattern="*.csv"):
        """指定パターンのCSVファイルを一括処理"""
        if not self.input_dir.exists():
            logging.error(f"入力ディレクトリが存在しません: {self.input_dir}")
            return
        
        csv_files = list(self.input_dir.glob(pattern))
        if not csv_files:
            logging.warning(f"処理対象のファイルが見つかりません: {pattern}")
            return
        
        logging.info(f"{len(csv_files)}個のCSVファイルを処理開始")
        
        processed_count = 0
        error_count = 0
        
        for csv_file in csv_files:
            try:
                if csv_file.is_file() and csv_file.stat().st_size > 0:
                    self.process_single_csv(csv_file)
                    processed_count += 1
                else:
                    logging.warning(f"スキップ: {csv_file} (空ファイルまたは無効)")
                    
            except Exception as e:
                logging.error(f"処理エラー {csv_file}: {e}")
                error_count += 1
        
        logging.info(f"処理完了: 成功 {processed_count}, エラー {error_count}")
    
    def process_single_csv(self, csv_file):
        """単一のCSVファイルを処理"""
        try:
            # ファイル存在確認
            if not csv_file.exists():
                raise FileNotFoundError(f"ファイルが見つかりません: {csv_file}")
            
            # データ読み込み
            df = pd.read_csv(csv_file)
            
            # データ処理(例:重複削除、列の整理など)
            df_cleaned = df.drop_duplicates().reset_index(drop=True)
            
            # 出力ファイル名の生成
            output_file = self.output_dir / f"cleaned_{csv_file.name}"
            
            # 結果保存
            df_cleaned.to_csv(output_file, index=False)
            logging.info(f"処理完了: {csv_file.name} -> {output_file.name}")
            
        except pd.errors.EmptyDataError:
            logging.warning(f"空のCSVファイル: {csv_file}")
        except pd.errors.ParserError as e:
            logging.error(f"CSVパースエラー {csv_file}: {e}")
        except Exception as e:
            logging.error(f"予期しないエラー {csv_file}: {e}")

# 使用例
processor = CSVProcessor("input_data", "output_data")
processor.process_csv_files("sales_*.csv")

設定ファイルの管理:

import json
import configparser
from pathlib import Path
from typing import Dict, Any, Optional

class ConfigManager:
    def __init__(self, config_dir: str = "config"):
        self.config_dir = Path(config_dir)
        self.config_dir.mkdir(exist_ok=True)
    
    def load_json_config(self, config_name: str, default_config: Optional[Dict] = None) -> Dict[str, Any]:
        """JSONタイプの設定ファイル読み込み"""
        config_file = self.config_dir / f"{config_name}.json"
        
        if config_file.exists() and config_file.is_file():
            try:
                with open(config_file, 'r', encoding='utf-8') as f:
                    config = json.load(f)
                logging.info(f"設定ファイル読み込み成功: {config_file}")
                return config
            except json.JSONDecodeError as e:
                logging.error(f"JSON設定ファイルの形式エラー {config_file}: {e}")
            except Exception as e:
                logging.error(f"設定ファイル読み込みエラー {config_file}: {e}")
        
        # デフォルト設定の使用または作成
        if default_config:
            self.save_json_config(config_name, default_config)
            logging.info(f"デフォルト設定ファイルを作成: {config_file}")
            return default_config
        
        return {}
    
    def save_json_config(self, config_name: str, config_data: Dict[str, Any]) -> bool:
        """JSON設定ファイルの保存"""
        config_file = self.config_dir / f"{config_name}.json"
        
        try:
            with open(config_file, 'w', encoding='utf-8') as f:
                json.dump(config_data, f, indent=2, ensure_ascii=False)
            logging.info(f"設定ファイル保存成功: {config_file}")
            return True
        except Exception as e:
            logging.error(f"設定ファイル保存エラー {config_file}: {e}")
            return False
    
    def load_ini_config(self, config_name: str) -> configparser.ConfigParser:
        """INIタイプの設定ファイル読み込み"""
        config_file = self.config_dir / f"{config_name}.ini"
        config = configparser.ConfigParser()
        
        if config_file.exists():
            try:
                config.read(config_file, encoding='utf-8')
                logging.info(f"INI設定ファイル読み込み成功: {config_file}")
            except Exception as e:
                logging.error(f"INI設定ファイル読み込みエラー {config_file}: {e}")
        else:
            logging.warning(f"INI設定ファイルが存在しません: {config_file}")
        
        return config

# 使用例
config_manager = ConfigManager()

# アプリケーション設定のデフォルト値
default_app_config = {
    "database": {
        "host": "localhost",
        "port": 5432,
        "name": "myapp"
    },
    "logging": {
        "level": "INFO",
        "file": "app.log"
    }
}

# 設定の読み込み
app_config = config_manager.load_json_config("application", default_app_config)
print(f"データベースホスト: {app_config['database']['host']}")

ログ処理での活用

ログファイルのローテーション管理:

import gzip
import shutil
from datetime import datetime, timedelta
from pathlib import Path
import logging

class LogManager:
    def __init__(self, log_dir: str = "logs", max_size_mb: int = 10, keep_days: int = 30):
        self.log_dir = Path(log_dir)
        self.log_dir.mkdir(exist_ok=True)
        self.max_size_bytes = max_size_mb * 1024 * 1024
        self.keep_days = keep_days
    
    def check_and_rotate_logs(self):
        """ログファイルサイズチェックとローテーション"""
        log_files = list(self.log_dir.glob("*.log"))
        
        for log_file in log_files:
            if not log_file.is_file():
                continue
            
            try:
                file_size = log_file.stat().st_size
                if file_size > self.max_size_bytes:
                    self.rotate_log_file(log_file)
                    
            except OSError as e:
                logging.error(f"ログファイルサイズ確認エラー {log_file}: {e}")
    
    def rotate_log_file(self, log_file: Path):
        """ログファイルのローテーション"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        rotated_name = f"{log_file.stem}_{timestamp}.log.gz"
        rotated_path = self.log_dir / rotated_name
        
        try:
            # ログファイルを圧縮して保存
            with open(log_file, 'rb') as f_in:
                with gzip.open(rotated_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            
            # 元のログファイルをクリア
            log_file.write_text('')
            
            logging.info(f"ログローテーション完了: {log_file} -> {rotated_path}")
            
        except Exception as e:
            logging.error(f"ログローテーションエラー {log_file}: {e}")
    
    def cleanup_old_logs(self):
        """古いログファイルの削除"""
        cutoff_date = datetime.now() - timedelta(days=self.keep_days)
        cutoff_timestamp = cutoff_date.timestamp()
        
        # .gz ファイル(ローテーション済み)をチェック
        rotated_logs = list(self.log_dir.glob("*.log.gz"))
        
        removed_count = 0
        for log_file in rotated_logs:
            try:
                if log_file.stat().st_mtime < cutoff_timestamp:
                    log_file.unlink()
                    removed_count += 1
                    logging.info(f"古いログファイル削除: {log_file}")
                    
            except OSError as e:
                logging.error(f"ログファイル削除エラー {log_file}: {e}")
        
        if removed_count > 0:
            logging.info(f"合計 {removed_count} 個の古いログファイルを削除")
    
    def get_log_statistics(self):
        """ログディレクトリの統計情報"""
        if not self.log_dir.exists():
            return {"error": "ログディレクトリが存在しません"}
        
        active_logs = list(self.log_dir.glob("*.log"))
        rotated_logs = list(self.log_dir.glob("*.log.gz"))
        
        total_size = 0
        for log_file in active_logs + rotated_logs:
            try:
                total_size += log_file.stat().st_size
            except OSError:
                pass
        
        return {
            "active_logs": len(active_logs),
            "rotated_logs": len(rotated_logs),
            "total_size_mb": round(total_size / (1024 * 1024), 2),
            "log_directory": str(self.log_dir)
        }

# 使用例
log_manager = LogManager(max_size_mb=5, keep_days=7)

# ログの状態確認
stats = log_manager.get_log_statistics()
print(f"ログ統計: {stats}")

# ローテーションと古いファイルの削除
log_manager.check_and_rotate_logs()
log_manager.cleanup_old_logs()

この章で紹介した活用例により、エラー発生を予防し、安定したコード運用が可能になります。次に、より安全なファイル操作のための注意点を詳しく見ていきましょう。

注意点とベストプラクティス

タイミングの問題と競合状態

ファイル存在確認で最も注意すべきは、チェックと実際の操作の間にファイル状態が変わる可能性です:

import time
from pathlib import Path

# 危険な例:チェックと操作の間にファイルが削除される可能性
def unsafe_file_operation(filepath):
    """非推奨:タイミングの問題がある例"""
    path = Path(filepath)
    
    if path.exists():  # この時点では存在
        time.sleep(0.1)  # 何らかの処理時間
        # ここで他のプロセスがファイルを削除する可能性
        with open(path, 'r') as f:  # FileNotFoundError の可能性
            return f.read()
    else:
        return None

# 安全な例:例外処理を組み合わせる
def safe_file_operation(filepath):
    """推奨:例外処理と組み合わせた安全な方法"""
    path = Path(filepath)
    
    try:
        with open(path, 'r', encoding='utf-8') as f:
            return f.read()
    except FileNotFoundError:
        logging.warning(f"ファイルが見つかりません: {filepath}")
        return None
    except PermissionError:
        logging.error(f"ファイルアクセス権限がありません: {filepath}")
        return None
    except UnicodeDecodeError:
        logging.error(f"ファイルの文字エンコーディングエラー: {filepath}")
        return None
    except Exception as e:
        logging.error(f"予期しないエラー {filepath}: {e}")
        return None

包括的なエラーハンドリング

from pathlib import Path
import logging
from typing import Optional, Union, Dict, Any

class SafeFileHandler:
    """安全なファイル操作のためのクラス"""
    
    @staticmethod
    def read_text_file(filepath: Union[str, Path], encoding: str = 'utf-8') -> Optional[str]:
        """テキストファイルの安全な読み込み"""
        path = Path(filepath)
        
        # 事前チェック
        if not path.exists():
            logging.warning(f"ファイルが存在しません: {path}")
            return None
        
        if not path.is_file():
            logging.error(f"ファイルではありません: {path}")
            return None
        
        try:
            return path.read_text(encoding=encoding)
        except PermissionError:
            logging.error(f"読み取り権限がありません: {path}")
        except UnicodeDecodeError as e:
            logging.error(f"エンコーディングエラー {path}: {e}")
        except Exception as e:
            logging.error(f"読み込みエラー {path}: {e}")
        
        return None
    
    @staticmethod
    def write_text_file(filepath: Union[str, Path], content: str, 
                       encoding: str = 'utf-8', backup: bool = True) -> bool:
        """テキストファイルの安全な書き込み"""
        path = Path(filepath)
        
        # バックアップの作成
        if backup and path.exists():
            backup_path = path.with_suffix(path.suffix + '.backup')
            try:
                shutil.copy2(path, backup_path)
                logging.info(f"バックアップ作成: {backup_path}")
            except Exception as e:
                logging.warning(f"バックアップ作成失敗 {path}: {e}")
        
        # ディレクトリの確認・作成
        path.parent.mkdir(parents=True, exist_ok=True)
        
        try:
            path.write_text(content, encoding=encoding)
            logging.info(f"ファイル書き込み成功: {path}")
            return True
        except PermissionError:
            logging.error(f"書き込み権限がありません: {path}")
        except Exception as e:
            logging.error(f"書き込みエラー {path}: {e}")
        
        return False
    
    @staticmethod
    def get_file_info(filepath: Union[str, Path]) -> Dict[str, Any]:
        """ファイル情報の安全な取得"""
        path = Path(filepath)
        
        info = {
            'path': str(path),
            'exists': False,
            'is_file': False,
            'is_dir': False,
            'size': None,
            'modified': None,
            'readable': False,
            'writable': False,
            'error': None
        }
        
        try:
            info['exists'] = path.exists()
            if info['exists']:
                info['is_file'] = path.is_file()
                info['is_dir'] = path.is_dir()
                
                if info['is_file']:
                    stat_info = path.stat()
                    info['size'] = stat_info.st_size
                    info['modified'] = datetime.fromtimestamp(stat_info.st_mtime)
                    info['readable'] = os.access(path, os.R_OK)
                    info['writable'] = os.access(path, os.W_OK)
        
        except Exception as e:
            info['error'] = str(e)
            logging.error(f"ファイル情報取得エラー {path}: {e}")
        
        return info

# 使用例
handler = SafeFileHandler()

# 安全なファイル読み込み
content = handler.read_text_file("important_data.txt")
if content:
    print("ファイル読み込み成功")
    print(f"内容: {content[:100]}...")  # 最初の100文字を表示

# 安全なファイル書き込み
success = handler.write_text_file("output.txt", "Hello, World!", backup=True)
print(f"書き込み結果: {success}")

# ファイル情報の取得
info = handler.get_file_info("config.json")
print(f"ファイル情報: {info}")

パフォーマンスを考慮した大量ファイル処理

import concurrent.futures
from pathlib import Path
import time
from typing import List, Callable, Any

class BulkFileProcessor:
    """大量ファイル処理のための最適化クラス"""
    
    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
    
    def process_files_parallel(self, file_paths: List[Path], 
                             processor_func: Callable[[Path], Any]) -> List[Any]:
        """並列処理でファイルを効率的に処理"""
        
        # 事前フィルタリング(存在するファイルのみ)
        existing_files = [p for p in file_paths if p.exists() and p.is_file()]
        
        if not existing_files:
            logging.warning("処理対象のファイルが見つかりません")
            return []
        
        logging.info(f"{len(existing_files)}個のファイルを並列処理開始")
        start_time = time.time()
        
        results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # すべてのタスクを投入
            future_to_file = {
                executor.submit(processor_func, file_path): file_path 
                for file_path in existing_files
            }
            
            # 結果を収集
            for future in concurrent.futures.as_completed(future_to_file):
                file_path = future_to_file[future]
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    logging.error(f"処理エラー {file_path}: {e}")
                    results.append(None)
        
        end_time = time.time()
        logging.info(f"並列処理完了: {end_time - start_time:.2f}秒")
        
        return results
    
    def batch_file_check(self, directory: Path, pattern: str = "*") -> Dict[str, int]:
        """ディレクトリ内のファイルを効率的にバッチチェック"""
        if not directory.exists() or not directory.is_dir():
            return {"error": "ディレクトリが存在しません"}
        
        try:
            files = list(directory.glob(pattern))
            
            stats = {
                "total_items": len(files),
                "files": 0,
                "directories": 0,
                "symlinks": 0,
                "other": 0,
                "total_size": 0
            }
            
            for item in files:
                if item.is_file():
                    stats["files"] += 1
                    stats["total_size"] += item.stat().st_size
                elif item.is_dir():
                    stats["directories"] += 1
                elif item.is_symlink():
                    stats["symlinks"] += 1
                else:
                    stats["other"] += 1
            
            return stats
            
        except Exception as e:
            logging.error(f"バッチチェックエラー {directory}: {e}")
            return {"error": str(e)}

# 使用例
def process_single_file(file_path: Path) -> Dict[str, Any]:
    """単一ファイルの処理関数例"""
    try:
        size = file_path.stat().st_size
        lines = 0
        if file_path.suffix in ['.txt', '.py', '.md']:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                lines = sum(1 for _ in f)
        
        return {
            "file": str(file_path),
            "size": size,
            "lines": lines,
            "processed": True
        }
    except Exception as e:
        return {
            "file": str(file_path),
            "error": str(e),
            "processed": False
        }

# 大量ファイル処理の実行
processor = BulkFileProcessor(max_workers=8)
target_dir = Path("large_dataset")

if target_dir.exists():
    # ディレクトリの統計取得
    stats = processor.batch_file_check(target_dir, "*.txt")
    print(f"ディレクトリ統計: {stats}")
    
    # 全てのテキストファイルを並列処理
    text_files = list(target_dir.glob("*.txt"))
    results = processor.process_files_parallel(text_files, process_single_file)
    
    # 結果の集計
    successful = [r for r in results if r and r.get('processed')]
    print(f"処理成功: {len(successful)}/{len(results)}")

ネットワークファイルシステムでの考慮事項

import time
from pathlib import Path
import platform

class NetworkFileChecker:
    """ネットワークファイルシステム対応のファイルチェッカー"""
    
    def __init__(self, timeout: float = 10.0, retry_count: int = 3):
        self.timeout = timeout
        self.retry_count = retry_count
    
    def robust_file_check(self, filepath: Union[str, Path]) -> Dict[str, Any]:
        """ネットワーク遅延を考慮した堅牢なファイルチェック"""
        path = Path(filepath)
        
        result = {
            "path": str(path),
            "exists": False,
            "accessible": False,
            "network_path": self.is_network_path(path),
            "check_duration": 0,
            "retries_used": 0,
            "error": None
        }
        
        start_time = time.time()
        
        for attempt in range(self.retry_count):
            try:
                result["retries_used"] = attempt
                
                # タイムアウト付きの存在チェック
                exists = self.check_with_timeout(path.exists, self.timeout)
                result["exists"] = exists
                
                if exists:
                    # アクセス可能性の確認
                    accessible = self.check_with_timeout(
                        lambda: path.is_file() or path.is_dir(), 
                        self.timeout
                    )
                    result["accessible"] = accessible
                
                break  # 成功したらループを抜ける
                
            except TimeoutError:
                result["error"] = f"タイムアウト (試行 {attempt + 1})"
                if attempt == self.retry_count - 1:
                    logging.error(f"ファイルチェックタイムアウト: {path}")
                else:
                    time.sleep(0.5)  # リトライ前の待機
                    
            except Exception as e:
                result["error"] = str(e)
                logging.error(f"ファイルチェックエラー {path}: {e}")
                break
        
        result["check_duration"] = time.time() - start_time
        return result
    
    def check_with_timeout(self, func: Callable, timeout: float) -> Any:
        """タイムアウト付きの関数実行"""
        import signal
        
        def timeout_handler(signum, frame):
            raise TimeoutError("操作がタイムアウトしました")
        
        # Windowsではsignalが制限されているため、別の方法を使用
        if platform.system() == "Windows":
            return self.check_with_timeout_windows(func, timeout)
        
        old_handler = signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(int(timeout))
        
        try:
            result = func()
            signal.alarm(0)  # タイマーをキャンセル
            return result
        finally:
            signal.signal(signal.SIGALRM, old_handler)
    
    def check_with_timeout_windows(self, func: Callable, timeout: float) -> Any:
        """Windows用のタイムアウト実装"""
        import concurrent.futures
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(func)
            try:
                return future.result(timeout=timeout)
            except concurrent.futures.TimeoutError:
                raise TimeoutError("操作がタイムアウトしました")
    
    @staticmethod
    def is_network_path(path: Path) -> bool:
        """ネットワークパスかどうかの判定"""
        path_str = str(path.resolve())
        
        # UNCパス (Windows)
        if path_str.startswith(r'\\'):
            return True
        
        # NFSマウント等の確認 (Linux/macOS)
        if platform.system() != "Windows":
            try:
                # /proc/mounts から NFS等のネットワークファイルシステムを確認
                with open('/proc/mounts', 'r') as f:
                    mounts = f.read()
                    for line in mounts.split('\n'):
                        if any(fs_type in line for fs_type in ['nfs', 'cifs', 'smb']):
                            mount_point = line.split()[1]
                            if path_str.startswith(mount_point):
                                return True
            except:
                pass
        
        return False

# 使用例
network_checker = NetworkFileChecker(timeout=5.0, retry_count=2)

# ネットワークファイルのチェック
network_files = [
    r"\\server\share\important.xlsx",  # Windows UNC
    "/mnt/nfs/data.csv",               # NFS マウント
    "local_file.txt"                   # ローカルファイル
]

for file_path in network_files:
    result = network_checker.robust_file_check(file_path)
    print(f"\n{file_path}:")
    print(f"  存在: {result['exists']}")
    print(f"  アクセス可能: {result['accessible']}")
    print(f"  ネットワークパス: {result['network_path']}")
    print(f"  チェック時間: {result['check_duration']:.2f}秒")
    if result['error']:
        print(f"  エラー: {result['error']}")

実践的なファイル監視システム

import time
import threading
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Set, Dict, Callable

class FileMonitor:
    """ファイルシステムの変更を監視するクラス"""
    
    def __init__(self, watch_directory: Union[str, Path], check_interval: float = 1.0):
        self.watch_directory = Path(watch_directory)
        self.check_interval = check_interval
        self.running = False
        self.monitor_thread = None
        
        # ファイル状態の追跡
        self.file_states: Dict[Path, Dict] = {}
        self.callbacks: Dict[str, List[Callable]] = defaultdict(list)
        
        # 監視対象パターン
        self.watch_patterns = ["*"]
        self.ignore_patterns = ["*.tmp", "*.swp", "*~"]
    
    def add_callback(self, event_type: str, callback: Callable):
        """コールバック関数の登録"""
        valid_events = ['created', 'modified', 'deleted', 'size_changed']
        if event_type not in valid_events:
            raise ValueError(f"無効なイベントタイプ: {event_type}")
        
        self.callbacks[event_type].append(callback)
    
    def should_monitor(self, file_path: Path) -> bool:
        """ファイルが監視対象かどうかの判定"""
        # 無視パターンのチェック
        for pattern in self.ignore_patterns:
            if file_path.match(pattern):
                return False
        
        # 監視パターンのチェック
        for pattern in self.watch_patterns:
            if file_path.match(pattern):
                return True
        
        return False
    
    def get_file_state(self, file_path: Path) -> Dict:
        """ファイルの現在状態を取得"""
        try:
            if file_path.exists() and file_path.is_file():
                stat_info = file_path.stat()
                return {
                    'exists': True,
                    'size': stat_info.st_size,
                    'modified': stat_info.st_mtime,
                    'last_check': time.time()
                }
        except OSError:
            pass
        
        return {
            'exists': False,
            'size': 0,
            'modified': 0,
            'last_check': time.time()
        }
    
    def scan_directory(self) -> Set[Path]:
        """ディレクトリのスキャン"""
        files = set()
        try:
            for pattern in self.watch_patterns:
                files.update(self.watch_directory.rglob(pattern))
            
            # ファイルのみをフィルタ
            files = {f for f in files if f.is_file() and self.should_monitor(f)}
            
        except Exception as e:
            logging.error(f"ディレクトリスキャンエラー: {e}")
        
        return files
    
    def check_changes(self):
        """ファイル変更の検出"""
        current_files = self.scan_directory()
        current_time = time.time()
        
        # 新しいファイルまたは変更されたファイルの検出
        for file_path in current_files:
            current_state = self.get_file_state(file_path)
            previous_state = self.file_states.get(file_path)
            
            if previous_state is None:
                # 新しいファイル
                self.trigger_event('created', file_path, current_state)
            else:
                # 変更の検出
                if current_state['modified'] > previous_state['modified']:
                    self.trigger_event('modified', file_path, current_state)
                
                if current_state['size'] != previous_state['size']:
                    self.trigger_event('size_changed', file_path, current_state)
            
            self.file_states[file_path] = current_state
        
        # 削除されたファイルの検出
        previous_files = set(self.file_states.keys())
        deleted_files = previous_files - current_files
        
        for file_path in deleted_files:
            self.trigger_event('deleted', file_path, self.file_states[file_path])
            del self.file_states[file_path]
    
    def trigger_event(self, event_type: str, file_path: Path, file_state: Dict):
        """イベントコールバックの実行"""
        for callback in self.callbacks[event_type]:
            try:
                callback(file_path, file_state, event_type)
            except Exception as e:
                logging.error(f"コールバックエラー {event_type}/{file_path}: {e}")
    
    def start_monitoring(self):
        """監視開始"""
        if self.running:
            return
        
        if not self.watch_directory.exists():
            raise FileNotFoundError(f"監視ディレクトリが存在しません: {self.watch_directory}")
        
        self.running = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self.monitor_thread.start()
        
        logging.info(f"ファイル監視開始: {self.watch_directory}")
    
    def stop_monitoring(self):
        """監視停止"""
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join()
        
        logging.info("ファイル監視停止")
    
    def _monitor_loop(self):
        """監視ループ(別スレッドで実行)"""
        while self.running:
            try:
                self.check_changes()
                time.sleep(self.check_interval)
            except Exception as e:
                logging.error(f"監視ループエラー: {e}")
                time.sleep(self.check_interval)

# ファイル監視の使用例
def on_file_created(file_path: Path, file_state: Dict, event_type: str):
    print(f"📄 新しいファイル: {file_path}")

def on_file_modified(file_path: Path, file_state: Dict, event_type: str):
    print(f"✏️ ファイル変更: {file_path} (サイズ: {file_state['size']} bytes)")

def on_file_deleted(file_path: Path, file_state: Dict, event_type: str):
    print(f"🗑️ ファイル削除: {file_path}")

# 監視システムのセットアップ
monitor = FileMonitor("./watch_folder", check_interval=2.0)
monitor.watch_patterns = ["*.txt", "*.log", "*.py"]
monitor.ignore_patterns = ["*.tmp", "*.swp", "__pycache__/*"]

# コールバック登録
monitor.add_callback('created', on_file_created)
monitor.add_callback('modified', on_file_modified)
monitor.add_callback('deleted', on_file_deleted)

# 監視開始
try:
    monitor.start_monitoring()
    print("ファイル監視中... (Ctrl+C で停止)")
    
    # メインスレッドは他の処理を実行可能
    while True:
        time.sleep(10)
        stats = {
            '監視中ファイル数': len(monitor.file_states),
            '監視ディレクトリ': str(monitor.watch_directory)
        }
        print(f"📊 監視統計: {stats}")
        
except KeyboardInterrupt:
    print("\n監視を停止中...")
finally:
    monitor.stop_monitoring()

まとめ

Pythonでファイルの存在を確認する方法には、os.pathpathlibの2つの主要なアプローチがあります:

基本的な選択指針

  • 新規プロジェクトpathlibを使用(推奨)
  • 既存コードの保守os.pathとの互換性を考慮
  • 可読性重視pathlibのオブジェクト指向アプローチ
  • レガシー対応os.pathの使用も有効

実践的なベストプラクティス

  1. 単純な存在確認より例外処理を優先try-exceptでより安全な実装
  2. 事前チェックと実際の操作を組み合わせる:競合状態を避ける
  3. 大量ファイル処理では並列化を検討:パフォーマンス向上
  4. ネットワークファイルシステムではタイムアウトを設定:無応答状態の回避
  5. ログ出力で操作履歴を残す:トラブルシューティングを容易に

セキュリティと安全性

  • ファイルパスの検証とサニタイゼーション
  • アクセス権限の適切な確認
  • 一時ファイルやバックアップの適切な管理
  • ログファイルでの機密情報の除外

コメント

タイトルとURLをコピーしました