#!/usr/bin/env python3 """ Backup script for all applications Automatically discovers and runs backup scripts for all users, then creates restic backups and sends notifications. """ import itertools import os import sys import subprocess import logging import pwd from abc import ABC from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional, Any import requests import tomllib # Default config path CONFIG_PATH = Path("/etc/backup/config.toml") # File name to store directories and files to back up BACKUP_TARGETS_FILE = "backup-targets" # Default directory fo backups (relative to app dir) # Used when backup-targets file not exists BACKUP_DEFAULT_DIR = "backups" # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("/var/log/backup-all.log"), ], ) logger = logging.getLogger(__name__) @dataclass class TelegramConfig: type: str telegram_bot_token: str telegram_chat_id: str notifications_name: str @dataclass class Config: roots: List[Path] notifications: Dict[str, TelegramConfig] @dataclass class Application: path: Path owner: str class Storage(ABC): def backup(self, backup_dirs: List[str]) -> bool: """Backup directories""" raise NotImplementedError() class ResticStorage(Storage): TYPE_NAME = "restic" def __init__(self, name: str, params: Dict[str, Any]): self.name = name self.restic_repository = str(params.get("restic_repository", "")) self.restic_password = str(params.get("restic_password", "")) self.aws_access_key_id = str(params.get("aws_access_key_id", "")) self.aws_secret_access_key = str(params.get("aws_secret_access_key", "")) self.aws_default_region = str(params.get("aws_default_region", "")) if not all( [ self.restic_repository, self.restic_password, self.aws_access_key_id, self.aws_secret_access_key, self.aws_default_region, ] ): raise ValueError( f"Missing storage configuration values for backend ResticStorage: '{self.name}'" ) def backup(self, backup_dirs: List[str]) -> bool: if not backup_dirs: logger.warning("No backup directories found") return True try: return self.__backup_internal(backup_dirs) except Exception as exc: # noqa: BLE001 logger.error("Restic backup process failed: %s", exc) return False def __backup_internal(self, backup_dirs: List[str]) -> bool: logger.info("Starting restic backup") logger.info("Destination: %s", self.restic_repository) env = os.environ.copy() env.update( { "RESTIC_REPOSITORY": self.restic_repository, "RESTIC_PASSWORD": self.restic_password, "AWS_ACCESS_KEY_ID": self.aws_access_key_id, "AWS_SECRET_ACCESS_KEY": self.aws_secret_access_key, "AWS_DEFAULT_REGION": self.aws_default_region, } ) backup_cmd = ["restic", "backup", "--verbose"] + backup_dirs result = subprocess.run(backup_cmd, env=env, capture_output=True, text=True) if result.returncode != 0: logger.error("Restic backup failed: %s", result.stderr) return False logger.info("Restic backup completed successfully") check_cmd = ["restic", "check"] result = subprocess.run(check_cmd, env=env, capture_output=True, text=True) if result.returncode != 0: logger.error("Restic check failed: %s", result.stderr) return False logger.info("Restic check completed successfully") forget_cmd = [ "restic", "forget", "--compact", "--prune", "--keep-daily", "90", "--keep-monthly", "36", ] result = subprocess.run(forget_cmd, env=env, capture_output=True, text=True) if result.returncode != 0: logger.error("Restic forget/prune failed: %s", result.stderr) return False logger.info("Restic forget/prune completed successfully") result = subprocess.run(check_cmd, env=env, capture_output=True, text=True) if result.returncode != 0: logger.error("Final restic check failed: %s", result.stderr) return False logger.info("Final restic check completed successfully") return True class BackupManager: def __init__(self, config: Config, storages: List[Storage]): self.errors: List[str] = [] self.warnings: List[str] = [] self.successful_backups: List[str] = [] self.config = config self.storages = storages def _select_telegram(self) -> Optional[TelegramConfig]: if "telegram" in self.config.notifications: return self.config.notifications["telegram"] return next(iter(self.config.notifications.values()), None) def find_applications(self) -> List[Application]: """Get all application directories and their owners.""" applications: List[Application] = [] source_dirs = itertools.chain(*(root.iterdir() for root in self.config.roots)) for app_dir in source_dirs: if "lost+found" in str(app_dir): continue if app_dir.is_dir(): try: stat_info = app_dir.stat() owner = pwd.getpwuid(stat_info.st_uid).pw_name applications.append(Application(path=app_dir, owner=owner)) except (KeyError, OSError) as e: logger.warning(f"Could not get owner for {app_dir}: {e}") return applications def find_backup_script(self, app_dir: str) -> Optional[str]: """Find backup script in user's home directory""" possible_scripts = [ os.path.join(app_dir, "backup.sh"), os.path.join(app_dir, "backup"), ] for script_path in possible_scripts: if os.path.exists(script_path): # Check if file is executable if os.access(script_path, os.X_OK): return script_path else: logger.warning( f"Backup script {script_path} exists but is not executable" ) return None def run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool: """Run backup script as the specified user""" try: logger.info(f"Running backup script {script_path} (user {username})") # Use su to run the script as the user cmd = ["su", "--login", username, "--command", script_path] result = subprocess.run( cmd, cwd=app_dir, capture_output=True, text=True, timeout=3600, # 1 hour timeout ) if result.returncode == 0: logger.info(f"Backup script for {username} completed successfully") self.successful_backups.append(username) return True else: error_msg = f"Backup script {script_path} failed with return code {result.returncode}" if result.stderr: error_msg += f": {result.stderr}" logger.error(error_msg) self.errors.append(f"App {username}: {error_msg}") return False except subprocess.TimeoutExpired: error_msg = f"Backup script {script_path} timed out" logger.error(error_msg) self.errors.append(f"App {username}: {error_msg}") return False except Exception as e: error_msg = f"Failed to run backup script {script_path}: {str(e)}" logger.error(error_msg) self.errors.append(f"App {username}: {error_msg}") return False def get_backup_directories(self) -> List[str]: """Collect backup targets according to backup-targets rules""" backup_dirs: List[str] = [] applications = self.find_applications() def parse_targets_file(targets_file: Path) -> List[str]: """Parse backup-targets file, skipping comments and empty lines.""" targets: List[str] = [] try: for raw_line in targets_file.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#"): continue targets.append(line) except OSError as e: warning_msg = f"Could not read backup targets file {targets_file}: {e}" logger.warning(warning_msg) self.warnings.append(warning_msg) return targets for app in applications: app_dir = app.path targets_file = app_dir / BACKUP_TARGETS_FILE resolved_targets: List[Path] = [] if targets_file.exists(): # Read custom targets defined by the application. for target_line in parse_targets_file(targets_file): target_path = Path(target_line) if not target_path.is_absolute(): target_path = (app_dir / target_path).resolve() else: target_path = target_path.resolve() if target_path.exists(): resolved_targets.append(target_path) else: warning_msg = ( f"Backup target does not exist for {app_dir}: {target_path}" ) logger.warning(warning_msg) self.warnings.append(warning_msg) else: # Fallback to default backups directory when no list is provided. default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve() if default_target.exists(): resolved_targets.append(default_target) else: warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}" logger.warning(warning_msg) self.warnings.append(warning_msg) for target in resolved_targets: target_str = str(target) if target_str not in backup_dirs: backup_dirs.append(target_str) return backup_dirs def send_telegram_notification(self, success: bool) -> None: """Send notification to Telegram""" telegram_cfg = self._select_telegram() if telegram_cfg is None: logger.warning("No telegram notification backend configured") return try: if success and not self.errors: message = ( f"{telegram_cfg.notifications_name}: бекап успешно завершен!" ) if self.successful_backups: message += ( f"\n\nУспешные бекапы: {', '.join(self.successful_backups)}" ) else: message = f"{telegram_cfg.notifications_name}: бекап завершен с ошибками!" if self.successful_backups: message += ( f"\n\n✅ Успешные бекапы: {', '.join(self.successful_backups)}" ) if self.warnings: message += f"\n\n⚠️ Предупреждения:\n" + "\n".join(self.warnings) if self.errors: message += f"\n\n❌ Ошибки:\n" + "\n".join(self.errors) url = f"https://api.telegram.org/bot{telegram_cfg.telegram_bot_token}/sendMessage" data = { "chat_id": telegram_cfg.telegram_chat_id, "parse_mode": "HTML", "text": message, } response = requests.post(url, data=data, timeout=30) if response.status_code == 200: logger.info("Telegram notification sent successfully") else: logger.error( f"Failed to send Telegram notification: {response.status_code} - {response.text}" ) except Exception as e: logger.error(f"Failed to send Telegram notification: {str(e)}") def run_backup_process(self) -> bool: """Main backup process""" logger.info("Starting backup process") # Get all home directories applications = self.find_applications() logger.info(f"Found {len(applications)} application directories") # Process each user's backup for app in applications: app_dir = str(app.path) username = app.owner logger.info(f"Processing backup for app: {app_dir} (user {username})") # Find backup script backup_script = self.find_backup_script(app_dir) if backup_script is None: warning_msg = ( f"No backup script found for app: {app_dir} (user {username})" ) logger.warning(warning_msg) self.warnings.append(warning_msg) continue self.run_app_backup(backup_script, app_dir, username) # Get backup directories backup_dirs = self.get_backup_directories() logger.info(f"Found backup directories: {backup_dirs}") overall_success = True for storage in self.storages: backup_result = storage.backup(backup_dirs) if not backup_result: self.errors.append("Restic backup failed") # Determine overall success overall_success = overall_success and backup_result # Send notification self.send_telegram_notification(overall_success) logger.info("Backup process completed") if self.errors: logger.error(f"Backup completed with {len(self.errors)} errors") return False elif self.warnings: logger.warning(f"Backup completed with {len(self.warnings)} warnings") return True else: logger.info("Backup completed successfully") return True def initialize(config_path: Path) -> BackupManager: try: with config_path.open("rb") as config_file: raw_config = tomllib.load(config_file) except OSError as e: logger.error(f"Failed to read config file {config_path}: {e}") raise roots_raw = raw_config.get("roots") or [] if not isinstance(roots_raw, list) or not roots_raw: raise ValueError("roots must be a non-empty list of paths in config.toml") roots = [Path(root) for root in roots_raw] storage_raw = raw_config.get("storage") or {} storages: List[Storage] = [] for name, params in storage_raw.items(): if not isinstance(params, dict): raise ValueError(f"Storage config for {name} must be a table") storage_type = params.get("type", "") if storage_type == ResticStorage.TYPE_NAME: storages.append(ResticStorage(name, params)) if not storages: raise ValueError("At least one storage backend must be configured") notifications_raw = raw_config.get("notifications") or {} notifications: Dict[str, TelegramConfig] = {} for name, cfg in notifications_raw.items(): if not isinstance(cfg, dict): raise ValueError(f"Notification config for {name} must be a table") notifications[name] = TelegramConfig( type=cfg.get("type", ""), telegram_bot_token=cfg.get("telegram_bot_token", ""), telegram_chat_id=cfg.get("telegram_chat_id", ""), notifications_name=cfg.get("notifications_name", ""), ) if not notifications: raise ValueError("At least one notification backend must be configured") for name, cfg in notifications.items(): if not all( [ cfg.type, cfg.telegram_bot_token, cfg.telegram_chat_id, cfg.notifications_name, ] ): raise ValueError( f"Missing notification configuration values for backend {name}" ) config = Config(roots=roots, notifications=notifications) return BackupManager(config=config, storages=storages) def main(): try: backup_manager = initialize(CONFIG_PATH) success = backup_manager.run_backup_process() if not success: sys.exit(1) except KeyboardInterrupt: logger.info("Backup process interrupted by user") sys.exit(130) except Exception as e: logger.error(f"Unexpected error in backup process: {str(e)}") sys.exit(1) if __name__ == "__main__": main()