Add workers and graceful shutdown

This commit is contained in:
2025-08-12 11:49:18 +03:00
parent 394970e400
commit 6783b7621e
3 changed files with 224 additions and 44 deletions

View File

@@ -3,7 +3,6 @@ package sqlite
import (
"database/sql"
"fmt"
"log"
"time"
"git.vakhrushev.me/av/transcriber/internal/entity"
@@ -151,7 +150,7 @@ func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string,
return nil, fmt.Errorf("failed to build query: %w", err)
}
log.Printf("aquire sql: %s", sql)
// log.Printf("aquire sql: %s", sql)
result, err := repo.db.Exec(sql, args...)
if err != nil {

View File

@@ -1,41 +1,138 @@
package worker
import (
"git.vakhrushev.me/av/transcriber/internal/repo"
"context"
"log"
"time"
"git.vakhrushev.me/av/transcriber/internal/service/transcribe"
)
// Worker представляет базовый интерфейс для всех воркеров
type Worker interface {
Start(ctx context.Context)
Name() string
}
// ConversionWorker обрабатывает задачи конвертации
type ConversionWorker struct {
jobRepo repo.TranscriptJobRepository
fileRepo repo.FileRepository
worker string
transcribeService *transcribe.TranscribeService
}
func NewConversionWorker(jobRepo repo.TranscriptJobRepository, fileRepo repo.FileRepository, worker string) *ConversionWorker {
return &ConversionWorker{jobRepo: jobRepo, fileRepo: fileRepo}
func NewConversionWorker(transcribeService *transcribe.TranscribeService) *ConversionWorker {
return &ConversionWorker{
transcribeService: transcribeService,
}
}
func (w *ConversionWorker) Run() {
w.handleJob()
// for {
// err := w.handleJob()
// if err != nil {
// continue
// }
// }
func (w *ConversionWorker) Name() string {
return "ConversionWorker"
}
func (w *ConversionWorker) handleJob() error {
// job, err := w.jobRepo.FindAndAcquire(entity.StateCreated, w.worker)
// if err != nil {
// return err
// }
func (w *ConversionWorker) Start(ctx context.Context) {
log.Printf("%s started", w.Name())
// job.MoveToState(entity.StateConverted)
for {
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal", w.Name())
return
default:
err := w.transcribeService.FindAndRunConversionJob()
if err != nil {
log.Printf("%s error: %v", w.Name(), err)
}
// err = w.jobRepo.Save(job)
// if err != nil {
// return err
// }
return nil
// Ждем 1 секунду перед следующей итерацией
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal during sleep", w.Name())
return
case <-time.After(1 * time.Second):
// Продолжаем работу
}
}
}
}
// TranscribeWorker обрабатывает задачи транскрипции
type TranscribeWorker struct {
transcribeService *transcribe.TranscribeService
}
func NewTranscribeWorker(transcribeService *transcribe.TranscribeService) *TranscribeWorker {
return &TranscribeWorker{
transcribeService: transcribeService,
}
}
func (w *TranscribeWorker) Name() string {
return "TranscribeWorker"
}
func (w *TranscribeWorker) Start(ctx context.Context) {
log.Printf("%s started", w.Name())
for {
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal", w.Name())
return
default:
err := w.transcribeService.FindAndRunTranscribeJob()
if err != nil {
log.Printf("%s error: %v", w.Name(), err)
}
// Ждем 1 секунду перед следующей итерацией
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal during sleep", w.Name())
return
case <-time.After(1 * time.Second):
// Продолжаем работу
}
}
}
}
// CheckWorker обрабатывает задачи проверки статуса распознавания
type CheckWorker struct {
transcribeService *transcribe.TranscribeService
}
func NewCheckWorker(transcribeService *transcribe.TranscribeService) *CheckWorker {
return &CheckWorker{
transcribeService: transcribeService,
}
}
func (w *CheckWorker) Name() string {
return "CheckWorker"
}
func (w *CheckWorker) Start(ctx context.Context) {
log.Printf("%s started", w.Name())
for {
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal", w.Name())
return
default:
err := w.transcribeService.FindAndRunTranscribeCheckJob()
if err != nil {
log.Printf("%s error: %v", w.Name(), err)
}
// Ждем 1 секунду перед следующей итерацией
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal during sleep", w.Name())
return
case <-time.After(1 * time.Second):
// Продолжаем работу
}
}
}
}

