@@ -10,13 +10,22 @@ import sys
import subprocess
import logging
import pwd
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
from typing import Dict , List , Optional
from typing import Dict , List , Optional , Any
import requests
import tomllib
from collections . abc import Iterable
# Default config path
CONFIG_PATH = Path ( " /etc/backup/config.toml " )
# File name to store directories and files to back up
BACKUP_TARGETS_FILE = " backup-targets "
# Default directory fo backups (relative to app dir)
# Used when backup-targets file not exists
BACKUP_DEFAULT_DIR = " backups "
# Configure logging
logging . basicConfig (
@@ -30,113 +39,10 @@ logging.basicConfig(
logger = logging . getLogger ( __name__ )
@dataclass
class StorageConfig :
type : str
restic_repository : str
restic_password : str
aws_access_key_id : str
aws_secret_access_key : str
aws_default_region : str
@dataclass
class TelegramConfig :
type : str
telegram_bot_token : str
telegram_chat_id : str
notifications_name : str
@dataclass
class Config :
host_name : str
roots : List [ Path ]
storage : Dict [ str , StorageConfig ]
notifications : Dict [ str , TelegramConfig ]
def read_config ( config_path : Path ) - > Config :
try :
with config_path . open ( " rb " ) as config_file :
raw_config = tomllib . load ( config_file )
except OSError as e :
logger . error ( f " Failed to read config file { config_path } : { e } " )
raise
roots_raw = raw_config . get ( " roots " ) or [ ]
if not isinstance ( roots_raw , list ) or not roots_raw :
raise ValueError ( " roots must be a non-empty list of paths in config.toml " )
roots = [ Path ( root ) for root in roots_raw ]
storage_raw = raw_config . get ( " storage " ) or { }
storage : Dict [ str , StorageConfig ] = { }
for name , cfg in storage_raw . items ( ) :
if not isinstance ( cfg , dict ) :
raise ValueError ( f " Storage config for { name } must be a table " )
storage [ name ] = StorageConfig (
type = cfg . get ( " type " , " " ) ,
restic_repository = cfg . get ( " restic_repository " , " " ) ,
restic_password = cfg . get ( " restic_password " , " " ) ,
aws_access_key_id = cfg . get ( " aws_access_key_id " , " " ) ,
aws_secret_access_key = cfg . get ( " aws_secret_access_key " , " " ) ,
aws_default_region = cfg . get ( " aws_default_region " , " " ) ,
)
if not storage :
raise ValueError ( " At least one storage backend must be configured " )
notifications_raw = raw_config . get ( " notifications " ) or { }
notifications : Dict [ str , TelegramConfig ] = { }
for name , cfg in notifications_raw . items ( ) :
if not isinstance ( cfg , dict ) :
raise ValueError ( f " Notification config for { name } must be a table " )
notifications [ name ] = TelegramConfig (
type = cfg . get ( " type " , " " ) ,
telegram_bot_token = cfg . get ( " telegram_bot_token " , " " ) ,
telegram_chat_id = cfg . get ( " telegram_chat_id " , " " ) ,
notifications_name = cfg . get ( " notifications_name " , " " ) ,
)
if not notifications :
raise ValueError ( " At least one notification backend must be configured " )
for name , cfg in storage . items ( ) :
if not all (
[
cfg . type ,
cfg . restic_repository ,
cfg . restic_password ,
cfg . aws_access_key_id ,
cfg . aws_secret_access_key ,
cfg . aws_default_region ,
]
) :
raise ValueError ( f " Missing storage configuration values for backend { name } " )
for name , cfg in notifications . items ( ) :
if not all (
[
cfg . type ,
cfg . telegram_bot_token ,
cfg . telegram_chat_id ,
cfg . notifications_name ,
]
) :
raise ValueError (
f " Missing notification configuration values for backend { name } "
)
return Config ( roots = roots , storage = storage , notifications = notifications )
CONFIG_PATH = Path ( " /etc/backup/config.toml " )
# File name to store directories and files to back up
BACKUP_TARGETS_FILE = " backup-targets "
# Default directory fo backups (relative to app dir)
# Used when backup-targets file not exists
BACKUP_DEFAULT_DIR = " backups "
@dataclass
@@ -145,27 +51,167 @@ class Application:
owner : str
class Storage ( ABC ) :
def backup ( self , backup_dirs : List [ str ] ) - > bool :
""" Backup directories """
raise NotImplementedError ( )
class ResticStorage ( Storage ) :
TYPE_NAME = " restic "
def __init__ ( self , name : str , params : Dict [ str , Any ] ) :
self . name = name
self . restic_repository = str ( params . get ( " restic_repository " , " " ) )
self . restic_password = str ( params . get ( " restic_password " , " " ) )
self . aws_access_key_id = str ( params . get ( " aws_access_key_id " , " " ) )
self . aws_secret_access_key = str ( params . get ( " aws_secret_access_key " , " " ) )
self . aws_default_region = str ( params . get ( " aws_default_region " , " " ) )
if not all (
[
self . restic_repository ,
self . restic_password ,
self . aws_access_key_id ,
self . aws_secret_access_key ,
self . aws_default_region ,
]
) :
raise ValueError (
f " Missing storage configuration values for backend ResticStorage: ' { self . name } ' "
)
def backup ( self , backup_dirs : List [ str ] ) - > bool :
if not backup_dirs :
logger . warning ( " No backup directories found " )
return True
try :
return self . __backup_internal ( backup_dirs )
except Exception as exc : # noqa: BLE001
logger . error ( " Restic backup process failed: %s " , exc )
return False
def __backup_internal ( self , backup_dirs : List [ str ] ) - > bool :
logger . info ( " Starting restic backup " )
logger . info ( " Destination: %s " , self . restic_repository )
env = os . environ . copy ( )
env . update (
{
" RESTIC_REPOSITORY " : self . restic_repository ,
" RESTIC_PASSWORD " : self . restic_password ,
" AWS_ACCESS_KEY_ID " : self . aws_access_key_id ,
" AWS_SECRET_ACCESS_KEY " : self . aws_secret_access_key ,
" AWS_DEFAULT_REGION " : self . aws_default_region ,
}
)
backup_cmd = [ " restic " , " backup " , " --verbose " ] + backup_dirs
result = subprocess . run ( backup_cmd , env = env , capture_output = True , text = True )
if result . returncode != 0 :
logger . error ( " Restic backup failed: %s " , result . stderr )
return False
logger . info ( " Restic backup completed successfully " )
check_cmd = [ " restic " , " check " ]
result = subprocess . run ( check_cmd , env = env , capture_output = True , text = True )
if result . returncode != 0 :
logger . error ( " Restic check failed: %s " , result . stderr )
return False
logger . info ( " Restic check completed successfully " )
forget_cmd = [
" restic " ,
" forget " ,
" --compact " ,
" --prune " ,
" --keep-daily " ,
" 90 " ,
" --keep-monthly " ,
" 36 " ,
]
result = subprocess . run ( forget_cmd , env = env , capture_output = True , text = True )
if result . returncode != 0 :
logger . error ( " Restic forget/prune failed: %s " , result . stderr )
return False
logger . info ( " Restic forget/prune completed successfully " )
result = subprocess . run ( check_cmd , env = env , capture_output = True , text = True )
if result . returncode != 0 :
logger . error ( " Final restic check failed: %s " , result . stderr )
return False
logger . info ( " Final restic check completed successfully " )
return True
class Notifier ( ABC ) :
def send ( self , html_message : str ) :
raise NotImplementedError ( )
class TelegramNotifier ( Notifier ) :
TYPE_NAME = " telegram "
def __init__ ( self , name : str , params : Dict [ str , Any ] ) :
self . name = name
self . telegram_bot_token = str ( params . get ( " telegram_bot_token " , " " ) )
self . telegram_chat_id = str ( params . get ( " telegram_chat_id " , " " ) )
if not all (
[
self . telegram_bot_token ,
self . telegram_chat_id ,
]
) :
raise ValueError (
f " Missing notification configuration values for backend { name } "
)
def send ( self , html_message : str ) :
url = f " https://api.telegram.org/bot { self . telegram_bot_token } /sendMessage "
data = {
" chat_id " : self . telegram_chat_id ,
" parse_mode " : " HTML " ,
" text " : html_message ,
}
response = requests . post ( url , data = data , timeout = 30 )
if response . status_code == 200 :
logger . info ( " Telegram notification sent successfully " )
else :
logger . error (
f " Failed to send Telegram notification: { response . status_code } - { response . text } "
)
class BackupManager :
def __init__ ( self ) :
def __init__ (
self ,
config : Config ,
roots : List [ Path ] ,
storages : List [ Storage ] ,
notifiers : List [ Notifier ] ,
) :
self . errors : List [ str ] = [ ]
self . warnings : List [ str ] = [ ]
self . successful_backups : List [ str ] = [ ]
self . config = read_ config( CONFIG_PATH )
def _select_storage ( self ) - > S torageConfig :
if " yandex " in self . config . storage :
return self . config . storage [ " yandex " ]
return next ( iter ( self . config . storage . values ( ) ) )
def _select_telegram ( self ) - > Optional [ TelegramConfig ] :
if " telegram " in self . config . notifications :
return self . config . notifications [ " telegram " ]
return next ( iter ( self . config . notifications . values ( ) ) , None )
self . config = config
self . roots : List [ Path ] = roots
self . storages = s torages
self . notifiers = notifiers
def find_applications ( self ) - > List [ Application ] :
""" Get all application directories and their owners. """
applications : List [ Application ] = [ ]
source_dirs = itertools . chain ( * ( root . iterdir ( ) for root in self . config . roots) )
source_dirs = itertools . chain ( * ( root . iterdir ( ) for root in self . roots ) )
for app_dir in source_dirs :
if " lost+found " in str ( app_dir ) :
@@ -296,141 +342,32 @@ class BackupManager:
return backup_dirs
def run_restic_backup ( self , backup_dirs : List [ str ] ) - > bool :
""" Run restic backup for all backup director ies"""
if not backup_dirs :
logger . warning ( " No backup directories found " )
return True
def send_notification ( self , success : bool ) - > None :
""" Send notification to Notif ier s"""
storage_cfg = self . _select_storage ( )
if success and not self . errors :
message = f " <b> { self . config . host_name } </b>: бекап успешно завершен! "
if self . successful_backups :
message + = f " \n \n Успешные бекапы: { ' , ' . join ( self . successful_backups ) } "
else :
message = f " <b> { self . config . host_name } </b>: бекап завершен с ошибками! "
try :
logger . info ( " Starting restic backup " )
logger . info ( " Destination: %s " , storage_cfg . restic_repository )
# Set environment variables for restic
env = os . environ . copy ( )
env . update (
{
" RESTIC_REPOSITORY " : storage_cfg . restic_repository ,
" RESTIC_PASSWORD " : storage_cfg . restic_password ,
" AWS_ACCESS_KEY_ID " : storage_cfg . aws_access_key_id ,
" AWS_SECRET_ACCESS_KEY " : storage_cfg . aws_secret_access_key ,
" AWS_DEFAULT_REGION " : storage_cfg . aws_default_region ,
}
)
# Run backup
backup_cmd = [ " restic " , " backup " , " --verbose " ] + backup_dirs
result = subprocess . run ( backup_cmd , env = env , capture_output = True , text = True )
if result . returncode != 0 :
error_msg = f " Restic backup failed: { result . stderr } "
logger . error ( error_msg )
self . errors . append ( f " Restic backup: { error_msg } " )
return False
logger . info ( " Restic backup completed successfully " )
# Run check
check_cmd = [ " restic " , " check " ]
result = subprocess . run ( check_cmd , env = env , capture_output = True , text = True )
if result . returncode != 0 :
error_msg = f " Restic check failed: { result . stderr } "
logger . error ( error_msg )
self . errors . append ( f " Restic check: { error_msg } " )
return False
logger . info ( " Restic check completed successfully " )
# Run forget and prune
forget_cmd = [
" restic " ,
" forget " ,
" --compact " ,
" --prune " ,
" --keep-daily " ,
" 90 " ,
" --keep-monthly " ,
" 36 " ,
]
result = subprocess . run ( forget_cmd , env = env , capture_output = True , text = True )
if result . returncode != 0 :
error_msg = f " Restic forget/prune failed: { result . stderr } "
logger . error ( error_msg )
self . errors . append ( f " Restic forget/prune: { error_msg } " )
return False
logger . info ( " Restic forget/prune completed successfully " )
# Final check
result = subprocess . run ( check_cmd , env = env , capture_output = True , text = True )
if result . returncode != 0 :
error_msg = f " Final restic check failed: { result . stderr } "
logger . error ( error_msg )
self . errors . append ( f " Final restic check: { error_msg } " )
return False
logger . info ( " Final restic check completed successfully " )
return True
except Exception as e :
error_msg = f " Restic backup process failed: { str ( e ) } "
logger . error ( error_msg )
self . errors . append ( f " Restic: { error_msg } " )
return False
def send_telegram_notification ( self , success : bool ) - > None :
""" Send notification to Telegram """
telegram_cfg = self . _select_telegram ( )
if telegram_cfg is None :
logger . warning ( " No telegram notification backend configured " )
return
try :
if success and not self . errors :
message = (
f " <b> { telegram_cfg . notifications_name } </b>: бекап успешно завершен! "
)
if self . successful_backups :
message + = (
f " \n \n Успешные бекапы: { ' , ' . join ( self . successful_backups ) } "
)
else :
message = f " <b> { telegram_cfg . notifications_name } </b>: бекап завершен с ошибками! "
if self . successful_backups :
message + = (
f " \n \n ✅ Успешные бекапы: { ' , ' . join ( self . successful_backups ) } "
)
if self . warnings :
message + = f " \n \n ⚠️ Предупреждения: \n " + " \n " . join ( self . warnings )
if self . errors :
message + = f " \n \n ❌ Ошибки: \n " + " \n " . join ( self . errors )
url = f " https://api.telegram.org/bot { telegram_cfg . telegram_bot_token } /sendMessage "
data = {
" chat_id " : telegram_cfg . telegram_chat_id ,
" parse_mode " : " HTML " ,
" text " : message ,
}
response = requests . post ( url , data = data , timeout = 30 )
if response . status_code == 200 :
logger . info ( " Telegram notification sent successfully " )
else :
logger . error (
f " Failed to send Telegram notification: { response . status_code } - { response . text } "
if self . successful_backups :
message + = (
f " \n \n ✅ Успешные бекапы: { ' , ' . join ( self . successful_backups ) } "
)
except Exception as e :
logger . error ( f " Failed to send Telegram notification: { str ( e ) } " )
if self . warnings :
message + = f " \n \n ⚠️ Предупреждения: \n " + " \n " . join ( self . warnings )
if self . errors :
message + = f " \n \n ❌ Ошибки: \n " + " \n " . join ( self . errors )
for notificator in self . notifiers :
try :
notificator . send ( message )
except Exception as e :
logger . error ( f " Failed to send notification: { str ( e ) } " )
def run_backup_process ( self ) - > bool :
""" Main backup process """
@@ -463,14 +400,18 @@ class BackupManager:
backup_dirs = self . get_backup_directories ( )
logger . info ( f " Found backup directories: { backup_dirs } " )
# Run restic backup
restic_success = self . run_restic_backup ( backup_dirs )
overall_success = True
# Determine overall success
overall_success = restic_success and len ( self . errors ) == 0
for storage in self . storages :
backup_result = storage . backup ( backup_dirs )
if not backup_result :
self . errors . append ( " Restic backup failed " )
# Determine overall success
overall_success = overall_success and backup_result
# Send notification
self . send_telegram_ notification ( overall_success )
self . send_notification ( overall_success )
logger . info ( " Backup process completed " )
@@ -485,9 +426,53 @@ class BackupManager:
return True
def initialize ( config_path : Path ) - > BackupManager :
try :
with config_path . open ( " rb " ) as config_file :
raw_config = tomllib . load ( config_file )
except OSError as e :
logger . error ( f " Failed to read config file { config_path } : { e } " )
raise
host_name = str ( raw_config . get ( " host_name " , " unknown " ) )
roots_raw = raw_config . get ( " roots " ) or [ ]
if not isinstance ( roots_raw , list ) or not roots_raw :
raise ValueError ( " roots must be a non-empty list of paths in config.toml " )
roots = [ Path ( root ) for root in roots_raw ]
storage_raw = raw_config . get ( " storage " ) or { }
storages : List [ Storage ] = [ ]
for name , params in storage_raw . items ( ) :
if not isinstance ( params , dict ) :
raise ValueError ( f " Storage config for { name } must be a table " )
storage_type = params . get ( " type " , " " )
if storage_type == ResticStorage . TYPE_NAME :
storages . append ( ResticStorage ( name , params ) )
if not storages :
raise ValueError ( " At least one storage backend must be configured " )
notifications_raw = raw_config . get ( " notifier " ) or { }
notifiers : List [ Notifier ] = [ ]
for name , params in notifications_raw . items ( ) :
if not isinstance ( params , dict ) :
raise ValueError ( f " Notificator config for { name } must be a table " )
notifier_type = params . get ( " type " , " " )
if notifier_type == TelegramNotifier . TYPE_NAME :
notifiers . append ( TelegramNotifier ( name , params ) )
if not notifiers :
raise ValueError ( " At least one notification backend must be configured " )
config = Config ( host_name = host_name , roots = roots )
return BackupManager (
config = config , roots = roots , storages = storages , notifiers = notifiers
)
def main ( ) :
try :
backup_manager = BackupManager ( )
backup_manager = initialize ( CONFIG_PATH )
success = backup_manager . run_backup_process ( )
if not success :
sys . exit ( 1 )