diff --git a/internal/controller/http/transcribe_test.go b/internal/controller/http/transcribe_test.go index 3fba6bb..0f1b1ff 100644 --- a/internal/controller/http/transcribe_test.go +++ b/internal/controller/http/transcribe_test.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "io" + "log/slog" "mime/multipart" "net/http" "net/http/httptest" @@ -63,7 +64,12 @@ func setupTestRouter(t *testing.T) (*gin.Engine, *TranscribeHandler) { converter := ffmpegconv.NewFfmpegConverter() recognizer := &recognizer.MemoryAudioRecognizer{} - trsService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer) + // Создаем тестовый логгер + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelError, // Только ошибки в тестах + })) + + trsService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer, logger) handler := NewTranscribeHandler(jobRepo, trsService) diff --git a/internal/controller/worker/worker.go b/internal/controller/worker/worker.go index 7f1c485..1c23af3 100644 --- a/internal/controller/worker/worker.go +++ b/internal/controller/worker/worker.go @@ -2,7 +2,7 @@ package worker import ( "context" - "log" + "log/slog" "strconv" "time" @@ -17,12 +17,21 @@ type Worker interface { } type CallbackWorker struct { - name string - f func() error + name string + f func() error + logger *slog.Logger } -func NewCallbackWorker(name string, f func() error) *CallbackWorker { - return &CallbackWorker{name, f} +func NewCallbackWorker(name string, f func() error, logger *slog.Logger) *CallbackWorker { + if logger == nil { + logger = slog.Default() + } + + return &CallbackWorker{ + name: name, + f: f, + logger: logger, + } } func (w *CallbackWorker) Name() string { @@ -30,12 +39,12 @@ func (w *CallbackWorker) Name() string { } func (w *CallbackWorker) Start(ctx context.Context) { - log.Printf("%s started", w.Name()) + w.logger.Info("Worker started", "worker_name", w.Name()) for { select { case <-ctx.Done(): - log.Printf("%s received shutdown signal", w.Name()) + w.logger.Info("Worker received shutdown signal", "worker_name", w.Name()) return default: err := w.f() @@ -44,13 +53,13 @@ func (w *CallbackWorker) Start(ctx context.Context) { metrics.WorkerJobCounter.WithLabelValues(w.Name(), strconv.FormatBool(err != nil)).Inc() } if err != nil && !isNoop { - log.Printf("%s error: %v", w.Name(), err) + w.logger.Error("Worker error", "worker_name", w.Name(), "error", err) } // Ждем 1 секунду перед следующей итерацией select { case <-ctx.Done(): - log.Printf("%s received shutdown signal during sleep", w.Name()) + w.logger.Info("Worker received shutdown signal during sleep", "worker_name", w.Name()) return case <-time.After(1 * time.Second): // Продолжаем работу diff --git a/internal/service/transcribe.go b/internal/service/transcribe.go index aa56450..95a5db1 100644 --- a/internal/service/transcribe.go +++ b/internal/service/transcribe.go @@ -3,7 +3,7 @@ package service import ( "fmt" "io" - "log" + "log/slog" "os" "path/filepath" "strconv" @@ -28,6 +28,7 @@ type TranscribeService struct { metaviewer contract.AudioMetaViewer converter contract.AudioFileConverter recognizer contract.AudioRecognizer + logger *slog.Logger } func NewTranscribeService( @@ -36,6 +37,7 @@ func NewTranscribeService( metaviewer contract.AudioMetaViewer, converter contract.AudioFileConverter, recognizer contract.AudioRecognizer, + logger *slog.Logger, ) *TranscribeService { return &TranscribeService{ jobRepo: jobRepo, @@ -43,6 +45,7 @@ func NewTranscribeService( metaviewer: metaviewer, converter: converter, recognizer: recognizer, + logger: logger, } } @@ -60,9 +63,15 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) storageFileName := fmt.Sprintf("%s%s", fileId, ext) storageFilePath := filepath.Join(baseStorageDir, storageFileName) + s.logger.Info("Creating transcribe job", + "file_id", fileId, + "file_name", fileName, + "storage_path", storageFilePath) + // Создаем файл на диске dst, err := os.Create(storageFilePath) if err != nil { + s.logger.Error("Failed to create file", "error", err, "path", storageFilePath) return nil, err } defer dst.Close() @@ -70,18 +79,26 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) // Копируем содержимое загруженного файла size, err := io.Copy(dst, file) if err != nil { + s.logger.Error("Failed to copy file content", "error", err) return nil, err } if err := dst.Close(); err != nil { + s.logger.Error("Failed to close file", "error", err) return nil, err } info, err := s.metaviewer.GetInfo(storageFilePath) if err != nil { + s.logger.Error("Failed to get file info", "error", err, "path", storageFilePath) return nil, err } + s.logger.Info("File uploaded successfully", + "file_id", fileId, + "size", size, + "duration_seconds", info.Seconds) + metrics.InputFileDurationHistogram.WithLabelValues().Observe(float64(info.Seconds)) metrics.InputFileSizeHistogram.WithLabelValues(ext).Observe(float64(size)) @@ -97,6 +114,7 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) if err := s.fileRepo.Create(fileRecord); err != nil { // Удаляем файл если не удалось создать запись в БД os.Remove(storageFilePath) + s.logger.Error("Failed to create file record", "error", err, "file_id", fileId) return nil, err } @@ -114,9 +132,11 @@ func (s *TranscribeService) CreateTranscribeJob(file io.Reader, fileName string) } if err := s.jobRepo.Create(job); err != nil { + s.logger.Error("Failed to create job record", "error", err, "job_id", jobId) return nil, err } + s.logger.Info("Transcribe job created successfully", "job_id", jobId, "file_id", fileId) return job, nil } @@ -129,11 +149,15 @@ func (s *TranscribeService) FindAndRunConversionJob() error { if _, ok := err.(*contract.JobNotFoundError); ok { return &contract.NoopJobError{State: entity.StateCreated} } + s.logger.Error("Failed to find and acquire conversion job", "error", err) return err } + s.logger.Info("Starting conversion job", "job_id", job.Id, "acquisition_id", acquisitionId) + srcFile, err := s.fileRepo.GetByID(*job.FileID) if err != nil { + s.logger.Error("Failed to get source file", "error", err, "file_id", *job.FileID) return err } @@ -149,6 +173,12 @@ func (s *TranscribeService) FindAndRunConversionJob() error { srcExt = defaultAudioExt } + s.logger.Info("Converting file", + "job_id", job.Id, + "src_path", srcFilePath, + "dest_path", destFilePath, + "src_format", srcExt) + // Измеряем время конвертации startTime := time.Now() err = s.converter.Convert(srcFilePath, destFilePath) @@ -160,14 +190,24 @@ func (s *TranscribeService) FindAndRunConversionJob() error { Observe(conversionDuration.Seconds()) if err != nil { + s.logger.Error("File conversion failed", + "error", err, + "job_id", job.Id, + "duration", conversionDuration) return err } stat, err := os.Stat(destFilePath) if err != nil { + s.logger.Error("Failed to stat converted file", "error", err, "path", destFilePath) return err } + s.logger.Info("File conversion completed", + "job_id", job.Id, + "duration", conversionDuration, + "output_size", stat.Size()) + // Записываем метрику размера выходного файла metrics.OutputFileSizeHistogram.WithLabelValues("ogg").Observe(float64(stat.Size())) @@ -185,14 +225,17 @@ func (s *TranscribeService) FindAndRunConversionJob() error { err = s.fileRepo.Create(destFileRecord) if err != nil { + s.logger.Error("Failed to create converted file record", "error", err, "file_id", destFileId) return err } err = s.jobRepo.Save(job) if err != nil { + s.logger.Error("Failed to save job", "error", err, "job_id", job.Id) return err } + s.logger.Info("Conversion job completed successfully", "job_id", job.Id) return nil } @@ -205,11 +248,15 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { if _, ok := err.(*contract.JobNotFoundError); ok { return &contract.NoopJobError{State: entity.StateConverted} } + s.logger.Error("Failed to find and acquire transcribe job", "error", err) return err } + s.logger.Info("Starting transcribe job", "job_id", jobRecord.Id, "acquisition_id", acquisitionId) + fileRecord, err := s.fileRepo.GetByID(*jobRecord.FileID) if err != nil { + s.logger.Error("Failed to get file record", "error", err, "file_id", *jobRecord.FileID) return err } @@ -217,6 +264,7 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { file, err := os.Open(filePath) if err != nil { + s.logger.Error("Failed to open file", "error", err, "path", filePath) return err } defer file.Close() @@ -224,12 +272,19 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { destFileId := uuid.NewString() destFileRecord := fileRecord.CopyWithStorage(destFileId, entity.StorageS3) + s.logger.Info("Starting recognition", "job_id", jobRecord.Id, "file_path", filePath) + // Запускаем асинхронное распознавание operationID, err := s.recognizer.Recognize(file, destFileRecord.FileName) if err != nil { + s.logger.Error("Failed to start recognition", "error", err, "job_id", jobRecord.Id) return err } + s.logger.Info("Recognition started", + "job_id", jobRecord.Id, + "operation_id", operationID) + // Обновляем задачу с ID операции распознавания jobRecord.FileID = &destFileId jobRecord.RecognitionOpID = &operationID @@ -238,14 +293,17 @@ func (s *TranscribeService) FindAndRunTranscribeJob() error { err = s.fileRepo.Create(destFileRecord) if err != nil { + s.logger.Error("Failed to create S3 file record", "error", err, "file_id", destFileId) return err } err = s.jobRepo.Save(jobRecord) if err != nil { + s.logger.Error("Failed to save job", "error", err, "job_id", jobRecord.Id) return err } + s.logger.Info("Transcribe job updated successfully", "job_id", jobRecord.Id) return nil } @@ -258,29 +316,33 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { if _, ok := err.(*contract.JobNotFoundError); ok { return &contract.NoopJobError{State: entity.StateTranscribe} } + s.logger.Error("Failed to find and acquire transcribe check job", "error", err) return fmt.Errorf("failed find and acquire job: %s, %w", entity.StateTranscribe, err) } if job.RecognitionOpID == nil { + s.logger.Error("Recognition operation ID not found", "job_id", job.Id) return fmt.Errorf("recogniton opId not found for job: %s", job.Id) } opId := *job.RecognitionOpID // Проверяем статус операции - log.Printf("Check operation status: id %s\n", opId) + s.logger.Info("Checking operation status", "job_id", job.Id, "operation_id", opId) recResult, err := s.recognizer.CheckRecognitionStatus(opId) if err != nil { + s.logger.Error("Failed to check recognition status", "error", err, "operation_id", opId) return err } if recResult.IsInProgress() { // Операция еще не завершена, оставляем в статусе обработки - log.Printf("Operation in progress: id %s\n", opId) + s.logger.Info("Operation in progress", "job_id", job.Id, "operation_id", opId) delayTime := time.Now().Add(10 * time.Second) job.MoveToStateAndDelay(entity.StateTranscribe, &delayTime) err := s.jobRepo.Save(job) if err != nil { + s.logger.Error("Failed to save job", "error", err, "job_id", job.Id) return err } return nil @@ -288,10 +350,14 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { if recResult.IsFailed() { errorText := recResult.GetError() - log.Printf("Operation failed: id %s, message %s\n", opId, errorText) + s.logger.Error("Operation failed", + "job_id", job.Id, + "operation_id", opId, + "error_message", errorText) job.Fail(errorText) err := s.jobRepo.Save(job) if err != nil { + s.logger.Error("Failed to save failed job", "error", err, "job_id", job.Id) return err } return nil @@ -300,18 +366,24 @@ func (s *TranscribeService) FindAndRunTranscribeCheckJob() error { // Операция завершена, получаем результат transcriptionText, err := s.recognizer.GetRecognitionText(opId) if err != nil { + s.logger.Error("Failed to get recognition text", "error", err, "operation_id", opId) return err } - log.Printf("Operation done: id %s\n", opId) + s.logger.Info("Operation completed successfully", + "job_id", job.Id, + "operation_id", opId, + "text_length", len(transcriptionText)) // Обновляем задачу с результатом job.Done(transcriptionText) err = s.jobRepo.Save(job) if err != nil { + s.logger.Error("Failed to save completed job", "error", err, "job_id", job.Id) return err } + s.logger.Info("Transcribe check job completed successfully", "job_id", job.Id) return nil } diff --git a/main.go b/main.go index de894bb..e2a5085 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,7 @@ import ( "context" "database/sql" "fmt" - "log" + "log/slog" "net/http" "os" "os/signal" @@ -29,40 +29,48 @@ import ( ) func main() { + // Создаем структурированный логгер + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + slog.SetDefault(logger) + // Загружаем переменные окружения из .env файла if err := godotenv.Load(); err != nil { - log.Println("Warning: .env file not found, using system environment variables") + logger.Warn("Warning: .env file not found, using system environment variables") } // Создаем директории если они не существуют if err := os.MkdirAll("data/files", 0755); err != nil { - log.Fatal("Failed to create data/files directory:", err) + logger.Error("Failed to create data/files directory", "error", err) + os.Exit(1) } db, err := sql.Open("sqlite3", "data/transcriber.db") if err != nil { - log.Fatalf("failed to open database: %v", err) + logger.Error("failed to open database", "error", err) + os.Exit(1) } defer db.Close() if err := db.Ping(); err != nil { - log.Fatalf("failed to ping database: %v", err) + logger.Error("failed to ping database", "error", err) + os.Exit(1) } gq := goqu.New("sqlite3", db) // Запускаем миграции - if err := RunMigrations(db, "migrations"); err != nil { - log.Fatal("Failed to run migrations:", err) + if err := RunMigrations(db, "migrations", logger); err != nil { + logger.Error("Failed to run migrations", "error", err) + os.Exit(1) } // Создаем репозитории - fileRepo := sqlite.NewFileRepository(db, gq) jobRepo := sqlite.NewTranscriptJobRepository(db, gq) // Создаем адаптеры - metaviewer := ffmpegmv.NewFfmpegMetaViewer() converter := ffmpegconv.NewFfmpegConverter() @@ -76,19 +84,18 @@ func main() { FolderID: os.Getenv("YANDEX_CLOUD_FOLDER_ID"), }) if err != nil { - log.Fatalf("failed to create audio recognizer: %v", err) + logger.Error("failed to create audio recognizer", "error", err) + os.Exit(1) } defer recognizer.Close() // Создаем сервисы - - transcribeService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer) + transcribeService := service.NewTranscribeService(jobRepo, fileRepo, metaviewer, converter, recognizer, logger) // Создаем воркеры - - conversionWorker := worker.NewCallbackWorker("conversion_worker", transcribeService.FindAndRunConversionJob) - transcribeWorker := worker.NewCallbackWorker("transcribe_worker", transcribeService.FindAndRunTranscribeJob) - checkWorker := worker.NewCallbackWorker("check_worker", transcribeService.FindAndRunTranscribeCheckJob) + conversionWorker := worker.NewCallbackWorker("conversion_worker", transcribeService.FindAndRunConversionJob, logger) + transcribeWorker := worker.NewCallbackWorker("transcribe_worker", transcribeService.FindAndRunTranscribeJob, logger) + checkWorker := worker.NewCallbackWorker("check_worker", transcribeService.FindAndRunTranscribeCheckJob, logger) workers := []worker.Worker{ conversionWorker, @@ -109,13 +116,18 @@ func main() { go func(worker worker.Worker) { defer wg.Done() worker.Start(ctx) - log.Printf("%s stopped", worker.Name()) + logger.Info("Worker stopped", "worker_name", worker.Name()) }(w) } + // Создаем Gin middleware для логирования + gin.SetMode(gin.DebugMode) + router := gin.New() + router.Use(ginSlogMiddleware(logger)) + router.Use(gin.Recovery()) + // Запускаем HTTP сервер для API (создание задач и проверка статуса) transcribeHandler := httpcontroller.NewTranscribeHandler(jobRepo, transcribeService) - router := gin.Default() // Настраиваем роуты только для создания задач и проверки статуса api := router.Group("/api") @@ -148,9 +160,9 @@ func main() { wg.Add(1) go func() { defer wg.Done() - log.Println("Starting HTTP server on :8080") + logger.Info("Starting HTTP server", "port", 8080) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Printf("HTTP server error: %v", err) + logger.Error("HTTP server error", "error", err) } }() @@ -158,24 +170,24 @@ func main() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - log.Println("Transcriber service started with background workers") - log.Println("Workers: ConversionWorker, TranscribeWorker, CheckWorker") - log.Println("Press Ctrl+C to stop...") + logger.Info("Transcriber service started with background workers") + logger.Info("Workers: ConversionWorker, TranscribeWorker, CheckWorker") + logger.Info("Press Ctrl+C to stop...") // Ждем сигнал завершения <-sigChan - log.Println("Received shutdown signal, initiating graceful shutdown...") + logger.Info("Received shutdown signal, initiating graceful shutdown...") // Создаем контекст с таймаутом для graceful shutdown HTTP сервера shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) defer shutdownCancel() // Останавливаем HTTP сервер - log.Println("Shutting down HTTP server...") + logger.Info("Shutting down HTTP server...") if err := srv.Shutdown(shutdownCtx); err != nil { - log.Printf("HTTP server forced to shutdown: %v", err) + logger.Error("HTTP server forced to shutdown", "error", err) } else { - log.Println("HTTP server stopped gracefully") + logger.Info("HTTP server stopped gracefully") } // Отменяем контекст для остановки воркеров @@ -191,15 +203,15 @@ func main() { // Ждем завершения всех воркеров или таймаута в 10 секунд select { case <-done: - log.Println("All workers stopped gracefully") + logger.Info("All workers stopped gracefully") case <-time.After(10 * time.Second): - log.Println("Timeout reached, forcing shutdown") + logger.Warn("Timeout reached, forcing shutdown") } - log.Println("Transcriber service stopped") + logger.Info("Transcriber service stopped") } -func RunMigrations(db *sql.DB, migrationsDir string) error { +func RunMigrations(db *sql.DB, migrationsDir string, logger *slog.Logger) error { if err := goose.SetDialect("sqlite3"); err != nil { return fmt.Errorf("failed to set goose dialect: %w", err) } @@ -208,6 +220,37 @@ func RunMigrations(db *sql.DB, migrationsDir string) error { return fmt.Errorf("failed to run migrations: %w", err) } - log.Println("Migrations completed successfully") + logger.Info("Migrations completed successfully") return nil } + +// ginSlogMiddleware создает middleware для Gin, который использует slog для логирования +func ginSlogMiddleware(logger *slog.Logger) gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + path := c.Request.URL.Path + raw := c.Request.URL.RawQuery + + // Обрабатываем запрос + c.Next() + + // Логируем после обработки + latency := time.Since(start) + clientIP := c.ClientIP() + method := c.Request.Method + statusCode := c.Writer.Status() + + if raw != "" { + path = path + "?" + raw + } + + logger.Info("HTTP request", + "method", method, + "path", path, + "status", statusCode, + "latency", latency, + "client_ip", clientIP, + "user_agent", c.Request.UserAgent(), + ) + } +}