116
main.go
View File

@@ -1,14 +1,21 @@
package main
import (
"context"
"database/sql"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"git.vakhrushev.me/av/transcriber/internal/controller/http"
httpcontroller "git.vakhrushev.me/av/transcriber/internal/controller/http"
"git.vakhrushev.me/av/transcriber/internal/repo/sqlite"
"git.vakhrushev.me/av/transcriber/internal/service/transcribe"
"git.vakhrushev.me/av/transcriber/internal/worker"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/sqlite3"
"github.com/gin-gonic/gin"
@@ -50,38 +57,115 @@ func main() {
transcribeService := transcribe.NewTranscribeService(jobRepo, fileRepo)
// Инициализируем обработчики
transcribeHandler := http.NewTranscribeHandler(jobRepo, transcribeService)
// Создаем воркеры
conversionWorker := worker.NewConversionWorker(transcribeService)
transcribeWorker := worker.NewTranscribeWorker(transcribeService)
checkWorker := worker.NewCheckWorker(transcribeService)
// Создаем Gin роутер
r := gin.Default()
workers := []worker.Worker{
conversionWorker,
transcribeWorker,
checkWorker,
}
// Настраиваем роуты
api := r.Group("/api")
// Создаем контекст для graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Создаем WaitGroup для ожидания завершения всех воркеров
var wg sync.WaitGroup
// Запускаем воркеры в отдельных горутинах
for _, w := range workers {
wg.Add(1)
go func(worker worker.Worker) {
defer wg.Done()
worker.Start(ctx)
log.Printf("%s stopped", worker.Name())
}(w)
}
// Запускаем HTTP сервер для API (создание задач и проверка статуса)
transcribeHandler := httpcontroller.NewTranscribeHandler(jobRepo, transcribeService)
router := gin.Default()
// Настраиваем роуты только для создания задач и проверки статуса
api := router.Group("/api")
{
api.POST("/audio", transcribeHandler.CreateTranscribeJob)
api.GET("/status/:id", transcribeHandler.GetTranscribeJobStatus)
api.POST("/convert", transcribeHandler.RunConversionJob)
api.POST("/transcribe", transcribeHandler.RunTranscribeJob)
api.POST("/check", transcribeHandler.RunRecognitionCheckJob)
}
// Добавляем middleware для обработки больших файлов
r.MaxMultipartMemory = 32 << 20 // 32 MiB
router.MaxMultipartMemory = 32 << 20 // 32 MiB
// Добавляем базовый роут для проверки работоспособности
r.GET("/health", func(c *gin.Context) {
router.GET("/health", func(c *gin.Context) {
c.JSON(200, gin.H{
"status": "ok",
"message": "Transcriber service is running",
})
})
log.Println("Starting server on :8080")
if err := r.Run(":8080"); err != nil {
log.Fatal("Failed to start server:", err)
// Создаем HTTP сервер
srv := &http.Server{
Addr: ":8080",
Handler: router,
}
// Запускаем HTTP сервер в отдельной горутине
wg.Add(1)
go func() {
defer wg.Done()
log.Println("Starting HTTP server on :8080")
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("HTTP server error: %v", err)
}
}()
// Настраиваем обработку сигналов для graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
log.Println("Transcriber service started with background workers")
log.Println("Workers: ConversionWorker, TranscribeWorker, CheckWorker")
log.Println("Press Ctrl+C to stop...")
// Ждем сигнал завершения
<-sigChan
log.Println("Received shutdown signal, initiating graceful shutdown...")
// Создаем контекст с таймаутом для graceful shutdown HTTP сервера
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
// Останавливаем HTTP сервер
log.Println("Shutting down HTTP server...")
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("HTTP server forced to shutdown: %v", err)
} else {
log.Println("HTTP server stopped gracefully")
}
// Отменяем контекст для остановки воркеров
cancel()
// Создаем канал для уведомления о завершении всех воркеров
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
// Ждем завершения всех воркеров или таймаута в 10 секунд
select {
case <-done:
log.Println("All workers stopped gracefully")
case <-time.After(10 * time.Second):
log.Println("Timeout reached, forcing shutdown")
}
log.Println("Transcriber service stopped")
}
func RunMigrations(db *sql.DB, migrationsDir string) error {