Upload files into yandex object storage (s3)
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
"git.vakhrushev.me/av/transcriber/internal/entity"
|
||||
"git.vakhrushev.me/av/transcriber/internal/repo"
|
||||
"git.vakhrushev.me/av/transcriber/internal/repo/ffmpeg"
|
||||
"git.vakhrushev.me/av/transcriber/internal/service/s3"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
@@ -147,7 +148,6 @@ func (h *TranscribeHandler) RunConversionJob(c *gin.Context) {
|
||||
srcFilePath := filepath.Join("data", "files", srcFile.FileName)
|
||||
|
||||
destFileId := uuid.New().String()
|
||||
|
||||
destFileName := fmt.Sprintf("%s%s", destFileId, ".ogg")
|
||||
destFilePath := filepath.Join("data", "files", destFileName)
|
||||
|
||||
@@ -190,3 +190,64 @@ func (h *TranscribeHandler) RunConversionJob(c *gin.Context) {
|
||||
|
||||
c.Status(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *TranscribeHandler) RunUploadJob(c *gin.Context) {
|
||||
acquisitionId := uuid.NewString()
|
||||
rottingTime := time.Now().Add(-1 * time.Hour)
|
||||
|
||||
job, err := h.jobRepo.FindAndAcquire(entity.StateConverted, acquisitionId, rottingTime)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
fileRecord, err := h.fileRepo.GetByID(*job.FileID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
filePath := filepath.Join("data", "files", fileRecord.FileName)
|
||||
|
||||
destFileId := uuid.New().String()
|
||||
destFileRecord := &entity.File{
|
||||
Id: destFileId,
|
||||
Storage: entity.StorageS3,
|
||||
FileName: fileRecord.FileName,
|
||||
Size: fileRecord.Size,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Создаем S3 сервис
|
||||
s3Service, err := s3.NewS3Service()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to initialize S3 service: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// Загружаем файл на S3
|
||||
err = s3Service.UploadFile(filePath, destFileRecord.FileName)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to upload file to S3: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
job.FileID = &destFileId
|
||||
job.MoveToState(entity.StateTranscribeReady)
|
||||
|
||||
// Сохраняем информацию о загрузке файла на S3
|
||||
err = h.fileRepo.Create(destFileRecord)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update file record: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// Обновляем состояние задачи
|
||||
err = h.jobRepo.Save(job)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update job state: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
c.Status(http.StatusOK)
|
||||
}
|
||||
|
@@ -16,10 +16,13 @@ type TranscribeJob struct {
|
||||
}
|
||||
|
||||
const (
|
||||
StateCreated = "created"
|
||||
StateConverted = "converted"
|
||||
StateUploaded = "uploaded"
|
||||
StatusFailed = "failed"
|
||||
StateCreated = "created"
|
||||
StateConverted = "converted"
|
||||
StateUploaded = "uploaded"
|
||||
StateTranscribeReady = "transcribe_ready"
|
||||
StateTranscribeWait = "transcribe_wait"
|
||||
StateDone = "done"
|
||||
StatusFailed = "failed"
|
||||
)
|
||||
|
||||
func (j *TranscribeJob) MoveToState(state string) {
|
||||
|
80
internal/service/s3/s3.go
Normal file
80
internal/service/s3/s3.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
type S3Service struct {
|
||||
client *s3.Client
|
||||
uploader *manager.Uploader
|
||||
bucketName string
|
||||
}
|
||||
|
||||
func NewS3Service() (*S3Service, error) {
|
||||
region := os.Getenv("AWS_REGION")
|
||||
accessKey := os.Getenv("AWS_ACCESS_KEY_ID")
|
||||
secretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
|
||||
bucketName := os.Getenv("S3_BUCKET_NAME")
|
||||
endpoint := os.Getenv("S3_ENDPOINT")
|
||||
|
||||
if region == "" || accessKey == "" || secretKey == "" || bucketName == "" {
|
||||
return nil, fmt.Errorf("missing required S3 environment variables")
|
||||
}
|
||||
|
||||
// Создаем конфигурацию
|
||||
cfg, err := config.LoadDefaultConfig(context.TODO(),
|
||||
config.WithRegion(region),
|
||||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
||||
}
|
||||
|
||||
// Создаем клиент S3
|
||||
var client *s3.Client
|
||||
if endpoint != "" {
|
||||
// Кастомный endpoint (например, для MinIO)
|
||||
client = s3.NewFromConfig(cfg, func(o *s3.Options) {
|
||||
o.BaseEndpoint = aws.String(endpoint)
|
||||
o.UsePathStyle = true
|
||||
})
|
||||
} else {
|
||||
// Стандартный AWS S3
|
||||
client = s3.NewFromConfig(cfg)
|
||||
}
|
||||
|
||||
uploader := manager.NewUploader(client)
|
||||
|
||||
return &S3Service{
|
||||
client: client,
|
||||
uploader: uploader,
|
||||
bucketName: bucketName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *S3Service) UploadFile(filePath, fileName string) error {
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s: %w", filePath, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
_, err = s.uploader.Upload(context.TODO(), &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucketName),
|
||||
Key: aws.String(fileName),
|
||||
Body: file,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload file to S3: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user