Files
filemover/main.go

171 lines
4.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"
"github.com/fsnotify/fsnotify"
)
const fileQueueLen = 100
func main() {
watchDir := "/home/av/temp/inbox"
destDir := "/home/av/temp/dest"
os.MkdirAll(destDir, 0755)
// Контекст для graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Канал для задач (буфер для имен файлов)
tasks := make(chan string, fileQueueLen)
// WaitGroup для отслеживания завершения воркера
var wg sync.WaitGroup
wg.Add(1)
// Запуск единственного обработчика
go func() {
defer wg.Done()
counter := loadCounter(destDir)
for {
select {
case file, ok := <-tasks:
if !ok {
// Канал закрыт, завершаем работу
saveCounter(destDir, counter)
log.Println("Worker stopped")
return
}
processFile(file, destDir, &counter)
case <-ctx.Done():
// Получен сигнал завершения
saveCounter(destDir, counter)
log.Println("Worker stopped by context")
return
}
}
}()
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatalf("Watcher creation failed: %v", err)
}
defer watcher.Close()
if err := watcher.Add(watchDir); err != nil {
log.Fatalf("Failed to add watch directory: %v", err)
}
// Канал для системных сигналов
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Главный цикл обработки событий
log.Println("Monitoring started. Press Ctrl+C to stop...")
mainLoop:
for {
select {
case event, ok := <-watcher.Events:
if !ok {
break mainLoop
}
if event.Op&fsnotify.Create == fsnotify.Create {
select {
case tasks <- event.Name:
// Файл добавлен в очередь
case <-ctx.Done():
break mainLoop
default:
log.Println("Task queue full, skipping file:", event.Name)
}
}
case err, ok := <-watcher.Errors:
if !ok {
break mainLoop
}
log.Println("Watcher error:", err)
case sig := <-sigChan:
log.Printf("Received signal: %v. Shutting down...", sig)
cancel() // Уведомляем воркер о завершении
break mainLoop
case <-ctx.Done():
break mainLoop
}
}
// Процесс graceful shutdown:
// 1. Останавливаем прием новых файлов
// 2. Закрываем канал задач
// 3. Ждем завершения воркера
log.Println("Shutting down...")
close(tasks) // Закрываем канал для воркера
wg.Wait() // Ожидаем завершения обработки текущего файла
log.Println("Shutdown complete")
}
func processFile(filePath, destDir string, counter *int) {
// Проверка что это файл
info, err := os.Stat(filePath)
if err != nil {
log.Printf("File no longer available: %s", filePath)
return
}
if info.IsDir() {
return
}
// Ожидание завершения записи
time.Sleep(500 * time.Millisecond)
// Копирование
newName := fmt.Sprintf("%03d", *counter)
destPath := filepath.Join(destDir, newName)
if err := os.Rename(filePath, destPath); err != nil {
log.Printf("Moving failed: %v", err)
return
}
// Обновление счетчика
*counter++
saveCounter(destDir, *counter)
log.Printf("Moved: %s -> %s", filePath, destPath)
}
func loadCounter(dir string) int {
counterPath := filepath.Join(dir, "counter.txt")
data, err := os.ReadFile(counterPath)
if err != nil {
if !os.IsNotExist(err) {
log.Printf("Error reading counter: %v", err)
}
return 1
}
count, err := strconv.Atoi(string(data))
if err != nil {
log.Printf("Invalid counter value: %v", err)
return 1
}
return count
}
func saveCounter(dir string, count int) {
counterPath := filepath.Join(dir, "counter.txt")
if err := os.WriteFile(counterPath, []byte(strconv.Itoa(count)), 0644); err != nil {
log.Printf("Failed to save counter: %v", err)
}
}