Files
transcriber/internal/service/transcribe.go

318 lines
8.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"fmt"
"io"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"git.vakhrushev.me/av/transcriber/internal/contract"
"git.vakhrushev.me/av/transcriber/internal/entity"
"git.vakhrushev.me/av/transcriber/internal/metrics"
"github.com/google/uuid"
)
const (
baseStorageDir = "data/files"
defaultAudioExt = "audio"
)
type TranscribeService struct {
jobRepo contract.TranscriptJobRepository
fileRepo contract.FileRepository
metaviewer contract.AudioMetaViewer
converter contract.AudioFileConverter
recognizer contract.AudioRecognizer
}
func NewTranscribeService(
jobRepo contract.TranscriptJobRepository,
fileRepo contract.FileRepository,
metaviewer contract.AudioMetaViewer,
converter contract.AudioFileConverter,
recognizer contract.AudioRecognizer,
) *TranscribeService {
return &TranscribeService{
jobRepo: jobRepo,
fileRepo: fileRepo,
metaviewer: metaviewer,
converter: converter,
recognizer: recognizer,
}
}
func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) (*entity.TranscribeJob, error) {
// Генерируем UUID для файла
fileId := uuid.NewString()
// Определяем расширение файла
ext := filepath.Ext(fileName)
if ext == "" {
ext = fmt.Sprintf(".%s", defaultAudioExt) // fallback если расширение не определено
}
// Создаем путь для сохранения файла
storageFileName := fmt.Sprintf("%s%s", fileId, ext)
storageFilePath := filepath.Join(baseStorageDir, storageFileName)
// Создаем файл на диске
dst, err := os.Create(storageFilePath)
if err != nil {
return nil, err
}
defer dst.Close()
// Копируем содержимое загруженного файла
size, err := io.Copy(dst, file)
if err != nil {
return nil, err
}
if err := dst.Close(); err != nil {
return nil, err
}
info, err := s.metaviewer.GetInfo(storageFilePath)
if err != nil {
return nil, err
}
metrics.InputFileDurationHistogram.WithLabelValues().Observe(float64(info.Seconds))
metrics.InputFileSizeHistogram.WithLabelValues(ext).Observe(float64(size))
// Создаем запись в таблице files
fileRecord := &entity.File{
Id: fileId,
Storage: entity.StorageLocal,
FileName: storageFileName,
Size: size,
CreatedAt: time.Now(),
}
if err := s.fileRepo.Create(fileRecord); err != nil {
// Удаляем файл если не удалось создать запись в БД
os.Remove(storageFilePath)
return nil, err
}
jobId := uuid.NewString()
now := time.Now()
// Создаем запись в таблице transcribe_jobs
job := &entity.TranscribeJob{
Id: jobId,
State: entity.StateCreated,
FileID: &fileId,
IsError: false,
CreatedAt: now,
UpdatedAt: now,
}
if err := s.jobRepo.Create(job); err != nil {
return nil, err
}
return job, nil
}
func (s *TranscribeService) FindAndRunConversionJob() error {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime)
if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok {
return &contract.NoopJobError{State: entity.StateCreated}
}
return err
}
srcFile, err := s.fileRepo.GetByID(*job.FileID)
if err != nil {
return err
}
srcFilePath := filepath.Join(baseStorageDir, srcFile.FileName)
destFileId := uuid.NewString()
destFileName := fmt.Sprintf("%s%s", destFileId, ".ogg")
destFilePath := filepath.Join(baseStorageDir, destFileName)
// Получаем расширение исходного файла для метрики
srcExt := strings.TrimPrefix(filepath.Ext(srcFile.FileName), ".")
if srcExt == "" {
srcExt = defaultAudioExt
}
// Измеряем время конвертации
startTime := time.Now()
err = s.converter.Convert(srcFilePath, destFilePath)
conversionDuration := time.Since(startTime)
// Записываем метрику времени конвертации
metrics.ConversionDurationHistogram.
WithLabelValues(srcExt, "ogg", strconv.FormatBool(err != nil)).
Observe(conversionDuration.Seconds())
if err != nil {
return err
}
stat, err := os.Stat(destFilePath)
if err != nil {
return err
}
// Записываем метрику размера выходного файла
metrics.OutputFileSizeHistogram.WithLabelValues("ogg").Observe(float64(stat.Size()))
// Создаем запись в таблице files
destFileRecord := &entity.File{
Id: destFileId,
Storage: entity.StorageLocal,
FileName: destFileName,
Size: stat.Size(),
CreatedAt: time.Now(),
}
job.FileID = &destFileId
job.MoveToState(entity.StateConverted)
err = s.fileRepo.Create(destFileRecord)
if err != nil {
return err
}
err = s.jobRepo.Save(job)
if err != nil {
return err
}
return nil
}
func (s *TranscribeService) FindAndRunTranscribeJob() error {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime)
if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok {
return &contract.NoopJobError{State: entity.StateConverted}
}
return err
}
fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID)
if err != nil {
return err
}
filePath := filepath.Join(baseStorageDir, fileRecord.FileName)
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
destFileId := uuid.NewString()
destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3)
// Запускаем асинхронное распознавание
operationID, err := s.recognizer.Recognize(file, destFileRecord.FileName)
if err != nil {
return err
}
// Обновляем задачу с ID операции распознавания
jobRecord.FileID = &destFileId
jobRecord.RecognitionOpID = &operationID
delayTime := time.Now().Add(10 * time.Second)
jobRecord.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err = s.fileRepo.Create(destFileRecord)
if err != nil {
return err
}
err = s.jobRepo.Save(jobRecord)
if err != nil {
return err
}
return nil
}
func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-24 * time.Hour)
job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime)
if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok {
return &contract.NoopJobError{State: entity.StateTranscribe}
}
return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err)
}
if job.RecognitionOpID == nil {
return fmt.Errorf("recogniton opId not found for job: %s", job.Id)
}
opId := *job.RecognitionOpID
// Проверяем статус операции
log.Printf("Check operation status: id %s\n", opId)
recResult, err := s.recognizer.CheckRecognitionStatus(opId)
if err != nil {
return err
}
if recResult.IsInProgress() {
// Операция еще не завершена, оставляем в статусе обработки
log.Printf("Operation in progress: id %s\n", opId)
delayTime := time.Now().Add(10 * time.Second)
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err := s.jobRepo.Save(job)
if err != nil {
return err
}
return nil
}
if recResult.IsFailed() {
errorText := recResult.GetError()
log.Printf("Operation failed: id %s, message %s\n", opId, errorText)
job.Fail(errorText)
err := s.jobRepo.Save(job)
if err != nil {
return err
}
return nil
}
// Операция завершена, получаем результат
transcriptionText, err := s.recognizer.GetRecognitionText(opId)
if err != nil {
return err
}
log.Printf("Operation done: id %s\n", opId)
// Обновляем задачу с результатом
job.Done(transcriptionText)
err = s.jobRepo.Save(job)
if err != nil {
return err
}
return nil
}