Fix operation check

This commit is contained in:
2025-08-11 17:30:23 +03:00
parent bb93f99911
commit f625e21418
3 changed files with 61 additions and 15 deletions

View File

@@ -92,7 +92,7 @@ func (h *TranscribeHandler) CreateTranscribeJob(c *gin.Context) {
} }
// Создаем запись в таблице transcribe_jobs // Создаем запись в таблице transcribe_jobs
jobId := uuid.New().String() jobId := uuid.NewString()
now := time.Now() now := time.Now()
job := &entity.TranscribeJob{ job := &entity.TranscribeJob{
Id: jobId, Id: jobId,
@@ -293,7 +293,8 @@ func (h *TranscribeHandler) RunRecognitionJob(c *gin.Context) {
// Обновляем задачу с ID операции распознавания // Обновляем задачу с ID операции распознавания
job.RecognitionOpID = &operationID job.RecognitionOpID = &operationID
job.MoveToState(entity.StateTranscribe) delayTime := time.Now().Add(time.Minute)
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err = h.jobRepo.Save(job) err = h.jobRepo.Save(job)
if err != nil { if err != nil {
@@ -306,7 +307,7 @@ func (h *TranscribeHandler) RunRecognitionJob(c *gin.Context) {
func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) { func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) {
acquisitionId := uuid.NewString() acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour) rottingTime := time.Now().Add(-24 * time.Hour)
job, err := h.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime) job, err := h.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime)
if err != nil { if err != nil {
@@ -325,6 +326,7 @@ func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize SpeechKit service: " + err.Error()}) c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize SpeechKit service: " + err.Error()})
return return
} }
defer speechKitService.Close()
// Проверяем статус операции // Проверяем статус операции
operation, err := speechKitService.CheckOperationStatus(*job.RecognitionOpID) operation, err := speechKitService.CheckOperationStatus(*job.RecognitionOpID)
@@ -334,8 +336,24 @@ func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) {
} }
if !operation.Done { if !operation.Done {
// Операция еще не завершена, переводим в состояние ожидания // Операция еще не завершена, оставляем в статусе обработки
err = h.jobRepo.Save(job) delayTime := time.Now().Add(10 * time.Second)
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
err := h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()})
return
}
c.Status(http.StatusOK)
return
}
if opErr := operation.GetError(); opErr != nil {
job.IsError = true
errorText := fmt.Sprintf("Operation failed: code %d, message: %s", opErr.Code, opErr.Message)
job.ErrorText = &errorText
job.MoveToState(entity.StateFailed)
err := h.jobRepo.Save(job)
if err != nil { if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()}) c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()})
return return

View File

