Change logger to slog
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
@@ -63,7 +64,12 @@ func setupTestRouter(t *testing.T) (*gin.Engine, *TranscribeHandler) {
|
|||||||
converter := ffmpegconv.NewFfmpegConverter()
|
converter := ffmpegconv.NewFfmpegConverter()
|
||||||
recognizer := &recognizer.MemoryAudioRecognizer{}
|
recognizer := &recognizer.MemoryAudioRecognizer{}
|
||||||
|
|
||||||
trsService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer)
|
// Создаем тестовый логгер
|
||||||
|
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
|
||||||
|
Level: slog.LevelError, // Только ошибки в тестах
|
||||||
|
}))
|
||||||
|
|
||||||
|
trsService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer, logger)
|
||||||
|
|
||||||
handler := NewTranscribeHandler(jobRepo, trsService)
|
handler := NewTranscribeHandler(jobRepo, trsService)
|
||||||
|
|
||||||
|
@@ -2,7 +2,7 @@ package worker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
"log/slog"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -19,10 +19,19 @@ type Worker interface {
|
|||||||
type CallbackWorker struct {
|
type CallbackWorker struct {
|
||||||
name string
|
name string
|
||||||
f func() error
|
f func() error
|
||||||
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCallbackWorker(name string, f func() error) *CallbackWorker {
|
func NewCallbackWorker(name string, f func() error, logger *slog.Logger) *CallbackWorker {
|
||||||
return &CallbackWorker{name, f}
|
if logger == nil {
|
||||||
|
logger = slog.Default()
|
||||||
|
}
|
||||||
|
|
||||||
|
return &CallbackWorker{
|
||||||
|
name: name,
|
||||||
|
f: f,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *CallbackWorker) Name() string {
|
func (w *CallbackWorker) Name() string {
|
||||||
@@ -30,12 +39,12 @@ func (w *CallbackWorker) Name() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *CallbackWorker) Start(ctx context.Context) {
|
func (w *CallbackWorker) Start(ctx context.Context) {
|
||||||
log.Printf("%s started", w.Name())
|
w.logger.Info("Worker started", "worker_name", w.Name())
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Printf("%s received shutdown signal", w.Name())
|
w.logger.Info("Worker received shutdown signal", "worker_name", w.Name())
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
err := w.f()
|
err := w.f()
|
||||||
@@ -44,13 +53,13 @@ func (w *CallbackWorker) Start(ctx context.Context) {
|
|||||||
metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc()
|
metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc()
|
||||||
}
|
}
|
||||||
if err != nil && !isNoop {
|
if err != nil && !isNoop {
|
||||||
log.Printf("%s error: %v", w.Name(), err)
|
w.logger.Error("Worker error", "worker_name", w.Name(), "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ждем 1 секунду перед следующей итерацией
|
// Ждем 1 секунду перед следующей итерацией
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Printf("%s received shutdown signal during sleep", w.Name())
|
w.logger.Info("Worker received shutdown signal during sleep", "worker_name", w.Name())
|
||||||
return
|
return
|
||||||
case <-time.After(1 * time.Second):
|
case <-time.After(1 * time.Second):
|
||||||
// Продолжаем работу
|
// Продолжаем работу
|
||||||
|
@@ -3,7 +3,7 @@ package service
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -28,6 +28,7 @@ type TranscribeService struct {
|
|||||||
metaviewer contract.AudioMetaViewer
|
metaviewer contract.AudioMetaViewer
|
||||||
converter contract.AudioFileConverter
|
converter contract.AudioFileConverter
|
||||||
recognizer contract.AudioRecognizer
|
recognizer contract.AudioRecognizer
|
||||||
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTranscribeService(
|
func NewTranscribeService(
|
||||||
@@ -36,6 +37,7 @@ func NewTranscribeService(
|
|||||||
metaviewer contract.AudioMetaViewer,
|
metaviewer contract.AudioMetaViewer,
|
||||||
converter contract.AudioFileConverter,
|
converter contract.AudioFileConverter,
|
||||||
recognizer contract.AudioRecognizer,
|
recognizer contract.AudioRecognizer,
|
||||||
|
logger *slog.Logger,
|
||||||
) *TranscribeService {
|
) *TranscribeService {
|
||||||
return &TranscribeService{
|
return &TranscribeService{
|
||||||
jobRepo: jobRepo,
|
jobRepo: jobRepo,
|
||||||
@@ -43,6 +45,7 @@ func NewTranscribeService(
|
|||||||
metaviewer: metaviewer,
|
metaviewer: metaviewer,
|
||||||
converter: converter,
|
converter: converter,
|
||||||
recognizer: recognizer,
|
recognizer: recognizer,
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,9 +63,15 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string)
|
|||||||
storageFileName := fmt.Sprintf("%s%s", fileId, ext)
|
storageFileName := fmt.Sprintf("%s%s", fileId, ext)
|
||||||
storageFilePath := filepath.Join(baseStorageDir, storageFileName)
|
storageFilePath := filepath.Join(baseStorageDir, storageFileName)
|
||||||
|
|
||||||
|
s.logger.Info("Creating transcribe job",
|
||||||
|
"file_id", fileId,
|
||||||
|
"file_name", fileName,
|
||||||
|
"storage_path", storageFilePath)
|
||||||
|
|
||||||
// Создаем файл на диске
|
// Создаем файл на диске
|
||||||
dst, err := os.Create(storageFilePath)
|
dst, err := os.Create(storageFilePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to create file", "error", err, "path", storageFilePath)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer dst.Close()
|
defer dst.Close()
|
||||||
@@ -70,18 +79,26 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string)
|
|||||||
// Копируем содержимое загруженного файла
|
// Копируем содержимое загруженного файла
|
||||||
size, err := io.Copy(dst, file)
|
size, err := io.Copy(dst, file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to copy file content", "error", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := dst.Close(); err != nil {
|
if err := dst.Close(); err != nil {
|
||||||
|
s.logger.Error("Failed to close file", "error", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := s.metaviewer.GetInfo(storageFilePath)
|
info, err := s.metaviewer.GetInfo(storageFilePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to get file info", "error", err, "path", storageFilePath)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("File uploaded successfully",
|
||||||
|
"file_id", fileId,
|
||||||
|
"size", size,
|
||||||
|
"duration_seconds", info.Seconds)
|
||||||
|
|
||||||
metrics.InputFileDurationHistogram.WithLabelValues().Observe(float64(info.Seconds))
|
metrics.InputFileDurationHistogram.WithLabelValues().Observe(float64(info.Seconds))
|
||||||
metrics.InputFileSizeHistogram.WithLabelValues(ext).Observe(float64(size))
|
metrics.InputFileSizeHistogram.WithLabelValues(ext).Observe(float64(size))
|
||||||
|
|
||||||
@@ -97,6 +114,7 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string)
|
|||||||
if err := s.fileRepo.Create(fileRecord); err != nil {
|
if err := s.fileRepo.Create(fileRecord); err != nil {
|
||||||
// Удаляем файл если не удалось создать запись в БД
|
// Удаляем файл если не удалось создать запись в БД
|
||||||
os.Remove(storageFilePath)
|
os.Remove(storageFilePath)
|
||||||
|
s.logger.Error("Failed to create file record", "error", err, "file_id", fileId)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -114,9 +132,11 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := s.jobRepo.Create(job); err != nil {
|
if err := s.jobRepo.Create(job); err != nil {
|
||||||
|
s.logger.Error("Failed to create job record", "error", err, "job_id", jobId)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Transcribe job created successfully", "job_id", jobId, "file_id", fileId)
|
||||||
return job, nil
|
return job, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,11 +149,15 @@ func (s *TranscribeService) FindAndRunConversionJob() error {
|
|||||||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||||||
return &contract.NoopJobError{State: entity.StateCreated}
|
return &contract.NoopJobError{State: entity.StateCreated}
|
||||||
}
|
}
|
||||||
|
s.logger.Error("Failed to find and acquire conversion job", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Starting conversion job", "job_id", job.Id, "acquisition_id", acquisitionId)
|
||||||
|
|
||||||
srcFile, err := s.fileRepo.GetByID(*job.FileID)
|
srcFile, err := s.fileRepo.GetByID(*job.FileID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to get source file", "error", err, "file_id", *job.FileID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -149,6 +173,12 @@ func (s *TranscribeService) FindAndRunConversionJob() error {
|
|||||||
srcExt = defaultAudioExt
|
srcExt = defaultAudioExt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Converting file",
|
||||||
|
"job_id", job.Id,
|
||||||
|
"src_path", srcFilePath,
|
||||||
|
"dest_path", destFilePath,
|
||||||
|
"src_format", srcExt)
|
||||||
|
|
||||||
// Измеряем время конвертации
|
// Измеряем время конвертации
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
err = s.converter.Convert(srcFilePath, destFilePath)
|
err = s.converter.Convert(srcFilePath, destFilePath)
|
||||||
@@ -160,14 +190,24 @@ func (s *TranscribeService) FindAndRunConversionJob() error {
|
|||||||
Observe(conversionDuration.Seconds())
|
Observe(conversionDuration.Seconds())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("File conversion failed",
|
||||||
|
"error", err,
|
||||||
|
"job_id", job.Id,
|
||||||
|
"duration", conversionDuration)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
stat, err := os.Stat(destFilePath)
|
stat, err := os.Stat(destFilePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to stat converted file", "error", err, "path", destFilePath)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("File conversion completed",
|
||||||
|
"job_id", job.Id,
|
||||||
|
"duration", conversionDuration,
|
||||||
|
"output_size", stat.Size())
|
||||||
|
|
||||||
// Записываем метрику размера выходного файла
|
// Записываем метрику размера выходного файла
|
||||||
metrics.OutputFileSizeHistogram.WithLabelValues("ogg").Observe(float64(stat.Size()))
|
metrics.OutputFileSizeHistogram.WithLabelValues("ogg").Observe(float64(stat.Size()))
|
||||||
|
|
||||||
@@ -185,14 +225,17 @@ func (s *TranscribeService) FindAndRunConversionJob() error {
|
|||||||
|
|
||||||
err = s.fileRepo.Create(destFileRecord)
|
err = s.fileRepo.Create(destFileRecord)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to create converted file record", "error", err, "file_id", destFileId)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.jobRepo.Save(job)
|
err = s.jobRepo.Save(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Conversion job completed successfully", "job_id", job.Id)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -205,11 +248,15 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
|||||||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||||||
return &contract.NoopJobError{State: entity.StateConverted}
|
return &contract.NoopJobError{State: entity.StateConverted}
|
||||||
}
|
}
|
||||||
|
s.logger.Error("Failed to find and acquire transcribe job", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Starting transcribe job", "job_id", jobRecord.Id, "acquisition_id", acquisitionId)
|
||||||
|
|
||||||
fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID)
|
fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to get file record", "error", err, "file_id", *jobRecord.FileID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -217,6 +264,7 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
|||||||
|
|
||||||
file, err := os.Open(filePath)
|
file, err := os.Open(filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to open file", "error", err, "path", filePath)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
@@ -224,12 +272,19 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
|||||||
destFileId := uuid.NewString()
|
destFileId := uuid.NewString()
|
||||||
destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3)
|
destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3)
|
||||||
|
|
||||||
|
s.logger.Info("Starting recognition", "job_id", jobRecord.Id, "file_path", filePath)
|
||||||
|
|
||||||
// Запускаем асинхронное распознавание
|
// Запускаем асинхронное распознавание
|
||||||
operationID, err := s.recognizer.Recognize(file, destFileRecord.FileName)
|
operationID, err := s.recognizer.Recognize(file, destFileRecord.FileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to start recognition", "error", err, "job_id", jobRecord.Id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Recognition started",
|
||||||
|
"job_id", jobRecord.Id,
|
||||||
|
"operation_id", operationID)
|
||||||
|
|
||||||
// Обновляем задачу с ID операции распознавания
|
// Обновляем задачу с ID операции распознавания
|
||||||
jobRecord.FileID = &destFileId
|
jobRecord.FileID = &destFileId
|
||||||
jobRecord.RecognitionOpID = &operationID
|
jobRecord.RecognitionOpID = &operationID
|
||||||
@@ -238,14 +293,17 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error {
|
|||||||
|
|
||||||
err = s.fileRepo.Create(destFileRecord)
|
err = s.fileRepo.Create(destFileRecord)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to create S3 file record", "error", err, "file_id", destFileId)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.jobRepo.Save(jobRecord)
|
err = s.jobRepo.Save(jobRecord)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to save job", "error", err, "job_id", jobRecord.Id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Transcribe job updated successfully", "job_id", jobRecord.Id)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -258,29 +316,33 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
|
|||||||
if _, ok := err.(*contract.JobNotFoundError); ok {
|
if _, ok := err.(*contract.JobNotFoundError); ok {
|
||||||
return &contract.NoopJobError{State: entity.StateTranscribe}
|
return &contract.NoopJobError{State: entity.StateTranscribe}
|
||||||
}
|
}
|
||||||
|
s.logger.Error("Failed to find and acquire transcribe check job", "error", err)
|
||||||
return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err)
|
return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if job.RecognitionOpID == nil {
|
if job.RecognitionOpID == nil {
|
||||||
|
s.logger.Error("Recognition operation ID not found", "job_id", job.Id)
|
||||||
return fmt.Errorf("recogniton opId not found for job: %s", job.Id)
|
return fmt.Errorf("recogniton opId not found for job: %s", job.Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
opId := *job.RecognitionOpID
|
opId := *job.RecognitionOpID
|
||||||
|
|
||||||
// Проверяем статус операции
|
// Проверяем статус операции
|
||||||
log.Printf("Check operation status: id %s\n", opId)
|
s.logger.Info("Checking operation status", "job_id", job.Id, "operation_id", opId)
|
||||||
recResult, err := s.recognizer.CheckRecognitionStatus(opId)
|
recResult, err := s.recognizer.CheckRecognitionStatus(opId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to check recognition status", "error", err, "operation_id", opId)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if recResult.IsInProgress() {
|
if recResult.IsInProgress() {
|
||||||
// Операция еще не завершена, оставляем в статусе обработки
|
// Операция еще не завершена, оставляем в статусе обработки
|
||||||
log.Printf("Operation in progress: id %s\n", opId)
|
s.logger.Info("Operation in progress", "job_id", job.Id, "operation_id", opId)
|
||||||
delayTime := time.Now().Add(10 * time.Second)
|
delayTime := time.Now().Add(10 * time.Second)
|
||||||
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
|
job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime)
|
||||||
err := s.jobRepo.Save(job)
|
err := s.jobRepo.Save(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to save job", "error", err, "job_id", job.Id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -288,10 +350,14 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
|
|||||||
|
|
||||||
if recResult.IsFailed() {
|
if recResult.IsFailed() {
|
||||||
errorText := recResult.GetError()
|
errorText := recResult.GetError()
|
||||||
log.Printf("Operation failed: id %s, message %s\n", opId, errorText)
|
s.logger.Error("Operation failed",
|
||||||
|
"job_id", job.Id,
|
||||||
|
"operation_id", opId,
|
||||||
|
"error_message", errorText)
|
||||||
job.Fail(errorText)
|
job.Fail(errorText)
|
||||||
err := s.jobRepo.Save(job)
|
err := s.jobRepo.Save(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to save failed job", "error", err, "job_id", job.Id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -300,18 +366,24 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error {
|
|||||||
// Операция завершена, получаем результат
|
// Операция завершена, получаем результат
|
||||||
transcriptionText, err := s.recognizer.GetRecognitionText(opId)
|
transcriptionText, err := s.recognizer.GetRecognitionText(opId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to get recognition text", "error", err, "operation_id", opId)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Operation done: id %s\n", opId)
|
s.logger.Info("Operation completed successfully",
|
||||||
|
"job_id", job.Id,
|
||||||
|
"operation_id", opId,
|
||||||
|
"text_length", len(transcriptionText))
|
||||||
|
|
||||||
// Обновляем задачу с результатом
|
// Обновляем задачу с результатом
|
||||||
job.Done(transcriptionText)
|
job.Done(transcriptionText)
|
||||||
|
|
||||||
err = s.jobRepo.Save(job)
|
err = s.jobRepo.Save(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to save completed job", "error", err, "job_id", job.Id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Transcribe check job completed successfully", "job_id", job.Id)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
107
main.go
107
main.go
@@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@@ -29,40 +29,48 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
// Создаем структурированный логгер
|
||||||
|
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||||
|
Level: slog.LevelInfo,
|
||||||
|
}))
|
||||||
|
slog.SetDefault(logger)
|
||||||
|
|
||||||
// Загружаем переменные окружения из .env файла
|
// Загружаем переменные окружения из .env файла
|
||||||
if err := godotenv.Load(); err != nil {
|
if err := godotenv.Load(); err != nil {
|
||||||
log.Println("Warning: .env file not found, using system environment variables")
|
logger.Warn("Warning: .env file not found, using system environment variables")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Создаем директории если они не существуют
|
// Создаем директории если они не существуют
|
||||||
if err := os.MkdirAll("data/files", 0755); err != nil {
|
if err := os.MkdirAll("data/files", 0755); err != nil {
|
||||||
log.Fatal("Failed to create data/files directory:", err)
|
logger.Error("Failed to create data/files directory", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := sql.Open("sqlite3", "data/transcriber.db")
|
db, err := sql.Open("sqlite3", "data/transcriber.db")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to open database: %v", err)
|
logger.Error("failed to open database", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
if err := db.Ping(); err != nil {
|
if err := db.Ping(); err != nil {
|
||||||
log.Fatalf("failed to ping database: %v", err)
|
logger.Error("failed to ping database", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
gq := goqu.New("sqlite3", db)
|
gq := goqu.New("sqlite3", db)
|
||||||
|
|
||||||
// Запускаем миграции
|
// Запускаем миграции
|
||||||
if err := RunMigrations(db, "migrations"); err != nil {
|
if err := RunMigrations(db, "migrations", logger); err != nil {
|
||||||
log.Fatal("Failed to run migrations:", err)
|
logger.Error("Failed to run migrations", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Создаем репозитории
|
// Создаем репозитории
|
||||||
|
|
||||||
fileRepo := sqlite.NewFileRepository(db, gq)
|
fileRepo := sqlite.NewFileRepository(db, gq)
|
||||||
jobRepo := sqlite.NewTranscriptJobRepository(db, gq)
|
jobRepo := sqlite.NewTranscriptJobRepository(db, gq)
|
||||||
|
|
||||||
// Создаем адаптеры
|
// Создаем адаптеры
|
||||||
|
|
||||||
metaviewer := ffmpegmv.NewFfmpegMetaViewer()
|
metaviewer := ffmpegmv.NewFfmpegMetaViewer()
|
||||||
converter := ffmpegconv.NewFfmpegConverter()
|
converter := ffmpegconv.NewFfmpegConverter()
|
||||||
|
|
||||||
@@ -76,19 +84,18 @@ func main() {
|
|||||||
FolderID: os.Getenv("YANDEX_CLOUD_FOLDER_ID"),
|
FolderID: os.Getenv("YANDEX_CLOUD_FOLDER_ID"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to create audio recognizer: %v", err)
|
logger.Error("failed to create audio recognizer", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
defer recognizer.Close()
|
defer recognizer.Close()
|
||||||
|
|
||||||
// Создаем сервисы
|
// Создаем сервисы
|
||||||
|
transcribeService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer, logger)
|
||||||
transcribeService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer)
|
|
||||||
|
|
||||||
// Создаем воркеры
|
// Создаем воркеры
|
||||||
|
conversionWorker := worker.NewCallbackWorker("conversion_worker", transcribeService.FindAndRunConversionJob, logger)
|
||||||
conversionWorker := worker.NewCallbackWorker("conversion_worker", transcribeService.FindAndRunConversionJob)
|
transcribeWorker := worker.NewCallbackWorker("transcribe_worker", transcribeService.FindAndRunTranscribeJob, logger)
|
||||||
transcribeWorker := worker.NewCallbackWorker("transcribe_worker", transcribeService.FindAndRunTranscribeJob)
|
checkWorker := worker.NewCallbackWorker("check_worker", transcribeService.FindAndRunTranscribeCheckJob, logger)
|
||||||
checkWorker := worker.NewCallbackWorker("check_worker", transcribeService.FindAndRunTranscribeCheckJob)
|
|
||||||
|
|
||||||
workers := []worker.Worker{
|
workers := []worker.Worker{
|
||||||
conversionWorker,
|
conversionWorker,
|
||||||
@@ -109,13 +116,18 @@ func main() {
|
|||||||
go func(worker worker.Worker) {
|
go func(worker worker.Worker) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
worker.Start(ctx)
|
worker.Start(ctx)
|
||||||
log.Printf("%s stopped", worker.Name())
|
logger.Info("Worker stopped", "worker_name", worker.Name())
|
||||||
}(w)
|
}(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Создаем Gin middleware для логирования
|
||||||
|
gin.SetMode(gin.DebugMode)
|
||||||
|
router := gin.New()
|
||||||
|
router.Use(ginSlogMiddleware(logger))
|
||||||
|
router.Use(gin.Recovery())
|
||||||
|
|
||||||
// Запускаем HTTP сервер для API (создание задач и проверка статуса)
|
// Запускаем HTTP сервер для API (создание задач и проверка статуса)
|
||||||
transcribeHandler := httpcontroller.NewTranscribeHandler(jobRepo, transcribeService)
|
transcribeHandler := httpcontroller.NewTranscribeHandler(jobRepo, transcribeService)
|
||||||
router := gin.Default()
|
|
||||||
|
|
||||||
// Настраиваем роуты только для создания задач и проверки статуса
|
// Настраиваем роуты только для создания задач и проверки статуса
|
||||||
api := router.Group("/api")
|
api := router.Group("/api")
|
||||||
@@ -148,9 +160,9 @@ func main() {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
log.Println("Starting HTTP server on :8080")
|
logger.Info("Starting HTTP server", "port", 8080)
|
||||||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||||
log.Printf("HTTP server error: %v", err)
|
logger.Error("HTTP server error", "error", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@@ -158,24 +170,24 @@ func main() {
|
|||||||
sigChan := make(chan os.Signal, 1)
|
sigChan := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
log.Println("Transcriber service started with background workers")
|
logger.Info("Transcriber service started with background workers")
|
||||||
log.Println("Workers: ConversionWorker, TranscribeWorker, CheckWorker")
|
logger.Info("Workers: ConversionWorker, TranscribeWorker, CheckWorker")
|
||||||
log.Println("Press Ctrl+C to stop...")
|
logger.Info("Press Ctrl+C to stop...")
|
||||||
|
|
||||||
// Ждем сигнал завершения
|
// Ждем сигнал завершения
|
||||||
<-sigChan
|
<-sigChan
|
||||||
log.Println("Received shutdown signal, initiating graceful shutdown...")
|
logger.Info("Received shutdown signal, initiating graceful shutdown...")
|
||||||
|
|
||||||
// Создаем контекст с таймаутом для graceful shutdown HTTP сервера
|
// Создаем контекст с таймаутом для graceful shutdown HTTP сервера
|
||||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer shutdownCancel()
|
defer shutdownCancel()
|
||||||
|
|
||||||
// Останавливаем HTTP сервер
|
// Останавливаем HTTP сервер
|
||||||
log.Println("Shutting down HTTP server...")
|
logger.Info("Shutting down HTTP server...")
|
||||||
if err := srv.Shutdown(shutdownCtx); err != nil {
|
if err := srv.Shutdown(shutdownCtx); err != nil {
|
||||||
log.Printf("HTTP server forced to shutdown: %v", err)
|
logger.Error("HTTP server forced to shutdown", "error", err)
|
||||||
} else {
|
} else {
|
||||||
log.Println("HTTP server stopped gracefully")
|
logger.Info("HTTP server stopped gracefully")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Отменяем контекст для остановки воркеров
|
// Отменяем контекст для остановки воркеров
|
||||||
@@ -191,15 +203,15 @@ func main() {
|
|||||||
// Ждем завершения всех воркеров или таймаута в 10 секунд
|
// Ждем завершения всех воркеров или таймаута в 10 секунд
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
log.Println("All workers stopped gracefully")
|
logger.Info("All workers stopped gracefully")
|
||||||
case <-time.After(10 * time.Second):
|
case <-time.After(10 * time.Second):
|
||||||
log.Println("Timeout reached, forcing shutdown")
|
logger.Warn("Timeout reached, forcing shutdown")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("Transcriber service stopped")
|
logger.Info("Transcriber service stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunMigrations(db *sql.DB, migrationsDir string) error {
|
func RunMigrations(db *sql.DB, migrationsDir string, logger *slog.Logger) error {
|
||||||
if err := goose.SetDialect("sqlite3"); err != nil {
|
if err := goose.SetDialect("sqlite3"); err != nil {
|
||||||
return fmt.Errorf("failed to set goose dialect: %w", err)
|
return fmt.Errorf("failed to set goose dialect: %w", err)
|
||||||
}
|
}
|
||||||
@@ -208,6 +220,37 @@ func RunMigrations(db *sql.DB, migrationsDir string) error {
|
|||||||
return fmt.Errorf("failed to run migrations: %w", err)
|
return fmt.Errorf("failed to run migrations: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("Migrations completed successfully")
|
logger.Info("Migrations completed successfully")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ginSlogMiddleware создает middleware для Gin, который использует slog для логирования
|
||||||
|
func ginSlogMiddleware(logger *slog.Logger) gin.HandlerFunc {
|
||||||
|
return func(c *gin.Context) {
|
||||||
|
start := time.Now()
|
||||||
|
path := c.Request.URL.Path
|
||||||
|
raw := c.Request.URL.RawQuery
|
||||||
|
|
||||||
|
// Обрабатываем запрос
|
||||||
|
c.Next()
|
||||||
|
|
||||||
|
// Логируем после обработки
|
||||||
|
latency := time.Since(start)
|
||||||
|
clientIP := c.ClientIP()
|
||||||
|
method := c.Request.Method
|
||||||
|
statusCode := c.Writer.Status()
|
||||||
|
|
||||||
|
if raw != "" {
|
||||||
|
path = path + "?" + raw
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("HTTP request",
|
||||||
|
"method", method,
|
||||||
|
"path", path,
|
||||||
|
"status", statusCode,
|
||||||
|
"latency", latency,
|
||||||
|
"client_ip", clientIP,
|
||||||
|
"user_agent", c.Request.UserAgent(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user