diff --git a/internal/contract/error.go b/internal/contract/error.go index 3254a64..12d0b01 100644 --- a/internal/contract/error.go +++ b/internal/contract/error.go @@ -10,3 +10,11 @@ type JobNotFoundError struct { func (e *JobNotFoundError) Error() string { return fmt.Sprintf("%s - %s", e.State, e.Message) } + +type NoopJobError struct { + State string +} + +func (e *NoopJobError) Error() string { + return fmt.Sprintf("%s: no op job occur", e.State) +} diff --git a/internal/controller/worker/worker.go b/internal/controller/worker/worker.go index 46a13e6..7f1c485 100644 --- a/internal/controller/worker/worker.go +++ b/internal/controller/worker/worker.go @@ -6,8 +6,8 @@ import ( "strconv" "time" + "git.vakhrushev.me/av/transcriber/internal/contract" "git.vakhrushev.me/av/transcriber/internal/metrics" - "git.vakhrushev.me/av/transcriber/internal/service" ) // Worker представляет базовый интерфейс для всех воркеров @@ -16,22 +16,20 @@ type Worker interface { Name() string } -// ConversionWorker обрабатывает задачи конвертации -type ConversionWorker struct { - transcribeService *service.TranscribeService +type CallbackWorker struct { + name string + f func() error } -func NewConversionWorker(transcribeService *service.TranscribeService) *ConversionWorker { - return &ConversionWorker{ - transcribeService: transcribeService, - } +func NewCallbackWorker(name string, f func() error) *CallbackWorker { + return &CallbackWorker{name, f} } -func (w *ConversionWorker) Name() string { - return "ConversionWorker" +func (w *CallbackWorker) Name() string { + return w.name } -func (w *ConversionWorker) Start(ctx context.Context) { +func (w *CallbackWorker) Start(ctx context.Context) { log.Printf("%s started", w.Name()) for { @@ -40,93 +38,12 @@ func (w *ConversionWorker) Start(ctx context.Context) { log.Printf("%s received shutdown signal", w.Name()) return default: - err := w.transcribeService.FindAndRunConversionJob() - metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc() - if err != nil { - log.Printf("%s error: %v", w.Name(), err) - } - - // Ждем 1 секунду перед следующей итерацией - select { - case <-ctx.Done(): - log.Printf("%s received shutdown signal during sleep", w.Name()) - return - case <-time.After(1 * time.Second): - // Продолжаем работу - } - } - } -} - -// TranscribeWorker обрабатывает задачи транскрипции -type TranscribeWorker struct { - transcribeService *service.TranscribeService -} - -func NewTranscribeWorker(transcribeService *service.TranscribeService) *TranscribeWorker { - return &TranscribeWorker{ - transcribeService: transcribeService, - } -} - -func (w *TranscribeWorker) Name() string { - return "TranscribeWorker" -} - -func (w *TranscribeWorker) Start(ctx context.Context) { - log.Printf("%s started", w.Name()) - - for { - select { - case <-ctx.Done(): - log.Printf("%s received shutdown signal", w.Name()) - return - default: - err := w.transcribeService.FindAndRunTranscribeJob() - metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc() - if err != nil { - log.Printf("%s error: %v", w.Name(), err) - } - - // Ждем 1 секунду перед следующей итерацией - select { - case <-ctx.Done(): - log.Printf("%s received shutdown signal during sleep", w.Name()) - return - case <-time.After(1 * time.Second): - // Продолжаем работу - } - } - } -} - -// CheckWorker обрабатывает задачи проверки статуса распознавания -type CheckWorker struct { - transcribeService *service.TranscribeService -} - -func NewCheckWorker(transcribeService *service.TranscribeService) *CheckWorker { - return &CheckWorker{ - transcribeService: transcribeService, - } -} - -func (w *CheckWorker) Name() string { - return "CheckWorker" -} - -func (w *CheckWorker) Start(ctx context.Context) { - log.Printf("%s started", w.Name()) - - for { - select { - case <-ctx.Done(): - log.Printf("%s received shutdown signal", w.Name()) - return - default: - err := w.transcribeService.FindAndRunTranscribeCheckJob() - metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc() - if err != nil { + err := w.f() + _, isNoop := err.(*contract.NoopJobError) + if !isNoop { + metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc() + } + if err != nil && !isNoop { log.Printf("%s error: %v", w.Name(), err) } diff --git a/internal/service/transcribe.go b/internal/service/transcribe.go index 905085c..3ca90a2 100644 --- a/internal/service/transcribe.go +++ b/internal/service/transcribe.go @@ -118,7 +118,7 @@ func (s *TranscribeService) FindAndRunConversionJob() error { job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime) if err != nil { if _, ok := err.(*contract.JobNotFoundError); ok { - return nil + return &contract.NoopJobError{State: entity.StateCreated} } return err } @@ -194,7 +194,7 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime) if err != nil { if _, ok := err.(*contract.JobNotFoundError); ok { - return nil + return &contract.NoopJobError{State: entity.StateConverted} } return err } @@ -247,7 +247,7 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime) if err != nil { if _, ok := err.(*contract.JobNotFoundError); ok { - return nil + return &contract.NoopJobError{State: entity.StateTranscribe} } return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err) } diff --git a/main.go b/main.go index 6cbc5a7..918d7e4 100644 --- a/main.go +++ b/main.go @@ -84,9 +84,9 @@ func main() { // Создаем воркеры - conversionWorker := worker.NewConversionWorker(transcribeService) - transcribeWorker := worker.NewTranscribeWorker(transcribeService) - checkWorker := worker.NewCheckWorker(transcribeService) + conversionWorker := worker.NewCallbackWorker("conversion_worker", transcribeService.FindAndRunConversionJob) + transcribeWorker := worker.NewCallbackWorker("transcribe_worker", transcribeService.FindAndRunTranscribeJob) + checkWorker := worker.NewCallbackWorker("check_worker", transcribeService.FindAndRunTranscribeCheckJob) workers := []worker.Worker{ conversionWorker,