diff --git a/internal/controller/http/transcribe.go b/internal/controller/http/transcribe.go index 4ebdd7e..f681811 100644 --- a/internal/controller/http/transcribe.go +++ b/internal/controller/http/transcribe.go @@ -1,29 +1,24 @@ package http import ( - "fmt" - "io" + "log" "net/http" - "os" - "path/filepath" "time" - "git.vakhrushev.me/av/transcriber/internal/entity" "git.vakhrushev.me/av/transcriber/internal/repo" - "git.vakhrushev.me/av/transcriber/internal/repo/ffmpeg" - "git.vakhrushev.me/av/transcriber/internal/service/s3" - "git.vakhrushev.me/av/transcriber/internal/service/speechkit" + "git.vakhrushev.me/av/transcriber/internal/service/transcribe" "github.com/gin-gonic/gin" - "github.com/google/uuid" ) +const baseStorageDir = "data/files" + type TranscribeHandler struct { - jobRepo repo.TranscriptJobRepository - fileRepo repo.FileRepository + jobRepo repo.TranscriptJobRepository + trsService *transcribe.TranscribeService } -func NewTranscribeHandler(jobRepo repo.TranscriptJobRepository, fileRepo repo.FileRepository) *TranscribeHandler { - return &TranscribeHandler{jobRepo: jobRepo, fileRepo: fileRepo} +func NewTranscribeHandler(jobRepo repo.TranscriptJobRepository, trsService *transcribe.TranscribeService) *TranscribeHandler { + return &TranscribeHandler{jobRepo: jobRepo, trsService: trsService} } type CreateTranscribeJobResponse struct { @@ -47,64 +42,10 @@ func (h *TranscribeHandler) CreateTranscribeJob(c *gin.Context) { } defer file.Close() - // Генерируем UUID для файла - fileId := uuid.New().String() - - // Определяем расширение файла - ext := filepath.Ext(header.Filename) - if ext == "" { - ext = ".audio" // fallback если расширение не определено - } - - // Создаем путь для сохранения файла - fileName := fmt.Sprintf("%s%s", fileId, ext) - filePath := filepath.Join("data", "files", fileName) - - // Создаем файл на диске - dst, err := os.Create(filePath) + job, err := h.trsService.CreateTranscribeJob(file, header.Filename) if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create file"}) - return - } - defer dst.Close() - - // Копируем содержимое загруженного файла - size, err := io.Copy(dst, file) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to save file"}) - return - } - - // Создаем запись в таблице files - fileRecord := &entity.File{ - Id: fileId, - Storage: entity.StorageLocal, - FileName: fileName, - Size: size, - CreatedAt: time.Now(), - } - - if err := h.fileRepo.Create(fileRecord); err != nil { - // Удаляем файл если не удалось создать запись в БД - os.Remove(filePath) - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to save file record"}) - return - } - - // Создаем запись в таблице transcribe_jobs - jobId := uuid.NewString() - now := time.Now() - job := &entity.TranscribeJob{ - Id: jobId, - State: entity.StateCreated, - FileID: &fileId, - IsError: false, - CreatedAt: now, - UpdatedAt: now, - } - - if err := h.jobRepo.Create(job); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create transcribe job"}) + log.Printf("Err: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create transcibe job"}) return } @@ -135,59 +76,7 @@ func (h *TranscribeHandler) GetTranscribeJobStatus(c *gin.Context) { } func (h *TranscribeHandler) RunConversionJob(c *gin.Context) { - acquisitionId := uuid.NewString() - rottingTime := time.Now().Add(-1 * time.Hour) - - job, err := h.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - srcFile, err := h.fileRepo.GetByID(*job.FileID) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - srcFilePath := filepath.Join("data", "files", srcFile.FileName) - - destFileId := uuid.New().String() - destFileName := fmt.Sprintf("%s%s", destFileId, ".ogg") - destFilePath := filepath.Join("data", "files", destFileName) - - conv := ffmpeg.NewFileConverter() - err = conv.Convert(srcFilePath, destFilePath) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - stat, err := os.Stat(destFilePath) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - // Создаем запись в таблице files - destFileRecord := &entity.File{ - Id: destFileId, - Storage: entity.StorageLocal, - FileName: destFileName, - Size: stat.Size(), - CreatedAt: time.Now(), - } - - job.FileID = &destFileId - job.MoveToState(entity.StateConverted) - - err = h.fileRepo.Create(destFileRecord) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - err = h.jobRepo.Save(job) + err := h.trsService.FindAndRunConversionJob() if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return @@ -196,191 +85,22 @@ func (h *TranscribeHandler) RunConversionJob(c *gin.Context) { c.Status(http.StatusOK) } -func (h *TranscribeHandler) RunUploadJob(c *gin.Context) { - acquisitionId := uuid.NewString() - rottingTime := time.Now().Add(-1 * time.Hour) - - job, err := h.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime) +func (h *TranscribeHandler) RunTranscribeJob(c *gin.Context) { + err := h.trsService.FindAndRunTranscribeJob() if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - fileRecord, err := h.fileRepo.GetByID(*job.FileID) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - filePath := filepath.Join("data", "files", fileRecord.FileName) - - destFileId := uuid.New().String() - destFileRecord := &entity.File{ - Id: destFileId, - Storage: entity.StorageS3, - FileName: fileRecord.FileName, - Size: fileRecord.Size, - CreatedAt: time.Now(), - } - - // Создаем S3 сервис - s3Service, err := s3.NewS3Service() - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize S3 service: " + err.Error()}) - return - } - - // Загружаем файл на S3 - err = s3Service.UploadFile(filePath, destFileRecord.FileName) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to upload file to S3: " + err.Error()}) - return - } - - job.FileID = &destFileId - job.MoveToState(entity.StateUploaded) - - // Сохраняем информацию о загрузке файла на S3 - err = h.fileRepo.Create(destFileRecord) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update file record: " + err.Error()}) - return - } - - // Обновляем состояние задачи - err = h.jobRepo.Save(job) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job state: " + err.Error()}) - return - } - - c.Status(http.StatusOK) -} - -func (h *TranscribeHandler) RunRecognitionJob(c *gin.Context) { - acquisitionId := uuid.NewString() - rottingTime := time.Now().Add(-1 * time.Hour) - - job, err := h.jobRepo.FindAndAcquire(entity.StateUploaded, acquisitionId, rottingTime) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - fileRecord, err := h.fileRepo.GetByID(*job.FileID) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - // Создаем SpeechKit сервис - speechKitService, err := speechkit.NewSpeechKitService() - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize SpeechKit service: " + err.Error()}) - return - } - - // Формируем S3 URI для файла - bucketName := os.Getenv("S3_BUCKET_NAME") - s3URI := fmt.Sprintf("https://storage.yandexcloud.net/%s/%s", bucketName, fileRecord.FileName) - - // Запускаем асинхронное распознавание - operationID, err := speechKitService.RecognizeFileFromS3(s3URI) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to start recognition: " + err.Error()}) - return - } - - // Обновляем задачу с ID операции распознавания - job.RecognitionOpID = &operationID - delayTime := time.Now().Add(time.Minute) - job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) - - err = h.jobRepo.Save(job) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()}) - return - } - c.Status(http.StatusOK) } func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) { - acquisitionId := uuid.NewString() - rottingTime := time.Now().Add(-24 * time.Hour) - - job, err := h.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime) + err := h.trsService.FindAndRunTranscribeCheckJob() if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - if job.RecognitionOpID == nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "No recognition operation ID found"}) - return - } - - // Создаем SpeechKit сервис - speechKitService, err := speechkit.NewSpeechKitService() - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize SpeechKit service: " + err.Error()}) - return - } - defer speechKitService.Close() - - // Проверяем статус операции - operation, err := speechKitService.CheckOperationStatus(*job.RecognitionOpID) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to check operation status: " + err.Error()}) - return - } - - if !operation.Done { - // Операция еще не завершена, оставляем в статусе обработки - delayTime := time.Now().Add(10 * time.Second) - job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) - err := h.jobRepo.Save(job) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()}) - return - } - c.Status(http.StatusOK) - return - } - - if opErr := operation.GetError(); opErr != nil { - job.IsError = true - errorText := fmt.Sprintf("Operation failed: code %d, message: %s", opErr.Code, opErr.Message) - job.ErrorText = &errorText - job.MoveToState(entity.StateFailed) - err := h.jobRepo.Save(job) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()}) - return - } - c.Status(http.StatusOK) - return - } - - // Операция завершена, получаем результат - responses, err := speechKitService.GetRecognitionResult(*job.RecognitionOpID) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get recognition result: " + err.Error()}) - return - } - - // Извлекаем текст из результатов - transcriptionText := speechkit.ExtractTranscriptionText(responses) - - // Обновляем задачу с результатом - job.TranscriptionText = &transcriptionText - job.MoveToState(entity.StateDone) - - err = h.jobRepo.Save(job) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()}) - return - } - c.Status(http.StatusOK) } diff --git a/internal/controller/http/transcribe_test.go b/internal/controller/http/transcribe_test.go index df8b031..b7261b5 100644 --- a/internal/controller/http/transcribe_test.go +++ b/internal/controller/http/transcribe_test.go @@ -9,16 +9,20 @@ import ( "net/http" "net/http/httptest" "os" + "path" "path/filepath" + "runtime" "testing" "time" "git.vakhrushev.me/av/transcriber/internal/entity" "git.vakhrushev.me/av/transcriber/internal/repo/sqlite" + "git.vakhrushev.me/av/transcriber/internal/service/transcribe" "github.com/doug-martin/goqu/v9" _ "github.com/doug-martin/goqu/v9/dialect/sqlite3" "github.com/gin-gonic/gin" _ "github.com/mattn/go-sqlite3" + "github.com/pressly/goose/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -30,32 +34,15 @@ func setupTestDB(t *testing.T) (*sql.DB, *goqu.Database) { gq := goqu.New("sqlite3", db) - // Создаем таблицы - createFilesTable := ` - CREATE TABLE files ( - id TEXT PRIMARY KEY, - storage TEXT NOT NULL, - size INTEGER NOT NULL, - created_at DATETIME NOT NULL - );` - - createJobsTable := ` - CREATE TABLE transcribe_jobs ( - id TEXT PRIMARY KEY, - state TEXT NOT NULL, - file_id TEXT, - is_error BOOLEAN NOT NULL DEFAULT 0, - error_text TEXT, - worker TEXT, - acquired_at DATETIME, - created_at DATETIME NOT NULL, - FOREIGN KEY (file_id) REFERENCES files(id) - );` - - _, err = db.Exec(createFilesTable) + err = goose.SetDialect("sqlite3") require.NoError(t, err) - _, err = db.Exec(createJobsTable) + _, b, _, _ := runtime.Caller(0) + + migpath, err := filepath.Abs(path.Join(b, "../../../../migrations")) + require.NoError(t, err) + + err = goose.Up(db, migpath) require.NoError(t, err) return db, gq @@ -69,15 +56,17 @@ func setupTestRouter(t *testing.T) (*gin.Engine, *TranscribeHandler) { fileRepo := sqlite.NewFileRepository(db, gq) jobRepo := sqlite.NewTranscriptJobRepository(db, gq) - handler := NewTranscribeHandler(jobRepo, fileRepo) + trsService := transcribe.NewTranscribeService(jobRepo, fileRepo) + + handler := NewTranscribeHandler(jobRepo, trsService) router := gin.New() router.MaxMultipartMemory = 32 << 20 // 32 MiB api := router.Group("/api") { - api.POST("/transcribe/audio", handler.CreateTranscribeJob) - api.GET("/transcribe/:id", handler.GetTranscribeJobStatus) + api.POST("/audio", handler.CreateTranscribeJob) + api.GET("/status/:id", handler.GetTranscribeJobStatus) } return router, handler @@ -106,7 +95,7 @@ func createMultipartRequest(t *testing.T, audioFilePath string) (*http.Request, require.NoError(t, err) // Создаем HTTP запрос - req, err := http.NewRequest("POST", "/api/transcribe/audio", &buf) + req, err := http.NewRequest("POST", "/api/audio", &buf) require.NoError(t, err) req.Header.Set("Content-Type", writer.FormDataContentType()) @@ -182,7 +171,7 @@ func TestCreateTranscribeJob_NoFile(t *testing.T) { router, _ := setupTestRouter(t) // Создаем запрос без файла - req, err := http.NewRequest("POST", "/api/transcribe/audio", nil) + req, err := http.NewRequest("POST", "/api/audio", nil) require.NoError(t, err) // Выполняем запрос @@ -333,7 +322,7 @@ func TestGetTranscribeJobStatus_Success(t *testing.T) { require.NoError(t, err) // Создаем запрос - req, err := http.NewRequest("GET", "/api/transcribe/test-job-id", nil) + req, err := http.NewRequest("GET", "/api/status/test-job-id", nil) require.NoError(t, err) // Выполняем запрос @@ -356,7 +345,7 @@ func TestGetTranscribeJobStatus_NotFound(t *testing.T) { router, _ := setupTestRouter(t) // Создаем запрос с несуществующим ID - req, err := http.NewRequest("GET", "/api/transcribe/non-existent-id", nil) + req, err := http.NewRequest("GET", "/api/status/non-existent-id", nil) require.NoError(t, err) // Выполняем запрос diff --git a/internal/entity/file.go b/internal/entity/file.go index a329e93..20d445b 100644 --- a/internal/entity/file.go +++ b/internal/entity/file.go @@ -16,3 +16,13 @@ type File struct { Size int64 CreatedAt time.Time } + +func (f *File) CopyWithStorage(newId, storage string) *File { + return &File{ + Id: newId, + Storage: storage, + FileName: f.FileName, + Size: f.Size, + CreatedAt: time.Now(), + } +} diff --git a/internal/entity/job.go b/internal/entity/job.go index 8bbdefb..ada1e99 100644 --- a/internal/entity/job.go +++ b/internal/entity/job.go @@ -22,7 +22,6 @@ type TranscribeJob struct { const ( StateCreated = "created" StateConverted = "converted" - StateUploaded = "uploaded" StateTranscribe = "transcribe" StateDone = "done" StateFailed = "failed" diff --git a/internal/service/s3/s3.go b/internal/service/s3/s3.go index c0eac8c..688718f 100644 --- a/internal/service/s3/s3.go +++ b/internal/service/s3/s3.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -78,3 +79,9 @@ func (s *S3Service) UploadFile(filePath, fileName string) error { return nil } + +func (s *S3Service) FileUrl(fileName string) string { + endpoint := strings.TrimRight(os.Getenv("S3_ENDPOINT"), "/") + bucketName := os.Getenv("S3_BUCKET_NAME") + return fmt.Sprintf("%s/%s/%s", endpoint, bucketName, fileName) +} diff --git a/internal/service/speechkit/speechkit.go b/internal/service/speechkit/speechkit.go index 4864fb1..495fee9 100644 --- a/internal/service/speechkit/speechkit.go +++ b/internal/service/speechkit/speechkit.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + "strings" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -17,6 +18,8 @@ import ( const ( SpeechKitEndpoint = "stt.api.cloud.yandex.net:443" OperationEndpoint = "operation.api.cloud.yandex.net:443" + + RecognitionModel = "deferred-general" ) type SpeechKitService struct { @@ -91,7 +94,7 @@ func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) { Uri: s3URI, }, RecognitionModel: &stt.RecognitionModelOptions{ - Model: "general", // Используем общую модель + Model: RecognitionModel, AudioFormat: &stt.AudioFormatOptions{ AudioFormat: &stt.AudioFormatOptions_ContainerAudio{ ContainerAudio: &stt.ContainerAudio{ @@ -121,7 +124,7 @@ func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) { } // GetRecognitionResult получает результат распознавания по ID операции -func (s *SpeechKitService) GetRecognitionResult(operationID string) ([]*stt.StreamingResponse, error) { +func (s *SpeechKitService) GetRecognitionText(operationID string) (string, error) { ctx := context.Background() // Добавляем авторизацию и folder_id в контекст @@ -134,22 +137,28 @@ func (s *SpeechKitService) GetRecognitionResult(operationID string) ([]*stt.Stre stream, err := s.sttClient.GetRecognition(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to get recognition stream: %w", err) + return "", fmt.Errorf("failed to get recognition stream: %w", err) } - var responses []*stt.StreamingResponse + var sb strings.Builder + for { resp, err := stream.Recv() if err != nil { if err.Error() == "EOF" { break } - return nil, fmt.Errorf("failed to receive recognition response: %w", err) + return "", fmt.Errorf("failed to receive recognition response: %w", err) + } + if final := resp.GetFinal(); final != nil { + for _, alt := range final.Alternatives { + sb.WriteString(alt.Text) + sb.WriteString(" ") + } } - responses = append(responses, resp) } - return responses, nil + return sb.String(), nil } // CheckOperationStatus проверяет статус операции распознавания @@ -171,18 +180,3 @@ func (s *SpeechKitService) CheckOperationStatus(operationID string) (*operation. return op, nil } - -// ExtractTranscriptionText извлекает текст из результатов распознавания -func ExtractTranscriptionText(responses []*stt.StreamingResponse) string { - var fullText string - - for _, resp := range responses { - if final := resp.GetFinal(); final != nil { - for _, alt := range final.Alternatives { - fullText += alt.Text + " " - } - } - } - - return fullText -} diff --git a/internal/service/transcribe/transcribe.go b/internal/service/transcribe/transcribe.go new file mode 100644 index 0000000..9ce09b0 --- /dev/null +++ b/internal/service/transcribe/transcribe.go @@ -0,0 +1,277 @@ +package transcribe + +import ( + "fmt" + "io" + "os" + "path/filepath" + "time" + + "git.vakhrushev.me/av/transcriber/internal/entity" + "git.vakhrushev.me/av/transcriber/internal/repo" + "git.vakhrushev.me/av/transcriber/internal/repo/ffmpeg" + "git.vakhrushev.me/av/transcriber/internal/service/s3" + "git.vakhrushev.me/av/transcriber/internal/service/speechkit" + "github.com/google/uuid" +) + +const baseStorageDir = "data/files" + +type TranscribeService struct { + jobRepo repo.TranscriptJobRepository + fileRepo repo.FileRepository +} + +func NewTranscribeService(jobRepo repo.TranscriptJobRepository, fileRepo repo.FileRepository) *TranscribeService { + return &TranscribeService{jobRepo: jobRepo, fileRepo: fileRepo} +} + +func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) (*entity.TranscribeJob, error) { + // Генерируем UUID для файла + fileId := uuid.New().String() + + // Определяем расширение файла + ext := filepath.Ext(fileName) + if ext == "" { + ext = ".audio" // fallback если расширение не определено + } + + // Создаем путь для сохранения файла + storageFileName := fmt.Sprintf("%s%s", fileId, ext) + storageFilePath := filepath.Join(baseStorageDir, storageFileName) + + // Создаем файл на диске + dst, err := os.Create(storageFilePath) + if err != nil { + return nil, err + } + defer dst.Close() + + // Копируем содержимое загруженного файла + size, err := io.Copy(dst, file) + if err != nil { + return nil, err + } + + // Создаем запись в таблице files + fileRecord := &entity.File{ + Id: fileId, + Storage: entity.StorageLocal, + FileName: storageFileName, + Size: size, + CreatedAt: time.Now(), + } + + if err := s.fileRepo.Create(fileRecord); err != nil { + // Удаляем файл если не удалось создать запись в БД + os.Remove(storageFilePath) + return nil, err + } + + jobId := uuid.NewString() + now := time.Now() + + // Создаем запись в таблице transcribe_jobs + job := &entity.TranscribeJob{ + Id: jobId, + State: entity.StateCreated, + FileID: &fileId, + IsError: false, + CreatedAt: now, + UpdatedAt: now, + } + + if err := s.jobRepo.Create(job); err != nil { + return nil, err + } + + return job, nil +} + +func (s *TranscribeService) FindAndRunConversionJob() error { + acquisitionId := uuid.NewString() + rottingTime := time.Now().Add(-1 * time.Hour) + + job, err := s.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime) + if err != nil { + return err + } + + srcFile, err := s.fileRepo.GetByID(*job.FileID) + if err != nil { + return err + } + + srcFilePath := filepath.Join(baseStorageDir, srcFile.FileName) + + destFileId := uuid.NewString() + destFileName := fmt.Sprintf("%s%s", destFileId, ".ogg") + destFilePath := filepath.Join(baseStorageDir, destFileName) + + conv := ffmpeg.NewFileConverter() + err = conv.Convert(srcFilePath, destFilePath) + if err != nil { + return err + } + + stat, err := os.Stat(destFilePath) + if err != nil { + return err + } + + // Создаем запись в таблице files + destFileRecord := &entity.File{ + Id: destFileId, + Storage: entity.StorageLocal, + FileName: destFileName, + Size: stat.Size(), + CreatedAt: time.Now(), + } + + job.FileID = &destFileId + job.MoveToState(entity.StateConverted) + + err = s.fileRepo.Create(destFileRecord) + if err != nil { + return err + } + + err = s.jobRepo.Save(job) + if err != nil { + return err + } + + return nil +} + +func (s *TranscribeService) FindAndRunTranscribeJob() error { + acquisitionId := uuid.NewString() + rottingTime := time.Now().Add(-1 * time.Hour) + + jobRecord, err := s.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime) + if err != nil { + return err + } + + fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID) + if err != nil { + return err + } + + filePath := filepath.Join(baseStorageDir, fileRecord.FileName) + + destFileId := uuid.NewString() + destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3) + + // Создаем S3 сервис + s3Service, err := s3.NewS3Service() + if err != nil { + return err + } + + // Загружаем файл на S3 + err = s3Service.UploadFile(filePath, destFileRecord.FileName) + if err != nil { + return err + } + + // Создаем SpeechKit сервис + speechKitService, err := speechkit.NewSpeechKitService() + if err != nil { + return err + } + + // Формируем S3 URI для файла + s3URI := s3Service.FileUrl(destFileRecord.FileName) + + // Запускаем асинхронное распознавание + operationID, err := speechKitService.RecognizeFileFromS3(s3URI) + if err != nil { + return err + } + + // Обновляем задачу с ID операции распознавания + jobRecord.FileID = &destFileId + jobRecord.RecognitionOpID = &operationID + delayTime := time.Now().Add(time.Minute) + jobRecord.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) + + err = s.fileRepo.Create(destFileRecord) + if err != nil { + return err + } + + err = s.jobRepo.Save(jobRecord) + if err != nil { + return err + } + + return nil +} + +func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { + acquisitionId := uuid.NewString() + rottingTime := time.Now().Add(-24 * time.Hour) + + job, err := s.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime) + if err != nil { + return err + } + + if job.RecognitionOpID == nil { + return fmt.Errorf("recogniton opId not found for job: %s", job.Id) + } + + // Создаем SpeechKit сервис + speechKitService, err := speechkit.NewSpeechKitService() + if err != nil { + return err + } + defer speechKitService.Close() + + // Проверяем статус операции + operation, err := speechKitService.CheckOperationStatus(*job.RecognitionOpID) + if err != nil { + return err + } + + if !operation.Done { + // Операция еще не завершена, оставляем в статусе обработки + delayTime := time.Now().Add(10 * time.Second) + job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) + err := s.jobRepo.Save(job) + if err != nil { + return err + } + return nil + } + + if opErr := operation.GetError(); opErr != nil { + job.IsError = true + errorText := fmt.Sprintf("Operation failed: code %d, message: %s", opErr.Code, opErr.Message) + job.ErrorText = &errorText + job.MoveToState(entity.StateFailed) + err := s.jobRepo.Save(job) + if err != nil { + return err + } + return nil + } + + // Операция завершена, получаем результат + transcriptionText, err := speechKitService.GetRecognitionText(*job.RecognitionOpID) + if err != nil { + return err + } + + // Обновляем задачу с результатом + job.TranscriptionText = &transcriptionText + job.MoveToState(entity.StateDone) + + err = s.jobRepo.Save(job) + if err != nil { + return err + } + + return nil +} diff --git a/main.go b/main.go index ee0f83b..2950313 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "git.vakhrushev.me/av/transcriber/internal/controller/http" "git.vakhrushev.me/av/transcriber/internal/repo/sqlite" + "git.vakhrushev.me/av/transcriber/internal/service/transcribe" "github.com/doug-martin/goqu/v9" _ "github.com/doug-martin/goqu/v9/dialect/sqlite3" "github.com/gin-gonic/gin" @@ -47,8 +48,10 @@ func main() { fileRepo := sqlite.NewFileRepository(db, gq) jobRepo := sqlite.NewTranscriptJobRepository(db, gq) + transcribeService := transcribe.NewTranscribeService(jobRepo, fileRepo) + // Инициализируем обработчики - transcribeHandler := http.NewTranscribeHandler(jobRepo, fileRepo) + transcribeHandler := http.NewTranscribeHandler(jobRepo, transcribeService) // Создаем Gin роутер r := gin.Default() @@ -56,13 +59,12 @@ func main() { // Настраиваем роуты api := r.Group("/api") { - api.POST("/transcribe/audio", transcribeHandler.CreateTranscribeJob) - api.GET("/transcribe/:id", transcribeHandler.GetTranscribeJobStatus) + api.POST("/audio", transcribeHandler.CreateTranscribeJob) + api.GET("/status/:id", transcribeHandler.GetTranscribeJobStatus) - api.POST("/transcribe/convert", transcribeHandler.RunConversionJob) - api.POST("/transcribe/upload", transcribeHandler.RunUploadJob) - api.POST("/transcribe/recognize", transcribeHandler.RunRecognitionJob) - api.POST("/transcribe/check", transcribeHandler.RunRecognitionCheckJob) + api.POST("/convert", transcribeHandler.RunConversionJob) + api.POST("/transcribe", transcribeHandler.RunTranscribeJob) + api.POST("/check", transcribeHandler.RunRecognitionCheckJob) } // Добавляем middleware для обработки больших файлов