From 6d9e14c262283937ad424cbbccf9df70848dfad5 Mon Sep 17 00:00:00 2001 From: Anton Vakhrushev Date: Mon, 21 Jul 2025 10:13:46 +0300 Subject: [PATCH] Filemover: initial commit --- .gitignore | 1 + go.mod | 7 +++ go.sum | 4 ++ main.go | 170 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 182 insertions(+) create mode 100644 .gitignore create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5cdfe1d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +filemover \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e846f97 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module git.vakhrushev.me/av/filemover + +go 1.24.3 + +require github.com/fsnotify/fsnotify v1.9.0 + +require golang.org/x/sys v0.13.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c1e3272 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/main.go b/main.go new file mode 100644 index 0000000..bfab346 --- /dev/null +++ b/main.go @@ -0,0 +1,170 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "path/filepath" + "strconv" + "sync" + "syscall" + "time" + + "github.com/fsnotify/fsnotify" +) + +const fileQueueLen = 100 + +func main() { + watchDir := "/home/av/temp/inbox" + destDir := "/home/av/temp/dest" + os.MkdirAll(destDir, 0755) + + // Контекст для graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Канал для задач (буфер для имен файлов) + tasks := make(chan string, fileQueueLen) + + // WaitGroup для отслеживания завершения воркера + var wg sync.WaitGroup + wg.Add(1) + + // Запуск единственного обработчика + go func() { + defer wg.Done() + counter := loadCounter(destDir) + for { + select { + case file, ok := <-tasks: + if !ok { + // Канал закрыт, завершаем работу + saveCounter(destDir, counter) + log.Println("Worker stopped") + return + } + processFile(file, destDir, &counter) + case <-ctx.Done(): + // Получен сигнал завершения + saveCounter(destDir, counter) + log.Println("Worker stopped by context") + return + } + } + }() + + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Fatalf("Watcher creation failed: %v", err) + } + defer watcher.Close() + + if err := watcher.Add(watchDir); err != nil { + log.Fatalf("Failed to add watch directory: %v", err) + } + + // Канал для системных сигналов + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Главный цикл обработки событий + log.Println("Monitoring started. Press Ctrl+C to stop...") + +mainLoop: + for { + select { + case event, ok := <-watcher.Events: + if !ok { + break mainLoop + } + if event.Op&fsnotify.Create == fsnotify.Create { + select { + case tasks <- event.Name: + // Файл добавлен в очередь + case <-ctx.Done(): + break mainLoop + default: + log.Println("Task queue full, skipping file:", event.Name) + } + } + + case err, ok := <-watcher.Errors: + if !ok { + break mainLoop + } + log.Println("Watcher error:", err) + + case sig := <-sigChan: + log.Printf("Received signal: %v. Shutting down...", sig) + cancel() // Уведомляем воркер о завершении + break mainLoop + + case <-ctx.Done(): + break mainLoop + } + } + + // Процесс graceful shutdown: + // 1. Останавливаем прием новых файлов + // 2. Закрываем канал задач + // 3. Ждем завершения воркера + log.Println("Shutting down...") + close(tasks) // Закрываем канал для воркера + wg.Wait() // Ожидаем завершения обработки текущего файла + log.Println("Shutdown complete") +} + +func processFile(filePath, destDir string, counter *int) { + // Проверка что это файл + info, err := os.Stat(filePath) + if err != nil { + log.Printf("File no longer available: %s", filePath) + return + } + if info.IsDir() { + return + } + + // Ожидание завершения записи + time.Sleep(500 * time.Millisecond) + + // Копирование + newName := fmt.Sprintf("%03d", *counter) + destPath := filepath.Join(destDir, newName) + if err := os.Rename(filePath, destPath); err != nil { + log.Printf("Moving failed: %v", err) + return + } + + // Обновление счетчика + *counter++ + saveCounter(destDir, *counter) + log.Printf("Moved: %s -> %s", filePath, destPath) +} + +func loadCounter(dir string) int { + counterPath := filepath.Join(dir, "counter.txt") + data, err := os.ReadFile(counterPath) + if err != nil { + if !os.IsNotExist(err) { + log.Printf("Error reading counter: %v", err) + } + return 1 + } + count, err := strconv.Atoi(string(data)) + if err != nil { + log.Printf("Invalid counter value: %v", err) + return 1 + } + return count +} + +func saveCounter(dir string, count int) { + counterPath := filepath.Join(dir, "counter.txt") + if err := os.WriteFile(counterPath, []byte(strconv.Itoa(count)), 0644); err != nil { + log.Printf("Failed to save counter: %v", err) + } +}