Files
pet-project-server/files/backups/backup-all.py
T

761 lines
28 KiB
Python

#!/usr/bin/env python3
"""
Backup script for all applications
Automatically discovers and runs backup scripts for all users,
then creates restic backups and sends notifications.
restic-операции разнесены на фазы с разной частотой (см. секцию [schedule] в config):
- backup, forget -- каждый прогон (forget БЕЗ --prune: только метаданные снапшотов);
- check -- структурная проверка, обычно еженедельно;
- prune -- репак/освобождение места, редко (квартально);
- verify -- check --read-data-subset, помесячно (полное покрытие за год).
Один прогон выполняет фазы строго последовательно, поэтому restic-локи между фазами
не конфликтуют. Наложение соседних прогонов предотвращается flock в cron-задаче.
"""
import argparse
import itertools
import logging
import os
import pwd
import subprocess
import sys
import time
import tomllib
from abc import ABC
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
from croniter import croniter
# Default config path
CONFIG_PATH = Path("/etc/backup/config.toml")
# File name to store directories and files to back up
BACKUP_TARGETS_FILE = "backup-targets"
# Default directory fo backups (relative to app dir)
# Used when backup-targets file not exists
BACKUP_DEFAULT_DIR = "backups"
# Retention policy applied by the `forget` phase on every run.
KEEP_DAILY = "90"
KEEP_MONTHLY = "36"
# Фазы в порядке выполнения. backup и forget идут каждый прогон,
# остальные — по расписанию из config.
PHASE_BACKUP = "backup"
PHASE_FORGET = "forget"
PHASE_CHECK = "check"
PHASE_PRUNE = "prune"
PHASE_VERIFY = "verify"
ALWAYS_PHASES = [PHASE_BACKUP, PHASE_FORGET]
SCHEDULED_PHASES = [PHASE_CHECK, PHASE_PRUNE, PHASE_VERIFY]
PHASE_ORDER = ALWAYS_PHASES + SCHEDULED_PHASES
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("/var/log/backup-all.log"),
],
)
logger = logging.getLogger(__name__)
@dataclass
class Config:
host_name: str
@dataclass
class MaintenanceOptions:
"""Параметры обслуживающих фаз (см. секцию [maintenance] в config)."""
verify_subset: str = "1/12"
prune_max_unused: str = "20%"
prune_max_repack: str = "5G"
@dataclass
class Schedule:
"""Расписание обслуживающих фаз: фаза -> cron-выражение."""
cron: Dict[str, str] = field(default_factory=dict)
def due_phases(self, now: datetime) -> List[str]:
"""Фазы, которые нужно выполнить в этот прогон, в порядке PHASE_ORDER."""
phases = list(ALWAYS_PHASES)
for phase in SCHEDULED_PHASES:
expr = self.cron.get(phase)
if expr and self._due_today(expr, now):
phases.append(phase)
return phases
@staticmethod
def _due_today(expr: str, now: datetime) -> bool:
"""True, если cron-выражение срабатывает где-то в течение сегодняшних суток.
Мы не сравниваем с текущей минутой (триггер один на сутки в фиксированное
время), а проверяем, попадает ли ближайшее срабатывание выражения на сегодня.
"""
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
nxt = croniter(expr, start - timedelta(minutes=1)).get_next(datetime)
return nxt.date() == now.date()
@dataclass
class Application:
path: Path
owner: str
backup_script: Optional[Path]
backup_targets: List[Path]
@dataclass
class BackupResult:
success: bool
error: Optional[str] = None
@dataclass
class StorageRunResult:
name: str
success: bool
duration: float
phases: List[str]
def format_duration(seconds: float) -> str:
if seconds < 60:
return f"{seconds:.1f}s"
minutes = int(seconds // 60)
secs = int(seconds % 60)
if minutes < 60:
return f"{minutes}m{secs:02d}s"
hours = minutes // 60
minutes = minutes % 60
return f"{hours}h{minutes:02d}m{secs:02d}s"
class Storage(ABC):
name: str
def run(
self,
backup_dirs: List[str],
phases: List[str],
maintenance: MaintenanceOptions,
) -> BackupResult:
"""Run the requested phases against this storage."""
raise NotImplementedError()
class ResticStorage(Storage):
TYPE_NAME = "restic"
def __init__(self, name: str, params: Dict[str, Any]):
self.name = name
self.restic_repository = str(params.get("restic_repository", ""))
self.restic_password = str(params.get("restic_password", ""))
env_raw = params.get("env") or {}
if not isinstance(env_raw, dict):
raise ValueError(
f"'env' must be a table for storage backend ResticStorage: '{self.name}'"
)
self.env: Dict[str, str] = {str(k): str(v) for k, v in env_raw.items()}
if not self.restic_repository or not self.restic_password:
raise ValueError(
f"Missing storage configuration values for backend ResticStorage: '{self.name}'"
)
def run(
self,
backup_dirs: List[str],
phases: List[str],
maintenance: MaintenanceOptions,
) -> BackupResult:
try:
return self.__run_internal(backup_dirs, phases, maintenance)
except Exception as exc: # noqa: BLE001
logger.error("Restic process failed: %s", exc)
return BackupResult(success=False, error=str(exc))
def __build_steps(
self,
backup_dirs: List[str],
phases: List[str],
maintenance: MaintenanceOptions,
) -> List[tuple[str, List[str]]]:
"""Собрать restic-команды для запрошенных фаз в порядке PHASE_ORDER."""
steps: List[tuple[str, List[str]]] = []
for phase in PHASE_ORDER:
if phase not in phases:
continue
if phase == PHASE_BACKUP:
if not backup_dirs:
logger.warning(
"No backup directories found, skipping backup phase for '%s'",
self.name,
)
continue
steps.append(
("backup", ["restic", "backup", "--verbose"] + backup_dirs)
)
elif phase == PHASE_FORGET:
# forget БЕЗ --prune: удаляет только метаданные снапшотов, не репакует
# data-паки и не сбивает охлаждение в Intelligent Tiering.
steps.append(
(
"forget",
[
"restic",
"forget",
"--compact",
"--keep-daily",
KEEP_DAILY,
"--keep-monthly",
KEEP_MONTHLY,
],
)
)
elif phase == PHASE_CHECK:
steps.append(("check", ["restic", "check"]))
elif phase == PHASE_PRUNE:
steps.append(
(
"prune",
[
"restic",
"prune",
"--max-unused",
maintenance.prune_max_unused,
"--max-repack-size",
maintenance.prune_max_repack,
],
)
)
elif phase == PHASE_VERIFY:
steps.append(
(
"verify",
[
"restic",
"check",
f"--read-data-subset={maintenance.verify_subset}",
],
)
)
return steps
def __run_internal(
self,
backup_dirs: List[str],
phases: List[str],
maintenance: MaintenanceOptions,
) -> BackupResult:
logger.info("Starting restic run for storage '%s'", self.name)
logger.info("Destination: %s", self.restic_repository)
logger.info("Phases: %s", ", ".join(phases))
env = os.environ.copy()
env["RESTIC_REPOSITORY"] = self.restic_repository
env["RESTIC_PASSWORD"] = self.restic_password
env.update(self.env)
steps = self.__build_steps(backup_dirs, phases, maintenance)
for step, cmd in steps:
error = self.__run_step(step, cmd, env)
if error is not None:
return BackupResult(success=False, error=f"restic {step}: {error}")
return BackupResult(success=True)
def __run_step(
self, step: str, cmd: List[str], env: Dict[str, str]
) -> Optional[str]:
"""Run a single restic command. Return None on success or error text."""
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
error = result.stderr.strip() or result.stdout.strip() or "no output"
logger.error("Restic %s failed: %s", step, error)
return error
logger.info("Restic %s completed successfully", step)
return None
class Notifier(ABC):
def send(self, title: str, html_message: str) -> None:
raise NotImplementedError()
class AppriseNotifier(Notifier):
TYPE_NAME = "apprise"
def __init__(self, name: str, params: Dict[str, Any]):
self.name = name
self.api_url = str(params.get("api_url", "")).rstrip("/")
self.tag = str(params.get("tag", ""))
if not self.api_url or not self.tag:
raise ValueError(
f"Missing notification configuration values for backend {name}"
)
def send(self, title: str, html_message: str) -> None:
url = f"{self.api_url}/notify/{self.tag}/"
payload = {
"title": title,
"body": html_message,
"format": "html",
}
response = requests.post(url, json=payload, timeout=30)
if response.ok:
logger.info("Apprise notification sent successfully")
else:
logger.error(
f"Failed to send Apprise notification: {response.status_code} - {response.text}"
)
class ApplicationFinder:
def __init__(self, roots: List[Path]):
self.roots = roots
self.warnings: List[str] = []
def find_applications(self) -> List[Application]:
"""Discover all applications with their backup scripts and targets."""
applications: List[Application] = []
source_dirs = itertools.chain(*(root.iterdir() for root in self.roots))
for app_dir in source_dirs:
if "lost+found" in str(app_dir):
continue
if app_dir.is_dir():
try:
stat_info = app_dir.stat()
owner = pwd.getpwuid(stat_info.st_uid).pw_name
backup_script = self._find_backup_script(app_dir)
backup_targets = self._find_backup_targets(app_dir)
applications.append(
Application(
path=app_dir,
owner=owner,
backup_script=backup_script,
backup_targets=backup_targets,
)
)
except (KeyError, OSError) as e:
logger.warning(f"Could not get owner for {app_dir}: {e}")
applications.sort(key=lambda app: app.path.name)
return applications
def _find_backup_script(self, app_dir: Path) -> Optional[Path]:
"""Find executable backup script in application directory."""
for name in ("backup.sh", "backup"):
script_path = app_dir / name
if script_path.exists():
if os.access(script_path, os.X_OK):
return script_path
else:
logger.warning(
f"Backup script {script_path} exists but is not executable"
)
return None
def _find_backup_targets(self, app_dir: Path) -> List[Path]:
"""Resolve backup target directories for an application."""
targets_file = app_dir / BACKUP_TARGETS_FILE
resolved_targets: List[Path] = []
if targets_file.exists():
for target_line in self._parse_targets_file(targets_file):
target_path = Path(target_line)
if not target_path.is_absolute():
target_path = (app_dir / target_path).resolve()
else:
target_path = target_path.resolve()
if target_path.exists():
resolved_targets.append(target_path)
else:
warning_msg = (
f"Backup target does not exist for {app_dir}: {target_path}"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
else:
default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve()
if default_target.exists():
resolved_targets.append(default_target)
else:
warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
return resolved_targets
def _parse_targets_file(self, targets_file: Path) -> List[str]:
"""Parse backup-targets file, skipping comments and empty lines."""
targets: List[str] = []
try:
for raw_line in targets_file.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
targets.append(line)
except OSError as e:
warning_msg = f"Could not read backup targets file {targets_file}: {e}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
return targets
class BackupManager:
def __init__(
self,
config: Config,
storages: List[Storage],
notifiers: List[Notifier],
schedule: Schedule,
maintenance: MaintenanceOptions,
forced_phases: Optional[List[str]] = None,
):
self.errors: List[str] = []
self.warnings: List[str] = []
self.successful_backups: List[str] = []
self.config = config
self.storages = storages
self.notifiers = notifiers
self.schedule = schedule
self.maintenance = maintenance
self.forced_phases = forced_phases
self.active_phases: List[str] = []
self.archive_duration: float = 0.0
self.storage_results: List[StorageRunResult] = []
def run_backup_process(self, applications: List[Application]) -> bool:
"""Main backup process"""
logger.info("Starting backup process")
logger.info(f"Found {len(applications)} application directories")
# Какие фазы выполняем в этот прогон: либо принудительно из CLI, либо по расписанию.
if self.forced_phases is not None:
self.active_phases = self.forced_phases
logger.info("Phases (forced): %s", ", ".join(self.active_phases))
else:
self.active_phases = self.schedule.due_phases(datetime.now())
logger.info("Phases (scheduled): %s", ", ".join(self.active_phases))
archive_start = time.monotonic()
# Archive phase (per-app backup scripts) нужна только если будем делать restic backup.
if PHASE_BACKUP in self.active_phases:
for app in applications:
app_dir = str(app.path)
username = app.owner
logger.info(f"Processing backup for app: {app_dir} (user {username})")
if app.backup_script is None:
warning_msg = (
f"No backup script found for app: {app_dir} (user {username})"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
continue
self._run_app_backup(str(app.backup_script), app_dir, username)
else:
logger.info("Backup phase not active, skipping per-app archive scripts")
self.archive_duration = time.monotonic() - archive_start
logger.info(
"Archive phase finished in %s", format_duration(self.archive_duration)
)
# Collect backup directories from applications
backup_dirs: List[str] = []
for app in applications:
for target in app.backup_targets:
target_str = str(target)
if target_str not in backup_dirs:
backup_dirs.append(target_str)
logger.info(f"Found backup directories: {backup_dirs}")
overall_success = True
# Each storage is processed independently: a failure in one storage
# must not prevent the others from being attempted.
for storage in self.storages:
storage_start = time.monotonic()
try:
backup_result = storage.run(
backup_dirs, self.active_phases, self.maintenance
)
except Exception as exc: # noqa: BLE001
logger.error(
"Storage '%s' raised an unexpected error: %s", storage.name, exc
)
backup_result = BackupResult(success=False, error=str(exc))
storage_duration = time.monotonic() - storage_start
self.storage_results.append(
StorageRunResult(
name=storage.name,
success=backup_result.success,
duration=storage_duration,
phases=list(self.active_phases),
)
)
logger.info(
"Storage '%s' finished in %s (success=%s)",
storage.name,
format_duration(storage_duration),
backup_result.success,
)
if not backup_result.success:
error_msg = f"Storage '{storage.name}' backup failed"
if backup_result.error:
error_msg += f": {backup_result.error}"
self.errors.append(error_msg)
# Determine overall success
overall_success = overall_success and backup_result.success
# Send notification
self._send_notification(overall_success)
logger.info("Backup process completed")
if self.errors:
logger.error(f"Backup completed with {len(self.errors)} errors")
return False
elif self.warnings:
logger.warning(f"Backup completed with {len(self.warnings)} warnings")
return True
else:
logger.info("Backup completed successfully")
return True
def _run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool:
"""Run backup script as the specified user"""
try:
logger.info(f"Running backup script {script_path} (user {username})")
# Use su to run the script as the user
cmd = ["su", "--login", username, "--command", script_path]
result = subprocess.run(
cmd,
cwd=app_dir,
capture_output=True,
text=True,
timeout=3600, # 1 hour timeout
)
if result.returncode == 0:
logger.info(f"Backup script for {username} completed successfully")
self.successful_backups.append(username)
return True
else:
error_msg = f"Backup script {script_path} failed with return code {result.returncode}"
if result.stderr:
error_msg += f": {result.stderr}"
logger.error(error_msg)
self.errors.append(f"App {username}: {error_msg}")
return False
except subprocess.TimeoutExpired:
error_msg = f"Backup script {script_path} timed out"
logger.error(error_msg)
self.errors.append(f"App {username}: {error_msg}")
return False
except Exception as e:
error_msg = f"Failed to run backup script {script_path}: {str(e)}"
logger.error(error_msg)
self.errors.append(f"App {username}: {error_msg}")
return False
def _send_notification(self, success: bool) -> None:
"""Send notification to Notifiers"""
host = self.config.host_name
phases_text = ", ".join(self.active_phases) if self.active_phases else "—"
if success and not self.errors:
title = f"{host}: бекап успешно завершен"
message = f"<p><b>{host}</b>: бекап успешно завершен!</p>"
if self.successful_backups:
items = "".join(f"<li>{b}</li>" for b in self.successful_backups)
message += f"<p>Успешные бекапы:</p><ul>{items}</ul>"
else:
title = f"{host}: бекап завершен с ошибками ({len(self.errors)})"
message = f"<p><b>{host}</b>: бекап завершен с ошибками!</p>"
if self.successful_backups:
items = "".join(f"<li>{b}</li>" for b in self.successful_backups)
message += f"<p>✅ Успешные бекапы:</p><ul>{items}</ul>"
if self.warnings:
items = "".join(f"<li>{w}</li>" for w in self.warnings)
message += f"<p>⚠️ Предупреждения:</p><ul>{items}</ul>"
if self.errors:
items = "".join(f"<li>{e}</li>" for e in self.errors)
message += f"<p>❌ Ошибки:</p><ul>{items}</ul>"
message += f"<p>🔧 Фазы restic: {phases_text}</p>"
message += f"<p>⏱ Время архивации: {format_duration(self.archive_duration)}</p>"
if self.storage_results:
items = "".join(
f"<li>{'✅' if r.success else '❌'} {r.name}: {format_duration(r.duration)}</li>"
for r in self.storage_results
)
message += f"<p>⏱ Время записи в хранилища:</p><ul>{items}</ul>"
for notificator in self.notifiers:
try:
notificator.send(title, message)
except Exception as e:
logger.error(f"Failed to send notification: {str(e)}")
def parse_phases(raw: str) -> List[str]:
"""Разобрать CLI-список фаз, вернуть их в порядке PHASE_ORDER."""
requested = {p.strip() for p in raw.split(",") if p.strip()}
unknown = requested - set(PHASE_ORDER)
if unknown:
raise ValueError(
f"Unknown phases: {', '.join(sorted(unknown))}. "
f"Allowed: {', '.join(PHASE_ORDER)}"
)
return [p for p in PHASE_ORDER if p in requested]
def initialize(
config_path: Path,
forced_phases: Optional[List[str]] = None,
) -> tuple[ApplicationFinder, BackupManager]:
try:
with config_path.open("rb") as config_file:
raw_config = tomllib.load(config_file)
except OSError as e:
logger.error(f"Failed to read config file {config_path}: {e}")
raise
host_name = str(raw_config.get("host_name", "unknown"))
roots_raw = raw_config.get("roots") or []
if not isinstance(roots_raw, list) or not roots_raw:
raise ValueError("roots must be a non-empty list of paths in config.toml")
roots = [Path(root) for root in roots_raw]
storage_raw = raw_config.get("storage") or {}
storages: List[Storage] = []
for name, params in storage_raw.items():
if not isinstance(params, dict):
raise ValueError(f"Storage config for {name} must be a table")
storage_type = params.get("type", "")
if storage_type == ResticStorage.TYPE_NAME:
storages.append(ResticStorage(name, params))
if not storages:
raise ValueError("At least one storage backend must be configured")
notifications_raw = raw_config.get("notifier") or {}
notifiers: List[Notifier] = []
for name, params in notifications_raw.items():
if not isinstance(params, dict):
raise ValueError(f"Notificator config for {name} must be a table")
notifier_type = params.get("type", "")
if notifier_type == AppriseNotifier.TYPE_NAME:
notifiers.append(AppriseNotifier(name, params))
if not notifiers:
raise ValueError("At least one notification backend must be configured")
schedule_raw = raw_config.get("schedule") or {}
if not isinstance(schedule_raw, dict):
raise ValueError("'schedule' must be a table in config.toml")
schedule = Schedule(
cron={
phase: str(schedule_raw[phase])
for phase in SCHEDULED_PHASES
if phase in schedule_raw
}
)
maintenance_raw = raw_config.get("maintenance") or {}
if not isinstance(maintenance_raw, dict):
raise ValueError("'maintenance' must be a table in config.toml")
defaults = MaintenanceOptions()
maintenance = MaintenanceOptions(
verify_subset=str(maintenance_raw.get("verify_subset", defaults.verify_subset)),
prune_max_unused=str(
maintenance_raw.get("prune_max_unused", defaults.prune_max_unused)
),
prune_max_repack=str(
maintenance_raw.get("prune_max_repack", defaults.prune_max_repack)
),
)
config = Config(host_name=host_name)
app_finder = ApplicationFinder(roots)
backup_manager = BackupManager(
config=config,
storages=storages,
notifiers=notifiers,
schedule=schedule,
maintenance=maintenance,
forced_phases=forced_phases,
)
return app_finder, backup_manager
def main() -> None:
parser = argparse.ArgumentParser(description="Run application backups via restic")
parser.add_argument(
"--config",
type=Path,
default=CONFIG_PATH,
help=f"Path to config.toml (default: {CONFIG_PATH})",
)
parser.add_argument(
"--phases",
help=(
"Comma-separated phases to run, overriding the schedule "
f"(allowed: {', '.join(PHASE_ORDER)}). Useful for manual maintenance runs."
),
)
args = parser.parse_args()
try:
forced_phases = parse_phases(args.phases) if args.phases else None
app_finder, backup_manager = initialize(args.config, forced_phases)
applications = app_finder.find_applications()
backup_manager.warnings.extend(app_finder.warnings)
success = backup_manager.run_backup_process(applications)
if not success:
sys.exit(1)
except KeyboardInterrupt:
logger.info("Backup process interrupted by user")
sys.exit(130)
except Exception as e:
logger.error(f"Unexpected error in backup process: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()