package service import ( "fmt" "io" "log" "os" "path/filepath" "strconv" "strings" "time" "git.vakhrushev.me/av/transcriber/internal/contract" "git.vakhrushev.me/av/transcriber/internal/entity" "git.vakhrushev.me/av/transcriber/internal/metrics" "github.com/google/uuid" ) const ( baseStorageDir = "data/files" defaultAudioExt = "audio" ) type TranscribeService struct { jobRepo contract.TranscriptJobRepository fileRepo contract.FileRepository converter contract.AudioFileConverter recognizer contract.AudioRecognizer } func NewTranscribeService( jobRepo contract.TranscriptJobRepository, fileRepo contract.FileRepository, converter contract.AudioFileConverter, recognizer contract.AudioRecognizer, ) *TranscribeService { return &TranscribeService{ jobRepo: jobRepo, fileRepo: fileRepo, converter: converter, recognizer: recognizer, } } func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) (*entity.TranscribeJob, error) { // Генерируем UUID для файла fileId := uuid.NewString() // Определяем расширение файла ext := filepath.Ext(fileName) if ext == "" { ext = fmt.Sprintf(".%s", defaultAudioExt) // fallback если расширение не определено } // Создаем путь для сохранения файла storageFileName := fmt.Sprintf("%s%s", fileId, ext) storageFilePath := filepath.Join(baseStorageDir, storageFileName) // Создаем файл на диске dst, err := os.Create(storageFilePath) if err != nil { return nil, err } defer dst.Close() // Копируем содержимое загруженного файла size, err := io.Copy(dst, file) if err != nil { return nil, err } if err := dst.Close(); err != nil { return nil, err } metrics.InputFileSizeHistogram.WithLabelValues(ext).Observe(float64(size)) // Создаем запись в таблице files fileRecord := &entity.File{ Id: fileId, Storage: entity.StorageLocal, FileName: storageFileName, Size: size, CreatedAt: time.Now(), } if err := s.fileRepo.Create(fileRecord); err != nil { // Удаляем файл если не удалось создать запись в БД os.Remove(storageFilePath) return nil, err } jobId := uuid.NewString() now := time.Now() // Создаем запись в таблице transcribe_jobs job := &entity.TranscribeJob{ Id: jobId, State: entity.StateCreated, FileID: &fileId, IsError: false, CreatedAt: now, UpdatedAt: now, } if err := s.jobRepo.Create(job); err != nil { return nil, err } return job, nil } func (s *TranscribeService) FindAndRunConversionJob() error { acquisitionId := uuid.NewString() rottingTime := time.Now().Add(-1 * time.Hour) job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime) if err != nil { if _, ok := err.(*contract.JobNotFoundError); ok { return nil } return err } srcFile, err := s.fileRepo.GetByID(*job.FileID) if err != nil { return err } srcFilePath := filepath.Join(baseStorageDir, srcFile.FileName) destFileId := uuid.NewString() destFileName := fmt.Sprintf("%s%s", destFileId, ".ogg") destFilePath := filepath.Join(baseStorageDir, destFileName) // Получаем расширение исходного файла для метрики srcExt := strings.TrimPrefix(filepath.Ext(srcFile.FileName), ".") if srcExt == "" { srcExt = defaultAudioExt } // Измеряем время конвертации startTime := time.Now() err = s.converter.Convert(srcFilePath, destFilePath) conversionDuration := time.Since(startTime) // Записываем метрику времени конвертации metrics.ConversionDurationHistogram. WithLabelValues(srcExt, "ogg", strconv.FormatBool(err != nil)). Observe(conversionDuration.Seconds()) if err != nil { return err } stat, err := os.Stat(destFilePath) if err != nil { return err } // Записываем метрику размера выходного файла metrics.OutputFileSizeHistogram.WithLabelValues("ogg").Observe(float64(stat.Size())) // Создаем запись в таблице files destFileRecord := &entity.File{ Id: destFileId, Storage: entity.StorageLocal, FileName: destFileName, Size: stat.Size(), CreatedAt: time.Now(), } job.FileID = &destFileId job.MoveToState(entity.StateConverted) err = s.fileRepo.Create(destFileRecord) if err != nil { return err } err = s.jobRepo.Save(job) if err != nil { return err } return nil } func (s *TranscribeService) FindAndRunTranscribeJob() error { acquisitionId := uuid.NewString() rottingTime := time.Now().Add(-1 * time.Hour) jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime) if err != nil { if _, ok := err.(*contract.JobNotFoundError); ok { return nil } return err } fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID) if err != nil { return err } filePath := filepath.Join(baseStorageDir, fileRecord.FileName) file, err := os.Open(filePath) if err != nil { return err } defer file.Close() destFileId := uuid.NewString() destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3) // Запускаем асинхронное распознавание operationID, err := s.recognizer.Recognize(file, destFileRecord.FileName) if err != nil { return err } // Обновляем задачу с ID операции распознавания jobRecord.FileID = &destFileId jobRecord.RecognitionOpID = &operationID delayTime := time.Now().Add(10 * time.Second) jobRecord.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) err = s.fileRepo.Create(destFileRecord) if err != nil { return err } err = s.jobRepo.Save(jobRecord) if err != nil { return err } return nil } func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { acquisitionId := uuid.NewString() rottingTime := time.Now().Add(-24 * time.Hour) job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime) if err != nil { if _, ok := err.(*contract.JobNotFoundError); ok { return nil } return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err) } if job.RecognitionOpID == nil { return fmt.Errorf("recogniton opId not found for job: %s", job.Id) } opId := *job.RecognitionOpID // Проверяем статус операции log.Printf("Check operation status: id %s\n", opId) recResult, err := s.recognizer.CheckRecognitionStatus(opId) if err != nil { return err } if recResult.IsInProgress() { // Операция еще не завершена, оставляем в статусе обработки log.Printf("Operation in progress: id %s\n", opId) delayTime := time.Now().Add(10 * time.Second) job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) err := s.jobRepo.Save(job) if err != nil { return err } return nil } if recResult.IsFailed() { errorText := recResult.GetError() log.Printf("Operation failed: id %s, message %s\n", opId, errorText) job.Fail(errorText) err := s.jobRepo.Save(job) if err != nil { return err } return nil } // Операция завершена, получаем результат transcriptionText, err := s.recognizer.GetRecognitionText(opId) if err != nil { return err } log.Printf("Operation done: id %s\n", opId) // Обновляем задачу с результатом job.Done(transcriptionText) err = s.jobRepo.Save(job) if err != nil { return err } return nil }