Files
filemover/main.go

219 lines
5.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"errors"
"fmt"
"io/fs"
"log"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"
"github.com/adrg/xdg"
"github.com/fsnotify/fsnotify"
)
const fileQueueLen = 100
const fileProcessingDelay = 500 * time.Millisecond
const moveAttempts = 100
func main() {
// Проверка аргументов командной строки
if len(os.Args) < 3 {
log.Fatalf("Usage: %s <watch_dir> <dest_dir>", os.Args[0])
}
watchDir := os.Args[1]
destDir := os.Args[2]
// Проверка существования watchDir
if _, err := os.Stat(watchDir); os.IsNotExist(err) {
log.Fatalf("Watch directory does not exist: %s", watchDir)
} else if err != nil {
log.Fatalf("Error accessing watch directory: %v", err)
}
// Создание destDir если не существует
if err := os.MkdirAll(destDir, 0755); err != nil {
log.Fatalf("Failed to create destination directory: %v", err)
}
counterFile, err := xdg.DataFile("filemover/counter")
if err != nil {
log.Fatalf("Application data dir not accessible, %v", err)
}
// Контекст для graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Канал для задач (буфер для имен файлов)
tasks := make(chan string, fileQueueLen)
// WaitGroup для отслеживания завершения воркера
var wg sync.WaitGroup
wg.Add(1)
// Запуск единственного обработчика
go func() {
defer wg.Done()
counter := loadCounter(counterFile)
for {
select {
case file, ok := <-tasks:
if !ok {
// Канал закрыт, завершаем работу
saveCounter(counterFile, counter)
log.Println("Worker stopped")
return
}
processFile(file, destDir, counterFile, &counter)
case <-ctx.Done():
// Получен сигнал завершения
saveCounter(counterFile, counter)
log.Println("Worker stopped by context")
return
}
}
}()
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatalf("Watcher creation failed: %v", err)
}
defer watcher.Close()
if err := watcher.Add(watchDir); err != nil {
log.Fatalf("Failed to add watch directory: %v", err)
}
// Канал для системных сигналов
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Главный цикл обработки событий
log.Println("Monitoring started. Press Ctrl+C to stop...")
mainLoop:
for {
select {
case event, ok := <-watcher.Events:
if !ok {
break mainLoop
}
if event.Op&fsnotify.Create == fsnotify.Create {
select {
case tasks <- event.Name:
// Файл добавлен в очередь
case <-ctx.Done():
break mainLoop
default:
log.Println("Task queue full, skipping file:", event.Name)
}
}
case err, ok := <-watcher.Errors:
if !ok {
break mainLoop
}
log.Println("Watcher error:", err)
case sig := <-sigChan:
log.Printf("Received signal: %v. Shutting down...", sig)
cancel() // Уведомляем воркер о завершении
break mainLoop
case <-ctx.Done():
break mainLoop
}
}
// Процесс graceful shutdown:
// 1. Останавливаем прием новых файлов
// 2. Закрываем канал задач
// 3. Ждем завершения воркера
log.Println("Shutting down...")
close(tasks) // Закрываем канал для воркера
wg.Wait() // Ожидаем завершения обработки текущего файла
log.Println("Shutdown complete")
}
func processFile(filePath, destDir, counterPath string, counter *int) {
// Проверка что это файл
info, err := os.Stat(filePath)
if err != nil {
log.Printf("File no longer available: %s", filePath)
return
}
if info.IsDir() {
return
}
// Ожидание завершения записи
time.Sleep(fileProcessingDelay)
attempt := 0
for {
if attempt >= moveAttempts {
log.Printf("Moving failed after %d attempts, see messages", attempt)
break
}
*counter++
saveCounter(counterPath, *counter)
newName := fmt.Sprintf("%05d%s", *counter, filepath.Ext(filePath))
destPath := filepath.Join(destDir, newName)
_, err := os.Stat(destPath)
if err == nil {
log.Printf("Moving failed, file already exists: %s", destPath)
attempt++
continue
}
if !errors.Is(err, fs.ErrNotExist) {
log.Printf("Moving failed: %v", err)
attempt++
continue
}
if err := os.Rename(filePath, destPath); err != nil {
log.Printf("Moving failed: %v", err)
attempt++
continue
}
log.Printf("Moved: %s -> %s", filePath, destPath)
break
}
}
func loadCounter(counterPath string) int {
data, err := os.ReadFile(counterPath)
if err != nil {
if !os.IsNotExist(err) {
log.Printf("Error reading counter: %v", err)
}
return 1
}
count, err := strconv.Atoi(string(data))
if err != nil {
log.Printf("Invalid counter value: %v", err)
return 1
}
return count
}
func saveCounter(counterPath string, count int) {
if err := os.WriteFile(counterPath, []byte(strconv.Itoa(count)), 0644); err != nil {
log.Printf("Failed to save counter: %v", err)
}
}