diff --git a/internal/adapter/repo/sqlite/transcript_job_repo.go b/internal/adapter/repo/sqlite/transcript_job_repo.go index 679a28f..c7b170a 100644 --- a/internal/adapter/repo/sqlite/transcript_job_repo.go +++ b/internal/adapter/repo/sqlite/transcript_job_repo.go @@ -21,18 +21,21 @@ func NewTranscriptJobRepository(db *sql.DB, gq *goqu.Database) *TranscriptJobRep func (repo *TranscriptJobRepository) Create(job *entity.TranscribeJob) error { record := goqu.Record{ - "id": job.Id, - "state": job.State, - "file_id": job.FileID, - "is_error": job.IsError, - "error_text": job.ErrorText, - "acquisition_id": job.AcquisitionID, - "acquire_time": job.AcquireTime, - "delay_time": job.DelayTime, - "recognition_op_id": job.RecognitionOpID, - "transcription_text": job.TranscriptionText, - "created_at": job.CreatedAt, - "updated_at": job.UpdatedAt, + "id": job.Id, + "state": job.State, + "source": job.Source, + "file_id": job.FileID, + "is_error": job.IsError, + "error_text": job.ErrorText, + "acquisition_id": job.AcquisitionID, + "acquire_time": job.AcquireTime, + "delay_time": job.DelayTime, + "recognition_op_id": job.RecognitionOpID, + "transcription_text": job.TranscriptionText, + "tg_chat_id": job.TgChatId, + "tg_reply_message_id": job.TgReplyMessageId, + "created_at": job.CreatedAt, + "updated_at": job.UpdatedAt, } query := repo.gq.Insert("transcribe_jobs").Rows(record) sql, args, err := query.ToSQL() @@ -50,16 +53,19 @@ func (repo *TranscriptJobRepository) Create(job *entity.TranscribeJob) error { func (repo *TranscriptJobRepository) Save(job *entity.TranscribeJob) error { record := goqu.Record{ - "state": job.State, - "file_id": job.FileID, - "is_error": job.IsError, - "error_text": job.ErrorText, - "acquisition_id": job.AcquisitionID, - "acquire_time": job.AcquireTime, - "delay_time": job.DelayTime, - "recognition_op_id": job.RecognitionOpID, - "transcription_text": job.TranscriptionText, - "updated_at": job.UpdatedAt, + "state": job.State, + "source": job.Source, + "file_id": job.FileID, + "is_error": job.IsError, + "error_text": job.ErrorText, + "acquisition_id": job.AcquisitionID, + "acquire_time": job.AcquireTime, + "delay_time": job.DelayTime, + "recognition_op_id": job.RecognitionOpID, + "transcription_text": job.TranscriptionText, + "tg_chat_id": job.TgChatId, + "tg_reply_message_id": job.TgReplyMessageId, + "updated_at": job.UpdatedAt, } query := repo.gq.Update("transcribe_jobs").Set(record).Where(goqu.C("id").Eq(job.Id)) sql, args, err := query.ToSQL() @@ -79,6 +85,7 @@ func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob, query := repo.gq.From("transcribe_jobs").Select( "id", "state", + "source", "file_id", "is_error", "error_text", @@ -87,6 +94,8 @@ func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob, "delay_time", "recognition_op_id", "transcription_text", + "tg_chat_id", + "tg_reply_message_id", "created_at", "updated_at", ).Where(goqu.C("id").Eq(id)) @@ -99,6 +108,7 @@ func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob, err = repo.db.QueryRow(sql, args...).Scan( &job.Id, &job.State, + &job.Source, &job.FileID, &job.IsError, &job.ErrorText, @@ -107,6 +117,8 @@ func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob, &job.DelayTime, &job.RecognitionOpID, &job.TranscriptionText, + &job.TgChatId, + &job.TgReplyMessageId, &job.CreatedAt, &job.UpdatedAt, ) @@ -172,6 +184,7 @@ func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string, selectQuery := repo.gq.From("transcribe_jobs").Select( "id", "state", + "source", "file_id", "is_error", "error_text", @@ -180,6 +193,8 @@ func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string, "delay_time", "recognition_op_id", "transcription_text", + "tg_chat_id", + "tg_reply_message_id", "created_at", "updated_at", ).Where(goqu.C("acquisition_id").Eq(acquisitionId)) @@ -193,6 +208,7 @@ func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string, err = repo.db.QueryRow(sql, args...).Scan( &job.Id, &job.State, + &job.Source, &job.FileID, &job.IsError, &job.ErrorText, @@ -201,6 +217,8 @@ func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string, &job.DelayTime, &job.RecognitionOpID, &job.TranscriptionText, + &job.TgChatId, + &job.TgReplyMessageId, &job.CreatedAt, &job.UpdatedAt, ) diff --git a/internal/adapter/telegram/sender.go b/internal/adapter/telegram/sender.go new file mode 100644 index 0000000..1d64d57 --- /dev/null +++ b/internal/adapter/telegram/sender.go @@ -0,0 +1,37 @@ +package telegram + +import ( + "log/slog" + + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" +) + +type TelegramMessageSender struct { + bot *tgbotapi.BotAPI + logger *slog.Logger +} + +func NewTelegramMessageSender(botToken string, logger *slog.Logger) (*TelegramMessageSender, error) { + bot, err := tgbotapi.NewBotAPI(botToken) + if err != nil { + return nil, err + } + + return &TelegramMessageSender{ + bot: bot, + logger: logger, + }, nil +} + +func (s *TelegramMessageSender) Send(text string, chatId int64, replyToMessageId *int) error { + resultMsg := tgbotapi.NewMessage(chatId, text) + if replyToMessageId != nil { + resultMsg.ReplyToMessageID = *replyToMessageId + } + _, err := s.bot.Send(resultMsg) + if err != nil { + s.logger.Error("Failed to send message to tg bot", "error", err) + return err + } + return nil +} diff --git a/internal/contract/contract.go b/internal/contract/contract.go index eabf0ba..894e110 100644 --- a/internal/contract/contract.go +++ b/internal/contract/contract.go @@ -23,3 +23,7 @@ type AudioRecognizer interface { GetRecognitionText(operationID string) (string, error) CheckRecognitionStatus(operationID string) (*entity.RecognitionResult, error) } + +type TelegramMessageSender interface { + Send(text string, chatId int64, replyToMessageId *int) error +} diff --git a/internal/controller/http/transcribe.go b/internal/controller/http/transcribe.go index 4cf73f8..08394b2 100644 --- a/internal/controller/http/transcribe.go +++ b/internal/controller/http/transcribe.go @@ -40,7 +40,7 @@ func (h *TranscribeHandler) CreateTranscribeJob(c *gin.Context) { } defer file.Close() - job, err := h.trsService.CreateTranscribeJob(file, header.Filename) + job, err := h.trsService.CreateJobFromApi(file, header.Filename) if err != nil { log.Printf("Err: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create transcibe job"}) diff --git a/internal/controller/http/transcribe_test.go b/internal/controller/http/transcribe_test.go index 8014522..7dbe689 100644 --- a/internal/controller/http/transcribe_test.go +++ b/internal/controller/http/transcribe_test.go @@ -69,7 +69,16 @@ func setupTestRouter(t *testing.T) (*gin.Engine, *TranscribeHandler) { Level: slog.LevelError, // Только ошибки в тестах })) - trsService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer, "data/files", logger) + trsService := service.NewTranscribeService( + jobRepo, + fileRepo, + metaviewer, + converter, + recognizer, + &TestTgSender{}, + "data/files", + logger, + ) handler := NewTranscribeHandler(jobRepo, trsService) @@ -374,3 +383,9 @@ func TestGetTranscribeJobStatus_NotFound(t *testing.T) { assert.Equal(t, "Job not found", response["error"]) } + +type TestTgSender struct{} + +func (s *TestTgSender) Send(msg string, chatId int64, replyMsgId *int) error { + return nil +} diff --git a/internal/controller/tg/tg.go b/internal/controller/tg/tg.go index 887610e..59ebbbd 100644 --- a/internal/controller/tg/tg.go +++ b/internal/controller/tg/tg.go @@ -7,7 +7,6 @@ import ( "net/http" "slices" "strings" - "time" "git.vakhrushev.me/av/transcriber/internal/contract" "git.vakhrushev.me/av/transcriber/internal/service" @@ -170,7 +169,7 @@ func (c *TelegramController) handleAudioMessage(message *tgbotapi.Message) { defer fileReader.Close() // Обрабатываем файл - job, err := c.transcribeService.CreateTranscribeJob(fileReader, fileName) + job, err := c.transcribeService.CreateJobFromTelegram(fileReader, fileName, message.Chat.ID, sentProgressMsg.MessageID) if err != nil { c.logger.Error("Failed to create transcribe job", "error", err) errorMsg := tgbotapi.NewMessage(message.Chat.ID, "Ошибка при создании задачи на расшифровку. Попробуйте еще раз.") @@ -182,9 +181,6 @@ func (c *TelegramController) handleAudioMessage(message *tgbotapi.Message) { successMsg := tgbotapi.NewMessage(message.Chat.ID, fmt.Sprintf("Задача на расшифровку создана. ID задачи: %s", job.Id)) successMsg.ReplyToMessageID = message.MessageID c.send(successMsg) - - // Отправляем результат расшифровки (асинхронно) - go c.sendTranscriptionResult(job.Id, message.Chat.ID, sentProgressMsg.MessageID) } func (c *TelegramController) handleVoiceMessage(message *tgbotapi.Message) { @@ -208,7 +204,7 @@ func (c *TelegramController) handleVoiceMessage(message *tgbotapi.Message) { defer fileReader.Close() // Обрабатываем файл - job, err := c.transcribeService.CreateTranscribeJob(fileReader, fileName) + job, err := c.transcribeService.CreateJobFromTelegram(fileReader, fileName, message.Chat.ID, sentProgressMsg.MessageID) if err != nil { c.logger.Error("Failed to create transcribe job", "error", err) errorMsg := tgbotapi.NewMessage(message.Chat.ID, "Ошибка при создании задачи на расшифровку. Попробуйте еще раз.") @@ -220,9 +216,6 @@ func (c *TelegramController) handleVoiceMessage(message *tgbotapi.Message) { successMsg := tgbotapi.NewMessage(message.Chat.ID, fmt.Sprintf("Задача на расшифровку создана. ID задачи: %s", job.Id)) successMsg.ReplyToMessageID = message.MessageID c.send(successMsg) - - // Отправляем результат расшифровки (асинхронно) - go c.sendTranscriptionResult(job.Id, message.Chat.ID, sentProgressMsg.MessageID) } func (c *TelegramController) handleDocumentMessage(message *tgbotapi.Message) { @@ -251,7 +244,7 @@ func (c *TelegramController) handleDocumentMessage(message *tgbotapi.Message) { defer fileReader.Close() // Обрабатываем файл - job, err := c.transcribeService.CreateTranscribeJob(fileReader, fileName) + job, err := c.transcribeService.CreateJobFromTelegram(fileReader, fileName, message.Chat.ID, sentProgressMsg.MessageID) if err != nil { c.logger.Error("Failed to create transcribe job", "error", err) errorMsg := tgbotapi.NewMessage(message.Chat.ID, "Ошибка при создании задачи на расшифровку. Попробуйте еще раз.") @@ -263,9 +256,6 @@ func (c *TelegramController) handleDocumentMessage(message *tgbotapi.Message) { successMsg := tgbotapi.NewMessage(message.Chat.ID, fmt.Sprintf("Задача на расшифровку создана. ID задачи: %s", job.Id)) successMsg.ReplyToMessageID = message.MessageID c.send(successMsg) - - // Отправляем результат расшифровки (асинхронно) - go c.sendTranscriptionResult(job.Id, message.Chat.ID, sentProgressMsg.MessageID) } func (c *TelegramController) downloadAudioFile(fileID string) (io.ReadCloser, string, error) { @@ -291,59 +281,6 @@ func (c *TelegramController) downloadAudioFile(fileID string) (io.ReadCloser, st return resp.Body, fileName, nil } -func (c *TelegramController) sendTranscriptionResult(jobID string, chatID int64, progressMessageID int) { - // Периодически проверяем статус задачи - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - timeout := time.After(10 * time.Minute) // Максимальное время ожидания 10 минут - - for { - select { - case <-ticker.C: - // Проверяем статус задачи - job, err := c.jobRepo.GetByID(jobID) - if err != nil { - c.logger.Error("Failed to get job", "job_id", jobID, "error", err) - continue - } - - switch job.State { - case "done": - // Отправляем результат - if job.TranscriptionText != nil { - resultMsg := tgbotapi.NewMessage(chatID, *job.TranscriptionText) - resultMsg.ReplyToMessageID = progressMessageID - c.send(resultMsg) - } else { - resultMsg := tgbotapi.NewMessage(chatID, "Расшифровка завершена, но текст пуст.") - resultMsg.ReplyToMessageID = progressMessageID - c.send(resultMsg) - } - return - case "failed": - // Отправляем сообщение об ошибке - var errorMsg string - if job.ErrorText != nil { - errorMsg = fmt.Sprintf("Ошибка при расшифровке: %s", *job.ErrorText) - } else { - errorMsg = "Ошибка при расшифровке аудиофайла." - } - resultMsg := tgbotapi.NewMessage(chatID, errorMsg) - resultMsg.ReplyToMessageID = progressMessageID - c.send(resultMsg) - return - } - case <-timeout: - // Время ожидания истекло - resultMsg := tgbotapi.NewMessage(chatID, "Время ожидания результата расшифровки истекло. Попробуйте позже проверить статус задачи.") - resultMsg.ReplyToMessageID = progressMessageID - c.send(resultMsg) - return - } - } -} - func (c *TelegramController) isAudioDocument(document *tgbotapi.Document) bool { // Проверяем MIME-тип документа if document.MimeType != "" { diff --git a/internal/entity/job.go b/internal/entity/job.go index ef7de4a..86e074a 100644 --- a/internal/entity/job.go +++ b/internal/entity/job.go @@ -7,6 +7,7 @@ import ( type TranscribeJob struct { Id string State string + Source string FileID *string IsError bool ErrorText *string @@ -15,6 +16,8 @@ type TranscribeJob struct { DelayTime *time.Time RecognitionOpID *string // ID операции распознавания в Yandex Cloud TranscriptionText *string // Результат распознавания + TgChatId *int64 // Telegram: в какой чат отправить результат распознавания + TgReplyMessageId *int // Telegram: с каким сообщением связать результат распознавания CreatedAt time.Time UpdatedAt time.Time } @@ -27,6 +30,14 @@ const ( StateFailed = "failed" ) +const ( + SourceUnknown = "unknown" + SourceApi = "api" + SourceTelegram = "telegram" +) + +// Переводит задачу в новое состояние, при этом очищает все +// служебные поля предыдущего состояния, как-то время задержки, информацию о воркере и тд func (j *TranscribeJob) MoveToState(state string) { j.State = state j.DelayTime = nil diff --git a/internal/service/transcribe.go b/internal/service/transcribe.go index 5629c8b..266cf11 100644 --- a/internal/service/transcribe.go +++ b/internal/service/transcribe.go @@ -1,6 +1,7 @@ package service import ( + "errors" "fmt" "io" "log/slog" @@ -26,6 +27,7 @@ type TranscribeService struct { metaviewer contract.AudioMetaViewer converter contract.AudioFileConverter recognizer contract.AudioRecognizer + tgSender contract.TelegramMessageSender storagePath string logger *slog.Logger } @@ -36,6 +38,7 @@ func NewTranscribeService( metaviewer contract.AudioMetaViewer, converter contract.AudioFileConverter, recognizer contract.AudioRecognizer, + tgSender contract.TelegramMessageSender, storagePath string, logger *slog.Logger, ) *TranscribeService { @@ -45,12 +48,47 @@ func NewTranscribeService( metaviewer: metaviewer, converter: converter, recognizer: recognizer, + tgSender: tgSender, storagePath: storagePath, logger: logger, } } -func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) (*entity.TranscribeJob, error) { +func (s *TranscribeService) CreateJobFromTelegram(file io.Reader, fileName string, chatId int64, replyMsgId int) (*entity.TranscribeJob, error) { + jobId := uuid.NewString() + now := time.Now() + + job := &entity.TranscribeJob{ + Id: jobId, + State: entity.StateCreated, + Source: entity.SourceTelegram, + TgChatId: &chatId, + TgReplyMessageId: &replyMsgId, + IsError: false, + CreatedAt: now, + UpdatedAt: now, + } + + return s.createTranscribeJob(job, file, fileName) +} + +func (s *TranscribeService) CreateJobFromApi(file io.Reader, fileName string) (*entity.TranscribeJob, error) { + jobId := uuid.NewString() + now := time.Now() + + job := &entity.TranscribeJob{ + Id: jobId, + State: entity.StateCreated, + Source: entity.SourceApi, + IsError: false, + CreatedAt: now, + UpdatedAt: now, + } + + return s.createTranscribeJob(job, file, fileName) +} + +func (s *TranscribeService) createTranscribeJob(job *entity.TranscribeJob, file io.Reader, fileName string) (*entity.TranscribeJob, error) { // Генерируем UUID для файла fileId := uuid.NewString() @@ -119,42 +157,25 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) return nil, err } - jobId := uuid.NewString() - now := time.Now() - - // Создаем запись в таблице transcribe_jobs - job := &entity.TranscribeJob{ - Id: jobId, - State: entity.StateCreated, - FileID: &fileId, - IsError: false, - CreatedAt: now, - UpdatedAt: now, - } + job.FileID = &fileId if err := s.jobRepo.Create(job); err != nil { - s.logger.Error("Failed to create job record", "error", err, "job_id", jobId) + s.logger.Error("Failed to create job record", "error", err, "job_id", job.Id) return nil, err } - s.logger.Info("Transcribe job created successfully", "job_id", jobId, "file_id", fileId) + s.logger.Info("Transcribe job created successfully", "job_id", job.Id, "file_id", fileId) + return job, nil } func (s *TranscribeService) FindAndRunConversionJob() error { - acquisitionId := uuid.NewString() - rottingTime := time.Now().Add(-1 * time.Hour) - - job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime) + job, err := s.findJob(entity.StateCreated, time.Hour) if err != nil { - if _, ok := err.(*contract.JobNotFoundError); ok { - return &contract.NoopJobError{State: entity.StateCreated} - } - s.logger.Error("Failed to find and acquire conversion job", "error", err) return err } - s.logger.Info("Starting conversion job", "job_id", job.Id, "acquisition_id", acquisitionId) + s.logger.Info("Starting conversion job", "job_id", job.Id) srcFile, err := s.fileRepo.GetByID(*job.FileID) if err != nil { @@ -195,7 +216,7 @@ func (s *TranscribeService) FindAndRunConversionJob() error { "error", err, "job_id", job.Id, "duration", conversionDuration) - return err + return s.failJob(job, err, "сбой конвертации файла") } stat, err := os.Stat(destFilePath) @@ -241,23 +262,16 @@ func (s *TranscribeService) FindAndRunConversionJob() error { } func (s *TranscribeService) FindAndRunTranscribeJob() error { - acquisitionId := uuid.NewString() - rottingTime := time.Now().Add(-1 * time.Hour) - - jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime) + job, err := s.findJob(entity.StateConverted, time.Hour) if err != nil { - if _, ok := err.(*contract.JobNotFoundError); ok { - return &contract.NoopJobError{State: entity.StateConverted} - } - s.logger.Error("Failed to find and acquire transcribe job", "error", err) return err } - s.logger.Info("Starting transcribe job", "job_id", jobRecord.Id, "acquisition_id", acquisitionId) + s.logger.Info("Starting transcribe job", "job_id", job.Id) - fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID) + fileRecord, err := s.fileRepo.GetByID(*job.FileID) if err != nil { - s.logger.Error("Failed to get file record", "error", err, "file_id", *jobRecord.FileID) + s.logger.Error("Failed to get file record", "error", err, "file_id", *job.FileID) return err } @@ -273,24 +287,24 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { destFileId := uuid.NewString() destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3) - s.logger.Info("Starting recognition", "job_id", jobRecord.Id, "file_path", filePath) + s.logger.Info("Starting recognition", "job_id", job.Id, "file_path", filePath) // Запускаем асинхронное распознавание operationID, err := s.recognizer.Recognize(file, destFileRecord.FileName) if err != nil { - s.logger.Error("Failed to start recognition", "error", err, "job_id", jobRecord.Id) + s.logger.Error("Failed to start recognition", "error", err, "job_id", job.Id) return err } s.logger.Info("Recognition started", - "job_id", jobRecord.Id, + "job_id", job.Id, "operation_id", operationID) // Обновляем задачу с ID операции распознавания - jobRecord.FileID = &destFileId - jobRecord.RecognitionOpID = &operationID + job.FileID = &destFileId + job.RecognitionOpID = &operationID delayTime := time.Now().Add(10 * time.Second) - jobRecord.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) + job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) err = s.fileRepo.Create(destFileRecord) if err != nil { @@ -298,27 +312,20 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { return err } - err = s.jobRepo.Save(jobRecord) + err = s.jobRepo.Save(job) if err != nil { - s.logger.Error("Failed to save job", "error", err, "job_id", jobRecord.Id) + s.logger.Error("Failed to save job", "error", err, "job_id", job.Id) return err } - s.logger.Info("Transcribe job updated successfully", "job_id", jobRecord.Id) + s.logger.Info("Transcribe job updated successfully", "job_id", job.Id) return nil } func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { - acquisitionId := uuid.NewString() - rottingTime := time.Now().Add(-24 * time.Hour) - - job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime) + job, err := s.findJob(entity.StateTranscribe, 24*time.Hour) if err != nil { - if _, ok := err.(*contract.JobNotFoundError); ok { - return &contract.NoopJobError{State: entity.StateTranscribe} - } - s.logger.Error("Failed to find and acquire transcribe check job", "error", err) - return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err) + return err } if job.RecognitionOpID == nil { @@ -339,7 +346,7 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { if recResult.IsInProgress() { // Операция еще не завершена, оставляем в статусе обработки s.logger.Info("Operation in progress", "job_id", job.Id, "operation_id", opId) - delayTime := time.Now().Add(10 * time.Second) + delayTime := time.Now().Add(5 * time.Second) job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) err := s.jobRepo.Save(job) if err != nil { @@ -355,13 +362,7 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { "job_id", job.Id, "operation_id", opId, "error_message", errorText) - job.Fail(errorText) - err := s.jobRepo.Save(job) - if err != nil { - s.logger.Error("Failed to save failed job", "error", err, "job_id", job.Id) - return err - } - return nil + return s.failJob(job, errors.New(errorText), "сбой при распознавании файла") } // Операция завершена, получаем результат @@ -371,20 +372,85 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { return err } - s.logger.Info("Operation completed successfully", + s.logger.Info("Transcribe operation completed successfully", "job_id", job.Id, "operation_id", opId, "text_length", len(transcriptionText)) + // Завершаем задачу + return s.completeJob(job, transcriptionText) +} + +func (s *TranscribeService) findJob(state string, expiration time.Duration) (job *entity.TranscribeJob, err error) { + acquisitionId := uuid.NewString() + rottingTime := time.Now().Add(-1 * expiration) + + job, err = s.jobRepo.FindAndAcquire(state, acquisitionId, rottingTime) + if err != nil { + if _, ok := err.(*contract.JobNotFoundError); ok { + return nil, &contract.NoopJobError{State: state} + } + s.logger.Error("Failed to find and acquire job", "state", state, "error", err) + return nil, fmt.Errorf("failed find and acquire job: %s, %w", state, err) + } + + return job, nil +} + +func (s *TranscribeService) completeJob(job *entity.TranscribeJob, transcriptionText string) error { // Обновляем задачу с результатом job.Done(transcriptionText) - err = s.jobRepo.Save(job) + // Сохраняем задачу в базу + err := s.jobRepo.Save(job) if err != nil { - s.logger.Error("Failed to save completed job", "error", err, "job_id", job.Id) - return err + s.logger.Error("Failed to save job", "error", err, "job_id", job.Id) + return fmt.Errorf("failed to save job: %w", err) + } + + // Отправляем распознанный текст обратно пользователю + switch job.Source { + case entity.SourceTelegram: + if job.TgChatId == nil { + s.logger.Error("Telegram chat not specified", "job_id", job.Id) + return fmt.Errorf("tg chat id not specified, job id: %s", job.Id) + } + err := s.tgSender.Send(transcriptionText, *job.TgChatId, job.TgReplyMessageId) + if err != nil { + s.logger.Error("Failed to sent transcription text to client", "job_id", job.Id) + return fmt.Errorf("failed to sent message to client, job id: %s, err: %w", job.Id, err) + } + } + + return nil +} + +func (s *TranscribeService) failJob(job *entity.TranscribeJob, jobErr error, humanErrorText string) error { + // Обновляем задачу с результатом + job.Fail(jobErr.Error()) + + // Сохраняем задачу в базу + err := s.jobRepo.Save(job) + if err != nil { + s.logger.Error("Failed to save job", "error", err, "job_id", job.Id) + return fmt.Errorf("failed to save job: %w", err) + } + + // Отправляем распознанный текст обратно пользователю + switch job.Source { + case entity.SourceTelegram: + if job.TgChatId == nil { + s.logger.Error("Telegram chat not specified", "job_id", job.Id) + return fmt.Errorf("tg chat id not specified, job id: %s", job.Id) + } + + errorMessage := fmt.Sprintf("При обработке задачи произошла ошибка: %s.\nПожалуйста, попробуйте еще раз.", humanErrorText) + err := s.tgSender.Send(errorMessage, *job.TgChatId, job.TgReplyMessageId) + if err != nil { + s.logger.Error("Failed to sent message to client", "job_id", job.Id) + return fmt.Errorf("failed to sent message to client, job id: %s, err: %w", job.Id, err) + } } - s.logger.Info("Transcribe check job completed successfully", "job_id", job.Id) return nil } diff --git a/main.go b/main.go index 5e76c99..581b7b9 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( ffmpegmv "git.vakhrushev.me/av/transcriber/internal/adapter/metaviewer/ffmpeg" "git.vakhrushev.me/av/transcriber/internal/adapter/recognizer/yandex" "git.vakhrushev.me/av/transcriber/internal/adapter/repo/sqlite" + "git.vakhrushev.me/av/transcriber/internal/adapter/telegram" "git.vakhrushev.me/av/transcriber/internal/config" httpcontroller "git.vakhrushev.me/av/transcriber/internal/controller/http" tgcontroller "git.vakhrushev.me/av/transcriber/internal/controller/tg" @@ -101,6 +102,12 @@ func main() { metaviewer := ffmpegmv.NewFfmpegMetaViewer() converter := ffmpegconv.NewFfmpegConverter() + tgSender, err := telegram.NewTelegramMessageSender(cfg.Telegram.BotToken, logger) + if err != nil { + logger.Error("failed to create audio telegram sender", "error", err) + os.Exit(1) + } + recognizer, err := yandex.NewYandexAudioRecognizerService(yandex.YandexAudioRecognizerConfig{ Region: cfg.Yandex.ObjStorageRegion, AccessKey: cfg.Yandex.ObjStorageAccessKey, @@ -123,6 +130,7 @@ func main() { metaviewer, converter, recognizer, + tgSender, cfg.Storage.Path, logger, ) diff --git a/migrations/003_add_telegram_fields_to_transcribe_jobs.sql b/migrations/003_add_telegram_fields_to_transcribe_jobs.sql new file mode 100644 index 0000000..3617f59 --- /dev/null +++ b/migrations/003_add_telegram_fields_to_transcribe_jobs.sql @@ -0,0 +1,9 @@ +-- +goose Up +ALTER TABLE transcribe_jobs ADD COLUMN source TEXT NOT NULL DEFAULT 'unknown'; +ALTER TABLE transcribe_jobs ADD COLUMN tg_chat_id INTEGER; +ALTER TABLE transcribe_jobs ADD COLUMN tg_reply_message_id INTEGER; + +-- +goose Down +ALTER TABLE transcribe_jobs DROP COLUMN source; +ALTER TABLE transcribe_jobs DROP COLUMN tg_chat_id; +ALTER TABLE transcribe_jobs DROP COLUMN tg_reply_message_id;