Refactor project structure

This commit is contained in:
2025-08-12 14:15:26 +03:00
parent 983de269ad
commit e1cb261570
8 changed files with 30 additions and 30 deletions

View File

@@ -0,0 +1,291 @@
package service
import (
"fmt"
"io"
"log"
"os"
"path/filepath"
"time"
"git.vakhrushev.me/av/transcriber/internal/contract"
"git.vakhrushev.me/av/transcriber/internal/entity"
"git.vakhrushev.me/av/transcriber/internal/repo/ffmpeg"
"git.vakhrushev.me/av/transcriber/internal/service/s3"
"git.vakhrushev.me/av/transcriber/internal/service/speechkit"
"github.com/google/uuid"
)
const baseStorageDir = "data/files"
type TranscribeService struct {
jobRepo contract.TranscriptJobRepository
fileRepo contract.FileRepository
}
func NewTranscribeService(jobRepo contract.TranscriptJobRepository, fileRepo contract.FileRepository) *TranscribeService {
return &TranscribeService{jobRepo: jobRepo, fileRepo: fileRepo}
}
func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) (*entity.TranscribeJob, error) {
// Генерируем UUID для файла
fileId := uuid.NewString()
// Определяем расширение файла
ext := filepath.Ext(fileName)
if ext == "" {
ext = ".audio" // fallback если расширение не определено
}
// Создаем путь для сохранения файла
storageFileName := fmt.Sprintf("%s%s", fileId, ext)
storageFilePath := filepath.Join(baseStorageDir, storageFileName)
// Создаем файл на диске
dst, err := os.Create(storageFilePath)
if err != nil {
return nil, err
}
defer dst.Close()
// Копируем содержимое загруженного файла
size, err := io.Copy(dst, file)
if err != nil {
return nil, err
}
// Создаем запись в таблице files
fileRecord := &entity.File{
Id: fileId,
Storage: entity.StorageLocal,
FileName: storageFileName,
Size: size,
CreatedAt: time.Now(),
}
if err := s.fileRepo.Create(fileRecord); err != nil {
// Удаляем файл если не удалось создать запись в БД
os.Remove(storageFilePath)
return nil, err
}
jobId := uuid.NewString()
now := time.Now()
// Создаем запись в таблице transcribe_jobs
job := &entity.TranscribeJob{
Id: jobId,
State: entity.StateCreated,
FileID: &fileId,
IsError: false,
CreatedAt: now,
UpdatedAt: now,
}
if err := s.jobRepo.Create(job); err != nil {
return nil, err
}
return job, nil
}
func (s *TranscribeService) FindAndRunConversionJob() error {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime)
if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok {
return nil
}
return err
}
srcFile, err := s.fileRepo.GetByID(*job.FileID)
if err != nil {
return err
}
srcFilePath := filepath.Join(baseStorageDir, srcFile.FileName)
destFileId := uuid.NewString()
destFileName := fmt.Sprintf("%s%s", destFileId, ".ogg")
destFilePath := filepath.Join(baseStorageDir, destFileName)
conv := ffmpeg.NewFileConverter()
err = conv.Convert(srcFilePath, destFilePath)
if err != nil {
return err
}
stat, err := os.Stat(destFilePath)
if err != nil {
return err
}
// Создаем запись в таблице files
destFileRecord := &entity.File{
Id: destFileId,
Storage: entity.StorageLocal,
FileName: destFileName,
Size: stat.Size(),
CreatedAt: time.Now(),
}
job.FileID = &destFileId
job.MoveToState(entity.StateConverted)
err = s.fileRepo.Create(destFileRecord)
if err != nil {
return err
}
err = s.jobRepo.Save(job)
if err != nil {
return err
}
return nil
}
func (s *TranscribeService) FindAndRunTranscribeJob() error {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime)
if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok {
return nil
}
return err
}
fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID)
if err != nil {
return err
}
filePath := filepath.Join(baseStorageDir, fileRecord.FileName)
destFileId := uuid.NewString()
destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3)
// Создаем S3 сервис
s3Service, err := s3.NewS3Service()
if err != nil {
return err
}
// Загружаем файл на S3
err = s3Service.UploadFile(filePath, destFileRecord.FileName)
if err != nil {
return err
}
// Создаем SpeechKit сервис
speechKitService, err := speechkit.NewSpeechKitService()
if err != nil {
return err
}
// Формируем S3 URI для файла
s3URI := s3Service.FileUrl(destFileRecord.FileName)
// Запускаем асинхронное распознавание
operationID, err := speechKitService.RecognizeFileFromS3(s3URI)
if err != nil {
return err
}
// Обновляем задачу с ID операции распознавания
jobRecord.FileID = &destFileId
jobRecord.RecognitionOpID = &operationID
delayTime := time.Now().Add(10 * time.Second)
jobRecord.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err = s.fileRepo.Create(destFileRecord)
if err != nil {
return err
}
err = s.jobRepo.Save(jobRecord)
if err != nil {
return err
}
return nil
}
func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-24 * time.Hour)
job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime)
if err != nil {
if _, ok := err.(*contract.JobNotFoundError); ok {
return nil
}
return err
}
if job.RecognitionOpID == nil {
return fmt.Errorf("recogniton opId not found for job: %s", job.Id)
}
// Создаем SpeechKit сервис
speechKitService, err := speechkit.NewSpeechKitService()
if err != nil {
return err
}
defer speechKitService.Close()
opId := *job.RecognitionOpID
// Проверяем статус операции
log.Printf("Check operation status: id %s\n", opId)
operation, err := speechKitService.CheckOperationStatus(opId)
if err != nil {
return err
}
if !operation.Done {
// Операция еще не завершена, оставляем в статусе обработки
log.Printf("Operation in progress: id %s\n", opId)
delayTime := time.Now().Add(10 * time.Second)
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err := s.jobRepo.Save(job)
if err != nil {
return err
}
return nil
}
if opErr := operation.GetError(); opErr != nil {
errorText := fmt.Sprintf("operation failed: code %d, message: %s", opErr.Code, opErr.Message)
log.Printf("Operation failed: id %s, message %s\n", opId, errorText)
job.Fail(errorText)
err := s.jobRepo.Save(job)
if err != nil {
return err
}
return nil
}
// Операция завершена, получаем результат
transcriptionText, err := speechKitService.GetRecognitionText(*job.RecognitionOpID)
if err != nil {
return err
}
log.Printf("Operation done: id %s\n", opId)
// Обновляем задачу с результатом
job.Done(transcriptionText)
err = s.jobRepo.Save(job)
if err != nil {
return err
}
return nil
}