Backups: split restic operations into phases for Intelligent Tiering

This commit is contained in:
2026-06-22 17:50:01 +03:00
parent 2b22fde718
commit 0f80e66b66
8 changed files with 438 additions and 47 deletions
+258 -46
View File
@@ -3,8 +3,17 @@
Backup script for all applications
Automatically discovers and runs backup scripts for all users,
then creates restic backups and sends notifications.
restic-операции разнесены на фазы с разной частотой (см. секцию [schedule] в config):
- backup, forget -- каждый прогон (forget БЕЗ --prune: только метаданные снапшотов);
- check -- структурная проверка, обычно еженедельно;
- prune -- репак/освобождение места, редко (квартально);
- verify -- check --read-data-subset, помесячно (полное покрытие за год).
Один прогон выполняет фазы строго последовательно, поэтому restic-локи между фазами
не конфликтуют. Наложение соседних прогонов предотвращается flock в cron-задаче.
"""
import argparse
import itertools
import logging
import os
@@ -14,11 +23,13 @@ import sys
import time
import tomllib
from abc import ABC
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
from croniter import croniter
# Default config path
CONFIG_PATH = Path("/etc/backup/config.toml")
@@ -30,6 +41,22 @@ BACKUP_TARGETS_FILE = "backup-targets"
# Used when backup-targets file not exists
BACKUP_DEFAULT_DIR = "backups"
# Retention policy applied by the `forget` phase on every run.
KEEP_DAILY = "90"
KEEP_MONTHLY = "36"
# Фазы в порядке выполнения. backup и forget идут каждый прогон,
# остальные — по расписанию из config.
PHASE_BACKUP = "backup"
PHASE_FORGET = "forget"
PHASE_CHECK = "check"
PHASE_PRUNE = "prune"
PHASE_VERIFY = "verify"
ALWAYS_PHASES = [PHASE_BACKUP, PHASE_FORGET]
SCHEDULED_PHASES = [PHASE_CHECK, PHASE_PRUNE, PHASE_VERIFY]
PHASE_ORDER = ALWAYS_PHASES + SCHEDULED_PHASES
# Configure logging
logging.basicConfig(
level=logging.INFO,
@@ -47,6 +74,42 @@ class Config:
host_name: str
@dataclass
class MaintenanceOptions:
"""Параметры обслуживающих фаз (см. секцию [maintenance] в config)."""
verify_subset: str = "1/12"
prune_max_unused: str = "20%"
prune_max_repack: str = "5G"
@dataclass
class Schedule:
"""Расписание обслуживающих фаз: фаза -> cron-выражение."""
cron: Dict[str, str] = field(default_factory=dict)
def due_phases(self, now: datetime) -> List[str]:
"""Фазы, которые нужно выполнить в этот прогон, в порядке PHASE_ORDER."""
phases = list(ALWAYS_PHASES)
for phase in SCHEDULED_PHASES:
expr = self.cron.get(phase)
if expr and self._due_today(expr, now):
phases.append(phase)
return phases
@staticmethod
def _due_today(expr: str, now: datetime) -> bool:
"""True, если cron-выражение срабатывает где-то в течение сегодняшних суток.
Мы не сравниваем с текущей минутой (триггер один на сутки в фиксированное
время), а проверяем, попадает ли ближайшее срабатывание выражения на сегодня.
"""
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
nxt = croniter(expr, start - timedelta(minutes=1)).get_next(datetime)
return nxt.date() == now.date()
@dataclass
class Application:
path: Path
@@ -66,6 +129,7 @@ class StorageRunResult:
name: str
success: bool
duration: float
phases: List[str]
def format_duration(seconds: float) -> str:
@@ -83,8 +147,13 @@ def format_duration(seconds: float) -> str:
class Storage(ABC):
name: str
def backup(self, backup_dirs: List[str]) -> BackupResult:
"""Backup directories"""
def run(
self,
backup_dirs: List[str],
phases: List[str],
maintenance: MaintenanceOptions,
) -> BackupResult:
"""Run the requested phases against this storage."""
raise NotImplementedError()
@@ -108,44 +177,104 @@ class ResticStorage(Storage):
f"Missing storage configuration values for backend ResticStorage: '{self.name}'"
)
def backup(self, backup_dirs: List[str]) -> BackupResult:
if not backup_dirs:
logger.warning("No backup directories found")
return BackupResult(success=True)
def run(
self,
backup_dirs: List[str],
phases: List[str],
maintenance: MaintenanceOptions,
) -> BackupResult:
try:
return self.__backup_internal(backup_dirs)
return self.__run_internal(backup_dirs, phases, maintenance)
except Exception as exc: # noqa: BLE001
logger.error("Restic backup process failed: %s", exc)
logger.error("Restic process failed: %s", exc)
return BackupResult(success=False, error=str(exc))
def __backup_internal(self, backup_dirs: List[str]) -> BackupResult:
logger.info("Starting restic backup for storage '%s'", self.name)
def __build_steps(
self,
backup_dirs: List[str],
phases: List[str],
maintenance: MaintenanceOptions,
) -> List[tuple[str, List[str]]]:
"""Собрать restic-команды для запрошенных фаз в порядке PHASE_ORDER."""
steps: List[tuple[str, List[str]]] = []
for phase in PHASE_ORDER:
if phase not in phases:
continue
if phase == PHASE_BACKUP:
if not backup_dirs:
logger.warning(
"No backup directories found, skipping backup phase for '%s'",
self.name,
)
continue
steps.append(
("backup", ["restic", "backup", "--verbose"] + backup_dirs)
)
elif phase == PHASE_FORGET:
# forget БЕЗ --prune: удаляет только метаданные снапшотов, не репакует
# data-паки и не сбивает охлаждение в Intelligent Tiering.
steps.append(
(
"forget",
[
"restic",
"forget",
"--compact",
"--keep-daily",
KEEP_DAILY,
"--keep-monthly",
KEEP_MONTHLY,
],
)
)
elif phase == PHASE_CHECK:
steps.append(("check", ["restic", "check"]))
elif phase == PHASE_PRUNE:
steps.append(
(
"prune",
[
"restic",
"prune",
"--max-unused",
maintenance.prune_max_unused,
"--max-repack-size",
maintenance.prune_max_repack,
],
)
)
elif phase == PHASE_VERIFY:
steps.append(
(
"verify",
[
"restic",
"check",
f"--read-data-subset={maintenance.verify_subset}",
],
)
)
return steps
def __run_internal(
self,
backup_dirs: List[str],
phases: List[str],
maintenance: MaintenanceOptions,
) -> BackupResult:
logger.info("Starting restic run for storage '%s'", self.name)
logger.info("Destination: %s", self.restic_repository)
logger.info("Phases: %s", ", ".join(phases))
env = os.environ.copy()
env["RESTIC_REPOSITORY"] = self.restic_repository
env["RESTIC_PASSWORD"] = self.restic_password
env.update(self.env)
check_cmd = ["restic", "check"]
steps = [
("backup", ["restic", "backup", "--verbose"] + backup_dirs),
("check", check_cmd),
(
"forget/prune",
[
"restic",
"forget",
"--compact",
"--prune",
"--keep-daily",
"90",
"--keep-monthly",
"36",
],
),
("final check", check_cmd),
]
steps = self.__build_steps(backup_dirs, phases, maintenance)
for step, cmd in steps:
error = self.__run_step(step, cmd, env)
@@ -303,6 +432,9 @@ class BackupManager:
config: Config,
storages: List[Storage],
notifiers: List[Notifier],
schedule: Schedule,
maintenance: MaintenanceOptions,
forced_phases: Optional[List[str]] = None,
):
self.errors: List[str] = []
self.warnings: List[str] = []
@@ -310,6 +442,10 @@ class BackupManager:
self.config = config
self.storages = storages
self.notifiers = notifiers
self.schedule = schedule
self.maintenance = maintenance
self.forced_phases = forced_phases
self.active_phases: List[str] = []
self.archive_duration: float = 0.0
self.storage_results: List[StorageRunResult] = []
@@ -318,22 +454,33 @@ class BackupManager:
logger.info("Starting backup process")
logger.info(f"Found {len(applications)} application directories")
# Какие фазы выполняем в этот прогон: либо принудительно из CLI, либо по расписанию.
if self.forced_phases is not None:
self.active_phases = self.forced_phases
logger.info("Phases (forced): %s", ", ".join(self.active_phases))
else:
self.active_phases = self.schedule.due_phases(datetime.now())
logger.info("Phases (scheduled): %s", ", ".join(self.active_phases))
archive_start = time.monotonic()
# Process each user's backup
for app in applications:
app_dir = str(app.path)
username = app.owner
logger.info(f"Processing backup for app: {app_dir} (user {username})")
# Archive phase (per-app backup scripts) нужна только если будем делать restic backup.
if PHASE_BACKUP in self.active_phases:
for app in applications:
app_dir = str(app.path)
username = app.owner
logger.info(f"Processing backup for app: {app_dir} (user {username})")
if app.backup_script is None:
warning_msg = (
f"No backup script found for app: {app_dir} (user {username})"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
continue
if app.backup_script is None:
warning_msg = (
f"No backup script found for app: {app_dir} (user {username})"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
continue
self._run_app_backup(str(app.backup_script), app_dir, username)
self._run_app_backup(str(app.backup_script), app_dir, username)
else:
logger.info("Backup phase not active, skipping per-app archive scripts")
self.archive_duration = time.monotonic() - archive_start
logger.info(
"Archive phase finished in %s", format_duration(self.archive_duration)
@@ -355,7 +502,9 @@ class BackupManager:
for storage in self.storages:
storage_start = time.monotonic()
try:
backup_result = storage.backup(backup_dirs)
backup_result = storage.run(
backup_dirs, self.active_phases, self.maintenance
)
except Exception as exc: # noqa: BLE001
logger.error(
"Storage '%s' raised an unexpected error: %s", storage.name, exc
@@ -367,6 +516,7 @@ class BackupManager:
name=storage.name,
success=backup_result.success,
duration=storage_duration,
phases=list(self.active_phases),
)
)
logger.info(
@@ -442,6 +592,7 @@ class BackupManager:
"""Send notification to Notifiers"""
host = self.config.host_name
phases_text = ", ".join(self.active_phases) if self.active_phases else ""
if success and not self.errors:
title = f"{host}: бекап успешно завершен"
@@ -465,6 +616,7 @@ class BackupManager:
items = "".join(f"<li>{e}</li>" for e in self.errors)
message += f"<p>❌ Ошибки:</p><ul>{items}</ul>"
message += f"<p>🔧 Фазы restic: {phases_text}</p>"
message += f"<p>⏱ Время архивации: {format_duration(self.archive_duration)}</p>"
if self.storage_results:
items = "".join(
@@ -480,8 +632,21 @@ class BackupManager:
logger.error(f"Failed to send notification: {str(e)}")
def parse_phases(raw: str) -> List[str]:
"""Разобрать CLI-список фаз, вернуть их в порядке PHASE_ORDER."""
requested = {p.strip() for p in raw.split(",") if p.strip()}
unknown = requested - set(PHASE_ORDER)
if unknown:
raise ValueError(
f"Unknown phases: {', '.join(sorted(unknown))}. "
f"Allowed: {', '.join(PHASE_ORDER)}"
)
return [p for p in PHASE_ORDER if p in requested]
def initialize(
config_path: Path,
forced_phases: Optional[List[str]] = None,
) -> tuple[ApplicationFinder, BackupManager]:
try:
with config_path.open("rb") as config_file:
@@ -519,18 +684,65 @@ def initialize(
if not notifiers:
raise ValueError("At least one notification backend must be configured")
schedule_raw = raw_config.get("schedule") or {}
if not isinstance(schedule_raw, dict):
raise ValueError("'schedule' must be a table in config.toml")
schedule = Schedule(
cron={
phase: str(schedule_raw[phase])
for phase in SCHEDULED_PHASES
if phase in schedule_raw
}
)
maintenance_raw = raw_config.get("maintenance") or {}
if not isinstance(maintenance_raw, dict):
raise ValueError("'maintenance' must be a table in config.toml")
defaults = MaintenanceOptions()
maintenance = MaintenanceOptions(
verify_subset=str(maintenance_raw.get("verify_subset", defaults.verify_subset)),
prune_max_unused=str(
maintenance_raw.get("prune_max_unused", defaults.prune_max_unused)
),
prune_max_repack=str(
maintenance_raw.get("prune_max_repack", defaults.prune_max_repack)
),
)
config = Config(host_name=host_name)
app_finder = ApplicationFinder(roots)
backup_manager = BackupManager(
config=config, storages=storages, notifiers=notifiers
config=config,
storages=storages,
notifiers=notifiers,
schedule=schedule,
maintenance=maintenance,
forced_phases=forced_phases,
)
return app_finder, backup_manager
def main() -> None:
parser = argparse.ArgumentParser(description="Run application backups via restic")
parser.add_argument(
"--config",
type=Path,
default=CONFIG_PATH,
help=f"Path to config.toml (default: {CONFIG_PATH})",
)
parser.add_argument(
"--phases",
help=(
"Comma-separated phases to run, overriding the schedule "
f"(allowed: {', '.join(PHASE_ORDER)}). Useful for manual maintenance runs."
),
)
args = parser.parse_args()
try:
app_finder, backup_manager = initialize(CONFIG_PATH)
forced_phases = parse_phases(args.phases) if args.phases else None
app_finder, backup_manager = initialize(args.config, forced_phases)
applications = app_finder.find_applications()
backup_manager.warnings.extend(app_finder.warnings)
success = backup_manager.run_backup_process(applications)