From f6b5e835a498e757f449096c7efe6a705036bf45 Mon Sep 17 00:00:00 2001 From: Anton Vakhrushev Date: Wed, 13 Aug 2025 09:20:45 +0300 Subject: [PATCH] Replace all infra services with interfaces --- internal/adapter/yandex/memory.go | 20 ++++ internal/adapter/yandex/recognizer.go | 95 +++++++++++++++++++ internal/adapter/{s3 => yandex}/s3.go | 27 ++---- .../{speechkit => yandex}/speechkit.go | 30 +++--- internal/contract/contract.go | 13 ++- internal/controller/http/transcribe_test.go | 6 +- internal/entity/recognition.go | 78 +++++++++++++++ internal/service/transcribe.go | 53 +++-------- main.go | 23 +++-- 9 files changed, 260 insertions(+), 85 deletions(-) create mode 100644 internal/adapter/yandex/memory.go create mode 100644 internal/adapter/yandex/recognizer.go rename internal/adapter/{s3 => yandex}/s3.go (80%) rename internal/adapter/{speechkit => yandex}/speechkit.go (88%) create mode 100644 internal/entity/recognition.go diff --git a/internal/adapter/yandex/memory.go b/internal/adapter/yandex/memory.go new file mode 100644 index 0000000..069323a --- /dev/null +++ b/internal/adapter/yandex/memory.go @@ -0,0 +1,20 @@ +package yandex + +import ( + "git.vakhrushev.me/av/transcriber/internal/entity" + "github.com/google/uuid" +) + +type MemoryAudioRecognizer struct{} + +func (r *MemoryAudioRecognizer) RecognizeFile(filePath string) (operationID string, err error) { + return uuid.NewString(), nil +} + +func (r *MemoryAudioRecognizer) GetRecognitionText(operationID string) (string, error) { + return "Foo bar, Baz.", nil +} + +func (r *MemoryAudioRecognizer) CheckRecognitionStatus(operationID string) (*entity.RecognitionResult, error) { + return entity.NewCompletedResult(), nil +} diff --git a/internal/adapter/yandex/recognizer.go b/internal/adapter/yandex/recognizer.go new file mode 100644 index 0000000..e711211 --- /dev/null +++ b/internal/adapter/yandex/recognizer.go @@ -0,0 +1,95 @@ +package yandex + +import ( + "fmt" + "path/filepath" + + "git.vakhrushev.me/av/transcriber/internal/entity" +) + +type YandexAudioRecognizerConfig struct { + // s3 + Region string + AccessKey string + SecretKey string + BucketName string + Endpoint string + // speech kit + ApiKey string + FolderID string +} + +type YandexAudioRecognizerService struct { + s3Sevice *yandexS3Service + sttService *speechKitService +} + +func NewYandexAudioRecognizerService(cfg YandexAudioRecognizerConfig) (*YandexAudioRecognizerService, error) { + s3, err := newYandexS3Service(s3Config{ + Region: cfg.Region, + AccessKey: cfg.AccessKey, + SecretKey: cfg.SecretKey, + BucketName: cfg.BucketName, + Endpoint: cfg.Endpoint, + }) + if err != nil { + return nil, err + } + + stt, err := newSpeechKitService(speechKitConfig{ + ApiKey: cfg.ApiKey, + FolderID: cfg.FolderID, + }) + if err != nil { + return nil, err + } + + return &YandexAudioRecognizerService{ + s3Sevice: s3, + sttService: stt, + }, nil +} + +func (s *YandexAudioRecognizerService) Close() error { + return s.sttService.Close() +} + +func (s *YandexAudioRecognizerService) RecognizeFile(filePath string) (string, error) { + fileName := filepath.Base(filePath) + + err := s.s3Sevice.uploadFile(filePath, fileName) + if err != nil { + return "", err + } + + uri := s.s3Sevice.fileUrl(fileName) + + opId, err := s.sttService.recognizeFileFromS3(uri) + if err != nil { + return "", err + } + + return opId, nil +} + +func (s *YandexAudioRecognizerService) GetRecognitionText(operationID string) (string, error) { + return s.sttService.getRecognitionText(operationID) +} + +func (s *YandexAudioRecognizerService) CheckRecognitionStatus(operationID string) (*entity.RecognitionResult, error) { + operation, err := s.sttService.checkOperationStatus(operationID) + if err != nil { + return nil, err + } + + if !operation.Done { + return entity.NewInProgressResult(), nil + } + + if opErr := operation.GetError(); opErr != nil { + errorText := fmt.Sprintf("operation failed: code %d, message: %s", opErr.Code, opErr.Message) + return entity.NewFailedResult(errorText), nil + } + + return entity.NewCompletedResult(), nil +} diff --git a/internal/adapter/s3/s3.go b/internal/adapter/yandex/s3.go similarity index 80% rename from internal/adapter/s3/s3.go rename to internal/adapter/yandex/s3.go index b2d3070..042ccd4 100644 --- a/internal/adapter/s3/s3.go +++ b/internal/adapter/yandex/s3.go @@ -1,4 +1,4 @@ -package s3 +package yandex import ( "context" @@ -13,7 +13,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" ) -type S3Config struct { +type s3Config struct { Region string AccessKey string SecretKey string @@ -21,14 +21,14 @@ type S3Config struct { Endpoint string } -type YandexS3Service struct { +type yandexS3Service struct { client *s3.Client uploader *manager.Uploader bucketName string endpoint string } -func NewYandexS3Service(cfg S3Config) (*YandexS3Service, error) { +func newYandexS3Service(cfg s3Config) (*yandexS3Service, error) { if cfg.Region == "" || cfg.AccessKey == "" || cfg.SecretKey == "" || cfg.BucketName == "" { return nil, fmt.Errorf("missing required S3 configuration parameters") } @@ -57,7 +57,7 @@ func NewYandexS3Service(cfg S3Config) (*YandexS3Service, error) { uploader := manager.NewUploader(client) - return &YandexS3Service{ + return &yandexS3Service{ client: client, uploader: uploader, bucketName: cfg.BucketName, @@ -65,7 +65,7 @@ func NewYandexS3Service(cfg S3Config) (*YandexS3Service, error) { }, nil } -func (s *YandexS3Service) UploadFile(filePath, fileName string) error { +func (s *yandexS3Service) uploadFile(filePath, fileName string) error { file, err := os.Open(filePath) if err != nil { return fmt.Errorf("failed to open file %s: %w", filePath, err) @@ -84,20 +84,7 @@ func (s *YandexS3Service) UploadFile(filePath, fileName string) error { return nil } -func (s *YandexS3Service) FileUrl(fileName string) string { +func (s *yandexS3Service) fileUrl(fileName string) string { endpoint := strings.TrimRight(s.endpoint, "/") return fmt.Sprintf("%s/%s/%s", endpoint, s.bucketName, fileName) } - -// test service - -type TestS3service struct { -} - -func (s *TestS3service) UploadFile(filePath, fileName string) error { - return nil -} - -func (s *TestS3service) FileUrl(fileName string) string { - return fileName -} diff --git a/internal/adapter/speechkit/speechkit.go b/internal/adapter/yandex/speechkit.go similarity index 88% rename from internal/adapter/speechkit/speechkit.go rename to internal/adapter/yandex/speechkit.go index beccc03..af84e6e 100644 --- a/internal/adapter/speechkit/speechkit.go +++ b/internal/adapter/yandex/speechkit.go @@ -1,9 +1,8 @@ -package speechkit +package yandex import ( "context" "fmt" - "os" "strings" "google.golang.org/grpc" @@ -21,7 +20,12 @@ const ( RecognitionModel = "deferred-general" ) -type SpeechKitService struct { +type speechKitConfig struct { + ApiKey string + FolderID string +} + +type speechKitService struct { sttConn *grpc.ClientConn opConn *grpc.ClientConn sttClient stt.AsyncRecognizerClient @@ -30,9 +34,9 @@ type SpeechKitService struct { folderID string } -func NewSpeechKitService() (*SpeechKitService, error) { - apiKey := os.Getenv("YANDEX_CLOUD_API_KEY") - folderID := os.Getenv("YANDEX_CLOUD_FOLDER_ID") +func newSpeechKitService(cfg speechKitConfig) (*speechKitService, error) { + apiKey := cfg.ApiKey + folderID := cfg.FolderID if apiKey == "" || folderID == "" { return nil, fmt.Errorf("missing required Yandex Cloud environment variables") @@ -55,7 +59,7 @@ func NewSpeechKitService() (*SpeechKitService, error) { sttClient := stt.NewAsyncRecognizerClient(sttConn) opClient := operation.NewOperationServiceClient(opConn) - return &SpeechKitService{ + return &speechKitService{ sttConn: sttConn, opConn: opConn, sttClient: sttClient, @@ -65,7 +69,7 @@ func NewSpeechKitService() (*SpeechKitService, error) { }, nil } -func (s *SpeechKitService) Close() error { +func (s *speechKitService) Close() error { var err1, err2 error if s.sttConn != nil { err1 = s.sttConn.Close() @@ -79,8 +83,8 @@ func (s *SpeechKitService) Close() error { return err2 } -// RecognizeFileFromS3 запускает асинхронное распознавание файла из S3 -func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) { +// recognizeFileFromS3 запускает асинхронное распознавание файла из S3 +func (s *speechKitService) recognizeFileFromS3(s3URI string) (string, error) { ctx := context.Background() // Добавляем авторизацию и folder_id в контекст @@ -123,7 +127,7 @@ func (s *SpeechKitService) RecognizeFileFromS3(s3URI string) (string, error) { } // GetRecognitionResult получает результат распознавания по ID операции -func (s *SpeechKitService) GetRecognitionText(operationID string) (string, error) { +func (s *speechKitService) getRecognitionText(operationID string) (string, error) { ctx := context.Background() // Добавляем авторизацию и folder_id в контекст @@ -162,8 +166,8 @@ func (s *SpeechKitService) GetRecognitionText(operationID string) (string, error return sb.String(), nil } -// CheckOperationStatus проверяет статус операции распознавания -func (s *SpeechKitService) CheckOperationStatus(operationID string) (*operation.Operation, error) { +// checkOperationStatus проверяет статус операции распознавания +func (s *speechKitService) checkOperationStatus(operationID string) (*operation.Operation, error) { ctx := context.Background() ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Api-Key "+s.apiKey) diff --git a/internal/contract/contract.go b/internal/contract/contract.go index 96c23f0..76453db 100644 --- a/internal/contract/contract.go +++ b/internal/contract/contract.go @@ -1,6 +1,11 @@ package contract -type ObjectStorage interface { +import ( + "git.vakhrushev.me/av/transcriber/internal/entity" +) + +type AudioFileConverter interface { + Convert(src, dest string) error } type YandexS3Uploader interface { @@ -8,6 +13,8 @@ type YandexS3Uploader interface { FileUrl(fileName string) string } -type AudioFileConverter interface { - Convert(src, dest string) error +type AudioRecognizer interface { + RecognizeFile(filePath string) (operationID string, err error) + GetRecognitionText(operationID string) (string, error) + CheckRecognitionStatus(operationID string) (*entity.RecognitionResult, error) } diff --git a/internal/controller/http/transcribe_test.go b/internal/controller/http/transcribe_test.go index 26b0316..a7babe3 100644 --- a/internal/controller/http/transcribe_test.go +++ b/internal/controller/http/transcribe_test.go @@ -16,8 +16,8 @@ import ( "time" "git.vakhrushev.me/av/transcriber/internal/adapter/ffmpeg" - "git.vakhrushev.me/av/transcriber/internal/adapter/s3" "git.vakhrushev.me/av/transcriber/internal/adapter/sqlite" + "git.vakhrushev.me/av/transcriber/internal/adapter/yandex" "git.vakhrushev.me/av/transcriber/internal/entity" "git.vakhrushev.me/av/transcriber/internal/service" "github.com/doug-martin/goqu/v9" @@ -59,9 +59,9 @@ func setupTestRouter(t *testing.T) (*gin.Engine, *TranscribeHandler) { jobRepo := sqlite.NewTranscriptJobRepository(db, gq) converter := ffmpeg.NewFfmpegConverter() - s3Service := &s3.TestS3service{} + recognizer := &yandex.MemoryAudioRecognizer{} - trsService := service.NewTranscribeService(jobRepo, fileRepo, converter, s3Service) + trsService := service.NewTranscribeService(jobRepo, fileRepo, converter, recognizer) handler := NewTranscribeHandler(jobRepo, trsService) diff --git a/internal/entity/recognition.go b/internal/entity/recognition.go new file mode 100644 index 0000000..6c4614f --- /dev/null +++ b/internal/entity/recognition.go @@ -0,0 +1,78 @@ +package entity + +// RecognitionStatus представляет статус операции транскрипции +type RecognitionStatus int + +const ( + // RecognitionStatusInProgress - операция в процессе выполнения + RecognitionStatusInProgress RecognitionStatus = iota + // RecognitionStatusCompleted - операция завершена успешно + RecognitionStatusCompleted + // RecognitionStatusFailed - операция завершена с ошибкой + RecognitionStatusFailed +) + +// String возвращает строковое представление статуса +func (s RecognitionStatus) String() string { + switch s { + case RecognitionStatusInProgress: + return "in_progress" + case RecognitionStatusCompleted: + return "completed" + case RecognitionStatusFailed: + return "failed" + default: + return "unknown" + } +} + +// RecognitionResult представляет результат операции транскрипции +type RecognitionResult struct { + Status RecognitionStatus + Error string // Текст ошибки (заполняется при StatusFailed) +} + +// NewInProgressResult создает результат для операции в процессе выполнения +func NewInProgressResult() *RecognitionResult { + return &RecognitionResult{ + Status: RecognitionStatusInProgress, + } +} + +// NewCompletedResult создает результат для успешно завершенной операции +func NewCompletedResult() *RecognitionResult { + return &RecognitionResult{ + Status: RecognitionStatusCompleted, + } +} + +// NewFailedResult создает результат для операции, завершенной с ошибкой +func NewFailedResult(errorText string) *RecognitionResult { + return &RecognitionResult{ + Status: RecognitionStatusFailed, + Error: errorText, + } +} + +// IsInProgress проверяет, находится ли операция в процессе выполнения +func (r *RecognitionResult) IsInProgress() bool { + return r.Status == RecognitionStatusInProgress +} + +// IsCompleted проверяет, завершена ли операция успешно +func (r *RecognitionResult) IsCompleted() bool { + return r.Status == RecognitionStatusCompleted +} + +// IsFailed проверяет, завершена ли операция с ошибкой +func (r *RecognitionResult) IsFailed() bool { + return r.Status == RecognitionStatusFailed +} + +// GetError возвращает текст ошибки (только для операций, завершенных с ошибкой) +func (r *RecognitionResult) GetError() string { + if r.IsFailed() { + return r.Error + } + return "" +} diff --git a/internal/service/transcribe.go b/internal/service/transcribe.go index c25b7e1..57db918 100644 --- a/internal/service/transcribe.go +++ b/internal/service/transcribe.go @@ -8,7 +8,6 @@ import ( "path/filepath" "time" - "git.vakhrushev.me/av/transcriber/internal/adapter/speechkit" "git.vakhrushev.me/av/transcriber/internal/contract" "git.vakhrushev.me/av/transcriber/internal/entity" "github.com/google/uuid" @@ -17,23 +16,23 @@ import ( const baseStorageDir = "data/files" type TranscribeService struct { - jobRepo contract.TranscriptJobRepository - fileRepo contract.FileRepository - converter contract.AudioFileConverter - s3Service contract.YandexS3Uploader + jobRepo contract.TranscriptJobRepository + fileRepo contract.FileRepository + converter contract.AudioFileConverter + recognizer contract.AudioRecognizer } func NewTranscribeService( jobRepo contract.TranscriptJobRepository, fileRepo contract.FileRepository, converter contract.AudioFileConverter, - s3Service contract.YandexS3Uploader, + recognizer contract.AudioRecognizer, ) *TranscribeService { return &TranscribeService{ - jobRepo: jobRepo, - fileRepo: fileRepo, - converter: converter, - s3Service: s3Service, + jobRepo: jobRepo, + fileRepo: fileRepo, + converter: converter, + recognizer: recognizer, } } @@ -179,23 +178,8 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { destFileId := uuid.NewString() destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3) - // Загружаем файл на S3 - err = s.s3Service.UploadFile(filePath, destFileRecord.FileName) - if err != nil { - return err - } - - // Создаем SpeechKit сервис - speechKitService, err := speechkit.NewSpeechKitService() - if err != nil { - return err - } - - // Формируем S3 URI для файла - s3URI := s.s3Service.FileUrl(destFileRecord.FileName) - // Запускаем асинхронное распознавание - operationID, err := speechKitService.RecognizeFileFromS3(s3URI) + operationID, err := s.recognizer.RecognizeFile(filePath) if err != nil { return err } @@ -235,23 +219,16 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { return fmt.Errorf("recogniton opId not found for job: %s", job.Id) } - // Создаем SpeechKit сервис - speechKitService, err := speechkit.NewSpeechKitService() - if err != nil { - return err - } - defer speechKitService.Close() - opId := *job.RecognitionOpID // Проверяем статус операции log.Printf("Check operation status: id %s\n", opId) - operation, err := speechKitService.CheckOperationStatus(opId) + recResult, err := s.recognizer.CheckRecognitionStatus(opId) if err != nil { return err } - if !operation.Done { + if recResult.IsInProgress() { // Операция еще не завершена, оставляем в статусе обработки log.Printf("Operation in progress: id %s\n", opId) delayTime := time.Now().Add(10 * time.Second) @@ -263,8 +240,8 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { return nil } - if opErr := operation.GetError(); opErr != nil { - errorText := fmt.Sprintf("operation failed: code %d, message: %s", opErr.Code, opErr.Message) + if recResult.IsFailed() { + errorText := recResult.GetError() log.Printf("Operation failed: id %s, message %s\n", opId, errorText) job.Fail(errorText) err := s.jobRepo.Save(job) @@ -275,7 +252,7 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { } // Операция завершена, получаем результат - transcriptionText, err := speechKitService.GetRecognitionText(*job.RecognitionOpID) + transcriptionText, err := s.recognizer.GetRecognitionText(opId) if err != nil { return err } diff --git a/main.go b/main.go index 34fbdfb..f0df933 100644 --- a/main.go +++ b/main.go @@ -13,8 +13,8 @@ import ( "time" "git.vakhrushev.me/av/transcriber/internal/adapter/ffmpeg" - "git.vakhrushev.me/av/transcriber/internal/adapter/s3" "git.vakhrushev.me/av/transcriber/internal/adapter/sqlite" + "git.vakhrushev.me/av/transcriber/internal/adapter/yandex" httpcontroller "git.vakhrushev.me/av/transcriber/internal/controller/http" "git.vakhrushev.me/av/transcriber/internal/controller/worker" "git.vakhrushev.me/av/transcriber/internal/service" @@ -54,28 +54,35 @@ func main() { log.Fatal("Failed to run migrations:", err) } + // Создаем репозитории + fileRepo := sqlite.NewFileRepository(db, gq) jobRepo := sqlite.NewTranscriptJobRepository(db, gq) + // Создаем адаптеры + converter := ffmpeg.NewFfmpegConverter() - // Создаем S3 сервис - s3Config := s3.S3Config{ + recognizer, err := yandex.NewYandexAudioRecognizerService(yandex.YandexAudioRecognizerConfig{ Region: os.Getenv("AWS_REGION"), AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"), SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), BucketName: os.Getenv("S3_BUCKET_NAME"), Endpoint: os.Getenv("S3_ENDPOINT"), - } - - s3Service, err := s3.NewYandexS3Service(s3Config) + ApiKey: os.Getenv("YANDEX_CLOUD_API_KEY"), + FolderID: os.Getenv("YANDEX_CLOUD_FOLDER_ID"), + }) if err != nil { - log.Fatalf("failed to create S3 service: %v", err) + log.Fatalf("failed to create audio recognizer: %v", err) } + defer recognizer.Close() - transcribeService := service.NewTranscribeService(jobRepo, fileRepo, converter, s3Service) + // Создаем сервисы + + transcribeService := service.NewTranscribeService(jobRepo, fileRepo, converter, recognizer) // Создаем воркеры + conversionWorker := worker.NewConversionWorker(transcribeService) transcribeWorker := worker.NewTranscribeWorker(transcribeService) checkWorker := worker.NewCheckWorker(transcribeService)