Extract logic into transcribe service

This commit is contained in:
2025-08-12 10:59:51 +03:00
parent f625e21418
commit 2c9a5f4bfb
8 changed files with 354 additions and 356 deletions

View File

@@ -1,29 +1,24 @@
package http
import (
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"time"
"git.vakhrushev.me/av/transcriber/internal/entity"
"git.vakhrushev.me/av/transcriber/internal/repo"
"git.vakhrushev.me/av/transcriber/internal/repo/ffmpeg"
"git.vakhrushev.me/av/transcriber/internal/service/s3"
"git.vakhrushev.me/av/transcriber/internal/service/speechkit"
"git.vakhrushev.me/av/transcriber/internal/service/transcribe"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
const baseStorageDir = "data/files"
type TranscribeHandler struct {
jobRepo repo.TranscriptJobRepository
fileRepo repo.FileRepository
jobRepo repo.TranscriptJobRepository
trsService *transcribe.TranscribeService
}
func NewTranscribeHandler(jobRepo repo.TranscriptJobRepository, fileRepo repo.FileRepository) *TranscribeHandler {
return &TranscribeHandler{jobRepo: jobRepo, fileRepo: fileRepo}
func NewTranscribeHandler(jobRepo repo.TranscriptJobRepository, trsService *transcribe.TranscribeService) *TranscribeHandler {
return &TranscribeHandler{jobRepo: jobRepo, trsService: trsService}
}
type CreateTranscribeJobResponse struct {
@@ -47,64 +42,10 @@ func (h *TranscribeHandler) CreateTranscribeJob(c *gin.Context) {
}
defer file.Close()
// Генерируем UUID для файла
fileId := uuid.New().String()
// Определяем расширение файла
ext := filepath.Ext(header.Filename)
if ext == "" {
ext = ".audio" // fallback если расширение не определено
}
// Создаем путь для сохранения файла
fileName := fmt.Sprintf("%s%s", fileId, ext)
filePath := filepath.Join("data", "files", fileName)
// Создаем файл на диске
dst, err := os.Create(filePath)
job, err := h.trsService.CreateTranscribeJob(file, header.Filename)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create file"})
return
}
defer dst.Close()
// Копируем содержимое загруженного файла
size, err := io.Copy(dst, file)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to save file"})
return
}
// Создаем запись в таблице files
fileRecord := &entity.File{
Id: fileId,
Storage: entity.StorageLocal,
FileName: fileName,
Size: size,
CreatedAt: time.Now(),
}
if err := h.fileRepo.Create(fileRecord); err != nil {
// Удаляем файл если не удалось создать запись в БД
os.Remove(filePath)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to save file record"})
return
}
// Создаем запись в таблице transcribe_jobs
jobId := uuid.NewString()
now := time.Now()
job := &entity.TranscribeJob{
Id: jobId,
State: entity.StateCreated,
FileID: &fileId,
IsError: false,
CreatedAt: now,
UpdatedAt: now,
}
if err := h.jobRepo.Create(job); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create transcribe job"})
log.Printf("Err: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create transcibe job"})
return
}
@@ -135,59 +76,7 @@ func (h *TranscribeHandler) GetTranscribeJobStatus(c *gin.Context) {
}
func (h *TranscribeHandler) RunConversionJob(c *gin.Context) {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
job, err := h.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
srcFile, err := h.fileRepo.GetByID(*job.FileID)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
srcFilePath := filepath.Join("data", "files", srcFile.FileName)
destFileId := uuid.New().String()
destFileName := fmt.Sprintf("%s%s", destFileId, ".ogg")
destFilePath := filepath.Join("data", "files", destFileName)
conv := ffmpeg.NewFileConverter()
err = conv.Convert(srcFilePath, destFilePath)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
stat, err := os.Stat(destFilePath)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Создаем запись в таблице files
destFileRecord := &entity.File{
Id: destFileId,
Storage: entity.StorageLocal,
FileName: destFileName,
Size: stat.Size(),
CreatedAt: time.Now(),
}
job.FileID = &destFileId
job.MoveToState(entity.StateConverted)
err = h.fileRepo.Create(destFileRecord)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
err = h.jobRepo.Save(job)
err := h.trsService.FindAndRunConversionJob()
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
@@ -196,191 +85,22 @@ func (h *TranscribeHandler) RunConversionJob(c *gin.Context) {
c.Status(http.StatusOK)
}
func (h *TranscribeHandler) RunUploadJob(c *gin.Context) {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
job, err := h.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime)
func (h *TranscribeHandler) RunTranscribeJob(c *gin.Context) {
err := h.trsService.FindAndRunTranscribeJob()
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
fileRecord, err := h.fileRepo.GetByID(*job.FileID)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
filePath := filepath.Join("data", "files", fileRecord.FileName)
destFileId := uuid.New().String()
destFileRecord := &entity.File{
Id: destFileId,
Storage: entity.StorageS3,
FileName: fileRecord.FileName,
Size: fileRecord.Size,
CreatedAt: time.Now(),
}
// Создаем S3 сервис
s3Service, err := s3.NewS3Service()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize S3 service: " + err.Error()})
return
}
// Загружаем файл на S3
err = s3Service.UploadFile(filePath, destFileRecord.FileName)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to upload file to S3: " + err.Error()})
return
}
job.FileID = &destFileId
job.MoveToState(entity.StateUploaded)
// Сохраняем информацию о загрузке файла на S3
err = h.fileRepo.Create(destFileRecord)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update file record: " + err.Error()})
return
}
// Обновляем состояние задачи
err = h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job state: " + err.Error()})
return
}
c.Status(http.StatusOK)
}
func (h *TranscribeHandler) RunRecognitionJob(c *gin.Context) {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
job, err := h.jobRepo.FindAndAcquire(entity.StateUploaded, acquisitionId, rottingTime)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
fileRecord, err := h.fileRepo.GetByID(*job.FileID)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Создаем SpeechKit сервис
speechKitService, err := speechkit.NewSpeechKitService()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize SpeechKit service: " + err.Error()})
return
}
// Формируем S3 URI для файла
bucketName := os.Getenv("S3_BUCKET_NAME")
s3URI := fmt.Sprintf("https://storage.yandexcloud.net/%s/%s", bucketName, fileRecord.FileName)
// Запускаем асинхронное распознавание
operationID, err := speechKitService.RecognizeFileFromS3(s3URI)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to start recognition: " + err.Error()})
return
}
// Обновляем задачу с ID операции распознавания
job.RecognitionOpID = &operationID
delayTime := time.Now().Add(time.Minute)
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err = h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()})
return
}
c.Status(http.StatusOK)
}
func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-24 * time.Hour)
job, err := h.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime)
err := h.trsService.FindAndRunTranscribeCheckJob()
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if job.RecognitionOpID == nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "No recognition operation ID found"})
return
}
// Создаем SpeechKit сервис
speechKitService, err := speechkit.NewSpeechKitService()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize SpeechKit service: " + err.Error()})
return
}
defer speechKitService.Close()
// Проверяем статус операции
operation, err := speechKitService.CheckOperationStatus(*job.RecognitionOpID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to check operation status: " + err.Error()})
return
}
if !operation.Done {
// Операция еще не завершена, оставляем в статусе обработки
delayTime := time.Now().Add(10 * time.Second)
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err := h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()})
return
}
c.Status(http.StatusOK)
return
}
if opErr := operation.GetError(); opErr != nil {
job.IsError = true
errorText := fmt.Sprintf("Operation failed: code %d, message: %s", opErr.Code, opErr.Message)
job.ErrorText = &errorText
job.MoveToState(entity.StateFailed)
err := h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()})
return
}
c.Status(http.StatusOK)
return
}
// Операция завершена, получаем результат
responses, err := speechKitService.GetRecognitionResult(*job.RecognitionOpID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get recognition result: " + err.Error()})
return
}
// Извлекаем текст из результатов
transcriptionText := speechkit.ExtractTranscriptionText(responses)
// Обновляем задачу с результатом
job.TranscriptionText = &transcriptionText
job.MoveToState(entity.StateDone)
err = h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()})
return
}
c.Status(http.StatusOK)
}

