Dependency inversion for s3 service
This commit is contained in:
103
internal/adapter/s3/s3.go
Normal file
103
internal/adapter/s3/s3.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
type S3Config struct {
|
||||
Region string
|
||||
AccessKey string
|
||||
SecretKey string
|
||||
BucketName string
|
||||
Endpoint string
|
||||
}
|
||||
|
||||
type YandexS3Service struct {
|
||||
client *s3.Client
|
||||
uploader *manager.Uploader
|
||||
bucketName string
|
||||
endpoint string
|
||||
}
|
||||
|
||||
func NewYandexS3Service(cfg S3Config) (*YandexS3Service, error) {
|
||||
if cfg.Region == "" || cfg.AccessKey == "" || cfg.SecretKey == "" || cfg.BucketName == "" {
|
||||
return nil, fmt.Errorf("missing required S3 configuration parameters")
|
||||
}
|
||||
|
||||
// Создаем конфигурацию
|
||||
awsCfg, err := config.LoadDefaultConfig(context.Background(),
|
||||
config.WithRegion(cfg.Region),
|
||||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(cfg.AccessKey, cfg.SecretKey, "")),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
||||
}
|
||||
|
||||
// Создаем клиент S3
|
||||
var client *s3.Client
|
||||
if cfg.Endpoint != "" {
|
||||
// Кастомный endpoint (например, для MinIO)
|
||||
client = s3.NewFromConfig(awsCfg, func(o *s3.Options) {
|
||||
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
||||
o.UsePathStyle = true
|
||||
})
|
||||
} else {
|
||||
// Стандартный AWS S3
|
||||
client = s3.NewFromConfig(awsCfg)
|
||||
}
|
||||
|
||||
uploader := manager.NewUploader(client)
|
||||
|
||||
return &YandexS3Service{
|
||||
client: client,
|
||||
uploader: uploader,
|
||||
bucketName: cfg.BucketName,
|
||||
endpoint: cfg.Endpoint,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *YandexS3Service) UploadFile(filePath, fileName string) error {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s: %w", filePath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
_, err = s.uploader.Upload(context.Background(), &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucketName),
|
||||
Key: aws.String(fileName),
|
||||
Body: file,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload file to S3: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *YandexS3Service) FileUrl(fileName string) string {
|
||||
endpoint := strings.TrimRight(s.endpoint, "/")
|
||||
return fmt.Sprintf("%s/%s/%s", endpoint, s.bucketName, fileName)
|
||||
}
|
||||
|
||||
// test service
|
||||
|
||||
type TestS3service struct {
|
||||
}
|
||||
|
||||
func (s *TestS3service) UploadFile(filePath, fileName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *TestS3service) FileUrl(fileName string) string {
|
||||
return fileName
|
||||
}
|
@@ -3,6 +3,11 @@ package contract
|
||||
type ObjectStorage interface {
|
||||
}
|
||||
|
||||
type YandexS3Uploader interface {
|
||||
UploadFile(filePath, fileName string) error
|
||||
FileUrl(fileName string) string
|
||||
}
|
||||
|
||||
type AudioFileConverter interface {
|
||||
Convert(src, dest string) error
|
||||
}
|
||||
|
@@ -16,6 +16,7 @@ import (
|
||||
"time"
|
||||
|
||||
"git.vakhrushev.me/av/transcriber/internal/adapter/ffmpeg"
|
||||
"git.vakhrushev.me/av/transcriber/internal/adapter/s3"
|
||||
"git.vakhrushev.me/av/transcriber/internal/adapter/sqlite"
|
||||
"git.vakhrushev.me/av/transcriber/internal/entity"
|
||||
"git.vakhrushev.me/av/transcriber/internal/service"
|
||||
@@ -58,8 +59,9 @@ func setupTestRouter(t *testing.T) (*gin.Engine, *TranscribeHandler) {
|
||||
jobRepo := sqlite.NewTranscriptJobRepository(db, gq)
|
||||
|
||||
converter := ffmpeg.NewFfmpegConverter()
|
||||
s3Service := &s3.TestS3service{}
|
||||
|
||||
trsService := service.NewTranscribeService(jobRepo, fileRepo, converter)
|
||||
trsService := service.NewTranscribeService(jobRepo, fileRepo, converter, s3Service)
|
||||
|
||||
handler := NewTranscribeHandler(jobRepo, trsService)
|
||||
|
||||
|
@@ -1,87 +0,0 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
type S3Service struct {
|
||||
client *s3.Client
|
||||
uploader *manager.Uploader
|
||||
bucketName string
|
||||
}
|
||||
|
||||
func NewS3Service() (*S3Service, error) {
|
||||
region := os.Getenv("AWS_REGION")
|
||||
accessKey := os.Getenv("AWS_ACCESS_KEY_ID")
|
||||
secretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
|
||||
bucketName := os.Getenv("S3_BUCKET_NAME")
|
||||
endpoint := os.Getenv("S3_ENDPOINT")
|
||||
|
||||
if region == "" || accessKey == "" || secretKey == "" || bucketName == "" {
|
||||
return nil, fmt.Errorf("missing required S3 environment variables")
|
||||
}
|
||||
|
||||
// Создаем конфигурацию
|
||||
cfg, err := config.LoadDefaultConfig(context.TODO(),
|
||||
config.WithRegion(region),
|
||||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
||||
}
|
||||
|
||||
// Создаем клиент S3
|
||||
var client *s3.Client
|
||||
if endpoint != "" {
|
||||
// Кастомный endpoint (например, для MinIO)
|
||||
client = s3.NewFromConfig(cfg, func(o *s3.Options) {
|
||||
o.BaseEndpoint = aws.String(endpoint)
|
||||
o.UsePathStyle = true
|
||||
})
|
||||
} else {
|
||||
// Стандартный AWS S3
|
||||
client = s3.NewFromConfig(cfg)
|
||||
}
|
||||
|
||||
uploader := manager.NewUploader(client)
|
||||
|
||||
return &S3Service{
|
||||
client: client,
|
||||
uploader: uploader,
|
||||
bucketName: bucketName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *S3Service) UploadFile(filePath, fileName string) error {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s: %w", filePath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
_, err = s.uploader.Upload(context.TODO(), &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucketName),
|
||||
Key: aws.String(fileName),
|
||||
Body: file,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload file to S3: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *S3Service) FileUrl(fileName string) string {
|
||||
endpoint := strings.TrimRight(os.Getenv("S3_ENDPOINT"), "/")
|
||||
bucketName := os.Getenv("S3_BUCKET_NAME")
|
||||
return fmt.Sprintf("%s/%s/%s", endpoint, bucketName, fileName)
|
||||
}
|
@@ -8,10 +8,9 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.vakhrushev.me/av/transcriber/internal/adapter/speechkit"
|
||||
"git.vakhrushev.me/av/transcriber/internal/contract"
|
||||
"git.vakhrushev.me/av/transcriber/internal/entity"
|
||||
"git.vakhrushev.me/av/transcriber/internal/service/s3"
|
||||
"git.vakhrushev.me/av/transcriber/internal/service/speechkit"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
@@ -21,13 +20,20 @@ type TranscribeService struct {
|
||||
jobRepo contract.TranscriptJobRepository
|
||||
fileRepo contract.FileRepository
|
||||
converter contract.AudioFileConverter
|
||||
s3Service contract.YandexS3Uploader
|
||||
}
|
||||
|
||||
func NewTranscribeService(jobRepo contract.TranscriptJobRepository, fileRepo contract.FileRepository, converter contract.AudioFileConverter) *TranscribeService {
|
||||
func NewTranscribeService(
|
||||
jobRepo contract.TranscriptJobRepository,
|
||||
fileRepo contract.FileRepository,
|
||||
converter contract.AudioFileConverter,
|
||||
s3Service contract.YandexS3Uploader,
|
||||
) *TranscribeService {
|
||||
return &TranscribeService{
|
||||
jobRepo: jobRepo,
|
||||
fileRepo: fileRepo,
|
||||
converter: converter,
|
||||
s3Service: s3Service,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,14 +179,8 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
||||
destFileId := uuid.NewString()
|
||||
destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3)
|
||||
|
||||
// Создаем S3 сервис
|
||||
s3Service, err := s3.NewS3Service()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Загружаем файл на S3
|
||||
err = s3Service.UploadFile(filePath, destFileRecord.FileName)
|
||||
err = s.s3Service.UploadFile(filePath, destFileRecord.FileName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -192,7 +192,7 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
||||
}
|
||||
|
||||
// Формируем S3 URI для файла
|
||||
s3URI := s3Service.FileUrl(destFileRecord.FileName)
|
||||
s3URI := s.s3Service.FileUrl(destFileRecord.FileName)
|
||||
|
||||
// Запускаем асинхронное распознавание
|
||||
operationID, err := speechKitService.RecognizeFileFromS3(s3URI)
|
||||
|
17
main.go
17
main.go
@@ -13,6 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
"git.vakhrushev.me/av/transcriber/internal/adapter/ffmpeg"
|
||||
"git.vakhrushev.me/av/transcriber/internal/adapter/s3"
|
||||
"git.vakhrushev.me/av/transcriber/internal/adapter/sqlite"
|
||||
httpcontroller "git.vakhrushev.me/av/transcriber/internal/controller/http"
|
||||
"git.vakhrushev.me/av/transcriber/internal/controller/worker"
|
||||
@@ -58,7 +59,21 @@ func main() {
|
||||
|
||||
converter := ffmpeg.NewFfmpegConverter()
|
||||
|
||||
transcribeService := service.NewTranscribeService(jobRepo, fileRepo, converter)
|
||||
// Создаем S3 сервис
|
||||
s3Config := s3.S3Config{
|
||||
Region: os.Getenv("AWS_REGION"),
|
||||
AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"),
|
||||
SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
|
||||
BucketName: os.Getenv("S3_BUCKET_NAME"),
|
||||
Endpoint: os.Getenv("S3_ENDPOINT"),
|
||||
}
|
||||
|
||||
s3Service, err := s3.NewYandexS3Service(s3Config)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create S3 service: %v", err)
|
||||
}
|
||||
|
||||
transcribeService := service.NewTranscribeService(jobRepo, fileRepo, converter, s3Service)
|
||||
|
||||
// Создаем воркеры
|
||||
conversionWorker := worker.NewConversionWorker(transcribeService)
|
||||
|
Reference in New Issue
Block a user