#!/usr/bin/env python3 """ Backup script for all applications Automatically discovers and runs backup scripts for all users, then creates restic backups and sends notifications. """ import os import sys import subprocess import logging import pwd from pathlib import Path from typing import List, Tuple, Optional import requests import configparser import itertools # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("/var/log/backup-all.log"), ], ) logger = logging.getLogger(__name__) config = configparser.ConfigParser() config.read("/etc/backup/config.ini") RESTIC_REPOSITORY = config.get("restic", "RESTIC_REPOSITORY") RESTIC_PASSWORD = config.get("restic", "RESTIC_PASSWORD") AWS_ACCESS_KEY_ID = config.get("restic", "AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = config.get("restic", "AWS_SECRET_ACCESS_KEY") AWS_DEFAULT_REGION = config.get("restic", "AWS_DEFAULT_REGION") TELEGRAM_BOT_TOKEN = config.get("telegram", "TELEGRAM_BOT_TOKEN") TELEGRAM_CHAT_ID = config.get("telegram", "TELEGRAM_CHAT_ID") NOTIFICATIONS_NAME = config.get("telegram", "NOTIFICATIONS_NAME") class BackupManager: def __init__(self): self.errors = [] self.warnings = [] self.successful_backups = [] def get_application_directories(self) -> List[Tuple[str, str]]: """Get all home directories and their owners""" app_dirs = [] applications_path = Path("/mnt/applications") home_path = Path("/home") source_dirs = itertools.chain(applications_path.iterdir(), home_path.iterdir()) for app_dir in source_dirs: if app_dir == "lost+found": continue if app_dir.is_dir(): try: # Get the owner of the directory stat_info = app_dir.stat() owner = pwd.getpwuid(stat_info.st_uid).pw_name app_dirs.append((str(app_dir), owner)) except (KeyError, OSError) as e: logger.warning(f"Could not get owner for {app_dir}: {e}") return app_dirs def find_backup_script(self, app_dir: str) -> Optional[str]: """Find backup script in user's home directory""" possible_scripts = [ os.path.join(app_dir, "backup.sh"), os.path.join(app_dir, "backup"), ] for script_path in possible_scripts: if os.path.exists(script_path): # Check if file is executable if os.access(script_path, os.X_OK): return script_path else: logger.warning( f"Backup script {script_path} exists but is not executable" ) return None def run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool: """Run backup script as the specified user""" try: logger.info(f"Running backup script {script_path} (user {username})") # Use su to run the script as the user cmd = ["su", "--login", username, "--command", script_path] result = subprocess.run( cmd, cwd=app_dir, capture_output=True, text=True, timeout=3600, # 1 hour timeout ) if result.returncode == 0: logger.info(f"Backup script for {username} completed successfully") self.successful_backups.append(username) return True else: error_msg = f"Backup script {script_path} failed with return code {result.returncode}" if result.stderr: error_msg += f": {result.stderr}" logger.error(error_msg) self.errors.append(f"App {username}: {error_msg}") return False except subprocess.TimeoutExpired: error_msg = f"Backup script {script_path} timed out" logger.error(error_msg) self.errors.append(f"App {username}: {error_msg}") return False except Exception as e: error_msg = f"Failed to run backup script {script_path}: {str(e)}" logger.error(error_msg) self.errors.append(f"App {username}: {error_msg}") return False def get_backup_directories(self) -> List[str]: """Get all backup directories that exist""" backup_dirs = [] app_dirs = self.get_application_directories() for app_dir, _ in app_dirs: backup_path = os.path.join(app_dir, "backups") if os.path.exists(backup_path) and os.path.isdir(backup_path): backup_dirs.append(backup_path) return backup_dirs def run_restic_backup(self, backup_dirs: List[str]) -> bool: """Run restic backup for all backup directories""" if not backup_dirs: logger.warning("No backup directories found") return True try: logger.info("Starting restic backup") logger.info("Destination: %s", RESTIC_REPOSITORY) # Set environment variables for restic env = os.environ.copy() env.update( { "RESTIC_REPOSITORY": RESTIC_REPOSITORY, "RESTIC_PASSWORD": RESTIC_PASSWORD, "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID, "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY, "AWS_DEFAULT_REGION": AWS_DEFAULT_REGION, } ) # Run backup backup_cmd = ["restic", "backup", "--verbose"] + backup_dirs result = subprocess.run(backup_cmd, env=env, capture_output=True, text=True) if result.returncode != 0: error_msg = f"Restic backup failed: {result.stderr}" logger.error(error_msg) self.errors.append(f"Restic backup: {error_msg}") return False logger.info("Restic backup completed successfully") # Run check check_cmd = ["restic", "check"] result = subprocess.run(check_cmd, env=env, capture_output=True, text=True) if result.returncode != 0: error_msg = f"Restic check failed: {result.stderr}" logger.error(error_msg) self.errors.append(f"Restic check: {error_msg}") return False logger.info("Restic check completed successfully") # Run forget and prune forget_cmd = [ "restic", "forget", "--compact", "--prune", "--keep-daily", "90", "--keep-monthly", "36", ] result = subprocess.run(forget_cmd, env=env, capture_output=True, text=True) if result.returncode != 0: error_msg = f"Restic forget/prune failed: {result.stderr}" logger.error(error_msg) self.errors.append(f"Restic forget/prune: {error_msg}") return False logger.info("Restic forget/prune completed successfully") # Final check result = subprocess.run(check_cmd, env=env, capture_output=True, text=True) if result.returncode != 0: error_msg = f"Final restic check failed: {result.stderr}" logger.error(error_msg) self.errors.append(f"Final restic check: {error_msg}") return False logger.info("Final restic check completed successfully") return True except Exception as e: error_msg = f"Restic backup process failed: {str(e)}" logger.error(error_msg) self.errors.append(f"Restic: {error_msg}") return False def send_telegram_notification(self, success: bool) -> None: """Send notification to Telegram""" try: if success and not self.errors: message = f"{NOTIFICATIONS_NAME}: бекап успешно завершен!" if self.successful_backups: message += ( f"\n\nУспешные бекапы: {', '.join(self.successful_backups)}" ) else: message = f"{NOTIFICATIONS_NAME}: бекап завершен с ошибками!" if self.successful_backups: message += ( f"\n\n✅ Успешные бекапы: {', '.join(self.successful_backups)}" ) if self.warnings: message += f"\n\n⚠️ Предупреждения:\n" + "\n".join(self.warnings) if self.errors: message += f"\n\n❌ Ошибки:\n" + "\n".join(self.errors) url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" data = {"chat_id": TELEGRAM_CHAT_ID, "parse_mode": "HTML", "text": message} response = requests.post(url, data=data, timeout=30) if response.status_code == 200: logger.info("Telegram notification sent successfully") else: logger.error( f"Failed to send Telegram notification: {response.status_code} - {response.text}" ) except Exception as e: logger.error(f"Failed to send Telegram notification: {str(e)}") def run_backup_process(self) -> bool: """Main backup process""" logger.info("Starting backup process") # Get all home directories app_dirs = self.get_application_directories() logger.info(f"Found {len(app_dirs)} application directories") # Process each user's backup for app_dir, username in app_dirs: logger.info(f"Processing backup for app: {app_dir} (user {username})") # Find backup script backup_script = self.find_backup_script(app_dir) if backup_script is None: warning_msg = ( f"No backup script found for app: {app_dir} (user {username})" ) logger.warning(warning_msg) self.warnings.append(warning_msg) continue self.run_app_backup(backup_script, app_dir, username) # Get backup directories backup_dirs = self.get_backup_directories() logger.info(f"Found backup directories: {backup_dirs}") # Run restic backup restic_success = self.run_restic_backup(backup_dirs) # Determine overall success overall_success = restic_success and len(self.errors) == 0 # Send notification self.send_telegram_notification(overall_success) logger.info("Backup process completed") if self.errors: logger.error(f"Backup completed with {len(self.errors)} errors") return False elif self.warnings: logger.warning(f"Backup completed with {len(self.warnings)} warnings") return True else: logger.info("Backup completed successfully") return True def main(): try: backup_manager = BackupManager() success = backup_manager.run_backup_process() if not success: sys.exit(1) except KeyboardInterrupt: logger.info("Backup process interrupted by user") sys.exit(130) except Exception as e: logger.error(f"Unexpected error in backup process: {str(e)}") sys.exit(1) if __name__ == "__main__": main()