diff --git a/internal/controller/http/transcribe.go b/internal/controller/http/transcribe.go index d437b55..4ebdd7e 100644 --- a/internal/controller/http/transcribe.go +++ b/internal/controller/http/transcribe.go @@ -92,7 +92,7 @@ func (h *TranscribeHandler) CreateTranscribeJob(c *gin.Context) { } // Создаем запись в таблице transcribe_jobs - jobId := uuid.New().String() + jobId := uuid.NewString() now := time.Now() job := &entity.TranscribeJob{ Id: jobId, @@ -293,7 +293,8 @@ func (h *TranscribeHandler) RunRecognitionJob(c *gin.Context) { // Обновляем задачу с ID операции распознавания job.RecognitionOpID = &operationID - job.MoveToState(entity.StateTranscribe) + delayTime := time.Now().Add(time.Minute) + job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) err = h.jobRepo.Save(job) if err != nil { @@ -306,7 +307,7 @@ func (h *TranscribeHandler) RunRecognitionJob(c *gin.Context) { func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) { acquisitionId := uuid.NewString() - rottingTime := time.Now().Add(-1 * time.Hour) + rottingTime := time.Now().Add(-24 * time.Hour) job, err := h.jobRepo.FindAndAcquire(entity.StateTranscribe, acquisitionId, rottingTime) if err != nil { @@ -325,6 +326,7 @@ func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize SpeechKit service: " + err.Error()}) return } + defer speechKitService.Close() // Проверяем статус операции operation, err := speechKitService.CheckOperationStatus(*job.RecognitionOpID) @@ -334,8 +336,24 @@ func (h *TranscribeHandler) RunRecognitionCheckJob(c *gin.Context) { } if !operation.Done { - // Операция еще не завершена, переводим в состояние ожидания - err = h.jobRepo.Save(job) + // Операция еще не завершена, оставляем в статусе обработки + delayTime := time.Now().Add(10 * time.Second) + job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) + err := h.jobRepo.Save(job) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()}) + return + } + c.Status(http.StatusOK) + return + } + + if opErr := operation.GetError(); opErr != nil { + job.IsError = true + errorText := fmt.Sprintf("Operation failed: code %d, message: %s", opErr.Code, opErr.Message) + job.ErrorText = &errorText + job.MoveToState(entity.StateFailed) + err := h.jobRepo.Save(job) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job: " + err.Error()}) return diff --git a/internal/entity/job.go b/internal/entity/job.go index 3db7f4e..8bbdefb 100644 --- a/internal/entity/job.go +++ b/internal/entity/job.go @@ -25,7 +25,7 @@ const ( StateUploaded = "uploaded" StateTranscribe = "transcribe" StateDone = "done" - StatusFailed = "failed" + StateFailed = "failed" ) func (j *TranscribeJob) MoveToState(state string) { diff --git a/internal/service/speechkit/speechkit.go b/internal/service/speechkit/speechkit.go index 57070e7..4864fb1 100644 --- a/internal/service/speechkit/speechkit.go +++ b/internal/service/speechkit/speechkit.go @@ -3,6 +3,7 @@ package speechkit import ( "context" "fmt" + "log" "os" "google.golang.org/grpc" @@ -15,10 +16,12 @@ import ( const ( SpeechKitEndpoint = "stt.api.cloud.yandex.net:443" + OperationEndpoint = "operation.api.cloud.yandex.net:443" ) type SpeechKitService struct { - conn *grpc.ClientConn + sttConn *grpc.ClientConn + opConn *grpc.ClientConn sttClient stt.AsyncRecognizerClient opClient operation.OperationServiceClient apiKey string @@ -33,18 +36,26 @@ func NewSpeechKitService() (*SpeechKitService, error) { return nil, fmt.Errorf("missing required Yandex Cloud environment variables") } - // Создаем защищенное соединение + // Создаем защищенное соединение для SpeechKit creds := credentials.NewTLS(nil) - conn, err := grpc.NewClient(SpeechKitEndpoint, grpc.WithTransportCredentials(creds)) + sttConn, err := grpc.NewClient(SpeechKitEndpoint, grpc.WithTransportCredentials(creds)) if err != nil { return nil, fmt.Errorf("failed to connect to SpeechKit: %w", err) } - sttClient := stt.NewAsyncRecognizerClient(conn) - opClient := operation.NewOperationServiceClient(conn) + // Создаем защищенное соединение для Operations API + opConn, err := grpc.NewClient(OperationEndpoint, grpc.WithTransportCredentials(creds)) + if err != nil { + sttConn.Close() + return nil, fmt.Errorf("failed to connect to Operations API: %w", err) + } + + sttClient := stt.NewAsyncRecognizerClient(sttConn) + opClient := operation.NewOperationServiceClient(opConn) return &SpeechKitService{ - conn: conn, + sttConn: sttConn, + opConn: opConn, sttClient: sttClient, opClient: opClient, apiKey: apiKey, @@ -53,15 +64,26 @@ func NewSpeechKitService() (*SpeechKitService, error) { } func (s *SpeechKitService) Close() error { - return s.conn.Close() + var err1, err2 error + if s.sttConn != nil { + err1 = s.sttConn.Close() + } + if s.opConn != nil { + err2 = s.opConn.Close() + } + if err1 != nil { + return err1 + } + return err2 } // RecognizeFileFromS3 запускает асинхронное распознавание файла из S3 func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) { ctx := context.Background() - // Добавляем авторизацию в контекст + // Добавляем авторизацию и folder_id в контекст ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey) + ctx = metadata.AppendToOutgoingContext(ctx, "x-folder-id", s.folderID) // Создаем запрос на распознавание req := &stt.RecognizeFileRequest{ @@ -102,8 +124,9 @@ func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) { func (s *SpeechKitService) GetRecognitionResult(operationID string) ([]*stt.StreamingResponse, error) { ctx := context.Background() - // Добавляем авторизацию в контекст + // Добавляем авторизацию и folder_id в контекст ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey) + ctx = metadata.AppendToOutgoingContext(ctx, "x-folder-id", s.folderID) req := &stt.GetRecognitionRequest{ OperationId: operationID, @@ -133,6 +156,11 @@ func (s *SpeechKitService) GetRecognitionResult(operationID string) ([]*stt.Stre func (s *SpeechKitService) CheckOperationStatus(operationID string) (*operation.Operation, error) { ctx := context.Background() + ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey) + ctx = metadata.AppendToOutgoingContext(ctx, "x-folder-id", s.folderID) + + log.Printf("Check operation status: id %s\n", operationID) + op, err := s.opClient.Get(ctx, &operation.GetOperationRequest{ OperationId: operationID, })