backup: extend application with scripts and backup paths
This commit is contained in:
+83
-85
@@ -49,6 +49,8 @@ class Config:
|
||||
class Application:
|
||||
path: Path
|
||||
owner: str
|
||||
backup_script: Optional[Path]
|
||||
backup_targets: List[Path]
|
||||
|
||||
|
||||
class Storage(ABC):
|
||||
@@ -190,9 +192,10 @@ class AppriseNotifier(Notifier):
|
||||
class ApplicationFinder:
|
||||
def __init__(self, roots: List[Path]):
|
||||
self.roots = roots
|
||||
self.warnings: List[str] = []
|
||||
|
||||
def find_applications(self) -> List[Application]:
|
||||
"""Get all application directories and their owners."""
|
||||
"""Discover all applications with their backup scripts and targets."""
|
||||
applications: List[Application] = []
|
||||
source_dirs = itertools.chain(*(root.iterdir() for root in self.roots))
|
||||
|
||||
@@ -203,12 +206,80 @@ class ApplicationFinder:
|
||||
try:
|
||||
stat_info = app_dir.stat()
|
||||
owner = pwd.getpwuid(stat_info.st_uid).pw_name
|
||||
applications.append(Application(path=app_dir, owner=owner))
|
||||
backup_script = self._find_backup_script(app_dir)
|
||||
backup_targets = self._find_backup_targets(app_dir)
|
||||
applications.append(
|
||||
Application(
|
||||
path=app_dir,
|
||||
owner=owner,
|
||||
backup_script=backup_script,
|
||||
backup_targets=backup_targets,
|
||||
)
|
||||
)
|
||||
except (KeyError, OSError) as e:
|
||||
logger.warning(f"Could not get owner for {app_dir}: {e}")
|
||||
|
||||
return applications
|
||||
|
||||
def _find_backup_script(self, app_dir: Path) -> Optional[Path]:
|
||||
"""Find executable backup script in application directory."""
|
||||
for name in ("backup.sh", "backup"):
|
||||
script_path = app_dir / name
|
||||
if script_path.exists():
|
||||
if os.access(script_path, os.X_OK):
|
||||
return script_path
|
||||
else:
|
||||
logger.warning(
|
||||
f"Backup script {script_path} exists but is not executable"
|
||||
)
|
||||
return None
|
||||
|
||||
def _find_backup_targets(self, app_dir: Path) -> List[Path]:
|
||||
"""Resolve backup target directories for an application."""
|
||||
targets_file = app_dir / BACKUP_TARGETS_FILE
|
||||
resolved_targets: List[Path] = []
|
||||
|
||||
if targets_file.exists():
|
||||
for target_line in self._parse_targets_file(targets_file):
|
||||
target_path = Path(target_line)
|
||||
if not target_path.is_absolute():
|
||||
target_path = (app_dir / target_path).resolve()
|
||||
else:
|
||||
target_path = target_path.resolve()
|
||||
if target_path.exists():
|
||||
resolved_targets.append(target_path)
|
||||
else:
|
||||
warning_msg = (
|
||||
f"Backup target does not exist for {app_dir}: {target_path}"
|
||||
)
|
||||
logger.warning(warning_msg)
|
||||
self.warnings.append(warning_msg)
|
||||
else:
|
||||
default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve()
|
||||
if default_target.exists():
|
||||
resolved_targets.append(default_target)
|
||||
else:
|
||||
warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}"
|
||||
logger.warning(warning_msg)
|
||||
self.warnings.append(warning_msg)
|
||||
|
||||
return resolved_targets
|
||||
|
||||
def _parse_targets_file(self, targets_file: Path) -> List[str]:
|
||||
"""Parse backup-targets file, skipping comments and empty lines."""
|
||||
targets: List[str] = []
|
||||
try:
|
||||
for raw_line in targets_file.read_text(encoding="utf-8").splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
targets.append(line)
|
||||
except OSError as e:
|
||||
warning_msg = f"Could not read backup targets file {targets_file}: {e}"
|
||||
logger.warning(warning_msg)
|
||||
self.warnings.append(warning_msg)
|
||||
return targets
|
||||
|
||||
|
||||
class BackupManager:
|
||||
def __init__(
|
||||
@@ -224,25 +295,6 @@ class BackupManager:
|
||||
self.storages = storages
|
||||
self.notifiers = notifiers
|
||||
|
||||
def find_backup_script(self, app_dir: str) -> Optional[str]:
|
||||
"""Find backup script in user's home directory"""
|
||||
possible_scripts = [
|
||||
os.path.join(app_dir, "backup.sh"),
|
||||
os.path.join(app_dir, "backup"),
|
||||
]
|
||||
|
||||
for script_path in possible_scripts:
|
||||
if os.path.exists(script_path):
|
||||
# Check if file is executable
|
||||
if os.access(script_path, os.X_OK):
|
||||
return script_path
|
||||
else:
|
||||
logger.warning(
|
||||
f"Backup script {script_path} exists but is not executable"
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool:
|
||||
"""Run backup script as the specified user"""
|
||||
try:
|
||||
@@ -282,63 +334,6 @@ class BackupManager:
|
||||
self.errors.append(f"App {username}: {error_msg}")
|
||||
return False
|
||||
|
||||
def get_backup_directories(self, applications: List[Application]) -> List[str]:
|
||||
"""Collect backup targets according to backup-targets rules"""
|
||||
backup_dirs: List[str] = []
|
||||
|
||||
def parse_targets_file(targets_file: Path) -> List[str]:
|
||||
"""Parse backup-targets file, skipping comments and empty lines."""
|
||||
targets: List[str] = []
|
||||
try:
|
||||
for raw_line in targets_file.read_text(encoding="utf-8").splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
targets.append(line)
|
||||
except OSError as e:
|
||||
warning_msg = f"Could not read backup targets file {targets_file}: {e}"
|
||||
logger.warning(warning_msg)
|
||||
self.warnings.append(warning_msg)
|
||||
return targets
|
||||
|
||||
for app in applications:
|
||||
app_dir = app.path
|
||||
targets_file = app_dir / BACKUP_TARGETS_FILE
|
||||
resolved_targets: List[Path] = []
|
||||
|
||||
if targets_file.exists():
|
||||
# Read custom targets defined by the application.
|
||||
for target_line in parse_targets_file(targets_file):
|
||||
target_path = Path(target_line)
|
||||
if not target_path.is_absolute():
|
||||
target_path = (app_dir / target_path).resolve()
|
||||
else:
|
||||
target_path = target_path.resolve()
|
||||
if target_path.exists():
|
||||
resolved_targets.append(target_path)
|
||||
else:
|
||||
warning_msg = (
|
||||
f"Backup target does not exist for {app_dir}: {target_path}"
|
||||
)
|
||||
logger.warning(warning_msg)
|
||||
self.warnings.append(warning_msg)
|
||||
else:
|
||||
# Fallback to default backups directory when no list is provided.
|
||||
default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve()
|
||||
if default_target.exists():
|
||||
resolved_targets.append(default_target)
|
||||
else:
|
||||
warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}"
|
||||
logger.warning(warning_msg)
|
||||
self.warnings.append(warning_msg)
|
||||
|
||||
for target in resolved_targets:
|
||||
target_str = str(target)
|
||||
if target_str not in backup_dirs:
|
||||
backup_dirs.append(target_str)
|
||||
|
||||
return backup_dirs
|
||||
|
||||
def send_notification(self, success: bool) -> None:
|
||||
"""Send notification to Notifiers"""
|
||||
|
||||
@@ -381,10 +376,7 @@ class BackupManager:
|
||||
username = app.owner
|
||||
logger.info(f"Processing backup for app: {app_dir} (user {username})")
|
||||
|
||||
# Find backup script
|
||||
backup_script = self.find_backup_script(app_dir)
|
||||
|
||||
if backup_script is None:
|
||||
if app.backup_script is None:
|
||||
warning_msg = (
|
||||
f"No backup script found for app: {app_dir} (user {username})"
|
||||
)
|
||||
@@ -392,10 +384,15 @@ class BackupManager:
|
||||
self.warnings.append(warning_msg)
|
||||
continue
|
||||
|
||||
self.run_app_backup(backup_script, app_dir, username)
|
||||
self.run_app_backup(str(app.backup_script), app_dir, username)
|
||||
|
||||
# Get backup directories
|
||||
backup_dirs = self.get_backup_directories(applications)
|
||||
# Collect backup directories from applications
|
||||
backup_dirs: List[str] = []
|
||||
for app in applications:
|
||||
for target in app.backup_targets:
|
||||
target_str = str(target)
|
||||
if target_str not in backup_dirs:
|
||||
backup_dirs.append(target_str)
|
||||
logger.info(f"Found backup directories: {backup_dirs}")
|
||||
|
||||
overall_success = True
|
||||
@@ -476,6 +473,7 @@ def main() -> None:
|
||||
try:
|
||||
app_finder, backup_manager = initialize(CONFIG_PATH)
|
||||
applications = app_finder.find_applications()
|
||||
backup_manager.warnings.extend(app_finder.warnings)
|
||||
success = backup_manager.run_backup_process(applications)
|
||||
if not success:
|
||||
sys.exit(1)
|
||||
|
||||
Reference in New Issue
Block a user