Compare commits

...

10 Commits

Author SHA1 Message Date
av 303aefb75f Gitea: update to 1.26.0
Linting / YAML Lint (push) Successful in 12s
Linting / Ansible Lint (push) Failing after 35s
2026-04-19 13:54:55 +03:00
av 22307d81c9 Memos: update to 0.27.1 2026-04-19 13:54:41 +03:00
av cc811f954d Dozzle: update to 10.4.1 2026-04-19 13:54:25 +03:00
av f17c4ac227 backup: error count in title 2026-04-12 18:05:49 +03:00
av 25d20df5a9 backup: notifications as html 2026-04-12 18:03:25 +03:00
av 7e1a8e2e99 backup: method ordering 2026-04-12 18:00:32 +03:00
av b90b87caa1 backup: sort apps 2026-04-12 17:58:03 +03:00
av 75ce60d8a0 backup: extend application with scripts and backup paths 2026-04-12 17:53:17 +03:00
av 0aa34efd00 backup: add application finder class 2026-04-12 17:42:43 +03:00
av b7a18f1296 apps: updates 2026-04-12 17:33:59 +03:00
8 changed files with 171 additions and 167 deletions
+1 -1
View File
@@ -2,7 +2,7 @@ services:
authelia_app: authelia_app:
container_name: 'authelia_app' container_name: 'authelia_app'
image: 'docker.io/authelia/authelia:4.39.16' image: 'docker.io/authelia/authelia:4.39.19'
user: '{{ owner_create_result.uid }}:{{ owner_create_result.group }}' user: '{{ owner_create_result.uid }}:{{ owner_create_result.group }}'
restart: 'unless-stopped' restart: 'unless-stopped'
networks: networks:
+164 -160
View File
@@ -43,13 +43,14 @@ logger = logging.getLogger(__name__)
@dataclass @dataclass
class Config: class Config:
host_name: str host_name: str
roots: List[Path]
@dataclass @dataclass
class Application: class Application:
path: Path path: Path
owner: str owner: str
backup_script: Optional[Path]
backup_targets: List[Path]
class Storage(ABC): class Storage(ABC):
@@ -188,24 +189,13 @@ class AppriseNotifier(Notifier):
) )
class BackupManager: class ApplicationFinder:
def __init__( def __init__(self, roots: List[Path]):
self, self.roots = roots
config: Config,
roots: List[Path],
storages: List[Storage],
notifiers: List[Notifier],
):
self.errors: List[str] = []
self.warnings: List[str] = [] self.warnings: List[str] = []
self.successful_backups: List[str] = []
self.config = config
self.roots: List[Path] = roots
self.storages = storages
self.notifiers = notifiers
def find_applications(self) -> List[Application]: def find_applications(self) -> List[Application]:
"""Get all application directories and their owners.""" """Discover all applications with their backup scripts and targets."""
applications: List[Application] = [] applications: List[Application] = []
source_dirs = itertools.chain(*(root.iterdir() for root in self.roots)) source_dirs = itertools.chain(*(root.iterdir() for root in self.roots))
@@ -216,32 +206,152 @@ class BackupManager:
try: try:
stat_info = app_dir.stat() stat_info = app_dir.stat()
owner = pwd.getpwuid(stat_info.st_uid).pw_name owner = pwd.getpwuid(stat_info.st_uid).pw_name
applications.append(Application(path=app_dir, owner=owner)) backup_script = self._find_backup_script(app_dir)
backup_targets = self._find_backup_targets(app_dir)
applications.append(
Application(
path=app_dir,
owner=owner,
backup_script=backup_script,
backup_targets=backup_targets,
)
)
except (KeyError, OSError) as e: except (KeyError, OSError) as e:
logger.warning(f"Could not get owner for {app_dir}: {e}") logger.warning(f"Could not get owner for {app_dir}: {e}")
applications.sort(key=lambda app: app.path.name)
return applications return applications
def find_backup_script(self, app_dir: str) -> Optional[str]: def _find_backup_script(self, app_dir: Path) -> Optional[Path]:
"""Find backup script in user's home directory""" """Find executable backup script in application directory."""
possible_scripts = [ for name in ("backup.sh", "backup"):
os.path.join(app_dir, "backup.sh"), script_path = app_dir / name
os.path.join(app_dir, "backup"), if script_path.exists():
]
for script_path in possible_scripts:
if os.path.exists(script_path):
# Check if file is executable
if os.access(script_path, os.X_OK): if os.access(script_path, os.X_OK):
return script_path return script_path
else: else:
logger.warning( logger.warning(
f"Backup script {script_path} exists but is not executable" f"Backup script {script_path} exists but is not executable"
) )
return None return None
def run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool: def _find_backup_targets(self, app_dir: Path) -> List[Path]:
"""Resolve backup target directories for an application."""
targets_file = app_dir / BACKUP_TARGETS_FILE
resolved_targets: List[Path] = []
if targets_file.exists():
for target_line in self._parse_targets_file(targets_file):
target_path = Path(target_line)
if not target_path.is_absolute():
target_path = (app_dir / target_path).resolve()
else:
target_path = target_path.resolve()
if target_path.exists():
resolved_targets.append(target_path)
else:
warning_msg = (
f"Backup target does not exist for {app_dir}: {target_path}"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
else:
default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve()
if default_target.exists():
resolved_targets.append(default_target)
else:
warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
return resolved_targets
def _parse_targets_file(self, targets_file: Path) -> List[str]:
"""Parse backup-targets file, skipping comments and empty lines."""
targets: List[str] = []
try:
for raw_line in targets_file.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
targets.append(line)
except OSError as e:
warning_msg = f"Could not read backup targets file {targets_file}: {e}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
return targets
class BackupManager:
def __init__(
self,
config: Config,
storages: List[Storage],
notifiers: List[Notifier],
):
self.errors: List[str] = []
self.warnings: List[str] = []
self.successful_backups: List[str] = []
self.config = config
self.storages = storages
self.notifiers = notifiers
def run_backup_process(self, applications: List[Application]) -> bool:
"""Main backup process"""
logger.info("Starting backup process")
logger.info(f"Found {len(applications)} application directories")
# Process each user's backup
for app in applications:
app_dir = str(app.path)
username = app.owner
logger.info(f"Processing backup for app: {app_dir} (user {username})")
if app.backup_script is None:
warning_msg = (
f"No backup script found for app: {app_dir} (user {username})"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
continue
self._run_app_backup(str(app.backup_script), app_dir, username)
# Collect backup directories from applications
backup_dirs: List[str] = []
for app in applications:
for target in app.backup_targets:
target_str = str(target)
if target_str not in backup_dirs:
backup_dirs.append(target_str)
logger.info(f"Found backup directories: {backup_dirs}")
overall_success = True
for storage in self.storages:
backup_result = storage.backup(backup_dirs)
if not backup_result:
self.errors.append("Restic backup failed")
# Determine overall success
overall_success = overall_success and backup_result
# Send notification
self._send_notification(overall_success)
logger.info("Backup process completed")
if self.errors:
logger.error(f"Backup completed with {len(self.errors)} errors")
return False
elif self.warnings:
logger.warning(f"Backup completed with {len(self.warnings)} warnings")
return True
else:
logger.info("Backup completed successfully")
return True
def _run_app_backup(self, script_path: str, app_dir: str, username: str) -> bool:
"""Run backup script as the specified user""" """Run backup script as the specified user"""
try: try:
logger.info(f"Running backup script {script_path} (user {username})") logger.info(f"Running backup script {script_path} (user {username})")
@@ -280,88 +390,32 @@ class BackupManager:
self.errors.append(f"App {username}: {error_msg}") self.errors.append(f"App {username}: {error_msg}")
return False return False
def get_backup_directories(self) -> List[str]: def _send_notification(self, success: bool) -> None:
"""Collect backup targets according to backup-targets rules"""
backup_dirs: List[str] = []
applications = self.find_applications()
def parse_targets_file(targets_file: Path) -> List[str]:
"""Parse backup-targets file, skipping comments and empty lines."""
targets: List[str] = []
try:
for raw_line in targets_file.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
targets.append(line)
except OSError as e:
warning_msg = f"Could not read backup targets file {targets_file}: {e}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
return targets
for app in applications:
app_dir = app.path
targets_file = app_dir / BACKUP_TARGETS_FILE
resolved_targets: List[Path] = []
if targets_file.exists():
# Read custom targets defined by the application.
for target_line in parse_targets_file(targets_file):
target_path = Path(target_line)
if not target_path.is_absolute():
target_path = (app_dir / target_path).resolve()
else:
target_path = target_path.resolve()
if target_path.exists():
resolved_targets.append(target_path)
else:
warning_msg = (
f"Backup target does not exist for {app_dir}: {target_path}"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
else:
# Fallback to default backups directory when no list is provided.
default_target = (app_dir / BACKUP_DEFAULT_DIR).resolve()
if default_target.exists():
resolved_targets.append(default_target)
else:
warning_msg = f"Default backup path does not exist for {app_dir}: {default_target}"
logger.warning(warning_msg)
self.warnings.append(warning_msg)
for target in resolved_targets:
target_str = str(target)
if target_str not in backup_dirs:
backup_dirs.append(target_str)
return backup_dirs
def send_notification(self, success: bool) -> None:
"""Send notification to Notifiers""" """Send notification to Notifiers"""
host = self.config.host_name host = self.config.host_name
if success and not self.errors: if success and not self.errors:
title = f"{host}: бекап успешно завершен" title = f"{host}: бекап успешно завершен"
message = f"<b>{host}</b>: бекап успешно завершен!" message = f"<p><b>{host}</b>: бекап успешно завершен!</p>"
if self.successful_backups: if self.successful_backups:
message += f"\n\nУспешные бекапы: {', '.join(self.successful_backups)}" items = "".join(f"<li>{b}</li>" for b in self.successful_backups)
message += f"<p>Успешные бекапы:</p><ul>{items}</ul>"
else: else:
title = f"{host}: бекап завершен с ошибками" title = f"{host}: бекап завершен с ошибками ({len(self.errors)})"
message = f"<b>{host}</b>: бекап завершен с ошибками!" message = f"<p><b>{host}</b>: бекап завершен с ошибками!</p>"
if self.successful_backups: if self.successful_backups:
message += ( items = "".join(f"<li>{b}</li>" for b in self.successful_backups)
f"\n\n✅ Успешные бекапы: {', '.join(self.successful_backups)}" message += f"<p>✅ Успешные бекапы:</p><ul>{items}</ul>"
)
if self.warnings: if self.warnings:
message += "\n\n⚠️ Предупреждения:\n" + "\n".join(self.warnings) items = "".join(f"<li>{w}</li>" for w in self.warnings)
message += f"<p>⚠️ Предупреждения:</p><ul>{items}</ul>"
if self.errors: if self.errors:
message += "\n\n❌ Ошибки:\n" + "\n".join(self.errors) items = "".join(f"<li>{e}</li>" for e in self.errors)
message += f"<p>❌ Ошибки:</p><ul>{items}</ul>"
for notificator in self.notifiers: for notificator in self.notifiers:
try: try:
@@ -369,64 +423,10 @@ class BackupManager:
except Exception as e: except Exception as e:
logger.error(f"Failed to send notification: {str(e)}") logger.error(f"Failed to send notification: {str(e)}")
def run_backup_process(self) -> bool:
"""Main backup process"""
logger.info("Starting backup process")
# Get all home directories def initialize(
applications = self.find_applications() config_path: Path,
logger.info(f"Found {len(applications)} application directories") ) -> tuple[ApplicationFinder, BackupManager]:
# Process each user's backup
for app in applications:
app_dir = str(app.path)
username = app.owner
logger.info(f"Processing backup for app: {app_dir} (user {username})")
# Find backup script
backup_script = self.find_backup_script(app_dir)
if backup_script is None:
warning_msg = (
f"No backup script found for app: {app_dir} (user {username})"
)
logger.warning(warning_msg)
self.warnings.append(warning_msg)
continue
self.run_app_backup(backup_script, app_dir, username)
# Get backup directories
backup_dirs = self.get_backup_directories()
logger.info(f"Found backup directories: {backup_dirs}")
overall_success = True
for storage in self.storages:
backup_result = storage.backup(backup_dirs)
if not backup_result:
self.errors.append("Restic backup failed")
# Determine overall success
overall_success = overall_success and backup_result
# Send notification
self.send_notification(overall_success)
logger.info("Backup process completed")
if self.errors:
logger.error(f"Backup completed with {len(self.errors)} errors")
return False
elif self.warnings:
logger.warning(f"Backup completed with {len(self.warnings)} warnings")
return True
else:
logger.info("Backup completed successfully")
return True
def initialize(config_path: Path) -> BackupManager:
try: try:
with config_path.open("rb") as config_file: with config_path.open("rb") as config_file:
raw_config = tomllib.load(config_file) raw_config = tomllib.load(config_file)
@@ -463,17 +463,21 @@ def initialize(config_path: Path) -> BackupManager:
if not notifiers: if not notifiers:
raise ValueError("At least one notification backend must be configured") raise ValueError("At least one notification backend must be configured")
config = Config(host_name=host_name, roots=roots) config = Config(host_name=host_name)
app_finder = ApplicationFinder(roots)
return BackupManager( backup_manager = BackupManager(
config=config, roots=roots, storages=storages, notifiers=notifiers config=config, storages=storages, notifiers=notifiers
) )
return app_finder, backup_manager
def main() -> None: def main() -> None:
try: try:
backup_manager = initialize(CONFIG_PATH) app_finder, backup_manager = initialize(CONFIG_PATH)
success = backup_manager.run_backup_process() applications = app_finder.find_applications()
backup_manager.warnings.extend(app_finder.warnings)
success = backup_manager.run_backup_process(applications)
if not success: if not success:
sys.exit(1) sys.exit(1)
except KeyboardInterrupt: except KeyboardInterrupt:
+1 -1
View File
@@ -1,7 +1,7 @@
services: services:
dozzle_app: dozzle_app:
image: amir20/dozzle:v10.2.1 image: amir20/dozzle:v10.4.1
container_name: dozzle_app container_name: dozzle_app
restart: unless-stopped restart: unless-stopped
volumes: volumes:
+1 -1
View File
@@ -1,7 +1,7 @@
services: services:
gitea_app: gitea_app:
image: gitea/gitea:1.25.5 image: gitea/gitea:1.26.0
restart: unless-stopped restart: unless-stopped
container_name: gitea_app container_name: gitea_app
ports: ports:
+1 -1
View File
@@ -3,7 +3,7 @@
services: services:
gramps_app: &gramps_app gramps_app: &gramps_app
image: ghcr.io/gramps-project/grampsweb:26.4.0 image: ghcr.io/gramps-project/grampsweb:26.4.1
container_name: gramps_app container_name: gramps_app
depends_on: depends_on:
- gramps_redis - gramps_redis
+1 -1
View File
@@ -3,7 +3,7 @@
services: services:
memos_app: memos_app:
image: neosmemo/memos:0.26.2 image: neosmemo/memos:0.27.1
container_name: memos_app container_name: memos_app
restart: unless-stopped restart: unless-stopped
user: "{{ owner_create_result.uid }}:{{ owner_create_result.group }}" user: "{{ owner_create_result.uid }}:{{ owner_create_result.group }}"
+1 -1
View File
@@ -1,7 +1,7 @@
services: services:
netdata: netdata:
image: netdata/netdata:v2.9.0 image: netdata/netdata:v2.10.1
container_name: netdata container_name: netdata
restart: unless-stopped restart: unless-stopped
cap_add: cap_add:
+1 -1
View File
@@ -23,7 +23,7 @@
ansible.builtin.command: ansible.builtin.command:
cmd: > cmd: >
{{ eget_bin_path }} rclone/rclone --quiet --upgrade-only --to {{ eget_install_dir }} --asset zip {{ eget_bin_path }} rclone/rclone --quiet --upgrade-only --to {{ eget_install_dir }} --asset zip
--tag v1.73.2 --tag v1.73.4
changed_when: false changed_when: false
- name: "Install restic" - name: "Install restic"