Files
transcriber/main.go

227 lines
7.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"database/sql"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
ffmpegconv "git.vakhrushev.me/av/transcriber/internal/adapter/converter/ffmpeg"
ffmpegmv "git.vakhrushev.me/av/transcriber/internal/adapter/metaviewer/ffmpeg"
"git.vakhrushev.me/av/transcriber/internal/adapter/recognizer/yandex"
"git.vakhrushev.me/av/transcriber/internal/adapter/repo/sqlite"
httpcontroller "git.vakhrushev.me/av/transcriber/internal/controller/http"
"git.vakhrushev.me/av/transcriber/internal/controller/worker"
"git.vakhrushev.me/av/transcriber/internal/service"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/sqlite3"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
_ "github.com/mattn/go-sqlite3"
"github.com/pressly/goose/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"
sloggin "github.com/samber/slog-gin"
)
func main() {
// Создаем структурированный логгер
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
slog.SetDefault(logger)
// Загружаем переменные окружения из .env файла
if err := godotenv.Load(); err != nil {
logger.Warn("Warning: .env file not found, using system environment variables")
}
// Создаем директории если они не существуют
if err := os.MkdirAll("data/files", 0755); err != nil {
logger.Error("Failed to create data/files directory", "error", err)
os.Exit(1)
}
db, err := sql.Open("sqlite3", "data/transcriber.db")
if err != nil {
logger.Error("failed to open database", "error", err)
os.Exit(1)
}
defer db.Close()
if err := db.Ping(); err != nil {
logger.Error("failed to ping database", "error", err)
os.Exit(1)
}
gq := goqu.New("sqlite3", db)
// Запускаем миграции
if err := RunMigrations(db, "migrations", logger); err != nil {
logger.Error("Failed to run migrations", "error", err)
os.Exit(1)
}
// Создаем репозитории
fileRepo := sqlite.NewFileRepository(db, gq)
jobRepo := sqlite.NewTranscriptJobRepository(db, gq)
// Создаем адаптеры
metaviewer := ffmpegmv.NewFfmpegMetaViewer()
converter := ffmpegconv.NewFfmpegConverter()
recognizer, err := yandex.NewYandexAudioRecognizerService(yandex.YandexAudioRecognizerConfig{
Region: os.Getenv("AWS_REGION"),
AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"),
SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
BucketName: os.Getenv("S3_BUCKET_NAME"),
Endpoint: os.Getenv("S3_ENDPOINT"),
ApiKey: os.Getenv("YANDEX_CLOUD_API_KEY"),
FolderID: os.Getenv("YANDEX_CLOUD_FOLDER_ID"),
})
if err != nil {
logger.Error("failed to create audio recognizer", "error", err)
os.Exit(1)
}
defer recognizer.Close()
// Создаем сервисы
transcribeService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer, logger)
// Создаем воркеры
conversionWorker := worker.NewCallbackWorker("conversion_worker", transcribeService.FindAndRunConversionJob, logger)
transcribeWorker := worker.NewCallbackWorker("transcribe_worker", transcribeService.FindAndRunTranscribeJob, logger)
checkWorker := worker.NewCallbackWorker("check_worker", transcribeService.FindAndRunTranscribeCheckJob, logger)
workers := []worker.Worker{
conversionWorker,
transcribeWorker,
checkWorker,
}
// Создаем контекст для graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Создаем WaitGroup для ожидания завершения всех воркеров
var wg sync.WaitGroup
// Запускаем воркеры в отдельных горутинах
for _, w := range workers {
wg.Add(1)
go func(worker worker.Worker) {
defer wg.Done()
worker.Start(ctx)
logger.Info("Worker stopped", "worker_name", worker.Name())
}(w)
}
// Создаем Gin middleware для логирования
gin.SetMode(gin.DebugMode)
router := gin.New()
router.Use(sloggin.New(logger))
router.Use(gin.Recovery())
// Запускаем HTTP сервер для API (создание задач и проверка статуса)
transcribeHandler := httpcontroller.NewTranscribeHandler(jobRepo, transcribeService)
// Настраиваем роуты только для создания задач и проверки статуса
api := router.Group("/api")
{
api.POST("/audio", transcribeHandler.CreateTranscribeJob)
api.GET("/status/:id", transcribeHandler.GetTranscribeJobStatus)
}
// Добавляем middleware для обработки больших файлов
router.MaxMultipartMemory = 32 << 20 // 32 MiB
// Добавляем базовый роут для проверки работоспособности
router.GET("/health", func(c *gin.Context) {
c.JSON(200, gin.H{
"status": "ok",
"message": "Transcriber service is running",
})
})
// Добавляем эндпоинт для метрик Prometheus
router.GET("/metrics", gin.WrapH(promhttp.Handler()))
// Создаем HTTP сервер
srv := &http.Server{
Addr: ":8080",
Handler: router,
}
// Запускаем HTTP сервер в отдельной горутине
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("Starting HTTP server", "port", 8080)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("HTTP server error", "error", err)
}
}()
// Настраиваем обработку сигналов для graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
logger.Info("Transcriber service started with background workers")
logger.Info("Workers: ConversionWorker, TranscribeWorker, CheckWorker")
logger.Info("Press Ctrl+C to stop...")
// Ждем сигнал завершения
<-sigChan
logger.Info("Received shutdown signal, initiating graceful shutdown...")
// Создаем контекст с таймаутом для graceful shutdown HTTP сервера
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
// Останавливаем HTTP сервер
logger.Info("Shutting down HTTP server...")
if err := srv.Shutdown(shutdownCtx); err != nil {
logger.Error("HTTP server forced to shutdown", "error", err)
} else {
logger.Info("HTTP server stopped gracefully")
}
// Отменяем контекст для остановки воркеров
cancel()
// Создаем канал для уведомления о завершении всех воркеров
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
// Ждем завершения всех воркеров или таймаута в 10 секунд
select {
case <-done:
logger.Info("All workers stopped gracefully")
case <-time.After(10 * time.Second):
logger.Warn("Timeout reached, forcing shutdown")
}
logger.Info("Transcriber service stopped")
}
func RunMigrations(db *sql.DB, migrationsDir string, logger *slog.Logger) error {
if err := goose.SetDialect("sqlite3"); err != nil {
return fmt.Errorf("failed to set goose dialect: %w", err)
}
if err := goose.Up(db, migrationsDir); err != nil {
return fmt.Errorf("failed to run migrations: %w", err)
}
logger.Info("Migrations completed successfully")
return nil
}