package main import ( "context" "errors" "fmt" "io/fs" "log" "os" "os/signal" "path/filepath" "strconv" "sync" "syscall" "time" "github.com/adrg/xdg" "github.com/fsnotify/fsnotify" ) const fileQueueLen = 100 const fileProcessingDelay = 500 * time.Millisecond const moveAttempts = 100 func main() { // Проверка аргументов командной строки if len(os.Args) < 3 { log.Fatalf("Usage: %s ", os.Args[0]) } watchDir := os.Args[1] destDir := os.Args[2] // Проверка существования watchDir if _, err := os.Stat(watchDir); os.IsNotExist(err) { log.Fatalf("Watch directory does not exist: %s", watchDir) } else if err != nil { log.Fatalf("Error accessing watch directory: %v", err) } // Создание destDir если не существует if err := os.MkdirAll(destDir, 0755); err != nil { log.Fatalf("Failed to create destination directory: %v", err) } counterFile, err := xdg.DataFile("filemover/counter") if err != nil { log.Fatalf("Application data dir not accessible, %v", err) } // Контекст для graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Канал для задач (буфер для имен файлов) tasks := make(chan string, fileQueueLen) // WaitGroup для отслеживания завершения воркера var wg sync.WaitGroup wg.Add(1) // Запуск единственного обработчика go func() { defer wg.Done() counter := loadCounter(counterFile) for { select { case file, ok := <-tasks: if !ok { // Канал закрыт, завершаем работу saveCounter(counterFile, counter) log.Println("Worker stopped") return } processFile(file, destDir, counterFile, &counter) case <-ctx.Done(): // Получен сигнал завершения saveCounter(counterFile, counter) log.Println("Worker stopped by context") return } } }() watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatalf("Watcher creation failed: %v", err) } defer watcher.Close() if err := watcher.Add(watchDir); err != nil { log.Fatalf("Failed to add watch directory: %v", err) } // Канал для системных сигналов sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // Главный цикл обработки событий log.Println("Monitoring started. Press Ctrl+C to stop...") mainLoop: for { select { case event, ok := <-watcher.Events: if !ok { break mainLoop } if event.Op&fsnotify.Create == fsnotify.Create { select { case tasks <- event.Name: // Файл добавлен в очередь case <-ctx.Done(): break mainLoop default: log.Println("Task queue full, skipping file:", event.Name) } } case err, ok := <-watcher.Errors: if !ok { break mainLoop } log.Println("Watcher error:", err) case sig := <-sigChan: log.Printf("Received signal: %v. Shutting down...", sig) cancel() // Уведомляем воркер о завершении break mainLoop case <-ctx.Done(): break mainLoop } } // Процесс graceful shutdown: // 1. Останавливаем прием новых файлов // 2. Закрываем канал задач // 3. Ждем завершения воркера log.Println("Shutting down...") close(tasks) // Закрываем канал для воркера wg.Wait() // Ожидаем завершения обработки текущего файла log.Println("Shutdown complete") } func processFile(filePath, destDir, counterPath string, counter *int) { // Проверка что это файл info, err := os.Stat(filePath) if err != nil { log.Printf("File no longer available: %s", filePath) return } if info.IsDir() { return } // Ожидание завершения записи time.Sleep(fileProcessingDelay) attempt := 0 for { if attempt >= moveAttempts { log.Printf("Moving failed after %d attempts, see messages", attempt) break } *counter++ saveCounter(counterPath, *counter) newName := fmt.Sprintf("%05d%s", *counter, filepath.Ext(filePath)) destPath := filepath.Join(destDir, newName) _, err := os.Stat(destPath) if err == nil { log.Printf("Moving failed, file already exists: %s", destPath) attempt++ continue } if !errors.Is(err, fs.ErrNotExist) { log.Printf("Moving failed: %v", err) attempt++ continue } if err := os.Rename(filePath, destPath); err != nil { log.Printf("Moving failed: %v", err) attempt++ continue } log.Printf("Moved: %s -> %s", filePath, destPath) break } } func loadCounter(counterPath string) int { data, err := os.ReadFile(counterPath) if err != nil { if !os.IsNotExist(err) { log.Printf("Error reading counter: %v", err) } return 1 } count, err := strconv.Atoi(string(data)) if err != nil { log.Printf("Invalid counter value: %v", err) return 1 } return count } func saveCounter(counterPath string, count int) { if err := os.WriteFile(counterPath, []byte(strconv.Itoa(count)), 0644); err != nil { log.Printf("Failed to save counter: %v", err) } }