183 lines
5.5 KiB
Go
183 lines
5.5 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"os/signal"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
|
||
httpcontroller "git.vakhrushev.me/av/transcriber/internal/controller/http"
|
||
"git.vakhrushev.me/av/transcriber/internal/controller/worker"
|
||
"git.vakhrushev.me/av/transcriber/internal/repo/sqlite"
|
||
"git.vakhrushev.me/av/transcriber/internal/service/transcribe"
|
||
"github.com/doug-martin/goqu/v9"
|
||
_ "github.com/doug-martin/goqu/v9/dialect/sqlite3"
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/joho/godotenv"
|
||
_ "github.com/mattn/go-sqlite3"
|
||
"github.com/pressly/goose/v3"
|
||
)
|
||
|
||
func main() {
|
||
// Загружаем переменные окружения из .env файла
|
||
if err := godotenv.Load(); err != nil {
|
||
log.Println("Warning: .env file not found, using system environment variables")
|
||
}
|
||
|
||
// Создаем директории если они не существуют
|
||
if err := os.MkdirAll("data/files", 0755); err != nil {
|
||
log.Fatal("Failed to create data/files directory:", err)
|
||
}
|
||
|
||
db, err := sql.Open("sqlite3", "data/transcriber.db")
|
||
if err != nil {
|
||
log.Fatalf("failed to open database: %v", err)
|
||
}
|
||
defer db.Close()
|
||
|
||
if err := db.Ping(); err != nil {
|
||
log.Fatalf("failed to ping database: %v", err)
|
||
}
|
||
|
||
gq := goqu.New("sqlite3", db)
|
||
|
||
// Запускаем миграции
|
||
if err := RunMigrations(db, "migrations"); err != nil {
|
||
log.Fatal("Failed to run migrations:", err)
|
||
}
|
||
|
||
fileRepo := sqlite.NewFileRepository(db, gq)
|
||
jobRepo := sqlite.NewTranscriptJobRepository(db, gq)
|
||
|
||
transcribeService := transcribe.NewTranscribeService(jobRepo, fileRepo)
|
||
|
||
// Создаем воркеры
|
||
conversionWorker := worker.NewConversionWorker(transcribeService)
|
||
transcribeWorker := worker.NewTranscribeWorker(transcribeService)
|
||
checkWorker := worker.NewCheckWorker(transcribeService)
|
||
|
||
workers := []worker.Worker{
|
||
conversionWorker,
|
||
transcribeWorker,
|
||
checkWorker,
|
||
}
|
||
|
||
// Создаем контекст для graceful shutdown
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
|
||
// Создаем WaitGroup для ожидания завершения всех воркеров
|
||
var wg sync.WaitGroup
|
||
|
||
// Запускаем воркеры в отдельных горутинах
|
||
for _, w := range workers {
|
||
wg.Add(1)
|
||
go func(worker worker.Worker) {
|
||
defer wg.Done()
|
||
worker.Start(ctx)
|
||
log.Printf("%s stopped", worker.Name())
|
||
}(w)
|
||
}
|
||
|
||
// Запускаем HTTP сервер для API (создание задач и проверка статуса)
|
||
transcribeHandler := httpcontroller.NewTranscribeHandler(jobRepo, transcribeService)
|
||
router := gin.Default()
|
||
|
||
// Настраиваем роуты только для создания задач и проверки статуса
|
||
api := router.Group("/api")
|
||
{
|
||
api.POST("/audio", transcribeHandler.CreateTranscribeJob)
|
||
api.GET("/status/:id", transcribeHandler.GetTranscribeJobStatus)
|
||
}
|
||
|
||
// Добавляем middleware для обработки больших файлов
|
||
router.MaxMultipartMemory = 32 << 20 // 32 MiB
|
||
|
||
// Добавляем базовый роут для проверки работоспособности
|
||
router.GET("/health", func(c *gin.Context) {
|
||
c.JSON(200, gin.H{
|
||
"status": "ok",
|
||
"message": "Transcriber service is running",
|
||
})
|
||
})
|
||
|
||
// Создаем HTTP сервер
|
||
srv := &http.Server{
|
||
Addr: ":8080",
|
||
Handler: router,
|
||
}
|
||
|
||
// Запускаем HTTP сервер в отдельной горутине
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
log.Println("Starting HTTP server on :8080")
|
||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||
log.Printf("HTTP server error: %v", err)
|
||
}
|
||
}()
|
||
|
||
// Настраиваем обработку сигналов для graceful shutdown
|
||
sigChan := make(chan os.Signal, 1)
|
||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||
|
||
log.Println("Transcriber service started with background workers")
|
||
log.Println("Workers: ConversionWorker, TranscribeWorker, CheckWorker")
|
||
log.Println("Press Ctrl+C to stop...")
|
||
|
||
// Ждем сигнал завершения
|
||
<-sigChan
|
||
log.Println("Received shutdown signal, initiating graceful shutdown...")
|
||
|
||
// Создаем контекст с таймаутом для graceful shutdown HTTP сервера
|
||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||
defer shutdownCancel()
|
||
|
||
// Останавливаем HTTP сервер
|
||
log.Println("Shutting down HTTP server...")
|
||
if err := srv.Shutdown(shutdownCtx); err != nil {
|
||
log.Printf("HTTP server forced to shutdown: %v", err)
|
||
} else {
|
||
log.Println("HTTP server stopped gracefully")
|
||
}
|
||
|
||
// Отменяем контекст для остановки воркеров
|
||
cancel()
|
||
|
||
// Создаем канал для уведомления о завершении всех воркеров
|
||
done := make(chan struct{})
|
||
go func() {
|
||
wg.Wait()
|
||
close(done)
|
||
}()
|
||
|
||
// Ждем завершения всех воркеров или таймаута в 10 секунд
|
||
select {
|
||
case <-done:
|
||
log.Println("All workers stopped gracefully")
|
||
case <-time.After(10 * time.Second):
|
||
log.Println("Timeout reached, forcing shutdown")
|
||
}
|
||
|
||
log.Println("Transcriber service stopped")
|
||
}
|
||
|
||
func RunMigrations(db *sql.DB, migrationsDir string) error {
|
||
if err := goose.SetDialect("sqlite3"); err != nil {
|
||
return fmt.Errorf("failed to set goose dialect: %w", err)
|
||
}
|
||
|
||
if err := goose.Up(db, migrationsDir); err != nil {
|
||
return fmt.Errorf("failed to run migrations: %w", err)
|
||
}
|
||
|
||
log.Println("Migrations completed successfully")
|
||
return nil
|
||
}
|