Add conversion job logic

This commit is contained in:
2025-08-09 17:38:28 +03:00
parent 7357b69a7b
commit d353f206fc
6 changed files with 180 additions and 1 deletions

View File

@@ -125,3 +125,24 @@ func (h *TranscribeHandler) GetTranscribeJobStatus(c *gin.Context) {
CreatedAt: job.CreatedAt, CreatedAt: job.CreatedAt,
}) })
} }
func (h *TranscribeHandler) RunConversionJob(c *gin.Context) {
acquisitionId := uuid.NewString()
rottingTime := time.Now().Add(-1 * time.Hour)
job, err := h.jobRepo.FindAndAcquire(entity.StateCreated, acquisitionId, rottingTime)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
job.MoveToState(entity.StateConverted)
err = h.jobRepo.Save(job)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
c.Status(http.StatusOK)
}

View File

@@ -21,3 +21,9 @@ const (
StateUploaded = "uploaded" StateUploaded = "uploaded"
StatusFailed = "failed" StatusFailed = "failed"
) )
func (j *TranscribeJob) MoveToState(state string) {
j.State = state
j.Worker = nil
j.AcquiredAt = nil
}

View File

@@ -1,6 +1,10 @@
package repo package repo
import "git.vakhrushev.me/av/transcriber/internal/entity" import (
"time"
"git.vakhrushev.me/av/transcriber/internal/entity"
)
type FileRepository interface { type FileRepository interface {
Create(file *entity.File) error Create(file *entity.File) error
@@ -9,7 +13,9 @@ type FileRepository interface {
type TranscriptJobRepository interface { type TranscriptJobRepository interface {
Create(job *entity.TranscribeJob) error Create(job *entity.TranscribeJob) error
Save(job *entity.TranscribeJob) error
GetByID(id string) (*entity.TranscribeJob, error) GetByID(id string) (*entity.TranscribeJob, error)
FindAndAcquire(state, acquisitionId string, rottingTime time.Time) (*entity.TranscribeJob, error)
} }
type ObjectStorage interface { type ObjectStorage interface {

View File

@@ -3,6 +3,8 @@ package sqlite
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"log"
"time"
"git.vakhrushev.me/av/transcriber/internal/entity" "git.vakhrushev.me/av/transcriber/internal/entity"
"github.com/doug-martin/goqu/v9" "github.com/doug-martin/goqu/v9"
@@ -42,6 +44,29 @@ func (repo *TranscriptJobRepository) Create(job *entity.TranscribeJob) error {
return nil return nil
} }
func (repo *TranscriptJobRepository) Save(job *entity.TranscribeJob) error {
record := goqu.Record{
"state": job.State,
"file_id": job.FileID,
"is_error": job.IsError,
"error_text": job.ErrorText,
"worker": job.Worker,
"acquired_at": job.AcquiredAt,
}
query := repo.gq.Update("transcribe_jobs").Set(record).Where(goqu.C("id").Eq(job.Id))
sql, args, err := query.ToSQL()
if err != nil {
return fmt.Errorf("failed to build query: %w", err)
}
_, err = repo.db.Exec(sql, args...)
if err != nil {
return fmt.Errorf("failed to update transcribe job: %w", err)
}
return nil
}
func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob, error) { func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob, error) {
query := repo.gq.From("transcribe_jobs").Select( query := repo.gq.From("transcribe_jobs").Select(
"id", "id",
@@ -75,3 +100,81 @@ func (repo *TranscriptJobRepository) GetByID(id string) (*entity.TranscribeJob,
return &job, nil return &job, nil
} }
func (repo *TranscriptJobRepository) FindAndAcquire(state, acquisitionId string, rottingTime time.Time) (*entity.TranscribeJob, error) {
updateQuery := repo.gq.Update("transcribe_jobs").
Set(
goqu.Record{
"worker": acquisitionId,
"acquired_at": time.Now(),
},
).
Where(
goqu.C("id").Eq(
repo.gq.From("transcribe_jobs").Select("id").
Where(
goqu.And(
goqu.C("state").Eq(state),
goqu.C("is_error").Eq(0),
goqu.Or(
goqu.C("worker").IsNull(),
goqu.C("acquired_at").Lt(rottingTime),
),
),
).
Limit(1),
),
)
sql, args, err := updateQuery.ToSQL()
if err != nil {
return nil, fmt.Errorf("failed to build query: %w", err)
}
log.Printf("aquire sql: %s", sql)
result, err := repo.db.Exec(sql, args...)
if err != nil {
return nil, fmt.Errorf("failed to aquire job with state %s: %w", state, err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return nil, fmt.Errorf("failed check affected rows: %w", err)
}
if rowsAffected != 1 {
return nil, fmt.Errorf("unexpected affected rows count: %d", rowsAffected)
}
selectQuery := repo.gq.From("transcribe_jobs").Select(
"id",
"state",
"file_id",
"is_error",
"error_text",
"worker",
"acquired_at",
"created_at",
).Where(goqu.C("worker").Eq(acquisitionId))
sql, args, err = selectQuery.ToSQL()
if err != nil {
return nil, fmt.Errorf("failed to build query: %w", err)
}
var job entity.TranscribeJob
err = repo.db.QueryRow(sql, args...).Scan(
&job.Id,
&job.State,
&job.FileID,
&job.IsError,
&job.ErrorText,
&job.Worker,
&job.AcquiredAt,
&job.CreatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to get transcribe job: %w", err)
}
return &job, nil
}

41
internal/worker/worker.go Normal file
View File

@@ -0,0 +1,41 @@
package worker
import (
"git.vakhrushev.me/av/transcriber/internal/repo"
)
type ConversionWorker struct {
jobRepo repo.TranscriptJobRepository
fileRepo repo.FileRepository
worker string
}
func NewConversionWorker(jobRepo repo.TranscriptJobRepository, fileRepo repo.FileRepository, worker string) *ConversionWorker {
return &ConversionWorker{jobRepo: jobRepo, fileRepo: fileRepo}
}
func (w *ConversionWorker) Run() {
w.handleJob()
// for {
// err := w.handleJob()
// if err != nil {
// continue
// }
// }
}
func (w *ConversionWorker) handleJob() error {
// job, err := w.jobRepo.FindAndAcquire(entity.StateCreated, w.worker)
// if err != nil {
// return err
// }
// job.MoveToState(entity.StateConverted)
// err = w.jobRepo.Save(job)
// if err != nil {
// return err
// }
return nil
}

View File

@@ -52,6 +52,8 @@ func main() {
{ {
api.POST("/transcribe/audio", transcribeHandler.CreateTranscribeJob) api.POST("/transcribe/audio", transcribeHandler.CreateTranscribeJob)
api.GET("/transcribe/:id", transcribeHandler.GetTranscribeJobStatus) api.GET("/transcribe/:id", transcribeHandler.GetTranscribeJobStatus)
api.POST("/transcribe/convert", transcribeHandler.RunConversionJob)
} }
// Добавляем middleware для обработки больших файлов // Добавляем middleware для обработки больших файлов