504 lines
18 KiB
Python
504 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Backup script for all applications
|
||
Automatically discovers and runs backup scripts for all users,
|
||
then creates restic backups and sends notifications.
|
||
"""
|
||
import itertools
|
||
import os
|
||
import sys
|
||
import subprocess
|
||
import logging
|
||
import pwd
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional
|
||
import requests
|
||
import tomllib
|
||
from collections.abc import Iterable
|
||
|
||
|
||
# Configure logging
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||
handlers=[
|
||
logging.StreamHandler(sys.stdout),
|
||
logging.FileHandler("/var/log/backup-all.log"),
|
||
],
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class StorageConfig:
|
||
type: str
|
||
restic_repository: str
|
||
restic_password: str
|
||
aws_access_key_id: str
|
||
aws_secret_access_key: str
|
||
aws_default_region: str
|
||
|
||
|
||
@dataclass
|
||
class TelegramConfig:
|
||
type: str
|
||
telegram_bot_token: str
|
||
telegram_chat_id: str
|
||
notifications_name: str
|
||
|
||
|
||
@dataclass
|
||
class Config:
|
||
roots: List[Path]
|
||
storage: Dict[str, StorageConfig]
|
||
notifications: Dict[str, TelegramConfig]
|
||
|
||
|
||
def read_config(config_path: Path) -> Config:
|
||
try:
|
||
with config_path.open("rb") as config_file:
|
||
raw_config = tomllib.load(config_file)
|
||
except OSError as e:
|
||
logger.error(f"Failed to read config file {config_path}: {e}")
|
||
raise
|
||
|
||
roots_raw = raw_config.get("roots") or []
|
||
if not isinstance(roots_raw, list) or not roots_raw:
|
||
raise ValueError("roots must be a non-empty list of paths in config.toml")
|
||
roots = [Path(root) for root in roots_raw]
|
||
|
||
storage_raw = raw_config.get("storage") or {}
|
||
storage: Dict[str, StorageConfig] = {}
|
||
for name, cfg in storage_raw.items():
|
||
if not isinstance(cfg, dict):
|
||
raise ValueError(f"Storage config for {name} must be a table")
|
||
storage[name] = StorageConfig(
|
||
type=cfg.get("type", ""),
|
||
restic_repository=cfg.get("restic_repository", ""),
|
||
restic_password=cfg.get("restic_password", ""),
|
||
aws_access_key_id=cfg.get("aws_access_key_id", ""),
|
||
aws_secret_access_key=cfg.get("aws_secret_access_key", ""),
|
||
aws_default_region=cfg.get("aws_default_region", ""),
|
||
)
|
||
|
||
if not storage:
|
||
raise ValueError("At least one storage backend must be configured")
|
||
|
||
notifications_raw = raw_config.get("notifications") or {}
|
||
notifications: Dict[str, TelegramConfig] = {}
|
||
for name, cfg in notifications_raw.items():
|
||
if not isinstance(cfg, dict):
|
||
raise ValueError(f"Notification config for {name} must be a table")
|
||
notifications[name] = TelegramConfig(
|
||
type=cfg.get("type", ""),
|
||
telegram_bot_token=cfg.get("telegram_bot_token", ""),
|
||
telegram_chat_id=cfg.get("telegram_chat_id", ""),
|
||
notifications_name=cfg.get("notifications_name", ""),
|
||
)
|
||
|
||
if not notifications:
|
||
raise ValueError("At least one notification backend must be configured")
|
||
|
||
for name, cfg in storage.items():
|
||
if not all(
|
||
[
|
||
cfg.type,
|
||
cfg.restic_repository,
|
||
cfg.restic_password,
|
||
cfg.aws_access_key_id,
|
||
cfg.aws_secret_access_key,
|
||
cfg.aws_default_region,
|
||
]
|
||
):
|
||
raise ValueError(f"Missing storage configuration values for backend {name}")
|
||
|
||
for name, cfg in notifications.items():
|
||
if not all(
|
||
[
|
||
cfg.type,
|
||
cfg.telegram_bot_token,
|
||
cfg.telegram_chat_id,
|
||
cfg.notifications_name,
|
||
]
|
||
):
|
||
raise ValueError(
|
||
f"Missing notification configuration values for backend {name}"
|
||
)
|
||
|
||
return Config(roots=roots, storage=storage, notifications=notifications)
|
||
|
||
|
||
CONFIG_PATH = Path("/etc/backup/config.toml")
|
||
|
||
# File name to store directories and files to back up
|
||
BACKUP_TARGETS_FILE = "backup-targets"
|
||
|
||
# Default directory fo backups (relative to app dir)
|
||
# Used when backup-targets file not exists
|
||
BACKUP_DEFAULT_DIR = "backups"
|
||
|
||
|
||
@dataclass
|
||
class Application:
|
||
path: Path
|
||
owner: str
|
||
|
||
|
||
class BackupManager:
|
||
def __init__(self):
|
||
self.errors: List[str] = []
|
||
self.warnings: List[str] = []
|
||
self.successful_backups: List[str] = []
|
||
self.config = read_config(CONFIG_PATH)
|
||
|
||
def _select_storage(self) -> StorageConfig:
|
||
if "yandex" in self.config.storage:
|
||
return self.config.storage["yandex"]
|
||
return next(iter(self.config.storage.values()))
|
||
|
||
def _select_telegram(self) -> Optional[TelegramConfig]:
|
||
if "telegram" in self.config.notifications:
|
||
return self.config.notifications["telegram"]
|
||
return next(iter(self.config.notifications.values()), None)
|
||
|
||
def find_applications(self) -> List[Application]:
|
||
"""Get all application directories and their owners."""
|
||
applications: List[Application] = []
|
||
source_dirs = itertools.chain(*(root.iterdir() for root in self.config.roots))
|
||
|
||
for app_dir in source_dirs:
|
||
if "lost+found" in str(app_dir):
|
||
continue
|
||
if app_dir.is_dir():
|
||
try:
|
||
stat_info = app_dir.stat()
|
||
owner = pwd.getpwuid(stat_info.st_uid).pw_name
|
||
applications.append(Application(path=app_dir, owner=owner))
|
||
except (KeyError, OSError) as e:
|
||
logger.warning(f"Could not get owner for {app_dir}: {e}")
|
||
|
||
return applications
|
||
|
||
def find_backup_script(self, app_dir: str) -> Optional[str]:
|
||
"""Find backup script in user's home directory"""
|
||
possible_scripts = [
|
||
os.path.join(app_dir, "backup.sh"),
|
||
os.path.join(app_dir, "backup"),
|
||
]
|
||
|
||
for script_path in possible_scripts:
|
||
if os.path.exists(script_path):
|
||
# Check if file is executable
|
||
if os.access(script_path, os.X_OK):
|
||
return script_path
|
||
else:
|
||
logger.warning(
|
||
f"Backup script {script_path} exists but is not executable"
|
||
)
|
||
|
||
return None
|
||
|
||
def run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool:
|
||
"""Run backup script as the specified user"""
|
||
try:
|
||
logger.info(f"Running backup script {script_path} (user {username})")
|
||
|
||
# Use su to run the script as the user
|
||
cmd = ["su", "--login", username, "--command", script_path]
|
||
|
||
result = subprocess.run(
|
||
cmd,
|
||
cwd=app_dir,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=3600, # 1 hour timeout
|
||
)
|
||
|
||
if result.returncode == 0:
|
||
logger.info(f"Backup script for {username} completed successfully")
|
||
self.successful_backups.append(username)
|
||
return True
|
||
else:
|
||
error_msg = f"Backup script {script_path} failed with return code {result.returncode}"
|
||
if result.stderr:
|
||
error_msg += f": {result.stderr}"
|
||
logger.error(error_msg)
|
||
self.errors.append(f"App {username}: {error_msg}")
|
||
return False
|
||
|
||
except subprocess.TimeoutExpired:
|
||
error_msg = f"Backup script {script_path} timed out"
|
||
logger.error(error_msg)
|
||
self.errors.append(f"App {username}: {error_msg}")
|
||
return False
|
||
except Exception as e:
|
||
error_msg = f"Failed to run backup script {script_path}: {str(e)}"
|
||
logger.error(error_msg)
|
||
self.errors.append(f"App {username}: {error_msg}")
|
||
return False
|
||
|
||
def get_backup_directories(self) -> List[str]:
|
||
"""Collect backup targets according to backup-targets rules"""
|
||
backup_dirs: List[str] = []
|
||
applications = self.find_applications()
|
||
|
||
def parse_targets_file(targets_file: Path) -> List[str]:
|
||
"""Parse backup-targets file, skipping comments and empty lines."""
|
||
targets: List[str] = []
|
||
try:
|
||
for raw_line in targets_file.read_text(encoding="utf-8").splitlines():
|
||
line = raw_line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
targets.append(line)
|
||
except OSError as e:
|
||
warning_msg = f"Could not read backup targets file {targets_file}: {e}"
|
||
logger.warning(warning_msg)
|
||
self.warnings.append(warning_msg)
|
||
return targets
|
||
|
||
for app in applications:
|
||
app_dir = app.path
|
||
targets_file = app_dir / BACKUP_TARGETS_FILE
|
||
resolved_targets: List[Path] = []
|
||
|
||
if targets_file.exists():
|
||
# Read custom targets defined by the application.
|
||
for target_line in parse_targets_file(targets_file):
|
||
target_path = Path(target_line)
|
||
if not target_path.is_absolute():
|
||
target_path = (app_dir / target_path).resolve()
|
||
else:
|
||
target_path = target_path.resolve()
|
||
if target_path.exists():
|
||
resolved_targets.append(target_path)
|
||
else:
|
||
warning_msg = (
|
||
f"Backup target does not exist for {app_dir}: {target_path}"
|
||
)
|
||
logger.warning(warning_msg)
|
||
self.warnings.append(warning_msg)
|
||
else:
|
||
# Fallback to default backups directory when no list is provided.
|
||
default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve()
|
||
if default_target.exists():
|
||
resolved_targets.append(default_target)
|
||
else:
|
||
warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}"
|
||
logger.warning(warning_msg)
|
||
self.warnings.append(warning_msg)
|
||
|
||
for target in resolved_targets:
|
||
target_str = str(target)
|
||
if target_str not in backup_dirs:
|
||
backup_dirs.append(target_str)
|
||
|
||
return backup_dirs
|
||
|
||
def run_restic_backup(self, backup_dirs: List[str]) -> bool:
|
||
"""Run restic backup for all backup directories"""
|
||
if not backup_dirs:
|
||
logger.warning("No backup directories found")
|
||
return True
|
||
|
||
storage_cfg = self._select_storage()
|
||
|
||
try:
|
||
logger.info("Starting restic backup")
|
||
logger.info("Destination: %s", storage_cfg.restic_repository)
|
||
|
||
# Set environment variables for restic
|
||
env = os.environ.copy()
|
||
env.update(
|
||
{
|
||
"RESTIC_REPOSITORY": storage_cfg.restic_repository,
|
||
"RESTIC_PASSWORD": storage_cfg.restic_password,
|
||
"AWS_ACCESS_KEY_ID": storage_cfg.aws_access_key_id,
|
||
"AWS_SECRET_ACCESS_KEY": storage_cfg.aws_secret_access_key,
|
||
"AWS_DEFAULT_REGION": storage_cfg.aws_default_region,
|
||
}
|
||
)
|
||
|
||
# Run backup
|
||
backup_cmd = ["restic", "backup", "--verbose"] + backup_dirs
|
||
result = subprocess.run(backup_cmd, env=env, capture_output=True, text=True)
|
||
|
||
if result.returncode != 0:
|
||
error_msg = f"Restic backup failed: {result.stderr}"
|
||
logger.error(error_msg)
|
||
self.errors.append(f"Restic backup: {error_msg}")
|
||
return False
|
||
|
||
logger.info("Restic backup completed successfully")
|
||
|
||
# Run check
|
||
check_cmd = ["restic", "check"]
|
||
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
|
||
|
||
if result.returncode != 0:
|
||
error_msg = f"Restic check failed: {result.stderr}"
|
||
logger.error(error_msg)
|
||
self.errors.append(f"Restic check: {error_msg}")
|
||
return False
|
||
|
||
logger.info("Restic check completed successfully")
|
||
|
||
# Run forget and prune
|
||
forget_cmd = [
|
||
"restic",
|
||
"forget",
|
||
"--compact",
|
||
"--prune",
|
||
"--keep-daily",
|
||
"90",
|
||
"--keep-monthly",
|
||
"36",
|
||
]
|
||
result = subprocess.run(forget_cmd, env=env, capture_output=True, text=True)
|
||
|
||
if result.returncode != 0:
|
||
error_msg = f"Restic forget/prune failed: {result.stderr}"
|
||
logger.error(error_msg)
|
||
self.errors.append(f"Restic forget/prune: {error_msg}")
|
||
return False
|
||
|
||
logger.info("Restic forget/prune completed successfully")
|
||
|
||
# Final check
|
||
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
|
||
|
||
if result.returncode != 0:
|
||
error_msg = f"Final restic check failed: {result.stderr}"
|
||
logger.error(error_msg)
|
||
self.errors.append(f"Final restic check: {error_msg}")
|
||
return False
|
||
|
||
logger.info("Final restic check completed successfully")
|
||
return True
|
||
|
||
except Exception as e:
|
||
error_msg = f"Restic backup process failed: {str(e)}"
|
||
logger.error(error_msg)
|
||
self.errors.append(f"Restic: {error_msg}")
|
||
return False
|
||
|
||
def send_telegram_notification(self, success: bool) -> None:
|
||
"""Send notification to Telegram"""
|
||
telegram_cfg = self._select_telegram()
|
||
if telegram_cfg is None:
|
||
logger.warning("No telegram notification backend configured")
|
||
return
|
||
|
||
try:
|
||
if success and not self.errors:
|
||
message = (
|
||
f"<b>{telegram_cfg.notifications_name}</b>: бекап успешно завершен!"
|
||
)
|
||
if self.successful_backups:
|
||
message += (
|
||
f"\n\nУспешные бекапы: {', '.join(self.successful_backups)}"
|
||
)
|
||
else:
|
||
message = f"<b>{telegram_cfg.notifications_name}</b>: бекап завершен с ошибками!"
|
||
|
||
if self.successful_backups:
|
||
message += (
|
||
f"\n\n✅ Успешные бекапы: {', '.join(self.successful_backups)}"
|
||
)
|
||
|
||
if self.warnings:
|
||
message += f"\n\n⚠️ Предупреждения:\n" + "\n".join(self.warnings)
|
||
|
||
if self.errors:
|
||
message += f"\n\n❌ Ошибки:\n" + "\n".join(self.errors)
|
||
|
||
url = f"https://api.telegram.org/bot{telegram_cfg.telegram_bot_token}/sendMessage"
|
||
data = {
|
||
"chat_id": telegram_cfg.telegram_chat_id,
|
||
"parse_mode": "HTML",
|
||
"text": message,
|
||
}
|
||
|
||
response = requests.post(url, data=data, timeout=30)
|
||
|
||
if response.status_code == 200:
|
||
logger.info("Telegram notification sent successfully")
|
||
else:
|
||
logger.error(
|
||
f"Failed to send Telegram notification: {response.status_code} - {response.text}"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to send Telegram notification: {str(e)}")
|
||
|
||
def run_backup_process(self) -> bool:
|
||
"""Main backup process"""
|
||
logger.info("Starting backup process")
|
||
|
||
# Get all home directories
|
||
applications = self.find_applications()
|
||
logger.info(f"Found {len(applications)} application directories")
|
||
|
||
# Process each user's backup
|
||
for app in applications:
|
||
app_dir = str(app.path)
|
||
username = app.owner
|
||
logger.info(f"Processing backup for app: {app_dir} (user {username})")
|
||
|
||
# Find backup script
|
||
backup_script = self.find_backup_script(app_dir)
|
||
|
||
if backup_script is None:
|
||
warning_msg = (
|
||
f"No backup script found for app: {app_dir} (user {username})"
|
||
)
|
||
logger.warning(warning_msg)
|
||
self.warnings.append(warning_msg)
|
||
continue
|
||
|
||
self.run_app_backup(backup_script, app_dir, username)
|
||
|
||
# Get backup directories
|
||
backup_dirs = self.get_backup_directories()
|
||
logger.info(f"Found backup directories: {backup_dirs}")
|
||
|
||
# Run restic backup
|
||
restic_success = self.run_restic_backup(backup_dirs)
|
||
|
||
# Determine overall success
|
||
overall_success = restic_success and len(self.errors) == 0
|
||
|
||
# Send notification
|
||
self.send_telegram_notification(overall_success)
|
||
|
||
logger.info("Backup process completed")
|
||
|
||
if self.errors:
|
||
logger.error(f"Backup completed with {len(self.errors)} errors")
|
||
return False
|
||
elif self.warnings:
|
||
logger.warning(f"Backup completed with {len(self.warnings)} warnings")
|
||
return True
|
||
else:
|
||
logger.info("Backup completed successfully")
|
||
return True
|
||
|
||
|
||
def main():
|
||
try:
|
||
backup_manager = BackupManager()
|
||
success = backup_manager.run_backup_process()
|
||
if not success:
|
||
sys.exit(1)
|
||
except KeyboardInterrupt:
|
||
logger.info("Backup process interrupted by user")
|
||
sys.exit(130)
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error in backup process: {str(e)}")
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|