diff --git a/internal/repo/contracts.go b/internal/repo/contracts.go index e0c6839..c60b6f8 100644 --- a/internal/repo/contracts.go +++ b/internal/repo/contracts.go @@ -1,11 +1,21 @@ package repo import ( + "fmt" "time" "git.vakhrushev.me/av/transcriber/internal/entity" ) +type JobNotFoundError struct { + State string + Message string +} + +func (e *JobNotFoundError) Error() string { + return fmt.Sprintf("%s - %s", e.State, e.Message) +} + type FileRepository interface { Create(file *entity.File) error GetByID(id string) (*entity.File, error) diff --git a/internal/repo/sqlite/transcript_job_repo.go b/internal/repo/sqlite/transcript_job_repo.go index 560f246..ad16146 100644 --- a/internal/repo/sqlite/transcript_job_repo.go +++ b/internal/repo/sqlite/transcript_job_repo.go @@ -6,6 +6,7 @@ import ( "time" "git.vakhrushev.me/av/transcriber/internal/entity" + contracts "git.vakhrushev.me/av/transcriber/internal/repo" "github.com/doug-martin/goqu/v9" ) @@ -160,6 +161,10 @@ func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string, if err != nil { return nil, fmt.Errorf("failed check affected rows: %w", err) } + if rowsAffected == 0 { + e := contracts.JobNotFoundError{State: state, Message: "appropriate job not found"} + return nil, &e + } if rowsAffected != 1 { return nil, fmt.Errorf("unexpected affected rows count: %d", rowsAffected) } diff --git a/internal/service/speechkit/speechkit.go b/internal/service/speechkit/speechkit.go index 495fee9..beccc03 100644 --- a/internal/service/speechkit/speechkit.go +++ b/internal/service/speechkit/speechkit.go @@ -3,7 +3,6 @@ package speechkit import ( "context" "fmt" - "log" "os" "strings" @@ -150,10 +149,12 @@ func (s *SpeechKitService) GetRecognitionText(operationID string) (string, error } return "", fmt.Errorf("failed to receive recognition response: %w", err) } - if final := resp.GetFinal(); final != nil { - for _, alt := range final.Alternatives { - sb.WriteString(alt.Text) - sb.WriteString(" ") + if refinement := resp.GetFinalRefinement(); refinement != nil { + if text := refinement.GetNormalizedText(); text != nil { + for _, alt := range text.Alternatives { + sb.WriteString(alt.Text) + sb.WriteString(" ") + } } } } @@ -168,8 +169,6 @@ func (s *SpeechKitService) CheckOperationStatus(operationID string) (*operation. ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey) ctx = metadata.AppendToOutgoingContext(ctx, "x-folder-id", s.folderID) - log.Printf("Check operation status: id %s\n", operationID) - op, err := s.opClient.Get(ctx, &operation.GetOperationRequest{ OperationId: operationID, }) diff --git a/internal/service/transcribe/transcribe.go b/internal/service/transcribe/transcribe.go index f6553f4..730fb3e 100644 --- a/internal/service/transcribe/transcribe.go +++ b/internal/service/transcribe/transcribe.go @@ -3,6 +3,7 @@ package transcribe import ( "fmt" "io" + "log" "os" "path/filepath" "time" @@ -94,6 +95,9 @@ func (s *TranscribeService) FindAndRunConversionJob() error { job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime) if err != nil { + if _, ok := err.(*repo.JobNotFoundError); ok { + return nil + } return err } @@ -150,6 +154,9 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime) if err != nil { + if _, ok := err.(*repo.JobNotFoundError); ok { + return nil + } return err } @@ -193,7 +200,7 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { // Обновляем задачу с ID операции распознавания jobRecord.FileID = &destFileId jobRecord.RecognitionOpID = &operationID - delayTime := time.Now().Add(time.Minute) + delayTime := time.Now().Add(10 * time.Second) jobRecord.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) err = s.fileRepo.Create(destFileRecord) @@ -215,6 +222,9 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime) if err != nil { + if _, ok := err.(*repo.JobNotFoundError); ok { + return nil + } return err } @@ -229,14 +239,18 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { } defer speechKitService.Close() + opId := *job.RecognitionOpID + // Проверяем статус операции - operation, err := speechKitService.CheckOperationStatus(*job.RecognitionOpID) + log.Printf("Check operation status: id %s\n", opId) + operation, err := speechKitService.CheckOperationStatus(opId) if err != nil { return err } if !operation.Done { // Операция еще не завершена, оставляем в статусе обработки + log.Printf("Operation in progress: id %s\n", opId) delayTime := time.Now().Add(10 * time.Second) job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) err := s.jobRepo.Save(job) @@ -247,7 +261,8 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { } if opErr := operation.GetError(); opErr != nil { - errorText := fmt.Sprintf("Operation failed: code %d, message: %s", opErr.Code, opErr.Message) + errorText := fmt.Sprintf("operation failed: code %d, message: %s", opErr.Code, opErr.Message) + log.Printf("Operation failed: id %s, message %s\n", opId, errorText) job.Fail(errorText) err := s.jobRepo.Save(job) if err != nil { @@ -262,6 +277,8 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { return err } + log.Printf("Operation done: id %s\n", opId) + // Обновляем задачу с результатом job.Done(transcriptionText)