More granular error handling
+ task queue refactoring
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
@@ -26,6 +27,7 @@ type TranscribeService struct {
|
||||
metaviewer contract.AudioMetaViewer
|
||||
converter contract.AudioFileConverter
|
||||
recognizer contract.AudioRecognizer
|
||||
tgSender contract.TelegramMessageSender
|
||||
storagePath string
|
||||
logger *slog.Logger
|
||||
}
|
||||
@@ -36,6 +38,7 @@ func NewTranscribeService(
|
||||
metaviewer contract.AudioMetaViewer,
|
||||
converter contract.AudioFileConverter,
|
||||
recognizer contract.AudioRecognizer,
|
||||
tgSender contract.TelegramMessageSender,
|
||||
storagePath string,
|
||||
logger *slog.Logger,
|
||||
) *TranscribeService {
|
||||
@@ -45,12 +48,47 @@ func NewTranscribeService(
|
||||
metaviewer: metaviewer,
|
||||
converter: converter,
|
||||
recognizer: recognizer,
|
||||
tgSender: tgSender,
|
||||
storagePath: storagePath,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) (*entity.TranscribeJob, error) {
|
||||
func (s *TranscribeService) CreateJobFromTelegram(file io.Reader, fileName string, chatId int64, replyMsgId int) (*entity.TranscribeJob, error) {
|
||||
jobId := uuid.NewString()
|
||||
now := time.Now()
|
||||
|
||||
job := &entity.TranscribeJob{
|
||||
Id: jobId,
|
||||
State: entity.StateCreated,
|
||||
Source: entity.SourceTelegram,
|
||||
TgChatId: &chatId,
|
||||
TgReplyMessageId: &replyMsgId,
|
||||
IsError: false,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
return s.createTranscribeJob(job, file, fileName)
|
||||
}
|
||||
|
||||
func (s *TranscribeService) CreateJobFromApi(file io.Reader, fileName string) (*entity.TranscribeJob, error) {
|
||||
jobId := uuid.NewString()
|
||||
now := time.Now()
|
||||
|
||||
job := &entity.TranscribeJob{
|
||||
Id: jobId,
|
||||
State: entity.StateCreated,
|
||||
Source: entity.SourceApi,
|
||||
IsError: false,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
return s.createTranscribeJob(job, file, fileName)
|
||||
}
|
||||
|
||||
func (s *TranscribeService) createTranscribeJob(job *entity.TranscribeJob, file io.Reader, fileName string) (*entity.TranscribeJob, error) {
|
||||
// Генерируем UUID для файла
|
||||
fileId := uuid.NewString()
|
||||
|
||||
@@ -119,42 +157,25 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
jobId := uuid.NewString()
|
||||
now := time.Now()
|
||||
|
||||
// Создаем запись в таблице transcribe_jobs
|
||||
job := &entity.TranscribeJob{
|
||||
Id: jobId,
|
||||
State: entity.StateCreated,
|
||||
FileID: &fileId,
|
||||
IsError: false,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
job.FileID = &fileId
|
||||
|
||||
if err := s.jobRepo.Create(job); err != nil {
|
||||
s.logger.Error("Failed to create job record", "error", err, "job_id", jobId)
|
||||
s.logger.Error("Failed to create job record", "error", err, "job_id", job.Id)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.logger.Info("Transcribe job created successfully", "job_id", jobId, "file_id", fileId)
|
||||
s.logger.Info("Transcribe job created successfully", "job_id", job.Id, "file_id", fileId)
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (s *TranscribeService) FindAndRunConversionJob() error {
|
||||
acquisitionId := uuid.NewString()
|
||||
rottingTime := time.Now().Add(-1 * time.Hour)
|
||||
|
||||
job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime)
|
||||
job, err := s.findJob(entity.StateCreated, time.Hour)
|
||||
if err != nil {
|
||||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||||
return &contract.NoopJobError{State: entity.StateCreated}
|
||||
}
|
||||
s.logger.Error("Failed to find and acquire conversion job", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Info("Starting conversion job", "job_id", job.Id, "acquisition_id", acquisitionId)
|
||||
s.logger.Info("Starting conversion job", "job_id", job.Id)
|
||||
|
||||
srcFile, err := s.fileRepo.GetByID(*job.FileID)
|
||||
if err != nil {
|
||||
@@ -195,7 +216,7 @@ func (s *TranscribeService) FindAndRunConversionJob() error {
|
||||
"error", err,
|
||||
"job_id", job.Id,
|
||||
"duration", conversionDuration)
|
||||
return err
|
||||
return s.failJob(job, err, "сбой конвертации файла")
|
||||
}
|
||||
|
||||
stat, err := os.Stat(destFilePath)
|
||||
@@ -241,23 +262,16 @@ func (s *TranscribeService) FindAndRunConversionJob() error {
|
||||
}
|
||||
|
||||
func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
||||
acquisitionId := uuid.NewString()
|
||||
rottingTime := time.Now().Add(-1 * time.Hour)
|
||||
|
||||
jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime)
|
||||
job, err := s.findJob(entity.StateConverted, time.Hour)
|
||||
if err != nil {
|
||||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||||
return &contract.NoopJobError{State: entity.StateConverted}
|
||||
}
|
||||
s.logger.Error("Failed to find and acquire transcribe job", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Info("Starting transcribe job", "job_id", jobRecord.Id, "acquisition_id", acquisitionId)
|
||||
s.logger.Info("Starting transcribe job", "job_id", job.Id)
|
||||
|
||||
fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID)
|
||||
fileRecord, err := s.fileRepo.GetByID(*job.FileID)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to get file record", "error", err, "file_id", *jobRecord.FileID)
|
||||
s.logger.Error("Failed to get file record", "error", err, "file_id", *job.FileID)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -273,24 +287,24 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
||||
destFileId := uuid.NewString()
|
||||
destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3)
|
||||
|
||||
s.logger.Info("Starting recognition", "job_id", jobRecord.Id, "file_path", filePath)
|
||||
s.logger.Info("Starting recognition", "job_id", job.Id, "file_path", filePath)
|
||||
|
||||
// Запускаем асинхронное распознавание
|
||||
operationID, err := s.recognizer.Recognize(file, destFileRecord.FileName)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to start recognition", "error", err, "job_id", jobRecord.Id)
|
||||
s.logger.Error("Failed to start recognition", "error", err, "job_id", job.Id)
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Info("Recognition started",
|
||||
"job_id", jobRecord.Id,
|
||||
"job_id", job.Id,
|
||||
"operation_id", operationID)
|
||||
|
||||
// Обновляем задачу с ID операции распознавания
|
||||
jobRecord.FileID = &destFileId
|
||||
jobRecord.RecognitionOpID = &operationID
|
||||
job.FileID = &destFileId
|
||||
job.RecognitionOpID = &operationID
|
||||
delayTime := time.Now().Add(10 * time.Second)
|
||||
jobRecord.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
|
||||
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
|
||||
|
||||
err = s.fileRepo.Create(destFileRecord)
|
||||
if err != nil {
|
||||
@@ -298,27 +312,20 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.jobRepo.Save(jobRecord)
|
||||
err = s.jobRepo.Save(job)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to save job", "error", err, "job_id", jobRecord.Id)
|
||||
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Info("Transcribe job updated successfully", "job_id", jobRecord.Id)
|
||||
s.logger.Info("Transcribe job updated successfully", "job_id", job.Id)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
|
||||
acquisitionId := uuid.NewString()
|
||||
rottingTime := time.Now().Add(-24 * time.Hour)
|
||||
|
||||
job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime)
|
||||
job, err := s.findJob(entity.StateTranscribe, 24*time.Hour)
|
||||
if err != nil {
|
||||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||||
return &contract.NoopJobError{State: entity.StateTranscribe}
|
||||
}
|
||||
s.logger.Error("Failed to find and acquire transcribe check job", "error", err)
|
||||
return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if job.RecognitionOpID == nil {
|
||||
@@ -339,7 +346,7 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
|
||||
if recResult.IsInProgress() {
|
||||
// Операция еще не завершена, оставляем в статусе обработки
|
||||
s.logger.Info("Operation in progress", "job_id", job.Id, "operation_id", opId)
|
||||
delayTime := time.Now().Add(10 * time.Second)
|
||||
delayTime := time.Now().Add(5 * time.Second)
|
||||
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
|
||||
err := s.jobRepo.Save(job)
|
||||
if err != nil {
|
||||
@@ -355,13 +362,7 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
|
||||
"job_id", job.Id,
|
||||
"operation_id", opId,
|
||||
"error_message", errorText)
|
||||
job.Fail(errorText)
|
||||
err := s.jobRepo.Save(job)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to save failed job", "error", err, "job_id", job.Id)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.failJob(job, errors.New(errorText), "сбой при распознавании файла")
|
||||
}
|
||||
|
||||
// Операция завершена, получаем результат
|
||||
@@ -371,20 +372,85 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Info("Operation completed successfully",
|
||||
s.logger.Info("Transcribe operation completed successfully",
|
||||
"job_id", job.Id,
|
||||
"operation_id", opId,
|
||||
"text_length", len(transcriptionText))
|
||||
|
||||
// Завершаем задачу
|
||||
return s.completeJob(job, transcriptionText)
|
||||
}
|
||||
|
||||
func (s *TranscribeService) findJob(state string, expiration time.Duration) (job *entity.TranscribeJob, err error) {
|
||||
acquisitionId := uuid.NewString()
|
||||
rottingTime := time.Now().Add(-1 * expiration)
|
||||
|
||||
job, err = s.jobRepo.FindAndAcquire(state, acquisitionId, rottingTime)
|
||||
if err != nil {
|
||||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||||
return nil, &contract.NoopJobError{State: state}
|
||||
}
|
||||
s.logger.Error("Failed to find and acquire job", "state", state, "error", err)
|
||||
return nil, fmt.Errorf("failed find and acquire job: %s, %w", state, err)
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (s *TranscribeService) completeJob(job *entity.TranscribeJob, transcriptionText string) error {
|
||||
// Обновляем задачу с результатом
|
||||
job.Done(transcriptionText)
|
||||
|
||||
err = s.jobRepo.Save(job)
|
||||
// Сохраняем задачу в базу
|
||||
err := s.jobRepo.Save(job)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to save completed job", "error", err, "job_id", job.Id)
|
||||
return err
|
||||
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
|
||||
return fmt.Errorf("failed to save job: %w", err)
|
||||
}
|
||||
|
||||
// Отправляем распознанный текст обратно пользователю
|
||||
switch job.Source {
|
||||
case entity.SourceTelegram:
|
||||
if job.TgChatId == nil {
|
||||
s.logger.Error("Telegram chat not specified", "job_id", job.Id)
|
||||
return fmt.Errorf("tg chat id not specified, job id: %s", job.Id)
|
||||
}
|
||||
err := s.tgSender.Send(transcriptionText, *job.TgChatId, job.TgReplyMessageId)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to sent transcription text to client", "job_id", job.Id)
|
||||
return fmt.Errorf("failed to sent message to client, job id: %s, err: %w", job.Id, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *TranscribeService) failJob(job *entity.TranscribeJob, jobErr error, humanErrorText string) error {
|
||||
// Обновляем задачу с результатом
|
||||
job.Fail(jobErr.Error())
|
||||
|
||||
// Сохраняем задачу в базу
|
||||
err := s.jobRepo.Save(job)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
|
||||
return fmt.Errorf("failed to save job: %w", err)
|
||||
}
|
||||
|
||||
// Отправляем распознанный текст обратно пользователю
|
||||
switch job.Source {
|
||||
case entity.SourceTelegram:
|
||||
if job.TgChatId == nil {
|
||||
s.logger.Error("Telegram chat not specified", "job_id", job.Id)
|
||||
return fmt.Errorf("tg chat id not specified, job id: %s", job.Id)
|
||||
}
|
||||
|
||||
errorMessage := fmt.Sprintf("При обработке задачи произошла ошибка: %s.\nПожалуйста, попробуйте еще раз.", humanErrorText)
|
||||
err := s.tgSender.Send(errorMessage, *job.TgChatId, job.TgReplyMessageId)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to sent message to client", "job_id", job.Id)
|
||||
return fmt.Errorf("failed to sent message to client, job id: %s, err: %w", job.Id, err)
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Info("Transcribe check job completed successfully", "job_id", job.Id)
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user