Files
transcriber/main.go

277 lines
9.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"database/sql"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
ffmpegconv "git.vakhrushev.me/av/transcriber/internal/adapter/converter/ffmpeg"
ffmpegmv "git.vakhrushev.me/av/transcriber/internal/adapter/metaviewer/ffmpeg"
"git.vakhrushev.me/av/transcriber/internal/adapter/recognizer/yandex"
"git.vakhrushev.me/av/transcriber/internal/adapter/repo/sqlite"
"git.vakhrushev.me/av/transcriber/internal/config"
httpcontroller "git.vakhrushev.me/av/transcriber/internal/controller/http"
tgcontroller "git.vakhrushev.me/av/transcriber/internal/controller/tg"
"git.vakhrushev.me/av/transcriber/internal/controller/worker"
"git.vakhrushev.me/av/transcriber/internal/service"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/sqlite3"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
_ "github.com/mattn/go-sqlite3"
"github.com/pressly/goose/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"
sloggin "github.com/samber/slog-gin"
)
const (
ServerShutdownTimeout = 5
ForceShutdownTimeout = 20
)
func main() {
// Создаем структурированный логгер
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
slog.SetDefault(logger)
// Parse command line flags
configPath := flag.String("c", "config.toml", "Path to config file")
flag.StringVar(configPath, "config", "config.toml", "Path to config file (alias for -c)")
flag.Parse()
// Load configuration
cfg, err := config.LoadConfig(*configPath)
if err != nil {
// If config file doesn't exist, use defaults
cfg = config.DefaultConfig()
// Log that we're using default config
logger.Info("Using default configuration", "config_path", *configPath, "error", err)
} else {
logger.Info("Configuration loaded successfully", "config_path", *configPath)
}
// Загружаем переменные окружения из .env файла
if err := godotenv.Load(); err != nil {
logger.Warn("Warning: .env file not found, using system environment variables")
}
// Создаем директории если они не существуют
if err := os.MkdirAll("data/files", 0755); err != nil {
logger.Error("Failed to create data/files directory", "error", err)
os.Exit(1)
}
db, err := sql.Open("sqlite3", cfg.Database.Path)
if err != nil {
logger.Error("failed to open database", "error", err)
os.Exit(1)
}
defer db.Close()
if err := db.Ping(); err != nil {
logger.Error("failed to ping database", "error", err)
os.Exit(1)
}
gq := goqu.New("sqlite3", db)
// Запускаем миграции
if err := RunMigrations(db, "migrations", logger); err != nil {
logger.Error("Failed to run migrations", "error", err)
os.Exit(1)
}
// Создаем репозитории
fileRepo := sqlite.NewFileRepository(db, gq)
jobRepo := sqlite.NewTranscriptJobRepository(db, gq)
// Создаем адаптеры
metaviewer := ffmpegmv.NewFfmpegMetaViewer()
converter := ffmpegconv.NewFfmpegConverter()
recognizer, err := yandex.NewYandexAudioRecognizerService(yandex.YandexAudioRecognizerConfig{
Region: cfg.AWS.Region,
AccessKey: cfg.AWS.AccessKey,
SecretKey: cfg.AWS.SecretKey,
BucketName: cfg.AWS.BucketName,
Endpoint: cfg.AWS.Endpoint,
ApiKey: cfg.Yandex.APIKey,
FolderID: cfg.Yandex.FolderID,
})
if err != nil {
logger.Error("failed to create audio recognizer", "error", err)
os.Exit(1)
}
defer recognizer.Close()
// Создаем сервисы
transcribeService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer, logger)
// Создаем контекст для graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Создаем WaitGroup для ожидания завершения всех воркеров
var wg sync.WaitGroup
tgConfig := tgcontroller.TelegramConfig{
BotToken: cfg.Telegram.BotToken,
UpdateTimeout: cfg.Telegram.UpdateTimeout,
}
// Создаем Telegram бот
tgController, err := tgcontroller.NewTelegramController(tgConfig, transcribeService, jobRepo, logger)
if err != nil {
logger.Error("Failed to create Telegram controller", "error", err)
// Не останавливаем приложение, если Telegram бот не создан
} else {
// Запускаем Telegram бот в отдельной горутине
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("Starting Telegram bot")
tgController.Start()
logger.Info("Telegram bot stopped gracefully")
}()
}
// Создаем воркеры
conversionWorker := worker.NewCallbackWorker("conversion_worker", transcribeService.FindAndRunConversionJob, logger)
transcribeWorker := worker.NewCallbackWorker("transcribe_worker", transcribeService.FindAndRunTranscribeJob, logger)
checkWorker := worker.NewCallbackWorker("check_worker", transcribeService.FindAndRunTranscribeCheckJob, logger)
workers := []worker.Worker{
conversionWorker,
transcribeWorker,
checkWorker,
}
// Запускаем воркеры в отдельных горутинах
for _, w := range workers {
wg.Add(1)
go func(worker worker.Worker) {
defer wg.Done()
worker.Start(ctx)
logger.Info("Worker stopped gracefully", "worker", worker.Name())
}(w)
}
// Создаем Gin middleware для логирования
gin.SetMode(gin.DebugMode)
router := gin.New()
router.Use(sloggin.New(logger))
router.Use(gin.Recovery())
// Запускаем HTTP сервер для API (создание задач и проверка статуса)
transcribeHandler := httpcontroller.NewTranscribeHandler(jobRepo, transcribeService)
// Настраиваем роуты только для создания задач и проверки статуса
api := router.Group("/api")
{
api.POST("/audio", transcribeHandler.CreateTranscribeJob)
api.GET("/status/:id", transcribeHandler.GetTranscribeJobStatus)
}
// Добавляем middleware для обработки больших файлов
router.MaxMultipartMemory = 32 << 20 // 32 MiB
// Добавляем базовый роут для проверки работоспособности
router.GET("/health", func(c *gin.Context) {
c.JSON(200, gin.H{
"status": "ok",
"message": "Transcriber service is running",
})
})
// Добавляем эндпоинт для метрик Prometheus
router.GET("/metrics", gin.WrapH(promhttp.Handler()))
// Создаем HTTP сервер
srv := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Server.Port),
Handler: router,
}
// Запускаем HTTP сервер в отдельной горутине
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("Starting HTTP server", "port", cfg.Server.Port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("HTTP server error", "error", err)
}
}()
// Настраиваем обработку сигналов для graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
logger.Info("Transcriber service started with background workers")
logger.Info("Workers: ConversionWorker, TranscribeWorker, CheckWorker")
logger.Info("Press Ctrl+C to stop...")
// Ждем сигнал завершения
<-sigChan
logger.Info("Received shutdown signal, initiating graceful shutdown...")
if tgController != nil {
logger.Info("Shutting down Telegram bot...")
tgController.Stop()
}
// Создаем контекст с таймаутом для graceful shutdown HTTP сервера
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), time.Duration(cfg.Server.ShutdownTimeout)*time.Second)
defer shutdownCancel()
// Останавливаем HTTP сервер
logger.Info("Shutting down HTTP server...")
if err := srv.Shutdown(shutdownCtx); err != nil {
logger.Error("HTTP server forced to shutdown", "error", err)
} else {
logger.Info("HTTP server stopped gracefully")
}
// Отменяем контекст для остановки воркеров
cancel()
// Создаем канал для уведомления о завершении всех воркеров
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
// Ждем завершения всех воркеров или таймаута
select {
case <-done:
logger.Info("All workers stopped gracefully")
case <-time.After(time.Duration(cfg.Server.ForceShutdownTimeout) * time.Second):
logger.Warn("Timeout reached, forcing shutdown")
}
logger.Info("Transcriber service stopped")
}
func RunMigrations(db *sql.DB, migrationsDir string, logger *slog.Logger) error {
if err := goose.SetDialect("sqlite3"); err != nil {
return fmt.Errorf("failed to set goose dialect: %w", err)
}
if err := goose.Up(db, migrationsDir); err != nil {
return fmt.Errorf("failed to run migrations: %w", err)
}
logger.Info("Migrations completed successfully")
return nil
}