Files
transcriber/internal/service/transcribe.go
2025-08-17 15:26:59 +03:00

457 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"errors"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"git.vakhrushev.me/av/transcriber/internal/contract"
"git.vakhrushev.me/av/transcriber/internal/entity"
"git.vakhrushev.me/av/transcriber/internal/metrics"
"github.com/google/uuid"
)
const (
defaultAudioExt = "audio"
)
type TranscribeService struct {
jobRepo contract.TranscriptJobRepository
fileRepo contract.FileRepository
metaviewer contract.AudioMetaViewer
converter contract.AudioFileConverter
recognizer contract.AudioRecognizer
tgSender contract.TelegramMessageSender
storagePath string
logger *slog.Logger
}
func NewTranscribeService(
jobRepo contract.TranscriptJobRepository,
fileRepo contract.FileRepository,
metaviewer contract.AudioMetaViewer,
converter contract.AudioFileConverter,
recognizer contract.AudioRecognizer,
tgSender contract.TelegramMessageSender,
storagePath string,
logger *slog.Logger,
) *TranscribeService {
return &TranscribeService{
jobRepo: jobRepo,
fileRepo: fileRepo,
metaviewer: metaviewer,
converter: converter,
recognizer: recognizer,
tgSender: tgSender,
storagePath: storagePath,
logger: logger,
}
}
func (s *TranscribeService) CreateJobFromTelegram(file io.Reader, fileName string, chatId int64, replyMsgId int) (*entity.TranscribeJob, error) {
jobId := uuid.NewString()
now := time.Now()
job := &entity.TranscribeJob{
Id: jobId,
State: entity.StateCreated,
Source: entity.SourceTelegram,
TgChatId: &chatId,
TgReplyMessageId: &replyMsgId,
IsError: false,
CreatedAt: now,
UpdatedAt: now,
}
return s.createTranscribeJob(job, file, fileName)
}
func (s *TranscribeService) CreateJobFromApi(file io.Reader, fileName string) (*entity.TranscribeJob, error) {
jobId := uuid.NewString()
now := time.Now()
job := &entity.TranscribeJob{
Id: jobId,
State: entity.StateCreated,
Source: entity.SourceApi,
IsError: false,
CreatedAt: now,
UpdatedAt: now,
}
return s.createTranscribeJob(job, file, fileName)
}
func (s *TranscribeService) createTranscribeJob(job *entity.TranscribeJob, file io.Reader, fileName string) (*entity.TranscribeJob, error) {
// Генерируем UUID для файла
fileId := uuid.NewString()
// Определяем расширение файла
ext := filepath.Ext(fileName)
if ext == "" {
ext = fmt.Sprintf(".%s", defaultAudioExt) // fallback если расширение не определено
}
// Создаем путь для сохранения файла
storageFileName := fmt.Sprintf("%s%s", fileId, ext)
storageFilePath := filepath.Join(s.storagePath, storageFileName)
s.logger.Info("Creating transcribe job",
"file_id", fileId,
"file_name", fileName,
"storage_path", storageFilePath)
// Создаем файл на диске
dst, err := os.Create(storageFilePath)
if err != nil {
s.logger.Error("Failed to create file", "error", err, "path", storageFilePath)
return nil, err
}
defer dst.Close()
// Копируем содержимое загруженного файла
size, err := io.Copy(dst, file)
if err != nil {
s.logger.Error("Failed to copy file content", "error", err)
return nil, err
}
if err := dst.Close(); err != nil {
s.logger.Error("Failed to close file", "error", err)
return nil, err
}
info, err := s.metaviewer.GetInfo(storageFilePath)
if err != nil {
s.logger.Error("Failed to get file info", "error", err, "path", storageFilePath)
return nil, err
}
s.logger.Info("File uploaded successfully",
"file_id", fileId,
"size", size,
"duration_seconds", info.Seconds)
metrics.InputFileDurationHistogram.WithLabelValues().Observe(float64(info.Seconds))
metrics.InputFileSizeHistogram.WithLabelValues(ext).Observe(float64(size))
// Создаем запись в таблице files
fileRecord := &entity.File{
Id: fileId,
Storage: entity.StorageLocal,
FileName: storageFileName,
Size: size,
CreatedAt: time.Now(),
}
if err := s.fileRepo.Create(fileRecord); err != nil {
// Удаляем файл если не удалось создать запись в БД
os.Remove(storageFilePath)
s.logger.Error("Failed to create file record", "error", err, "file_id", fileId)
return nil, err
}
job.FileID = &fileId
if err := s.jobRepo.Create(job); err != nil {
s.logger.Error("Failed to create job record", "error", err, "job_id", job.Id)
return nil, err
}
s.logger.Info("Transcribe job created successfully", "job_id", job.Id, "file_id", fileId)
return job, nil
}
func (s *TranscribeService) FindAndRunConversionJob() error {
job, err := s.findJob(entity.StateCreated, time.Hour)
if err != nil {
return err
}
s.logger.Info("Starting conversion job", "job_id", job.Id)
srcFile, err := s.fileRepo.GetByID(*job.FileID)
if err != nil {
s.logger.Error("Failed to get source file", "error", err, "file_id", *job.FileID)
return err
}
srcFilePath := filepath.Join(s.storagePath, srcFile.FileName)
destFileId := uuid.NewString()
destFileName := fmt.Sprintf("%s%s", destFileId, ".ogg")
destFilePath := filepath.Join(s.storagePath, destFileName)
// Получаем расширение исходного файла для метрики
srcExt := strings.TrimPrefix(filepath.Ext(srcFile.FileName), ".")
if srcExt == "" {
srcExt = defaultAudioExt
}
s.logger.Info("Converting file",
"job_id", job.Id,
"src_path", srcFilePath,
"dest_path", destFilePath,
"src_format", srcExt)
// Измеряем время конвертации
startTime := time.Now()
err = s.converter.Convert(srcFilePath, destFilePath)
conversionDuration := time.Since(startTime)
// Записываем метрику времени конвертации
metrics.ConversionDurationHistogram.
WithLabelValues(srcExt, "ogg", strconv.FormatBool(err != nil)).
Observe(conversionDuration.Seconds())
if err != nil {
s.logger.Error("File conversion failed",
"error", err,
"job_id", job.Id,
"duration", conversionDuration)
return s.failJob(job, err, "сбой конвертации файла")
}
stat, err := os.Stat(destFilePath)
if err != nil {
s.logger.Error("Failed to stat converted file", "error", err, "path", destFilePath)
return err
}
s.logger.Info("File conversion completed",
"job_id", job.Id,
"duration", conversionDuration,
"output_size", stat.Size())
// Записываем метрику размера выходного файла
metrics.OutputFileSizeHistogram.WithLabelValues("ogg").Observe(float64(stat.Size()))
// Создаем запись в таблице files
destFileRecord := &entity.File{
Id: destFileId,
Storage: entity.StorageLocal,
FileName: destFileName,
Size: stat.Size(),
CreatedAt: time.Now(),
}
job.FileID = &destFileId
job.MoveToState(entity.StateConverted)
err = s.fileRepo.Create(destFileRecord)
if err != nil {
s.logger.Error("Failed to create converted file record", "error", err, "file_id", destFileId)
return err
}
err = s.jobRepo.Save(job)
if err != nil {
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
return err
}
s.logger.Info("Conversion job completed successfully", "job_id", job.Id)
return nil
}
func (s *TranscribeService) FindAndRunTranscribeJob() error {
job, err := s.findJob(entity.StateConverted, time.Hour)
if err != nil {
return err
}
s.logger.Info("Starting transcribe job", "job_id", job.Id)
fileRecord, err := s.fileRepo.GetByID(*job.FileID)
if err != nil {
s.logger.Error("Failed to get file record", "error", err, "file_id", *job.FileID)
return err
}
filePath := filepath.Join(s.storagePath, fileRecord.FileName)
file, err := os.Open(filePath)
if err != nil {
s.logger.Error("Failed to open file", "error", err, "path", filePath)
return err
}
defer file.Close()
destFileId := uuid.NewString()
destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3)
s.logger.Info("Starting recognition", "job_id", job.Id, "file_path", filePath)
// Запускаем асинхронное распознавание
operationID, err := s.recognizer.Recognize(file, destFileRecord.FileName)
if err != nil {
s.logger.Error("Failed to start recognition", "error", err, "job_id", job.Id)
return err
}
s.logger.Info("Recognition started",
"job_id", job.Id,
"operation_id", operationID)
// Обновляем задачу с ID операции распознавания
job.FileID = &destFileId
job.RecognitionOpID = &operationID
delayTime := time.Now().Add(10 * time.Second)
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err = s.fileRepo.Create(destFileRecord)
if err != nil {
s.logger.Error("Failed to create S3 file record", "error", err, "file_id", destFileId)
return err
}
err = s.jobRepo.Save(job)
if err != nil {
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
return err
}
s.logger.Info("Transcribe job updated successfully", "job_id", job.Id)
return nil
}
func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
job, err := s.findJob(entity.StateTranscribe, 24*time.Hour)
if err != nil {
return err
}
if job.RecognitionOpID == nil {
s.logger.Error("Recognition operation ID not found", "job_id", job.Id)
return fmt.Errorf("recogniton opId not found for job: %s", job.Id)
}
opId := *job.RecognitionOpID
// Проверяем статус операции
s.logger.Info("Checking operation status", "job_id", job.Id, "operation_id", opId)
recResult, err := s.recognizer.CheckRecognitionStatus(opId)
if err != nil {
s.logger.Error("Failed to check recognition status", "error", err, "operation_id", opId)
return err
}
if recResult.IsInProgress() {
// Операция еще не завершена, оставляем в статусе обработки
s.logger.Info("Operation in progress", "job_id", job.Id, "operation_id", opId)
delayTime := time.Now().Add(5 * time.Second)
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err := s.jobRepo.Save(job)
if err != nil {
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
return err
}
return nil
}
if recResult.IsFailed() {
errorText := recResult.GetError()
s.logger.Error("Operation failed",
"job_id", job.Id,
"operation_id", opId,
"error_message", errorText)
return s.failJob(job, errors.New(errorText), "сбой при распознавании файла")
}
// Операция завершена, получаем результат
transcriptionText, err := s.recognizer.GetRecognitionText(opId)
if err != nil {
s.logger.Error("Failed to get recognition text", "error", err, "operation_id", opId)
return err
}
s.logger.Info("Transcribe operation completed successfully",
"job_id", job.Id,
"operation_id", opId,
"text_length", len(transcriptionText))
// Завершаем задачу
return s.completeJob(job, transcriptionText)
}
func (s *TranscribeService) findJob(state string, expiration time.Duration) (job *entity.TranscribeJob, err error) {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * expiration)
job, err = s.jobRepo.FindAndAcquire(state, acquisitionId, rottingTime)
if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok {
return nil, &contract.NoopJobError{State: state}
}
s.logger.Error("Failed to find and acquire job", "state", state, "error", err)
return nil, fmt.Errorf("failed find and acquire job: %s, %w", state, err)
}
return job, nil
}
func (s *TranscribeService) completeJob(job *entity.TranscribeJob, transcriptionText string) error {
// Обновляем задачу с результатом
job.Done(transcriptionText)
// Сохраняем задачу в базу
err := s.jobRepo.Save(job)
if err != nil {
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
return fmt.Errorf("failed to save job: %w", err)
}
// Отправляем распознанный текст обратно пользователю
switch job.Source {
case entity.SourceTelegram:
if job.TgChatId == nil {
s.logger.Error("Telegram chat not specified", "job_id", job.Id)
return fmt.Errorf("tg chat id not specified, job id: %s", job.Id)
}
err := s.tgSender.Send(transcriptionText, *job.TgChatId, job.TgReplyMessageId)
if err != nil {
s.logger.Error("Failed to sent transcription text to client", "job_id", job.Id)
return fmt.Errorf("failed to sent message to client, job id: %s, err: %w", job.Id, err)
}
}
return nil
}
func (s *TranscribeService) failJob(job *entity.TranscribeJob, jobErr error, humanErrorText string) error {
// Обновляем задачу с результатом
job.Fail(jobErr.Error())
// Сохраняем задачу в базу
err := s.jobRepo.Save(job)
if err != nil {
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
return fmt.Errorf("failed to save job: %w", err)
}
// Отправляем распознанный текст обратно пользователю
switch job.Source {
case entity.SourceTelegram:
if job.TgChatId == nil {
s.logger.Error("Telegram chat not specified", "job_id", job.Id)
return fmt.Errorf("tg chat id not specified, job id: %s", job.Id)
}
errorMessage := fmt.Sprintf("При обработке задачи произошла ошибка: %s.\nПожалуйста, попробуйте еще раз.", humanErrorText)
err := s.tgSender.Send(errorMessage, *job.TgChatId, job.TgReplyMessageId)
if err != nil {
s.logger.Error("Failed to sent message to client", "job_id", job.Id)
return fmt.Errorf("failed to sent message to client, job id: %s, err: %w", job.Id, err)
}
}
return nil
}