diff --git a/internal/controller/http/transcribe.go b/internal/controller/http/transcribe.go index e76f4eb..93e4cde 100644 --- a/internal/controller/http/transcribe.go +++ b/internal/controller/http/transcribe.go @@ -125,3 +125,24 @@ func (h *TranscribeHandler) GetTranscribeJobStatus(c *gin.Context) { CreatedAt: job.CreatedAt, }) } + +func (h *TranscribeHandler) RunConversionJob(c *gin.Context) { + acquisitionId := uuid.NewString() + rottingTime := time.Now().Add(-1 * time.Hour) + + job, err := h.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + job.MoveToState(entity.StateConverted) + + err = h.jobRepo.Save(job) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + c.Status(http.StatusOK) +} diff --git a/internal/entity/job.go b/internal/entity/job.go index b7da8c9..fab7ecb 100644 --- a/internal/entity/job.go +++ b/internal/entity/job.go @@ -21,3 +21,9 @@ const ( StateUploaded = "uploaded" StatusFailed = "failed" ) + +func (j *TranscribeJob) MoveToState(state string) { + j.State = state + j.Worker = nil + j.AcquiredAt = nil +} diff --git a/internal/repo/contracts.go b/internal/repo/contracts.go index f2866a2..5666afc 100644 --- a/internal/repo/contracts.go +++ b/internal/repo/contracts.go @@ -1,6 +1,10 @@ package repo -import "git.vakhrushev.me/av/transcriber/internal/entity" +import ( + "time" + + "git.vakhrushev.me/av/transcriber/internal/entity" +) type FileRepository interface { Create(file *entity.File) error @@ -9,7 +13,9 @@ type FileRepository interface { type TranscriptJobRepository interface { Create(job *entity.TranscribeJob) error + Save(job *entity.TranscribeJob) error GetByID(id string) (*entity.TranscribeJob, error) + FindAndAcquire(state, acquisitionId string, rottingTime time.Time) (*entity.TranscribeJob, error) } type ObjectStorage interface { diff --git a/internal/repo/sqlite/transcript_job_sqlite.go b/internal/repo/sqlite/transcript_job_sqlite.go index 76487cd..f9ee623 100644 --- a/internal/repo/sqlite/transcript_job_sqlite.go +++ b/internal/repo/sqlite/transcript_job_sqlite.go @@ -3,6 +3,8 @@ package sqlite import ( "database/sql" "fmt" + "log" + "time" "git.vakhrushev.me/av/transcriber/internal/entity" "github.com/doug-martin/goqu/v9" @@ -42,6 +44,29 @@ func (repo *TranscriptJobRepository) Create(job *entity.TranscribeJob) error { return nil } +func (repo *TranscriptJobRepository) Save(job *entity.TranscribeJob) error { + record := goqu.Record{ + "state": job.State, + "file_id": job.FileID, + "is_error": job.IsError, + "error_text": job.ErrorText, + "worker": job.Worker, + "acquired_at": job.AcquiredAt, + } + query := repo.gq.Update("transcribe_jobs").Set(record).Where(goqu.C("id").Eq(job.Id)) + sql, args, err := query.ToSQL() + if err != nil { + return fmt.Errorf("failed to build query: %w", err) + } + + _, err = repo.db.Exec(sql, args...) + if err != nil { + return fmt.Errorf("failed to update transcribe job: %w", err) + } + + return nil +} + func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob, error) { query := repo.gq.From("transcribe_jobs").Select( "id", @@ -75,3 +100,81 @@ func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob, return &job, nil } + +func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string, rottingTime time.Time) (*entity.TranscribeJob, error) { + updateQuery := repo.gq.Update("transcribe_jobs"). + Set( + goqu.Record{ + "worker": acquisitionId, + "acquired_at": time.Now(), + }, + ). + Where( + goqu.C("id").Eq( + repo.gq.From("transcribe_jobs").Select("id"). + Where( + goqu.And( + goqu.C("state").Eq(state), + goqu.C("is_error").Eq(0), + goqu.Or( + goqu.C("worker").IsNull(), + goqu.C("acquired_at").Lt(rottingTime), + ), + ), + ). + Limit(1), + ), + ) + + sql, args, err := updateQuery.ToSQL() + if err != nil { + return nil, fmt.Errorf("failed to build query: %w", err) + } + + log.Printf("aquire sql: %s", sql) + + result, err := repo.db.Exec(sql, args...) + if err != nil { + return nil, fmt.Errorf("failed to aquire job with state %s: %w", state, err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return nil, fmt.Errorf("failed check affected rows: %w", err) + } + if rowsAffected != 1 { + return nil, fmt.Errorf("unexpected affected rows count: %d", rowsAffected) + } + + selectQuery := repo.gq.From("transcribe_jobs").Select( + "id", + "state", + "file_id", + "is_error", + "error_text", + "worker", + "acquired_at", + "created_at", + ).Where(goqu.C("worker").Eq(acquisitionId)) + + sql, args, err = selectQuery.ToSQL() + if err != nil { + return nil, fmt.Errorf("failed to build query: %w", err) + } + + var job entity.TranscribeJob + err = repo.db.QueryRow(sql, args...).Scan( + &job.Id, + &job.State, + &job.FileID, + &job.IsError, + &job.ErrorText, + &job.Worker, + &job.AcquiredAt, + &job.CreatedAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to get transcribe job: %w", err) + } + + return &job, nil +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go new file mode 100644 index 0000000..7c7a354 --- /dev/null +++ b/internal/worker/worker.go @@ -0,0 +1,41 @@ +package worker + +import ( + "git.vakhrushev.me/av/transcriber/internal/repo" +) + +type ConversionWorker struct { + jobRepo repo.TranscriptJobRepository + fileRepo repo.FileRepository + worker string +} + +func NewConversionWorker(jobRepo repo.TranscriptJobRepository, fileRepo repo.FileRepository, worker string) *ConversionWorker { + return &ConversionWorker{jobRepo: jobRepo, fileRepo: fileRepo} +} + +func (w *ConversionWorker) Run() { + w.handleJob() + // for { + // err := w.handleJob() + // if err != nil { + // continue + // } + // } +} + +func (w *ConversionWorker) handleJob() error { + // job, err := w.jobRepo.FindAndAcquire(entity.StateCreated, w.worker) + // if err != nil { + // return err + // } + + // job.MoveToState(entity.StateConverted) + + // err = w.jobRepo.Save(job) + // if err != nil { + // return err + // } + + return nil +} diff --git a/main.go b/main.go index e1961a0..dd545d5 100644 --- a/main.go +++ b/main.go @@ -52,6 +52,8 @@ func main() { { api.POST("/transcribe/audio", transcribeHandler.CreateTranscribeJob) api.GET("/transcribe/:id", transcribeHandler.GetTranscribeJobStatus) + + api.POST("/transcribe/convert", transcribeHandler.RunConversionJob) } // Добавляем middleware для обработки больших файлов