Simplify workers and fix metrics

This commit is contained in:
2025-08-13 11:37:05 +03:00
parent eb0dea6113
commit 900c7ecb51
4 changed files with 29 additions and 104 deletions

View File

@@ -10,3 +10,11 @@ type JobNotFoundError struct {
func (e *JobNotFoundError) Error() string { func (e *JobNotFoundError) Error() string {
return fmt.Sprintf("%s - %s", e.State, e.Message) return fmt.Sprintf("%s - %s", e.State, e.Message)
} }
type NoopJobError struct {
State string
}
func (e *NoopJobError) Error() string {
return fmt.Sprintf("%s: no op job occur", e.State)
}

View File

@@ -6,8 +6,8 @@ import (
"strconv" "strconv"
"time" "time"
"git.vakhrushev.me/av/transcriber/internal/contract"
"git.vakhrushev.me/av/transcriber/internal/metrics" "git.vakhrushev.me/av/transcriber/internal/metrics"
"git.vakhrushev.me/av/transcriber/internal/service"
) )
// Worker представляет базовый интерфейс для всех воркеров // Worker представляет базовый интерфейс для всех воркеров
@@ -16,22 +16,20 @@ type Worker interface {
Name() string Name() string
} }
// ConversionWorker обрабатывает задачи конвертации type CallbackWorker struct {
type ConversionWorker struct { name string
transcribeService *service.TranscribeService f func() error
} }
func NewConversionWorker(transcribeService *service.TranscribeService) *ConversionWorker { func NewCallbackWorker(name string, f func() error) *CallbackWorker {
return &ConversionWorker{ return &CallbackWorker{name, f}
transcribeService: transcribeService,
}
} }
func (w *ConversionWorker) Name() string { func (w *CallbackWorker) Name() string {
return "ConversionWorker" return w.name
} }
func (w *ConversionWorker) Start(ctx context.Context) { func (w *CallbackWorker) Start(ctx context.Context) {
log.Printf("%s started", w.Name()) log.Printf("%s started", w.Name())
for { for {
@@ -40,93 +38,12 @@ func (w *ConversionWorker) Start(ctx context.Context) {
log.Printf("%s received shutdown signal", w.Name()) log.Printf("%s received shutdown signal", w.Name())
return return
default: default:
err := w.transcribeService.FindAndRunConversionJob() err := w.f()
metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc() _, isNoop := err.(*contract.NoopJobError)
if err != nil { if !isNoop {
log.Printf("%s error: %v", w.Name(), err) metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc()
} }
if err != nil && !isNoop {
// Ждем 1 секунду перед следующей итерацией
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal during sleep", w.Name())
return
case <-time.After(1 * time.Second):
// Продолжаем работу
}
}
}
}
// TranscribeWorker обрабатывает задачи транскрипции
type TranscribeWorker struct {
transcribeService *service.TranscribeService
}
func NewTranscribeWorker(transcribeService *service.TranscribeService) *TranscribeWorker {
return &TranscribeWorker{
transcribeService: transcribeService,
}
}
func (w *TranscribeWorker) Name() string {
return "TranscribeWorker"
}
func (w *TranscribeWorker) Start(ctx context.Context) {
log.Printf("%s started", w.Name())
for {
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal", w.Name())
return
default:
err := w.transcribeService.FindAndRunTranscribeJob()
metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc()
if err != nil {
log.Printf("%s error: %v", w.Name(), err)
}
// Ждем 1 секунду перед следующей итерацией
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal during sleep", w.Name())
return
case <-time.After(1 * time.Second):
// Продолжаем работу
}
}
}
}
// CheckWorker обрабатывает задачи проверки статуса распознавания
type CheckWorker struct {
transcribeService *service.TranscribeService
}
func NewCheckWorker(transcribeService *service.TranscribeService) *CheckWorker {
return &CheckWorker{
transcribeService: transcribeService,
}
}
func (w *CheckWorker) Name() string {
return "CheckWorker"
}
func (w *CheckWorker) Start(ctx context.Context) {
log.Printf("%s started", w.Name())
for {
select {
case <-ctx.Done():
log.Printf("%s received shutdown signal", w.Name())
return
default:
err := w.transcribeService.FindAndRunTranscribeCheckJob()
metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc()
if err != nil {
log.Printf("%s error: %v", w.Name(), err) log.Printf("%s error: %v", w.Name(), err)
} }

View File

@@ -118,7 +118,7 @@ func (s *TranscribeService) FindAndRunConversionJob() error {
job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime) job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime)
if err != nil { if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok { if _, ok := err.(*contract.JobNotFoundError); ok {
return nil return &contract.NoopJobError{State: entity.StateCreated}
} }
return err return err
} }
@@ -194,7 +194,7 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error {
jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime) jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime)
if err != nil { if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok { if _, ok := err.(*contract.JobNotFoundError); ok {
return nil return &contract.NoopJobError{State: entity.StateConverted}
} }
return err return err
} }
@@ -247,7 +247,7 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime) job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime)
if err != nil { if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok { if _, ok := err.(*contract.JobNotFoundError); ok {
return nil return &contract.NoopJobError{State: entity.StateTranscribe}
} }
return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err) return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err)
} }

View File

@@ -84,9 +84,9 @@ func main() {
// Создаем воркеры // Создаем воркеры
conversionWorker := worker.NewConversionWorker(transcribeService) conversionWorker := worker.NewCallbackWorker("conversion_worker", transcribeService.FindAndRunConversionJob)
transcribeWorker := worker.NewTranscribeWorker(transcribeService) transcribeWorker := worker.NewCallbackWorker("transcribe_worker", transcribeService.FindAndRunTranscribeJob)
checkWorker := worker.NewCheckWorker(transcribeService) checkWorker := worker.NewCallbackWorker("check_worker", transcribeService.FindAndRunTranscribeCheckJob)
workers := []worker.Worker{ workers := []worker.Worker{
conversionWorker, conversionWorker,