Files
pet-project-server/files/backups/backup-all.py

416 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Backup script for all applications
Automatically discovers and runs backup scripts for all users,
then creates restic backups and sends notifications.
"""
import itertools
import os
import sys
import subprocess
import logging
import pwd
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
import requests
import tomllib
from collections.abc import Iterable
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("/var/log/backup-all.log"),
],
)
logger = logging.getLogger(__name__)
try:
with open("/etc/backup/config.toml", "rb") as config_file:
config = tomllib.load(config_file)
except OSError as e:
logger.error(f"Failed to read config file: {e}")
raise
ROOTS = config.get("roots") or []
storage_cfg = config.get("storage", {}).get("yandex", {})
RESTIC_REPOSITORY = storage_cfg.get("restic_repository")
RESTIC_PASSWORD = storage_cfg.get("restic_password")
AWS_ACCESS_KEY_ID = storage_cfg.get("aws_access_key_id")
AWS_SECRET_ACCESS_KEY = storage_cfg.get("aws_secret_access_key")
AWS_DEFAULT_REGION = storage_cfg.get("aws_default_region")
notifications_cfg = config.get("notifications", {}).get("telegram", {})
TELEGRAM_BOT_TOKEN = notifications_cfg.get("telegram_bot_token")
TELEGRAM_CHAT_ID = notifications_cfg.get("telegram_chat_id")
NOTIFICATIONS_NAME = notifications_cfg.get("notifications_name")
if not isinstance(ROOTS, Iterable) or not ROOTS:
raise ValueError("roots must be a non-empty list of paths in config.toml")
if not all(
[
RESTIC_REPOSITORY,
RESTIC_PASSWORD,
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
AWS_DEFAULT_REGION,
TELEGRAM_BOT_TOKEN,
TELEGRAM_CHAT_ID,
NOTIFICATIONS_NAME,
]
):
raise ValueError("Missing required configuration values in config.toml")
# File name to store directories and files to back up
BACKUP_TARGETS_FILE = "backup-targets"
# Default directory fo backups (relative to app dir)
# Used when backup-targets file not exists
BACKUP_DEFAULT_DIR = "backups"
@dataclass
class Application:
path: Path
owner: str
class BackupManager:
def __init__(self):
self.errors: List[str] = []
self.warnings: List[str] = []
self.successful_backups: List[str] = []
def find_applications(self) -> List[Application]:
"""Get all application directories and their owners."""
applications: List[Application] = []
source_dirs = itertools.chain(*(Path(root).iterdir() for root in ROOTS))
for app_dir in source_dirs:
if "lost+found" in str(app_dir):
continue
if app_dir.is_dir():
try:
stat_info = app_dir.stat()
owner = pwd.getpwuid(stat_info.st_uid).pw_name
applications.append(Application(path=app_dir, owner=owner))
except (KeyError, OSError) as e:
logger.warning(f"Could not get owner for {app_dir}: {e}")
return applications
def find_backup_script(self, app_dir: str) -> Optional[str]:
"""Find backup script in user's home directory"""
possible_scripts = [
os.path.join(app_dir, "backup.sh"),
os.path.join(app_dir, "backup"),
]
for script_path in possible_scripts:
if os.path.exists(script_path):
# Check if file is executable
if os.access(script_path, os.X_OK):
return script_path
else:
logger.warning(
f"Backup script {script_path} exists but is not executable"
)
return None
def run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool:
"""Run backup script as the specified user"""
try:
logger.info(f"Running backup script {script_path} (user {username})")
# Use su to run the script as the user
cmd = ["su", "--login", username, "--command", script_path]
result = subprocess.run(
cmd,
cwd=app_dir,
capture_output=True,
text=True,
timeout=3600, # 1 hour timeout
)
if result.returncode == 0:
logger.info(f"Backup script for {username} completed successfully")
self.successful_backups.append(username)
return True
else:
error_msg = f"Backup script {script_path} failed with return code {result.returncode}"
if result.stderr:
error_msg += f": {result.stderr}"
logger.error(error_msg)
self.errors.append(f"App {username}: {error_msg}")
return False
except subprocess.TimeoutExpired:
error_msg = f"Backup script {script_path} timed out"
logger.error(error_msg)
self.errors.append(f"App {username}: {error_msg}")
return False
except Exception as e:
error_msg = f"Failed to run backup script {script_path}: {str(e)}"
logger.error(error_msg)
self.errors.append(f"App {username}: {error_msg}")
return False
def get_backup_directories(self) -> List[str]:
"""Collect backup targets according to backup-targets rules"""
backup_dirs: List[str] = []
applications = self.find_applications()
def parse_targets_file(targets_file: Path) -> List[str]:
"""Parse backup-targets file, skipping comments and empty lines."""
targets: List[str] = []
try:
for raw_line in targets_file.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
targets.append(line)
except OSError as e:
warning_msg = f"Could not read backup targets file {targets_file}: {e}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
return targets
for app in applications:
app_dir = app.path
targets_file = app_dir / BACKUP_TARGETS_FILE
resolved_targets: List[Path] = []
if targets_file.exists():
# Read custom targets defined by the application.
for target_line in parse_targets_file(targets_file):
target_path = Path(target_line)
if not target_path.is_absolute():
target_path = (app_dir / target_path).resolve()
else:
target_path = target_path.resolve()
if target_path.exists():
resolved_targets.append(target_path)
else:
warning_msg = (
f"Backup target does not exist for {app_dir}: {target_path}"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
else:
# Fallback to default backups directory when no list is provided.
default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve()
if default_target.exists():
resolved_targets.append(default_target)
else:
warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
for target in resolved_targets:
target_str = str(target)
if target_str not in backup_dirs:
backup_dirs.append(target_str)
return backup_dirs
def run_restic_backup(self, backup_dirs: List[str]) -> bool:
"""Run restic backup for all backup directories"""
if not backup_dirs:
logger.warning("No backup directories found")
return True
try:
logger.info("Starting restic backup")
logger.info("Destination: %s", RESTIC_REPOSITORY)
# Set environment variables for restic
env = os.environ.copy()
env.update(
{
"RESTIC_REPOSITORY": RESTIC_REPOSITORY,
"RESTIC_PASSWORD": RESTIC_PASSWORD,
"AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
"AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
"AWS_DEFAULT_REGION": AWS_DEFAULT_REGION,
}
)
# Run backup
backup_cmd = ["restic", "backup", "--verbose"] + backup_dirs
result = subprocess.run(backup_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Restic backup failed: {result.stderr}"
logger.error(error_msg)
self.errors.append(f"Restic backup: {error_msg}")
return False
logger.info("Restic backup completed successfully")
# Run check
check_cmd = ["restic", "check"]
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Restic check failed: {result.stderr}"
logger.error(error_msg)
self.errors.append(f"Restic check: {error_msg}")
return False
logger.info("Restic check completed successfully")
# Run forget and prune
forget_cmd = [
"restic",
"forget",
"--compact",
"--prune",
"--keep-daily",
"90",
"--keep-monthly",
"36",
]
result = subprocess.run(forget_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Restic forget/prune failed: {result.stderr}"
logger.error(error_msg)
self.errors.append(f"Restic forget/prune: {error_msg}")
return False
logger.info("Restic forget/prune completed successfully")
# Final check
result = subprocess.run(check_cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Final restic check failed: {result.stderr}"
logger.error(error_msg)
self.errors.append(f"Final restic check: {error_msg}")
return False
logger.info("Final restic check completed successfully")
return True
except Exception as e:
error_msg = f"Restic backup process failed: {str(e)}"
logger.error(error_msg)
self.errors.append(f"Restic: {error_msg}")
return False
def send_telegram_notification(self, success: bool) -> None:
"""Send notification to Telegram"""
try:
if success and not self.errors:
message = f"<b>{NOTIFICATIONS_NAME}</b>: бекап успешно завершен!"
if self.successful_backups:
message += (
f"\n\nУспешные бекапы: {', '.join(self.successful_backups)}"
)
else:
message = f"<b>{NOTIFICATIONS_NAME}</b>: бекап завершен с ошибками!"
if self.successful_backups:
message += (
f"\n\n✅ Успешные бекапы: {', '.join(self.successful_backups)}"
)
if self.warnings:
message += f"\n\n⚠️ Предупреждения:\n" + "\n".join(self.warnings)
if self.errors:
message += f"\n\n❌ Ошибки:\n" + "\n".join(self.errors)
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {"chat_id": TELEGRAM_CHAT_ID, "parse_mode": "HTML", "text": message}
response = requests.post(url, data=data, timeout=30)
if response.status_code == 200:
logger.info("Telegram notification sent successfully")
else:
logger.error(
f"Failed to send Telegram notification: {response.status_code} - {response.text}"
)
except Exception as e:
logger.error(f"Failed to send Telegram notification: {str(e)}")
def run_backup_process(self) -> bool:
"""Main backup process"""
logger.info("Starting backup process")
# Get all home directories
applications = self.find_applications()
logger.info(f"Found {len(applications)} application directories")
# Process each user's backup
for app in applications:
app_dir = str(app.path)
username = app.owner
logger.info(f"Processing backup for app: {app_dir} (user {username})")
# Find backup script
backup_script = self.find_backup_script(app_dir)
if backup_script is None:
warning_msg = (
f"No backup script found for app: {app_dir} (user {username})"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
continue
self.run_app_backup(backup_script, app_dir, username)
# Get backup directories
backup_dirs = self.get_backup_directories()
logger.info(f"Found backup directories: {backup_dirs}")
# Run restic backup
restic_success = self.run_restic_backup(backup_dirs)
# Determine overall success
overall_success = restic_success and len(self.errors) == 0
# Send notification
self.send_telegram_notification(overall_success)
logger.info("Backup process completed")
if self.errors:
logger.error(f"Backup completed with {len(self.errors)} errors")
return False
elif self.warnings:
logger.warning(f"Backup completed with {len(self.warnings)} warnings")
return True
else:
logger.info("Backup completed successfully")
return True
def main():
try:
backup_manager = BackupManager()
success = backup_manager.run_backup_process()
if not success:
sys.exit(1)
except KeyboardInterrupt:
logger.info("Backup process interrupted by user")
sys.exit(130)
except Exception as e:
logger.error(f"Unexpected error in backup process: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()