Files
pet-project-server/files/backups/backup-all.py
T
av df3a37e610
Linting / YAML Lint (push) Successful in 41s
Linting / Ansible Lint (push) Failing after 1m4s
Backups: backup to home server storage
2026-05-01 10:49:27 +03:00

543 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Backup script for all applications
Automatically discovers and runs backup scripts for all users,
then creates restic backups and sends notifications.
"""
import itertools
import os
import sys
import subprocess
import logging
import pwd
import time
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Any
import requests
import tomllib
# Default config path
CONFIG_PATH = Path("/etc/backup/config.toml")
# File name to store directories and files to back up
BACKUP_TARGETS_FILE = "backup-targets"
# Default directory fo backups (relative to app dir)
# Used when backup-targets file not exists
BACKUP_DEFAULT_DIR = "backups"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("/var/log/backup-all.log"),
],
)
logger = logging.getLogger(__name__)
@dataclass
class Config:
host_name: str
@dataclass
class Application:
path: Path
owner: str
backup_script: Optional[Path]
backup_targets: List[Path]
@dataclass
class StorageRunResult:
name: str
success: bool
duration: float
def format_duration(seconds: float) -> str:
if seconds < 60:
return f"{seconds:.1f}s"
minutes = int(seconds // 60)
secs = int(seconds % 60)
if minutes < 60:
return f"{minutes}m{secs:02d}s"
hours = minutes // 60
minutes = minutes % 60
return f"{hours}h{minutes:02d}m{secs:02d}s"
class Storage(ABC):
name: str
def backup(self, backup_dirs: List[str]) -> bool:
"""Backup directories"""
raise NotImplementedError()
class ResticStorage(Storage):
TYPE_NAME = "restic"
def __init__(self, name: str, params: Dict[str, Any]):
self.name = name
self.restic_repository = str(params.get("restic_repository", ""))
self.restic_password = str(params.get("restic_password", ""))
env_raw = params.get("env") or {}
if not isinstance(env_raw, dict):
raise ValueError(
f"'env' must be a table for storage backend ResticStorage: '{self.name}'"
)
self.env: Dict[str, str] = {str(k): str(v) for k, v in env_raw.items()}
if not self.restic_repository or not self.restic_password:
raise ValueError(
f"Missing storage configuration values for backend ResticStorage: '{self.name}'"
)
def backup(self, backup_dirs: List[str]) -> bool:
if not backup_dirs:
logger.warning("No backup directories found")
return True
try:
return self.__backup_internal(backup_dirs)
except Exception as exc: # noqa: BLE001
logger.error("Restic backup process failed: %s", exc)
return False
def __backup_internal(self, backup_dirs: List[str]) -> bool:
logger.info("Starting restic backup for storage '%s'", self.name)
logger.info("Destination: %s", self.restic_repository)
env = os.environ.copy()
env["RESTIC_REPOSITORY"] = self.restic_repository
env["RESTIC_PASSWORD"] = self.restic_password
env.update(self.env)
backup_cmd = ["restic", "backup", "--verbose"] + backup_dirs
result = subprocess.run(backup_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
logger.error("Restic backup failed: %s", result.stderr)
return False
logger.info("Restic backup completed successfully")
check_cmd = ["restic", "check"]
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
logger.error("Restic check failed: %s", result.stderr)
return False
logger.info("Restic check completed successfully")
forget_cmd = [
"restic",
"forget",
"--compact",
"--prune",
"--keep-daily",
"90",
"--keep-monthly",
"36",
]
result = subprocess.run(forget_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
logger.error("Restic forget/prune failed: %s", result.stderr)
return False
logger.info("Restic forget/prune completed successfully")
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
logger.error("Final restic check failed: %s", result.stderr)
return False
logger.info("Final restic check completed successfully")
return True
class Notifier(ABC):
def send(self, title: str, html_message: str) -> None:
raise NotImplementedError()
class AppriseNotifier(Notifier):
TYPE_NAME = "apprise"
def __init__(self, name: str, params: Dict[str, Any]):
self.name = name
self.api_url = str(params.get("api_url", "")).rstrip("/")
self.tag = str(params.get("tag", ""))
if not self.api_url or not self.tag:
raise ValueError(
f"Missing notification configuration values for backend {name}"
)
def send(self, title: str, html_message: str) -> None:
url = f"{self.api_url}/notify/{self.tag}/"
payload = {
"title": title,
"body": html_message,
"format": "html",
}
response = requests.post(url, json=payload, timeout=30)
if response.ok:
logger.info("Apprise notification sent successfully")
else:
logger.error(
f"Failed to send Apprise notification: {response.status_code} - {response.text}"
)
class ApplicationFinder:
def __init__(self, roots: List[Path]):
self.roots = roots
self.warnings: List[str] = []
def find_applications(self) -> List[Application]:
"""Discover all applications with their backup scripts and targets."""
applications: List[Application] = []
source_dirs = itertools.chain(*(root.iterdir() for root in self.roots))
for app_dir in source_dirs:
if "lost+found" in str(app_dir):
continue
if app_dir.is_dir():
try:
stat_info = app_dir.stat()
owner = pwd.getpwuid(stat_info.st_uid).pw_name
backup_script = self._find_backup_script(app_dir)
backup_targets = self._find_backup_targets(app_dir)
applications.append(
Application(
path=app_dir,
owner=owner,
backup_script=backup_script,
backup_targets=backup_targets,
)
)
except (KeyError, OSError) as e:
logger.warning(f"Could not get owner for {app_dir}: {e}")
applications.sort(key=lambda app: app.path.name)
return applications
def _find_backup_script(self, app_dir: Path) -> Optional[Path]:
"""Find executable backup script in application directory."""
for name in ("backup.sh", "backup"):
script_path = app_dir / name
if script_path.exists():
if os.access(script_path, os.X_OK):
return script_path
else:
logger.warning(
f"Backup script {script_path} exists but is not executable"
)
return None
def _find_backup_targets(self, app_dir: Path) -> List[Path]:
"""Resolve backup target directories for an application."""
targets_file = app_dir / BACKUP_TARGETS_FILE
resolved_targets: List[Path] = []
if targets_file.exists():
for target_line in self._parse_targets_file(targets_file):
target_path = Path(target_line)
if not target_path.is_absolute():
target_path = (app_dir / target_path).resolve()
else:
target_path = target_path.resolve()
if target_path.exists():
resolved_targets.append(target_path)
else:
warning_msg = (
f"Backup target does not exist for {app_dir}: {target_path}"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
else:
default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve()
if default_target.exists():
resolved_targets.append(default_target)
else:
warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
return resolved_targets
def _parse_targets_file(self, targets_file: Path) -> List[str]:
"""Parse backup-targets file, skipping comments and empty lines."""
targets: List[str] = []
try:
for raw_line in targets_file.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
targets.append(line)
except OSError as e:
warning_msg = f"Could not read backup targets file {targets_file}: {e}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
return targets
class BackupManager:
def __init__(
self,
config: Config,
storages: List[Storage],
notifiers: List[Notifier],
):
self.errors: List[str] = []
self.warnings: List[str] = []
self.successful_backups: List[str] = []
self.config = config
self.storages = storages
self.notifiers = notifiers
self.archive_duration: float = 0.0
self.storage_results: List[StorageRunResult] = []
def run_backup_process(self, applications: List[Application]) -> bool:
"""Main backup process"""
logger.info("Starting backup process")
logger.info(f"Found {len(applications)} application directories")
archive_start = time.monotonic()
# Process each user's backup
for app in applications:
app_dir = str(app.path)
username = app.owner
logger.info(f"Processing backup for app: {app_dir} (user {username})")
if app.backup_script is None:
warning_msg = (
f"No backup script found for app: {app_dir} (user {username})"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
continue
self._run_app_backup(str(app.backup_script), app_dir, username)
self.archive_duration = time.monotonic() - archive_start
logger.info(
"Archive phase finished in %s", format_duration(self.archive_duration)
)
# Collect backup directories from applications
backup_dirs: List[str] = []
for app in applications:
for target in app.backup_targets:
target_str = str(target)
if target_str not in backup_dirs:
backup_dirs.append(target_str)
logger.info(f"Found backup directories: {backup_dirs}")
overall_success = True
# Each storage is processed independently: a failure in one storage
# must not prevent the others from being attempted.
for storage in self.storages:
storage_start = time.monotonic()
try:
backup_result = storage.backup(backup_dirs)
except Exception as exc: # noqa: BLE001
logger.error(
"Storage '%s' raised an unexpected error: %s", storage.name, exc
)
backup_result = False
storage_duration = time.monotonic() - storage_start
self.storage_results.append(
StorageRunResult(
name=storage.name,
success=backup_result,
duration=storage_duration,
)
)
logger.info(
"Storage '%s' finished in %s (success=%s)",
storage.name,
format_duration(storage_duration),
backup_result,
)
if not backup_result:
self.errors.append(f"Storage '{storage.name}' backup failed")
# Determine overall success
overall_success = overall_success and backup_result
# Send notification
self._send_notification(overall_success)
logger.info("Backup process completed")
if self.errors:
logger.error(f"Backup completed with {len(self.errors)} errors")
return False
elif self.warnings:
logger.warning(f"Backup completed with {len(self.warnings)} warnings")
return True
else:
logger.info("Backup completed successfully")
return True
def _run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool:
"""Run backup script as the specified user"""
try:
logger.info(f"Running backup script {script_path} (user {username})")
# Use su to run the script as the user
cmd = ["su", "--login", username, "--command", script_path]
result = subprocess.run(
cmd,
cwd=app_dir,
capture_output=True,
text=True,
timeout=3600, # 1 hour timeout
)
if result.returncode == 0:
logger.info(f"Backup script for {username} completed successfully")
self.successful_backups.append(username)
return True
else:
error_msg = f"Backup script {script_path} failed with return code {result.returncode}"
if result.stderr:
error_msg += f": {result.stderr}"
logger.error(error_msg)
self.errors.append(f"App {username}: {error_msg}")
return False
except subprocess.TimeoutExpired:
error_msg = f"Backup script {script_path} timed out"
logger.error(error_msg)
self.errors.append(f"App {username}: {error_msg}")
return False
except Exception as e:
error_msg = f"Failed to run backup script {script_path}: {str(e)}"
logger.error(error_msg)
self.errors.append(f"App {username}: {error_msg}")
return False
def _send_notification(self, success: bool) -> None:
"""Send notification to Notifiers"""
host = self.config.host_name
if success and not self.errors:
title = f"{host}: бекап успешно завершен"
message = f"<p><b>{host}</b>: бекап успешно завершен!</p>"
if self.successful_backups:
items = "".join(f"<li>{b}</li>" for b in self.successful_backups)
message += f"<p>Успешные бекапы:</p><ul>{items}</ul>"
else:
title = f"{host}: бекап завершен с ошибками ({len(self.errors)})"
message = f"<p><b>{host}</b>: бекап завершен с ошибками!</p>"
if self.successful_backups:
items = "".join(f"<li>{b}</li>" for b in self.successful_backups)
message += f"<p>✅ Успешные бекапы:</p><ul>{items}</ul>"
if self.warnings:
items = "".join(f"<li>{w}</li>" for w in self.warnings)
message += f"<p>⚠️ Предупреждения:</p><ul>{items}</ul>"
if self.errors:
items = "".join(f"<li>{e}</li>" for e in self.errors)
message += f"<p>❌ Ошибки:</p><ul>{items}</ul>"
message += f"<p>⏱ Время архивации: {format_duration(self.archive_duration)}</p>"
if self.storage_results:
items = "".join(
f"<li>{'' if r.success else ''} {r.name}: {format_duration(r.duration)}</li>"
for r in self.storage_results
)
message += f"<p>⏱ Время записи в хранилища:</p><ul>{items}</ul>"
for notificator in self.notifiers:
try:
notificator.send(title, message)
except Exception as e:
logger.error(f"Failed to send notification: {str(e)}")
def initialize(
config_path: Path,
) -> tuple[ApplicationFinder, BackupManager]:
try:
with config_path.open("rb") as config_file:
raw_config = tomllib.load(config_file)
except OSError as e:
logger.error(f"Failed to read config file {config_path}: {e}")
raise
host_name = str(raw_config.get("host_name", "unknown"))
roots_raw = raw_config.get("roots") or []
if not isinstance(roots_raw, list) or not roots_raw:
raise ValueError("roots must be a non-empty list of paths in config.toml")
roots = [Path(root) for root in roots_raw]
storage_raw = raw_config.get("storage") or {}
storages: List[Storage] = []
for name, params in storage_raw.items():
if not isinstance(params, dict):
raise ValueError(f"Storage config for {name} must be a table")
storage_type = params.get("type", "")
if storage_type == ResticStorage.TYPE_NAME:
storages.append(ResticStorage(name, params))
if not storages:
raise ValueError("At least one storage backend must be configured")
notifications_raw = raw_config.get("notifier") or {}
notifiers: List[Notifier] = []
for name, params in notifications_raw.items():
if not isinstance(params, dict):
raise ValueError(f"Notificator config for {name} must be a table")
notifier_type = params.get("type", "")
if notifier_type == AppriseNotifier.TYPE_NAME:
notifiers.append(AppriseNotifier(name, params))
if not notifiers:
raise ValueError("At least one notification backend must be configured")
config = Config(host_name=host_name)
app_finder = ApplicationFinder(roots)
backup_manager = BackupManager(
config=config, storages=storages, notifiers=notifiers
)
return app_finder, backup_manager
def main() -> None:
try:
app_finder, backup_manager = initialize(CONFIG_PATH)
applications = app_finder.find_applications()
backup_manager.warnings.extend(app_finder.warnings)
success = backup_manager.run_backup_process(applications)
if not success:
sys.exit(1)
except KeyboardInterrupt:
logger.info("Backup process interrupted by user")
sys.exit(130)
except Exception as e:
logger.error(f"Unexpected error in backup process: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()