From 6783b7621eebddb31a09eff9a3ed5d98c27be248 Mon Sep 17 00:00:00 2001 From: Anton Vakhrushev Date: Tue, 12 Aug 2025 11:49:18 +0300 Subject: [PATCH] Add workers and graceful shutdown --- internal/repo/sqlite/transcript_job_repo.go | 3 +- internal/worker/worker.go | 149 ++++++++++++++++---- main.go | 116 ++++++++++++--- 3 files changed, 224 insertions(+), 44 deletions(-) diff --git a/internal/repo/sqlite/transcript_job_repo.go b/internal/repo/sqlite/transcript_job_repo.go index d041efe..560f246 100644 --- a/internal/repo/sqlite/transcript_job_repo.go +++ b/internal/repo/sqlite/transcript_job_repo.go @@ -3,7 +3,6 @@ package sqlite import ( "database/sql" "fmt" - "log" "time" "git.vakhrushev.me/av/transcriber/internal/entity" @@ -151,7 +150,7 @@ func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string, return nil, fmt.Errorf("failed to build query: %w", err) } - log.Printf("aquire sql: %s", sql) + // log.Printf("aquire sql: %s", sql) result, err := repo.db.Exec(sql, args...) if err != nil { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 7c7a354..2c63c16 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -1,41 +1,138 @@ package worker import ( - "git.vakhrushev.me/av/transcriber/internal/repo" + "context" + "log" + "time" + + "git.vakhrushev.me/av/transcriber/internal/service/transcribe" ) +// Worker представляет базовый интерфейс для всех воркеров +type Worker interface { + Start(ctx context.Context) + Name() string +} + +// ConversionWorker обрабатывает задачи конвертации type ConversionWorker struct { - jobRepo repo.TranscriptJobRepository - fileRepo repo.FileRepository - worker string + transcribeService *transcribe.TranscribeService } -func NewConversionWorker(jobRepo repo.TranscriptJobRepository, fileRepo repo.FileRepository, worker string) *ConversionWorker { - return &ConversionWorker{jobRepo: jobRepo, fileRepo: fileRepo} +func NewConversionWorker(transcribeService *transcribe.TranscribeService) *ConversionWorker { + return &ConversionWorker{ + transcribeService: transcribeService, + } } -func (w *ConversionWorker) Run() { - w.handleJob() - // for { - // err := w.handleJob() - // if err != nil { - // continue - // } - // } +func (w *ConversionWorker) Name() string { + return "ConversionWorker" } -func (w *ConversionWorker) handleJob() error { - // job, err := w.jobRepo.FindAndAcquire(entity.StateCreated, w.worker) - // if err != nil { - // return err - // } +func (w *ConversionWorker) Start(ctx context.Context) { + log.Printf("%s started", w.Name()) - // job.MoveToState(entity.StateConverted) + for { + select { + case <-ctx.Done(): + log.Printf("%s received shutdown signal", w.Name()) + return + default: + err := w.transcribeService.FindAndRunConversionJob() + if err != nil { + log.Printf("%s error: %v", w.Name(), err) + } - // err = w.jobRepo.Save(job) - // if err != nil { - // return err - // } - - return nil + // Ждем 1 секунду перед следующей итерацией + select { + case <-ctx.Done(): + log.Printf("%s received shutdown signal during sleep", w.Name()) + return + case <-time.After(1 * time.Second): + // Продолжаем работу + } + } + } +} + +// TranscribeWorker обрабатывает задачи транскрипции +type TranscribeWorker struct { + transcribeService *transcribe.TranscribeService +} + +func NewTranscribeWorker(transcribeService *transcribe.TranscribeService) *TranscribeWorker { + return &TranscribeWorker{ + transcribeService: transcribeService, + } +} + +func (w *TranscribeWorker) Name() string { + return "TranscribeWorker" +} + +func (w *TranscribeWorker) Start(ctx context.Context) { + log.Printf("%s started", w.Name()) + + for { + select { + case <-ctx.Done(): + log.Printf("%s received shutdown signal", w.Name()) + return + default: + err := w.transcribeService.FindAndRunTranscribeJob() + if err != nil { + log.Printf("%s error: %v", w.Name(), err) + } + + // Ждем 1 секунду перед следующей итерацией + select { + case <-ctx.Done(): + log.Printf("%s received shutdown signal during sleep", w.Name()) + return + case <-time.After(1 * time.Second): + // Продолжаем работу + } + } + } +} + +// CheckWorker обрабатывает задачи проверки статуса распознавания +type CheckWorker struct { + transcribeService *transcribe.TranscribeService +} + +func NewCheckWorker(transcribeService *transcribe.TranscribeService) *CheckWorker { + return &CheckWorker{ + transcribeService: transcribeService, + } +} + +func (w *CheckWorker) Name() string { + return "CheckWorker" +} + +func (w *CheckWorker) Start(ctx context.Context) { + log.Printf("%s started", w.Name()) + + for { + select { + case <-ctx.Done(): + log.Printf("%s received shutdown signal", w.Name()) + return + default: + err := w.transcribeService.FindAndRunTranscribeCheckJob() + if err != nil { + log.Printf("%s error: %v", w.Name(), err) + } + + // Ждем 1 секунду перед следующей итерацией + select { + case <-ctx.Done(): + log.Printf("%s received shutdown signal during sleep", w.Name()) + return + case <-time.After(1 * time.Second): + // Продолжаем работу + } + } + } } diff --git a/main.go b/main.go index 2950313..02b6d0f 100644 --- a/main.go +++ b/main.go @@ -1,14 +1,21 @@ package main import ( + "context" "database/sql" "fmt" "log" + "net/http" "os" + "os/signal" + "sync" + "syscall" + "time" - "git.vakhrushev.me/av/transcriber/internal/controller/http" + httpcontroller "git.vakhrushev.me/av/transcriber/internal/controller/http" "git.vakhrushev.me/av/transcriber/internal/repo/sqlite" "git.vakhrushev.me/av/transcriber/internal/service/transcribe" + "git.vakhrushev.me/av/transcriber/internal/worker" "github.com/doug-martin/goqu/v9" _ "github.com/doug-martin/goqu/v9/dialect/sqlite3" "github.com/gin-gonic/gin" @@ -50,38 +57,115 @@ func main() { transcribeService := transcribe.NewTranscribeService(jobRepo, fileRepo) - // Инициализируем обработчики - transcribeHandler := http.NewTranscribeHandler(jobRepo, transcribeService) + // Создаем воркеры + conversionWorker := worker.NewConversionWorker(transcribeService) + transcribeWorker := worker.NewTranscribeWorker(transcribeService) + checkWorker := worker.NewCheckWorker(transcribeService) - // Создаем Gin роутер - r := gin.Default() + workers := []worker.Worker{ + conversionWorker, + transcribeWorker, + checkWorker, + } - // Настраиваем роуты - api := r.Group("/api") + // Создаем контекст для graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Создаем WaitGroup для ожидания завершения всех воркеров + var wg sync.WaitGroup + + // Запускаем воркеры в отдельных горутинах + for _, w := range workers { + wg.Add(1) + go func(worker worker.Worker) { + defer wg.Done() + worker.Start(ctx) + log.Printf("%s stopped", worker.Name()) + }(w) + } + + // Запускаем HTTP сервер для API (создание задач и проверка статуса) + transcribeHandler := httpcontroller.NewTranscribeHandler(jobRepo, transcribeService) + router := gin.Default() + + // Настраиваем роуты только для создания задач и проверки статуса + api := router.Group("/api") { api.POST("/audio", transcribeHandler.CreateTranscribeJob) api.GET("/status/:id", transcribeHandler.GetTranscribeJobStatus) - - api.POST("/convert", transcribeHandler.RunConversionJob) - api.POST("/transcribe", transcribeHandler.RunTranscribeJob) - api.POST("/check", transcribeHandler.RunRecognitionCheckJob) } // Добавляем middleware для обработки больших файлов - r.MaxMultipartMemory = 32 << 20 // 32 MiB + router.MaxMultipartMemory = 32 << 20 // 32 MiB // Добавляем базовый роут для проверки работоспособности - r.GET("/health", func(c *gin.Context) { + router.GET("/health", func(c *gin.Context) { c.JSON(200, gin.H{ "status": "ok", "message": "Transcriber service is running", }) }) - log.Println("Starting server on :8080") - if err := r.Run(":8080"); err != nil { - log.Fatal("Failed to start server:", err) + // Создаем HTTP сервер + srv := &http.Server{ + Addr: ":8080", + Handler: router, } + + // Запускаем HTTP сервер в отдельной горутине + wg.Add(1) + go func() { + defer wg.Done() + log.Println("Starting HTTP server on :8080") + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Printf("HTTP server error: %v", err) + } + }() + + // Настраиваем обработку сигналов для graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + log.Println("Transcriber service started with background workers") + log.Println("Workers: ConversionWorker, TranscribeWorker, CheckWorker") + log.Println("Press Ctrl+C to stop...") + + // Ждем сигнал завершения + <-sigChan + log.Println("Received shutdown signal, initiating graceful shutdown...") + + // Создаем контекст с таймаутом для graceful shutdown HTTP сервера + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + + // Останавливаем HTTP сервер + log.Println("Shutting down HTTP server...") + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Printf("HTTP server forced to shutdown: %v", err) + } else { + log.Println("HTTP server stopped gracefully") + } + + // Отменяем контекст для остановки воркеров + cancel() + + // Создаем канал для уведомления о завершении всех воркеров + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + // Ждем завершения всех воркеров или таймаута в 10 секунд + select { + case <-done: + log.Println("All workers stopped gracefully") + case <-time.After(10 * time.Second): + log.Println("Timeout reached, forcing shutdown") + } + + log.Println("Transcriber service stopped") } func RunMigrations(db *sql.DB, migrationsDir string) error {