Add initial audio recognition requests

This commit is contained in:
2025-08-11 15:26:55 +03:00
parent c1da998c02
commit 672d8573fc
9 changed files with 396 additions and 64 deletions

View File

@@ -12,6 +12,7 @@ import (
"git.vakhrushev.me/av/transcriber/internal/repo"
"git.vakhrushev.me/av/transcriber/internal/repo/ffmpeg"
"git.vakhrushev.me/av/transcriber/internal/service/s3"
"git.vakhrushev.me/av/transcriber/internal/service/speechkit"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
@@ -31,9 +32,10 @@ type CreateTranscribeJobResponse struct {
}
type GetTranscribeJobResponse struct {
JobID string `json:"job_id"`
State string `json:"status"`
CreatedAt time.Time `json:"created_at"`
JobID string `json:"job_id"`
State string `json:"status"`
CreatedAt time.Time `json:"created_at"`
TranscriptionText *string `json:"transcription_text,omitempty"`
}
func (h *TranscribeHandler) CreateTranscribeJob(c *gin.Context) {
@@ -123,9 +125,10 @@ func (h *TranscribeHandler) GetTranscribeJobStatus(c *gin.Context) {
}
c.JSON(http.StatusOK, GetTranscribeJobResponse{
JobID: job.Id,
State: job.State,
CreatedAt: job.CreatedAt,
JobID: job.Id,
State: job.State,
CreatedAt: job.CreatedAt,
TranscriptionText: job.TranscriptionText,
})
}
@@ -233,7 +236,7 @@ func (h *TranscribeHandler) RunUploadJob(c *gin.Context) {
}
job.FileID = &destFileId
job.MoveToState(entity.StateTranscribeReady)
job.MoveToState(entity.StateUploaded)
// Сохраняем информацию о загрузке файла на S3
err = h.fileRepo.Create(destFileRecord)
@@ -251,3 +254,113 @@ func (h *TranscribeHandler) RunUploadJob(c *gin.Context) {
c.Status(http.StatusOK)
}
func (h *TranscribeHandler) RunRecognitionJob(c *gin.Context) {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
job, err := h.jobRepo.FindAndAcquire(entity.StateUploaded, acquisitionId, rottingTime)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
fileRecord, err := h.fileRepo.GetByID(*job.FileID)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Создаем SpeechKit сервис
speechKitService, err := speechkit.NewSpeechKitService()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize SpeechKit service: " + err.Error()})
return
}
// Формируем S3 URI для файла
bucketName := os.Getenv("S3_BUCKET_NAME")
s3URI := fmt.Sprintf("https://storage.yandexcloud.net/%s/%s", bucketName, fileRecord.FileName)
// Запускаем асинхронное распознавание
operationID, err := speechKitService.RecognizeFileFromS3(s3URI)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to start recognition: " + err.Error()})
return
}
// Обновляем задачу с ID операции распознавания
job.RecognitionOpID = &operationID
job.MoveToState(entity.StateTranscribe)
err = h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()})
return
}
c.Status(http.StatusOK)
}
func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
job, err := h.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if job.RecognitionOpID == nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "No recognition operation ID found"})
return
}
// Создаем SpeechKit сервис
speechKitService, err := speechkit.NewSpeechKitService()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize SpeechKit service: " + err.Error()})
return
}
// Проверяем статус операции
operation, err := speechKitService.CheckOperationStatus(*job.RecognitionOpID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to check operation status: " + err.Error()})
return
}
if !operation.Done {
// Операция еще не завершена, переводим в состояние ожидания
err = h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()})
return
}
c.Status(http.StatusOK)
return
}
// Операция завершена, получаем результат
responses, err := speechKitService.GetRecognitionResult(*job.RecognitionOpID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get recognition result: " + err.Error()})
return
}
// Извлекаем текст из результатов
transcriptionText := speechkit.ExtractTranscriptionText(responses)
// Обновляем задачу с результатом
job.TranscriptionText = &transcriptionText
job.MoveToState(entity.StateDone)
err = h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()})
return
}
c.Status(http.StatusOK)
}

View File

