diff --git a/files/backups/backup-all.py b/files/backups/backup-all.py index 6feeb60..fe95790 100644 --- a/files/backups/backup-all.py +++ b/files/backups/backup-all.py @@ -49,6 +49,8 @@ class Config: class Application: path: Path owner: str + backup_script: Optional[Path] + backup_targets: List[Path] class Storage(ABC): @@ -190,9 +192,10 @@ class AppriseNotifier(Notifier): class ApplicationFinder: def __init__(self, roots: List[Path]): self.roots = roots + self.warnings: List[str] = [] def find_applications(self) -> List[Application]: - """Get all application directories and their owners.""" + """Discover all applications with their backup scripts and targets.""" applications: List[Application] = [] source_dirs = itertools.chain(*(root.iterdir() for root in self.roots)) @@ -203,12 +206,80 @@ class ApplicationFinder: try: stat_info = app_dir.stat() owner = pwd.getpwuid(stat_info.st_uid).pw_name - applications.append(Application(path=app_dir, owner=owner)) + backup_script = self._find_backup_script(app_dir) + backup_targets = self._find_backup_targets(app_dir) + applications.append( + Application( + path=app_dir, + owner=owner, + backup_script=backup_script, + backup_targets=backup_targets, + ) + ) except (KeyError, OSError) as e: logger.warning(f"Could not get owner for {app_dir}: {e}") return applications + def _find_backup_script(self, app_dir: Path) -> Optional[Path]: + """Find executable backup script in application directory.""" + for name in ("backup.sh", "backup"): + script_path = app_dir / name + if script_path.exists(): + if os.access(script_path, os.X_OK): + return script_path + else: + logger.warning( + f"Backup script {script_path} exists but is not executable" + ) + return None + + def _find_backup_targets(self, app_dir: Path) -> List[Path]: + """Resolve backup target directories for an application.""" + targets_file = app_dir / BACKUP_TARGETS_FILE + resolved_targets: List[Path] = [] + + if targets_file.exists(): + for target_line in self._parse_targets_file(targets_file): + target_path = Path(target_line) + if not target_path.is_absolute(): + target_path = (app_dir / target_path).resolve() + else: + target_path = target_path.resolve() + if target_path.exists(): + resolved_targets.append(target_path) + else: + warning_msg = ( + f"Backup target does not exist for {app_dir}: {target_path}" + ) + logger.warning(warning_msg) + self.warnings.append(warning_msg) + else: + default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve() + if default_target.exists(): + resolved_targets.append(default_target) + else: + warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}" + logger.warning(warning_msg) + self.warnings.append(warning_msg) + + return resolved_targets + + def _parse_targets_file(self, targets_file: Path) -> List[str]: + """Parse backup-targets file, skipping comments and empty lines.""" + targets: List[str] = [] + try: + for raw_line in targets_file.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + targets.append(line) + except OSError as e: + warning_msg = f"Could not read backup targets file {targets_file}: {e}" + logger.warning(warning_msg) + self.warnings.append(warning_msg) + return targets + class BackupManager: def __init__( @@ -224,25 +295,6 @@ class BackupManager: self.storages = storages self.notifiers = notifiers - def find_backup_script(self, app_dir: str) -> Optional[str]: - """Find backup script in user's home directory""" - possible_scripts = [ - os.path.join(app_dir, "backup.sh"), - os.path.join(app_dir, "backup"), - ] - - for script_path in possible_scripts: - if os.path.exists(script_path): - # Check if file is executable - if os.access(script_path, os.X_OK): - return script_path - else: - logger.warning( - f"Backup script {script_path} exists but is not executable" - ) - - return None - def run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool: """Run backup script as the specified user""" try: @@ -282,63 +334,6 @@ class BackupManager: self.errors.append(f"App {username}: {error_msg}") return False - def get_backup_directories(self, applications: List[Application]) -> List[str]: - """Collect backup targets according to backup-targets rules""" - backup_dirs: List[str] = [] - - def parse_targets_file(targets_file: Path) -> List[str]: - """Parse backup-targets file, skipping comments and empty lines.""" - targets: List[str] = [] - try: - for raw_line in targets_file.read_text(encoding="utf-8").splitlines(): - line = raw_line.strip() - if not line or line.startswith("#"): - continue - targets.append(line) - except OSError as e: - warning_msg = f"Could not read backup targets file {targets_file}: {e}" - logger.warning(warning_msg) - self.warnings.append(warning_msg) - return targets - - for app in applications: - app_dir = app.path - targets_file = app_dir / BACKUP_TARGETS_FILE - resolved_targets: List[Path] = [] - - if targets_file.exists(): - # Read custom targets defined by the application. - for target_line in parse_targets_file(targets_file): - target_path = Path(target_line) - if not target_path.is_absolute(): - target_path = (app_dir / target_path).resolve() - else: - target_path = target_path.resolve() - if target_path.exists(): - resolved_targets.append(target_path) - else: - warning_msg = ( - f"Backup target does not exist for {app_dir}: {target_path}" - ) - logger.warning(warning_msg) - self.warnings.append(warning_msg) - else: - # Fallback to default backups directory when no list is provided. - default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve() - if default_target.exists(): - resolved_targets.append(default_target) - else: - warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}" - logger.warning(warning_msg) - self.warnings.append(warning_msg) - - for target in resolved_targets: - target_str = str(target) - if target_str not in backup_dirs: - backup_dirs.append(target_str) - - return backup_dirs - def send_notification(self, success: bool) -> None: """Send notification to Notifiers""" @@ -381,10 +376,7 @@ class BackupManager: username = app.owner logger.info(f"Processing backup for app: {app_dir} (user {username})") - # Find backup script - backup_script = self.find_backup_script(app_dir) - - if backup_script is None: + if app.backup_script is None: warning_msg = ( f"No backup script found for app: {app_dir} (user {username})" ) @@ -392,10 +384,15 @@ class BackupManager: self.warnings.append(warning_msg) continue - self.run_app_backup(backup_script, app_dir, username) + self.run_app_backup(str(app.backup_script), app_dir, username) - # Get backup directories - backup_dirs = self.get_backup_directories(applications) + # Collect backup directories from applications + backup_dirs: List[str] = [] + for app in applications: + for target in app.backup_targets: + target_str = str(target) + if target_str not in backup_dirs: + backup_dirs.append(target_str) logger.info(f"Found backup directories: {backup_dirs}") overall_success = True @@ -476,6 +473,7 @@ def main() -> None: try: app_finder, backup_manager = initialize(CONFIG_PATH) applications = app_finder.find_applications() + backup_manager.warnings.extend(app_finder.warnings) success = backup_manager.run_backup_process(applications) if not success: sys.exit(1)