391 lines
12 KiB
Go
391 lines
12 KiB
Go
package service
|
||
|
||
import (
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"os"
|
||
"path/filepath"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"git.vakhrushev.me/av/transcriber/internal/contract"
|
||
"git.vakhrushev.me/av/transcriber/internal/entity"
|
||
"git.vakhrushev.me/av/transcriber/internal/metrics"
|
||
"github.com/google/uuid"
|
||
)
|
||
|
||
const (
|
||
defaultAudioExt = "audio"
|
||
)
|
||
|
||
type TranscribeService struct {
|
||
jobRepo contract.TranscriptJobRepository
|
||
fileRepo contract.FileRepository
|
||
metaviewer contract.AudioMetaViewer
|
||
converter contract.AudioFileConverter
|
||
recognizer contract.AudioRecognizer
|
||
storagePath string
|
||
logger *slog.Logger
|
||
}
|
||
|
||
func NewTranscribeService(
|
||
jobRepo contract.TranscriptJobRepository,
|
||
fileRepo contract.FileRepository,
|
||
metaviewer contract.AudioMetaViewer,
|
||
converter contract.AudioFileConverter,
|
||
recognizer contract.AudioRecognizer,
|
||
storagePath string,
|
||
logger *slog.Logger,
|
||
) *TranscribeService {
|
||
return &TranscribeService{
|
||
jobRepo: jobRepo,
|
||
fileRepo: fileRepo,
|
||
metaviewer: metaviewer,
|
||
converter: converter,
|
||
recognizer: recognizer,
|
||
storagePath: storagePath,
|
||
logger: logger,
|
||
}
|
||
}
|
||
|
||
func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) (*entity.TranscribeJob, error) {
|
||
// Генерируем UUID для файла
|
||
fileId := uuid.NewString()
|
||
|
||
// Определяем расширение файла
|
||
ext := filepath.Ext(fileName)
|
||
if ext == "" {
|
||
ext = fmt.Sprintf(".%s", defaultAudioExt) // fallback если расширение не определено
|
||
}
|
||
|
||
// Создаем путь для сохранения файла
|
||
storageFileName := fmt.Sprintf("%s%s", fileId, ext)
|
||
storageFilePath := filepath.Join(s.storagePath, storageFileName)
|
||
|
||
s.logger.Info("Creating transcribe job",
|
||
"file_id", fileId,
|
||
"file_name", fileName,
|
||
"storage_path", storageFilePath)
|
||
|
||
// Создаем файл на диске
|
||
dst, err := os.Create(storageFilePath)
|
||
if err != nil {
|
||
s.logger.Error("Failed to create file", "error", err, "path", storageFilePath)
|
||
return nil, err
|
||
}
|
||
defer dst.Close()
|
||
|
||
// Копируем содержимое загруженного файла
|
||
size, err := io.Copy(dst, file)
|
||
if err != nil {
|
||
s.logger.Error("Failed to copy file content", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
if err := dst.Close(); err != nil {
|
||
s.logger.Error("Failed to close file", "error", err)
|
||
return nil, err
|
||
}
|
||
|
||
info, err := s.metaviewer.GetInfo(storageFilePath)
|
||
if err != nil {
|
||
s.logger.Error("Failed to get file info", "error", err, "path", storageFilePath)
|
||
return nil, err
|
||
}
|
||
|
||
s.logger.Info("File uploaded successfully",
|
||
"file_id", fileId,
|
||
"size", size,
|
||
"duration_seconds", info.Seconds)
|
||
|
||
metrics.InputFileDurationHistogram.WithLabelValues().Observe(float64(info.Seconds))
|
||
metrics.InputFileSizeHistogram.WithLabelValues(ext).Observe(float64(size))
|
||
|
||
// Создаем запись в таблице files
|
||
fileRecord := &entity.File{
|
||
Id: fileId,
|
||
Storage: entity.StorageLocal,
|
||
FileName: storageFileName,
|
||
Size: size,
|
||
CreatedAt: time.Now(),
|
||
}
|
||
|
||
if err := s.fileRepo.Create(fileRecord); err != nil {
|
||
// Удаляем файл если не удалось создать запись в БД
|
||
os.Remove(storageFilePath)
|
||
s.logger.Error("Failed to create file record", "error", err, "file_id", fileId)
|
||
return nil, err
|
||
}
|
||
|
||
jobId := uuid.NewString()
|
||
now := time.Now()
|
||
|
||
// Создаем запись в таблице transcribe_jobs
|
||
job := &entity.TranscribeJob{
|
||
Id: jobId,
|
||
State: entity.StateCreated,
|
||
FileID: &fileId,
|
||
IsError: false,
|
||
CreatedAt: now,
|
||
UpdatedAt: now,
|
||
}
|
||
|
||
if err := s.jobRepo.Create(job); err != nil {
|
||
s.logger.Error("Failed to create job record", "error", err, "job_id", jobId)
|
||
return nil, err
|
||
}
|
||
|
||
s.logger.Info("Transcribe job created successfully", "job_id", jobId, "file_id", fileId)
|
||
return job, nil
|
||
}
|
||
|
||
func (s *TranscribeService) FindAndRunConversionJob() error {
|
||
acquisitionId := uuid.NewString()
|
||
rottingTime := time.Now().Add(-1 * time.Hour)
|
||
|
||
job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime)
|
||
if err != nil {
|
||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||
return &contract.NoopJobError{State: entity.StateCreated}
|
||
}
|
||
s.logger.Error("Failed to find and acquire conversion job", "error", err)
|
||
return err
|
||
}
|
||
|
||
s.logger.Info("Starting conversion job", "job_id", job.Id, "acquisition_id", acquisitionId)
|
||
|
||
srcFile, err := s.fileRepo.GetByID(*job.FileID)
|
||
if err != nil {
|
||
s.logger.Error("Failed to get source file", "error", err, "file_id", *job.FileID)
|
||
return err
|
||
}
|
||
|
||
srcFilePath := filepath.Join(s.storagePath, srcFile.FileName)
|
||
|
||
destFileId := uuid.NewString()
|
||
destFileName := fmt.Sprintf("%s%s", destFileId, ".ogg")
|
||
destFilePath := filepath.Join(s.storagePath, destFileName)
|
||
|
||
// Получаем расширение исходного файла для метрики
|
||
srcExt := strings.TrimPrefix(filepath.Ext(srcFile.FileName), ".")
|
||
if srcExt == "" {
|
||
srcExt = defaultAudioExt
|
||
}
|
||
|
||
s.logger.Info("Converting file",
|
||
"job_id", job.Id,
|
||
"src_path", srcFilePath,
|
||
"dest_path", destFilePath,
|
||
"src_format", srcExt)
|
||
|
||
// Измеряем время конвертации
|
||
startTime := time.Now()
|
||
err = s.converter.Convert(srcFilePath, destFilePath)
|
||
conversionDuration := time.Since(startTime)
|
||
|
||
// Записываем метрику времени конвертации
|
||
metrics.ConversionDurationHistogram.
|
||
WithLabelValues(srcExt, "ogg", strconv.FormatBool(err != nil)).
|
||
Observe(conversionDuration.Seconds())
|
||
|
||
if err != nil {
|
||
s.logger.Error("File conversion failed",
|
||
"error", err,
|
||
"job_id", job.Id,
|
||
"duration", conversionDuration)
|
||
return err
|
||
}
|
||
|
||
stat, err := os.Stat(destFilePath)
|
||
if err != nil {
|
||
s.logger.Error("Failed to stat converted file", "error", err, "path", destFilePath)
|
||
return err
|
||
}
|
||
|
||
s.logger.Info("File conversion completed",
|
||
"job_id", job.Id,
|
||
"duration", conversionDuration,
|
||
"output_size", stat.Size())
|
||
|
||
// Записываем метрику размера выходного файла
|
||
metrics.OutputFileSizeHistogram.WithLabelValues("ogg").Observe(float64(stat.Size()))
|
||
|
||
// Создаем запись в таблице files
|
||
destFileRecord := &entity.File{
|
||
Id: destFileId,
|
||
Storage: entity.StorageLocal,
|
||
FileName: destFileName,
|
||
Size: stat.Size(),
|
||
CreatedAt: time.Now(),
|
||
}
|
||
|
||
job.FileID = &destFileId
|
||
job.MoveToState(entity.StateConverted)
|
||
|
||
err = s.fileRepo.Create(destFileRecord)
|
||
if err != nil {
|
||
s.logger.Error("Failed to create converted file record", "error", err, "file_id", destFileId)
|
||
return err
|
||
}
|
||
|
||
err = s.jobRepo.Save(job)
|
||
if err != nil {
|
||
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
|
||
return err
|
||
}
|
||
|
||
s.logger.Info("Conversion job completed successfully", "job_id", job.Id)
|
||
return nil
|
||
}
|
||
|
||
func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
||
acquisitionId := uuid.NewString()
|
||
rottingTime := time.Now().Add(-1 * time.Hour)
|
||
|
||
jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime)
|
||
if err != nil {
|
||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||
return &contract.NoopJobError{State: entity.StateConverted}
|
||
}
|
||
s.logger.Error("Failed to find and acquire transcribe job", "error", err)
|
||
return err
|
||
}
|
||
|
||
s.logger.Info("Starting transcribe job", "job_id", jobRecord.Id, "acquisition_id", acquisitionId)
|
||
|
||
fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID)
|
||
if err != nil {
|
||
s.logger.Error("Failed to get file record", "error", err, "file_id", *jobRecord.FileID)
|
||
return err
|
||
}
|
||
|
||
filePath := filepath.Join(s.storagePath, fileRecord.FileName)
|
||
|
||
file, err := os.Open(filePath)
|
||
if err != nil {
|
||
s.logger.Error("Failed to open file", "error", err, "path", filePath)
|
||
return err
|
||
}
|
||
defer file.Close()
|
||
|
||
destFileId := uuid.NewString()
|
||
destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3)
|
||
|
||
s.logger.Info("Starting recognition", "job_id", jobRecord.Id, "file_path", filePath)
|
||
|
||
// Запускаем асинхронное распознавание
|
||
operationID, err := s.recognizer.Recognize(file, destFileRecord.FileName)
|
||
if err != nil {
|
||
s.logger.Error("Failed to start recognition", "error", err, "job_id", jobRecord.Id)
|
||
return err
|
||
}
|
||
|
||
s.logger.Info("Recognition started",
|
||
"job_id", jobRecord.Id,
|
||
"operation_id", operationID)
|
||
|
||
// Обновляем задачу с ID операции распознавания
|
||
jobRecord.FileID = &destFileId
|
||
jobRecord.RecognitionOpID = &operationID
|
||
delayTime := time.Now().Add(10 * time.Second)
|
||
jobRecord.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
|
||
|
||
err = s.fileRepo.Create(destFileRecord)
|
||
if err != nil {
|
||
s.logger.Error("Failed to create S3 file record", "error", err, "file_id", destFileId)
|
||
return err
|
||
}
|
||
|
||
err = s.jobRepo.Save(jobRecord)
|
||
if err != nil {
|
||
s.logger.Error("Failed to save job", "error", err, "job_id", jobRecord.Id)
|
||
return err
|
||
}
|
||
|
||
s.logger.Info("Transcribe job updated successfully", "job_id", jobRecord.Id)
|
||
return nil
|
||
}
|
||
|
||
func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
|
||
acquisitionId := uuid.NewString()
|
||
rottingTime := time.Now().Add(-24 * time.Hour)
|
||
|
||
job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime)
|
||
if err != nil {
|
||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||
return &contract.NoopJobError{State: entity.StateTranscribe}
|
||
}
|
||
s.logger.Error("Failed to find and acquire transcribe check job", "error", err)
|
||
return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err)
|
||
}
|
||
|
||
if job.RecognitionOpID == nil {
|
||
s.logger.Error("Recognition operation ID not found", "job_id", job.Id)
|
||
return fmt.Errorf("recogniton opId not found for job: %s", job.Id)
|
||
}
|
||
|
||
opId := *job.RecognitionOpID
|
||
|
||
// Проверяем статус операции
|
||
s.logger.Info("Checking operation status", "job_id", job.Id, "operation_id", opId)
|
||
recResult, err := s.recognizer.CheckRecognitionStatus(opId)
|
||
if err != nil {
|
||
s.logger.Error("Failed to check recognition status", "error", err, "operation_id", opId)
|
||
return err
|
||
}
|
||
|
||
if recResult.IsInProgress() {
|
||
// Операция еще не завершена, оставляем в статусе обработки
|
||
s.logger.Info("Operation in progress", "job_id", job.Id, "operation_id", opId)
|
||
delayTime := time.Now().Add(10 * time.Second)
|
||
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
|
||
err := s.jobRepo.Save(job)
|
||
if err != nil {
|
||
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
if recResult.IsFailed() {
|
||
errorText := recResult.GetError()
|
||
s.logger.Error("Operation failed",
|
||
"job_id", job.Id,
|
||
"operation_id", opId,
|
||
"error_message", errorText)
|
||
job.Fail(errorText)
|
||
err := s.jobRepo.Save(job)
|
||
if err != nil {
|
||
s.logger.Error("Failed to save failed job", "error", err, "job_id", job.Id)
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Операция завершена, получаем результат
|
||
transcriptionText, err := s.recognizer.GetRecognitionText(opId)
|
||
if err != nil {
|
||
s.logger.Error("Failed to get recognition text", "error", err, "operation_id", opId)
|
||
return err
|
||
}
|
||
|
||
s.logger.Info("Operation completed successfully",
|
||
"job_id", job.Id,
|
||
"operation_id", opId,
|
||
"text_length", len(transcriptionText))
|
||
|
||
// Обновляем задачу с результатом
|
||
job.Done(transcriptionText)
|
||
|
||
err = s.jobRepo.Save(job)
|
||
if err != nil {
|
||
s.logger.Error("Failed to save completed job", "error", err, "job_id", job.Id)
|
||
return err
|
||
}
|
||
|
||
s.logger.Info("Transcribe check job completed successfully", "job_id", job.Id)
|
||
return nil
|
||
}
|