@@ -25,7 +25,7 @@ const (
StateUploaded = "uploaded" StateUploaded = "uploaded"
StateTranscribe = "transcribe" StateTranscribe = "transcribe"
StateDone = "done" StateDone = "done"
StatusFailed = "failed" StateFailed = "failed"
) )
func (j *TranscribeJob) MoveToState(state string) { func (j *TranscribeJob) MoveToState(state string) {

View File

@@ -3,6 +3,7 @@ package speechkit
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"os" "os"
"google.golang.org/grpc" "google.golang.org/grpc"
@@ -15,10 +16,12 @@ import (
const ( const (
SpeechKitEndpoint = "stt.api.cloud.yandex.net:443" SpeechKitEndpoint = "stt.api.cloud.yandex.net:443"
OperationEndpoint = "operation.api.cloud.yandex.net:443"
) )
type SpeechKitService struct { type SpeechKitService struct {
conn *grpc.ClientConn sttConn *grpc.ClientConn
opConn *grpc.ClientConn
sttClient stt.AsyncRecognizerClient sttClient stt.AsyncRecognizerClient
opClient operation.OperationServiceClient opClient operation.OperationServiceClient
apiKey string apiKey string
@@ -33,18 +36,26 @@ func NewSpeechKitService() (*SpeechKitService, error) {
return nil, fmt.Errorf("missing required Yandex Cloud environment variables") return nil, fmt.Errorf("missing required Yandex Cloud environment variables")
} }
// Создаем защищенное соединение // Создаем защищенное соединение для SpeechKit
creds := credentials.NewTLS(nil) creds := credentials.NewTLS(nil)
conn, err := grpc.NewClient(SpeechKitEndpoint, grpc.WithTransportCredentials(creds)) sttConn, err := grpc.NewClient(SpeechKitEndpoint, grpc.WithTransportCredentials(creds))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to connect to SpeechKit: %w", err) return nil, fmt.Errorf("failed to connect to SpeechKit: %w", err)
} }
sttClient := stt.NewAsyncRecognizerClient(conn) // Создаем защищенное соединение для Operations API
opClient := operation.NewOperationServiceClient(conn) opConn, err := grpc.NewClient(OperationEndpoint, grpc.WithTransportCredentials(creds))
if err != nil {
sttConn.Close()
return nil, fmt.Errorf("failed to connect to Operations API: %w", err)
}
sttClient := stt.NewAsyncRecognizerClient(sttConn)
opClient := operation.NewOperationServiceClient(opConn)
return &SpeechKitService{ return &SpeechKitService{
conn: conn, sttConn: sttConn,
opConn: opConn,
sttClient: sttClient, sttClient: sttClient,
opClient: opClient, opClient: opClient,
apiKey: apiKey, apiKey: apiKey,
@@ -53,15 +64,26 @@ func NewSpeechKitService() (*SpeechKitService, error) {
} }
func (s *SpeechKitService) Close() error { func (s *SpeechKitService) Close() error {
return s.conn.Close() var err1, err2 error
if s.sttConn != nil {
err1 = s.sttConn.Close()
}
if s.opConn != nil {
err2 = s.opConn.Close()
}
if err1 != nil {
return err1
}
return err2
} }
// RecognizeFileFromS3 запускает асинхронное распознавание файла из S3 // RecognizeFileFromS3 запускает асинхронное распознавание файла из S3
func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) { func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) {
ctx := context.Background() ctx := context.Background()
// Добавляем авторизацию в контекст // Добавляем авторизацию и folder_id в контекст
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey) ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey)
ctx = metadata.AppendToOutgoingContext(ctx, "x-folder-id", s.folderID)
// Создаем запрос на распознавание // Создаем запрос на распознавание
req := &stt.RecognizeFileRequest{ req := &stt.RecognizeFileRequest{
@@ -102,8 +124,9 @@ func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) {
func (s *SpeechKitService) GetRecognitionResult(operationID string) ([]*stt.StreamingResponse, error) { func (s *SpeechKitService) GetRecognitionResult(operationID string) ([]*stt.StreamingResponse, error) {
ctx := context.Background() ctx := context.Background()
// Добавляем авторизацию в контекст // Добавляем авторизацию и folder_id в контекст
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey) ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey)
ctx = metadata.AppendToOutgoingContext(ctx, "x-folder-id", s.folderID)
req := &stt.GetRecognitionRequest{ req := &stt.GetRecognitionRequest{
OperationId: operationID, OperationId: operationID,
@@ -133,6 +156,11 @@ func (s *SpeechKitService) GetRecognitionResult(operationID string) ([]*stt.Stre
func (s *SpeechKitService) CheckOperationStatus(operationID string) (*operation.Operation, error) { func (s *SpeechKitService) CheckOperationStatus(operationID string) (*operation.Operation, error) {
ctx := context.Background() ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey)
ctx = metadata.AppendToOutgoingContext(ctx, "x-folder-id", s.folderID)
log.Printf("Check operation status: id %s\n", operationID)
op, err := s.opClient.Get(ctx, &operation.GetOperationRequest{ op, err := s.opClient.Get(ctx, &operation.GetOperationRequest{
OperationId: operationID, OperationId: operationID,
}) })