@@ -5,24 +5,25 @@ import (
)
type TranscribeJob struct {
Id string
State string
FileID *string
IsError bool
ErrorText *string
Worker *string
AcquiredAt *time.Time
CreatedAt time.Time
Id string
State string
FileID *string
IsError bool
ErrorText *string
Worker *string
AcquiredAt *time.Time
CreatedAt time.Time
RecognitionOpID *string // ID операции распознавания в Yandex Cloud
TranscriptionText *string // Результат распознавания
}
const (
StateCreated = "created"
StateConverted = "converted"
StateUploaded = "uploaded"
StateTranscribeReady = "transcribe_ready"
StateTranscribeWait = "transcribe_wait"
StateDone = "done"
StatusFailed = "failed"
StateCreated = "created"
StateConverted = "converted"
StateUploaded = "uploaded"
StateTranscribe = "transcribe"
StateDone = "done"
StatusFailed = "failed"
)
func (j *TranscribeJob) MoveToState(state string) {

View File

@@ -21,14 +21,16 @@ func NewTranscriptJobRepository(db *sql.DB, gq *goqu.Database) *TranscriptJobRep
func (repo *TranscriptJobRepository) Create(job *entity.TranscribeJob) error {
record := goqu.Record{
"id": job.Id,
"state": job.State,
"file_id": job.FileID,
"is_error": job.IsError,
"error_text": job.ErrorText,
"worker": job.Worker,
"acquired_at": job.AcquiredAt,
"created_at": job.CreatedAt,
"id": job.Id,
"state": job.State,
"file_id": job.FileID,
"is_error": job.IsError,
"error_text": job.ErrorText,
"worker": job.Worker,
"acquired_at": job.AcquiredAt,
"created_at": job.CreatedAt,
"recognition_op_id": job.RecognitionOpID,
"transcription_text": job.TranscriptionText,
}
query := repo.gq.Insert("transcribe_jobs").Rows(record)
sql, args, err := query.ToSQL()
@@ -46,12 +48,14 @@ func (repo *TranscriptJobRepository) Create(job *entity.TranscribeJob) error {
func (repo *TranscriptJobRepository) Save(job *entity.TranscribeJob) error {
record := goqu.Record{
"state": job.State,
"file_id": job.FileID,
"is_error": job.IsError,
"error_text": job.ErrorText,
"worker": job.Worker,
"acquired_at": job.AcquiredAt,
"state": job.State,
"file_id": job.FileID,
"is_error": job.IsError,
"error_text": job.ErrorText,
"worker": job.Worker,
"acquired_at": job.AcquiredAt,
"recognition_op_id": job.RecognitionOpID,
"transcription_text": job.TranscriptionText,
}
query := repo.gq.Update("transcribe_jobs").Set(record).Where(goqu.C("id").Eq(job.Id))
sql, args, err := query.ToSQL()
@@ -77,6 +81,8 @@ func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob,
"worker",
"acquired_at",
"created_at",
"recognition_op_id",
"transcription_text",
).Where(goqu.C("id").Eq(id))
sql, args, err := query.ToSQL()
if err != nil {
@@ -93,6 +99,8 @@ func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob,
&job.Worker,
&job.AcquiredAt,
&job.CreatedAt,
&job.RecognitionOpID,
&job.TranscriptionText,
)
if err != nil {
return nil, fmt.Errorf("failed to get transcribe job: %w", err)
@@ -154,6 +162,8 @@ func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string,
"worker",
"acquired_at",
"created_at",
"recognition_op_id",
"transcription_text",
).Where(goqu.C("worker").Eq(acquisitionId))
sql, args, err = selectQuery.ToSQL()
@@ -171,6 +181,8 @@ func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string,
&job.Worker,
&job.AcquiredAt,
&job.CreatedAt,
&job.RecognitionOpID,
&job.TranscriptionText,
)
if err != nil {
return nil, fmt.Errorf("failed to get transcribe job: %w", err)

View File

@@ -0,0 +1,160 @@
package speechkit
import (
"context"
"fmt"
"os"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
stt "github.com/yandex-cloud/go-genproto/yandex/cloud/ai/stt/v3"
"github.com/yandex-cloud/go-genproto/yandex/cloud/operation"
)
const (
SpeechKitEndpoint = "stt.api.cloud.yandex.net:443"
)
type SpeechKitService struct {
conn *grpc.ClientConn
sttClient stt.AsyncRecognizerClient
opClient operation.OperationServiceClient
apiKey string
folderID string
}
func NewSpeechKitService() (*SpeechKitService, error) {
apiKey := os.Getenv("YANDEX_CLOUD_API_KEY")
folderID := os.Getenv("YANDEX_CLOUD_FOLDER_ID")
if apiKey == "" || folderID == "" {
return nil, fmt.Errorf("missing required Yandex Cloud environment variables")
}
// Создаем защищенное соединение
creds := credentials.NewTLS(nil)
conn, err := grpc.NewClient(SpeechKitEndpoint, grpc.WithTransportCredentials(creds))
if err != nil {
return nil, fmt.Errorf("failed to connect to SpeechKit: %w", err)
}
sttClient := stt.NewAsyncRecognizerClient(conn)
opClient := operation.NewOperationServiceClient(conn)
return &SpeechKitService{
conn: conn,
sttClient: sttClient,
opClient: opClient,
apiKey: apiKey,
folderID: folderID,
}, nil
}
func (s *SpeechKitService) Close() error {
return s.conn.Close()
}
// RecognizeFileFromS3 запускает асинхронное распознавание файла из S3
func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) {
ctx := context.Background()
// Добавляем авторизацию в контекст
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey)
// Создаем запрос на распознавание
req := &stt.RecognizeFileRequest{
AudioSource: &stt.RecognizeFileRequest_Uri{
Uri: s3URI,
},
RecognitionModel: &stt.RecognitionModelOptions{
Model: "general", // Используем общую модель
AudioFormat: &stt.AudioFormatOptions{
AudioFormat: &stt.AudioFormatOptions_ContainerAudio{
ContainerAudio: &stt.ContainerAudio{
ContainerAudioType: stt.ContainerAudio_OGG_OPUS,
},
},
},
TextNormalization: &stt.TextNormalizationOptions{
TextNormalization: stt.TextNormalizationOptions_TEXT_NORMALIZATION_ENABLED,
ProfanityFilter: false,
LiteratureText: true,
},
AudioProcessingType: stt.RecognitionModelOptions_FULL_DATA,
},
SpeakerLabeling: &stt.SpeakerLabelingOptions{
SpeakerLabeling: stt.SpeakerLabelingOptions_SPEAKER_LABELING_ENABLED,
},
}
// Отправляем запрос
op, err := s.sttClient.RecognizeFile(ctx, req)
if err != nil {
return "", fmt.Errorf("failed to start recognition: %w", err)
}
return op.Id, nil
}
// GetRecognitionResult получает результат распознавания по ID операции
func (s *SpeechKitService) GetRecognitionResult(operationID string) ([]*stt.StreamingResponse, error) {
ctx := context.Background()
// Добавляем авторизацию в контекст
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey)
req := &stt.GetRecognitionRequest{
OperationId: operationID,
}
stream, err := s.sttClient.GetRecognition(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get recognition stream: %w", err)
}
var responses []*stt.StreamingResponse
for {
resp, err := stream.Recv()
if err != nil {
if err.Error() == "EOF" {
break
}
return nil, fmt.Errorf("failed to receive recognition response: %w", err)
}
responses = append(responses, resp)
}
return responses, nil
}
// CheckOperationStatus проверяет статус операции распознавания
func (s *SpeechKitService) CheckOperationStatus(operationID string) (*operation.Operation, error) {
ctx := context.Background()
op, err := s.opClient.Get(ctx, &operation.GetOperationRequest{
OperationId: operationID,
})
if err != nil {
return nil, fmt.Errorf("failed to get operation status: %w", err)
}
return op, nil
}
// ExtractTranscriptionText извлекает текст из результатов распознавания
func ExtractTranscriptionText(responses []*stt.StreamingResponse) string {
var fullText string
for _, resp := range responses {
if final := resp.GetFinal(); final != nil {
for _, alt := range final.Alternatives {
fullText += alt.Text + " "
}
}
}
return fullText
}