feat-backup-classes #2

Merged
av merged 7 commits from feat-backup-classes into master 2025-12-21 07:14:21 +00:00
3 changed files with 245 additions and 270 deletions

View File

@@ -10,13 +10,22 @@ import sys
import subprocess
import logging
import pwd
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Any
import requests
import tomllib
from collections.abc import Iterable
# Default config path
CONFIG_PATH = Path("/etc/backup/config.toml")
# File name to store directories and files to back up
BACKUP_TARGETS_FILE = "backup-targets"
# Default directory fo backups (relative to app dir)
# Used when backup-targets file not exists
BACKUP_DEFAULT_DIR = "backups"
# Configure logging
logging.basicConfig(
@@ -30,113 +39,10 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
@dataclass
class StorageConfig:
type: str
restic_repository: str
restic_password: str
aws_access_key_id: str
aws_secret_access_key: str
aws_default_region: str
@dataclass
class TelegramConfig:
type: str
telegram_bot_token: str
telegram_chat_id: str
notifications_name: str
@dataclass
class Config:
host_name: str
roots: List[Path]
storage: Dict[str, StorageConfig]
notifications: Dict[str, TelegramConfig]
def read_config(config_path: Path) -> Config:
try:
with config_path.open("rb") as config_file:
raw_config = tomllib.load(config_file)
except OSError as e:
logger.error(f"Failed to read config file {config_path}: {e}")
raise
roots_raw = raw_config.get("roots") or []
if not isinstance(roots_raw, list) or not roots_raw:
raise ValueError("roots must be a non-empty list of paths in config.toml")
roots = [Path(root) for root in roots_raw]
storage_raw = raw_config.get("storage") or {}
storage: Dict[str, StorageConfig] = {}
for name, cfg in storage_raw.items():
if not isinstance(cfg, dict):
raise ValueError(f"Storage config for {name} must be a table")
storage[name] = StorageConfig(
type=cfg.get("type", ""),
restic_repository=cfg.get("restic_repository", ""),
restic_password=cfg.get("restic_password", ""),
aws_access_key_id=cfg.get("aws_access_key_id", ""),
aws_secret_access_key=cfg.get("aws_secret_access_key", ""),
aws_default_region=cfg.get("aws_default_region", ""),
)
if not storage:
raise ValueError("At least one storage backend must be configured")
notifications_raw = raw_config.get("notifications") or {}
notifications: Dict[str, TelegramConfig] = {}
for name, cfg in notifications_raw.items():
if not isinstance(cfg, dict):
raise ValueError(f"Notification config for {name} must be a table")
notifications[name] = TelegramConfig(
type=cfg.get("type", ""),
telegram_bot_token=cfg.get("telegram_bot_token", ""),
telegram_chat_id=cfg.get("telegram_chat_id", ""),
notifications_name=cfg.get("notifications_name", ""),
)
if not notifications:
raise ValueError("At least one notification backend must be configured")
for name, cfg in storage.items():
if not all(
[
cfg.type,
cfg.restic_repository,
cfg.restic_password,
cfg.aws_access_key_id,
cfg.aws_secret_access_key,
cfg.aws_default_region,
]
):
raise ValueError(f"Missing storage configuration values for backend {name}")
for name, cfg in notifications.items():
if not all(
[
cfg.type,
cfg.telegram_bot_token,
cfg.telegram_chat_id,
cfg.notifications_name,
]
):
raise ValueError(
f"Missing notification configuration values for backend {name}"
)
return Config(roots=roots, storage=storage, notifications=notifications)
CONFIG_PATH = Path("/etc/backup/config.toml")
# File name to store directories and files to back up
BACKUP_TARGETS_FILE = "backup-targets"
# Default directory fo backups (relative to app dir)
# Used when backup-targets file not exists
BACKUP_DEFAULT_DIR = "backups"
@dataclass
@@ -145,27 +51,167 @@ class Application:
owner: str
class Storage(ABC):
def backup(self, backup_dirs: List[str]) -> bool:
"""Backup directories"""
raise NotImplementedError()
class ResticStorage(Storage):
TYPE_NAME = "restic"
def __init__(self, name: str, params: Dict[str, Any]):
self.name = name
self.restic_repository = str(params.get("restic_repository", ""))
self.restic_password = str(params.get("restic_password", ""))
self.aws_access_key_id = str(params.get("aws_access_key_id", ""))
self.aws_secret_access_key = str(params.get("aws_secret_access_key", ""))
self.aws_default_region = str(params.get("aws_default_region", ""))
if not all(
[
self.restic_repository,
self.restic_password,
self.aws_access_key_id,
self.aws_secret_access_key,
self.aws_default_region,
]
):
raise ValueError(
f"Missing storage configuration values for backend ResticStorage: '{self.name}'"
)
def backup(self, backup_dirs: List[str]) -> bool:
if not backup_dirs:
logger.warning("No backup directories found")
return True
try:
return self.__backup_internal(backup_dirs)
except Exception as exc: # noqa: BLE001
logger.error("Restic backup process failed: %s", exc)
return False
def __backup_internal(self, backup_dirs: List[str]) -> bool:
logger.info("Starting restic backup")
logger.info("Destination: %s", self.restic_repository)
env = os.environ.copy()
env.update(
{
"RESTIC_REPOSITORY": self.restic_repository,
"RESTIC_PASSWORD": self.restic_password,
"AWS_ACCESS_KEY_ID": self.aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": self.aws_secret_access_key,
"AWS_DEFAULT_REGION": self.aws_default_region,
}
)
backup_cmd = ["restic", "backup", "--verbose"] + backup_dirs
result = subprocess.run(backup_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
logger.error("Restic backup failed: %s", result.stderr)
return False
logger.info("Restic backup completed successfully")
check_cmd = ["restic", "check"]
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
logger.error("Restic check failed: %s", result.stderr)
return False
logger.info("Restic check completed successfully")
forget_cmd = [
"restic",
"forget",
"--compact",
"--prune",
"--keep-daily",
"90",
"--keep-monthly",
"36",
]
result = subprocess.run(forget_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
logger.error("Restic forget/prune failed: %s", result.stderr)
return False
logger.info("Restic forget/prune completed successfully")
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
logger.error("Final restic check failed: %s", result.stderr)
return False
logger.info("Final restic check completed successfully")
return True
class Notifier(ABC):
def send(self, html_message: str):
raise NotImplementedError()
class TelegramNotifier(Notifier):
TYPE_NAME = "telegram"
def __init__(self, name: str, params: Dict[str, Any]):
self.name = name
self.telegram_bot_token = str(params.get("telegram_bot_token", ""))
self.telegram_chat_id = str(params.get("telegram_chat_id", ""))
if not all(
[
self.telegram_bot_token,
self.telegram_chat_id,
]
):
raise ValueError(
f"Missing notification configuration values for backend {name}"
)
def send(self, html_message: str):
url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
data = {
"chat_id": self.telegram_chat_id,
"parse_mode": "HTML",
"text": html_message,
}
response = requests.post(url, data=data, timeout=30)
if response.status_code == 200:
logger.info("Telegram notification sent successfully")
else:
logger.error(
f"Failed to send Telegram notification: {response.status_code} - {response.text}"
)
class BackupManager:
def __init__(self):
def __init__(
self,
config: Config,
roots: List[Path],
storages: List[Storage],
notifiers: List[Notifier],
):
self.errors: List[str] = []
self.warnings: List[str] = []
self.successful_backups: List[str] = []
self.config = read_config(CONFIG_PATH)
def _select_storage(self) -> StorageConfig:
if "yandex" in self.config.storage:
return self.config.storage["yandex"]
return next(iter(self.config.storage.values()))
def _select_telegram(self) -> Optional[TelegramConfig]:
if "telegram" in self.config.notifications:
return self.config.notifications["telegram"]
return next(iter(self.config.notifications.values()), None)
self.config = config
self.roots: List[Path] = roots
self.storages = storages
self.notifiers = notifiers
def find_applications(self) -> List[Application]:
"""Get all application directories and their owners."""
applications: List[Application] = []
source_dirs = itertools.chain(*(root.iterdir() for root in self.config.roots))
source_dirs = itertools.chain(*(root.iterdir() for root in self.roots))
for app_dir in source_dirs:
if "lost+found" in str(app_dir):
@@ -296,111 +342,15 @@ class BackupManager:
return backup_dirs
def run_restic_backup(self, backup_dirs: List[str]) -> bool:
"""Run restic backup for all backup directories"""
if not backup_dirs:
logger.warning("No backup directories found")
return True
def send_notification(self, success: bool) -> None:
"""Send notification to Notifiers"""
storage_cfg = self._select_storage()
try:
logger.info("Starting restic backup")
logger.info("Destination: %s", storage_cfg.restic_repository)
# Set environment variables for restic
env = os.environ.copy()
env.update(
{
"RESTIC_REPOSITORY": storage_cfg.restic_repository,
"RESTIC_PASSWORD": storage_cfg.restic_password,
"AWS_ACCESS_KEY_ID": storage_cfg.aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": storage_cfg.aws_secret_access_key,
"AWS_DEFAULT_REGION": storage_cfg.aws_default_region,
}
)
# Run backup
backup_cmd = ["restic", "backup", "--verbose"] + backup_dirs
result = subprocess.run(backup_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Restic backup failed: {result.stderr}"
logger.error(error_msg)
self.errors.append(f"Restic backup: {error_msg}")
return False
logger.info("Restic backup completed successfully")
# Run check
check_cmd = ["restic", "check"]
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Restic check failed: {result.stderr}"
logger.error(error_msg)
self.errors.append(f"Restic check: {error_msg}")
return False
logger.info("Restic check completed successfully")
# Run forget and prune
forget_cmd = [
"restic",
"forget",
"--compact",
"--prune",
"--keep-daily",
"90",
"--keep-monthly",
"36",
]
result = subprocess.run(forget_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Restic forget/prune failed: {result.stderr}"
logger.error(error_msg)
self.errors.append(f"Restic forget/prune: {error_msg}")
return False
logger.info("Restic forget/prune completed successfully")
# Final check
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Final restic check failed: {result.stderr}"
logger.error(error_msg)
self.errors.append(f"Final restic check: {error_msg}")
return False
logger.info("Final restic check completed successfully")
return True
except Exception as e:
error_msg = f"Restic backup process failed: {str(e)}"
logger.error(error_msg)
self.errors.append(f"Restic: {error_msg}")
return False
def send_telegram_notification(self, success: bool) -> None:
"""Send notification to Telegram"""
telegram_cfg = self._select_telegram()
if telegram_cfg is None:
logger.warning("No telegram notification backend configured")
return
try:
if success and not self.errors:
message = (
f"<b>{telegram_cfg.notifications_name}</b>: бекап успешно завершен!"
)
message = f"<b>{self.config.host_name}</b>: бекап успешно завершен!"
if self.successful_backups:
message += (
f"\n\nУспешные бекапы: {', '.join(self.successful_backups)}"
)
message += f"\n\nУспешные бекапы: {', '.join(self.successful_backups)}"
else:
message = f"<b>{telegram_cfg.notifications_name}</b>: бекап завершен с ошибками!"
message = f"<b>{self.config.host_name}</b>: бекап завершен с ошибками!"
if self.successful_backups:
message += (
@@ -413,24 +363,11 @@ class BackupManager:
if self.errors:
message += f"\n\n❌ Ошибки:\n" + "\n".join(self.errors)
url = f"https://api.telegram.org/bot{telegram_cfg.telegram_bot_token}/sendMessage"
data = {
"chat_id": telegram_cfg.telegram_chat_id,
"parse_mode": "HTML",
"text": message,
}
response = requests.post(url, data=data, timeout=30)
if response.status_code == 200:
logger.info("Telegram notification sent successfully")
else:
logger.error(
f"Failed to send Telegram notification: {response.status_code} - {response.text}"
)
for notificator in self.notifiers:
try:
notificator.send(message)
except Exception as e:
logger.error(f"Failed to send Telegram notification: {str(e)}")
logger.error(f"Failed to send notification: {str(e)}")
def run_backup_process(self) -> bool:
"""Main backup process"""
@@ -463,14 +400,18 @@ class BackupManager:
backup_dirs = self.get_backup_directories()
logger.info(f"Found backup directories: {backup_dirs}")
# Run restic backup
restic_success = self.run_restic_backup(backup_dirs)
overall_success = True
for storage in self.storages:
backup_result = storage.backup(backup_dirs)
if not backup_result:
self.errors.append("Restic backup failed")
# Determine overall success
overall_success = restic_success and len(self.errors) == 0
overall_success = overall_success and backup_result
# Send notification
self.send_telegram_notification(overall_success)
self.send_notification(overall_success)
logger.info("Backup process completed")
@@ -485,9 +426,53 @@ class BackupManager:
return True
def initialize(config_path: Path) -> BackupManager:
try:
with config_path.open("rb") as config_file:
raw_config = tomllib.load(config_file)
except OSError as e:
logger.error(f"Failed to read config file {config_path}: {e}")
raise
host_name = str(raw_config.get("host_name", "unknown"))
roots_raw = raw_config.get("roots") or []
if not isinstance(roots_raw, list) or not roots_raw:
raise ValueError("roots must be a non-empty list of paths in config.toml")
roots = [Path(root) for root in roots_raw]
storage_raw = raw_config.get("storage") or {}
storages: List[Storage] = []
for name, params in storage_raw.items():
if not isinstance(params, dict):
raise ValueError(f"Storage config for {name} must be a table")
storage_type = params.get("type", "")
if storage_type == ResticStorage.TYPE_NAME:
storages.append(ResticStorage(name, params))
if not storages:
raise ValueError("At least one storage backend must be configured")
notifications_raw = raw_config.get("notifier") or {}
notifiers: List[Notifier] = []
for name, params in notifications_raw.items():
if not isinstance(params, dict):
raise ValueError(f"Notificator config for {name} must be a table")
notifier_type = params.get("type", "")
if notifier_type == TelegramNotifier.TYPE_NAME:
notifiers.append(TelegramNotifier(name, params))
if not notifiers:
raise ValueError("At least one notification backend must be configured")
config = Config(host_name=host_name, roots=roots)
return BackupManager(
config=config, roots=roots, storages=storages, notifiers=notifiers
)
def main():
try:
backup_manager = BackupManager()
backup_manager = initialize(CONFIG_PATH)
success = backup_manager.run_backup_process()
if not success:
sys.exit(1)

View File

@@ -1,11 +0,0 @@
[restic]
RESTIC_REPOSITORY={{ restic_repository }}
RESTIC_PASSWORD={{ restic_password }}
AWS_ACCESS_KEY_ID={{ restic_s3_access_key }}
AWS_SECRET_ACCESS_KEY={{ restic_s3_access_secret }}
AWS_DEFAULT_REGION={{ restic_s3_region }}
[telegram]
TELEGRAM_BOT_TOKEN={{ notifications_tg_bot_token }}
TELEGRAM_CHAT_ID={{ notifications_tg_chat_id }}
NOTIFICATIONS_NAME={{ notifications_name }}

View File

@@ -1,8 +1,10 @@
host_name = "{{ notifications_name }}"
roots = [
"{{ application_dir }}"
]
[storage.yandex]
[storage.yandex_cloud_s3]
type = "restic"
restic_repository = "{{ restic_repository }}"
restic_password = "{{ restic_password }}"
@@ -10,8 +12,7 @@ aws_access_key_id = "{{ restic_s3_access_key }}"
aws_secret_access_key = "{{ restic_s3_access_secret }}"
aws_default_region = "{{ restic_s3_region }}"
[notifications.telegram]
[notifier.server_notifications_channel]
type = "telegram"
telegram_bot_token = "{{ notifications_tg_bot_token }}"
telegram_chat_id = "{{ notifications_tg_chat_id }}"
notifications_name = "{{ notifications_name }}"