View File

@@ -9,16 +9,20 @@ import (
"net/http"
"net/http/httptest"
"os"
"path"
"path/filepath"
"runtime"
"testing"
"time"
"git.vakhrushev.me/av/transcriber/internal/entity"
"git.vakhrushev.me/av/transcriber/internal/repo/sqlite"
"git.vakhrushev.me/av/transcriber/internal/service/transcribe"
"github.com/doug-martin/goqu/v9"
_ "github.com/doug-martin/goqu/v9/dialect/sqlite3"
"github.com/gin-gonic/gin"
_ "github.com/mattn/go-sqlite3"
"github.com/pressly/goose/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -30,32 +34,15 @@ func setupTestDB(t *testing.T) (*sql.DB, *goqu.Database) {
gq := goqu.New("sqlite3", db)
// Создаем таблицы
createFilesTable := `
CREATE TABLE files (
id TEXT PRIMARY KEY,
storage TEXT NOT NULL,
size INTEGER NOT NULL,
created_at DATETIME NOT NULL
);`
createJobsTable := `
CREATE TABLE transcribe_jobs (
id TEXT PRIMARY KEY,
state TEXT NOT NULL,
file_id TEXT,
is_error BOOLEAN NOT NULL DEFAULT 0,
error_text TEXT,
worker TEXT,
acquired_at DATETIME,
created_at DATETIME NOT NULL,
FOREIGN KEY (file_id) REFERENCES files(id)
);`
_, err = db.Exec(createFilesTable)
err = goose.SetDialect("sqlite3")
require.NoError(t, err)
_, err = db.Exec(createJobsTable)
_, b, _, _ := runtime.Caller(0)
migpath, err := filepath.Abs(path.Join(b, "../../../../migrations"))
require.NoError(t, err)
err = goose.Up(db, migpath)
require.NoError(t, err)
return db, gq
@@ -69,15 +56,17 @@ func setupTestRouter(t *testing.T) (*gin.Engine, *TranscribeHandler) {
fileRepo := sqlite.NewFileRepository(db, gq)
jobRepo := sqlite.NewTranscriptJobRepository(db, gq)
handler := NewTranscribeHandler(jobRepo, fileRepo)
trsService := transcribe.NewTranscribeService(jobRepo, fileRepo)
handler := NewTranscribeHandler(jobRepo, trsService)
router := gin.New()
router.MaxMultipartMemory = 32 << 20 // 32 MiB
api := router.Group("/api")
{
api.POST("/transcribe/audio", handler.CreateTranscribeJob)
api.GET("/transcribe/:id", handler.GetTranscribeJobStatus)
api.POST("/audio", handler.CreateTranscribeJob)
api.GET("/status/:id", handler.GetTranscribeJobStatus)
}
return router, handler
@@ -106,7 +95,7 @@ func createMultipartRequest(t *testing.T, audioFilePath string) (*http.Request,
require.NoError(t, err)
// Создаем HTTP запрос
req, err := http.NewRequest("POST", "/api/transcribe/audio", &buf)
req, err := http.NewRequest("POST", "/api/audio", &buf)
require.NoError(t, err)
req.Header.Set("Content-Type", writer.FormDataContentType())
@@ -182,7 +171,7 @@ func TestCreateTranscribeJob_NoFile(t *testing.T) {
router, _ := setupTestRouter(t)
// Создаем запрос без файла
req, err := http.NewRequest("POST", "/api/transcribe/audio", nil)
req, err := http.NewRequest("POST", "/api/audio", nil)
require.NoError(t, err)
// Выполняем запрос
@@ -333,7 +322,7 @@ func TestGetTranscribeJobStatus_Success(t *testing.T) {
require.NoError(t, err)
// Создаем запрос
req, err := http.NewRequest("GET", "/api/transcribe/test-job-id", nil)
req, err := http.NewRequest("GET", "/api/status/test-job-id", nil)
require.NoError(t, err)
// Выполняем запрос
@@ -356,7 +345,7 @@ func TestGetTranscribeJobStatus_NotFound(t *testing.T) {
router, _ := setupTestRouter(t)
// Создаем запрос с несуществующим ID
req, err := http.NewRequest("GET", "/api/transcribe/non-existent-id", nil)
req, err := http.NewRequest("GET", "/api/status/non-existent-id", nil)
require.NoError(t, err)
// Выполняем запрос