package main import ( "context" "database/sql" "fmt" "log/slog" "net/http" "os" "os/signal" "sync" "syscall" "time" ffmpegconv "git.vakhrushev.me/av/transcriber/internal/adapter/converter/ffmpeg" ffmpegmv "git.vakhrushev.me/av/transcriber/internal/adapter/metaviewer/ffmpeg" "git.vakhrushev.me/av/transcriber/internal/adapter/recognizer/yandex" "git.vakhrushev.me/av/transcriber/internal/adapter/repo/sqlite" httpcontroller "git.vakhrushev.me/av/transcriber/internal/controller/http" "git.vakhrushev.me/av/transcriber/internal/controller/worker" "git.vakhrushev.me/av/transcriber/internal/service" "github.com/doug-martin/goqu/v9" _ "github.com/doug-martin/goqu/v9/dialect/sqlite3" "github.com/gin-gonic/gin" "github.com/joho/godotenv" _ "github.com/mattn/go-sqlite3" "github.com/pressly/goose/v3" "github.com/prometheus/client_golang/prometheus/promhttp" sloggin "github.com/samber/slog-gin" ) func main() { // Создаем структурированный логгер logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, })) slog.SetDefault(logger) // Загружаем переменные окружения из .env файла if err := godotenv.Load(); err != nil { logger.Warn("Warning: .env file not found, using system environment variables") } // Создаем директории если они не существуют if err := os.MkdirAll("data/files", 0755); err != nil { logger.Error("Failed to create data/files directory", "error", err) os.Exit(1) } db, err := sql.Open("sqlite3", "data/transcriber.db") if err != nil { logger.Error("failed to open database", "error", err) os.Exit(1) } defer db.Close() if err := db.Ping(); err != nil { logger.Error("failed to ping database", "error", err) os.Exit(1) } gq := goqu.New("sqlite3", db) // Запускаем миграции if err := RunMigrations(db, "migrations", logger); err != nil { logger.Error("Failed to run migrations", "error", err) os.Exit(1) } // Создаем репозитории fileRepo := sqlite.NewFileRepository(db, gq) jobRepo := sqlite.NewTranscriptJobRepository(db, gq) // Создаем адаптеры metaviewer := ffmpegmv.NewFfmpegMetaViewer() converter := ffmpegconv.NewFfmpegConverter() recognizer, err := yandex.NewYandexAudioRecognizerService(yandex.YandexAudioRecognizerConfig{ Region: os.Getenv("AWS_REGION"), AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"), SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), BucketName: os.Getenv("S3_BUCKET_NAME"), Endpoint: os.Getenv("S3_ENDPOINT"), ApiKey: os.Getenv("YANDEX_CLOUD_API_KEY"), FolderID: os.Getenv("YANDEX_CLOUD_FOLDER_ID"), }) if err != nil { logger.Error("failed to create audio recognizer", "error", err) os.Exit(1) } defer recognizer.Close() // Создаем сервисы transcribeService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer, logger) // Создаем воркеры conversionWorker := worker.NewCallbackWorker("conversion_worker", transcribeService.FindAndRunConversionJob, logger) transcribeWorker := worker.NewCallbackWorker("transcribe_worker", transcribeService.FindAndRunTranscribeJob, logger) checkWorker := worker.NewCallbackWorker("check_worker", transcribeService.FindAndRunTranscribeCheckJob, logger) workers := []worker.Worker{ conversionWorker, transcribeWorker, checkWorker, } // Создаем контекст для graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Создаем WaitGroup для ожидания завершения всех воркеров var wg sync.WaitGroup // Запускаем воркеры в отдельных горутинах for _, w := range workers { wg.Add(1) go func(worker worker.Worker) { defer wg.Done() worker.Start(ctx) logger.Info("Worker stopped", "worker_name", worker.Name()) }(w) } // Создаем Gin middleware для логирования gin.SetMode(gin.DebugMode) router := gin.New() router.Use(sloggin.New(logger)) router.Use(gin.Recovery()) // Запускаем HTTP сервер для API (создание задач и проверка статуса) transcribeHandler := httpcontroller.NewTranscribeHandler(jobRepo, transcribeService) // Настраиваем роуты только для создания задач и проверки статуса api := router.Group("/api") { api.POST("/audio", transcribeHandler.CreateTranscribeJob) api.GET("/status/:id", transcribeHandler.GetTranscribeJobStatus) } // Добавляем middleware для обработки больших файлов router.MaxMultipartMemory = 32 << 20 // 32 MiB // Добавляем базовый роут для проверки работоспособности router.GET("/health", func(c *gin.Context) { c.JSON(200, gin.H{ "status": "ok", "message": "Transcriber service is running", }) }) // Добавляем эндпоинт для метрик Prometheus router.GET("/metrics", gin.WrapH(promhttp.Handler())) // Создаем HTTP сервер srv := &http.Server{ Addr: ":8080", Handler: router, } // Запускаем HTTP сервер в отдельной горутине wg.Add(1) go func() { defer wg.Done() logger.Info("Starting HTTP server", "port", 8080) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Error("HTTP server error", "error", err) } }() // Настраиваем обработку сигналов для graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) logger.Info("Transcriber service started with background workers") logger.Info("Workers: ConversionWorker, TranscribeWorker, CheckWorker") logger.Info("Press Ctrl+C to stop...") // Ждем сигнал завершения <-sigChan logger.Info("Received shutdown signal, initiating graceful shutdown...") // Создаем контекст с таймаутом для graceful shutdown HTTP сервера shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) defer shutdownCancel() // Останавливаем HTTP сервер logger.Info("Shutting down HTTP server...") if err := srv.Shutdown(shutdownCtx); err != nil { logger.Error("HTTP server forced to shutdown", "error", err) } else { logger.Info("HTTP server stopped gracefully") } // Отменяем контекст для остановки воркеров cancel() // Создаем канал для уведомления о завершении всех воркеров done := make(chan struct{}) go func() { wg.Wait() close(done) }() // Ждем завершения всех воркеров или таймаута в 10 секунд select { case <-done: logger.Info("All workers stopped gracefully") case <-time.After(10 * time.Second): logger.Warn("Timeout reached, forcing shutdown") } logger.Info("Transcriber service stopped") } func RunMigrations(db *sql.DB, migrationsDir string, logger *slog.Logger) error { if err := goose.SetDialect("sqlite3"); err != nil { return fmt.Errorf("failed to set goose dialect: %w", err) } if err := goose.Up(db, migrationsDir); err != nil { return fmt.Errorf("failed to run migrations: %w", err) } logger.Info("Migrations completed successfully") return nil }