// Package ingest — use-case приёма загрузки, общий для всех транспортов // (HTTP, Telegram, CLI). Принимает источник + контекст, отдаёт источник в // qBittorrent и заводит/находит задачу в БД. package ingest import ( "context" "fmt" "log/slog" "strings" "git.vakhrushev.me/av/jellybit/internal/magnet" "git.vakhrushev.me/av/jellybit/internal/qbt" "git.vakhrushev.me/av/jellybit/internal/store" ) // Store — нужная ingest часть хранилища. type Store interface { FindActiveByInfohash(ctx context.Context, infohash string) (*store.Download, error) CreateDownload(ctx context.Context, d *store.Download) (int64, error) SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error } // QBittorrent — нужная ingest часть клиента qBittorrent. type QBittorrent interface { Add(ctx context.Context, ar qbt.AddRequest) error } // Config — параметры добавления в qBittorrent. type Config struct { Category string SavePath string } // Service — реализация приёма. type Service struct { store Store qbt QBittorrent cfg Config log *slog.Logger } // New собирает сервис приёма. func New(st Store, qb QBittorrent, cfg Config, log *slog.Logger) *Service { return &Service{store: st, qbt: qb, cfg: cfg, log: log} } // Request — входной запрос приёма. type Request struct { Source string // пока — magnet-ссылка Context string // подсказка для распознавания (опц.) } // Result — итог приёма. type Result struct { DownloadID int64 Infohash string State store.State Deduplicated bool // присоединились к уже активной задаче, нового добавления не было } // Ingest принимает источник: извлекает infohash, дедуплицирует по активной // задаче, иначе заводит задачу и отдаёт источник в qBittorrent. func (s *Service) Ingest(ctx context.Context, req Request) (Result, error) { source := strings.TrimSpace(req.Source) info, err := magnet.Parse(source) if err != nil { // Ф1: поддержан только magnet. .torrent/url — следующий заход. return Result{}, fmt.Errorf("ingest: %w", err) } if existing, err := s.store.FindActiveByInfohash(ctx, info.Infohash); err != nil { return Result{}, fmt.Errorf("ingest: lookup active: %w", err) } else if existing != nil { s.log.Info("ingest: attached to active download", "download_id", existing.ID, "infohash", info.Infohash, "state", existing.State) return Result{ DownloadID: existing.ID, Infohash: info.Infohash, State: existing.State, Deduplicated: true, }, nil } d := &store.Download{ SourceType: store.SourceMagnet, SourceRef: source, Context: req.Context, Infohash: store.NullString(info.Infohash), IdempotencyKey: store.NullString(info.Infohash), State: store.StateDownloading, } id, err := s.store.CreateDownload(ctx, d) if err != nil { return Result{}, fmt.Errorf("ingest: create download: %w", err) } addErr := s.qbt.Add(ctx, qbt.AddRequest{ URLs: []string{source}, Category: s.cfg.Category, SavePath: s.cfg.SavePath, }) if addErr != nil { s.log.Warn("ingest: qbittorrent add failed, marking download failed", "download_id", id, "infohash", info.Infohash, "err", addErr) // Задача уже в БД — помечаем failed, чтобы worker её не подхватил. if setErr := s.store.SetDownloadState(ctx, id, store.StateFailed, "qbit_add", addErr.Error()); setErr != nil { s.log.Error("ingest: failed to mark download failed after qbit error", "download_id", id, "err", setErr) } return Result{DownloadID: id, Infohash: info.Infohash, State: store.StateFailed}, fmt.Errorf("ingest: add to qbittorrent: %w", addErr) } s.log.Info("ingest: download accepted", "download_id", id, "infohash", info.Infohash, "category", s.cfg.Category) return Result{ DownloadID: id, Infohash: info.Infohash, State: store.StateDownloading, }, nil }