121 lines
4.1 KiB
Go
121 lines
4.1 KiB
Go
// Package ingest — use-case приёма загрузки, общий для всех транспортов
|
|
// (HTTP, Telegram, CLI). Принимает источник + контекст, отдаёт источник в
|
|
// qBittorrent и заводит/находит задачу в БД.
|
|
package ingest
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
|
|
"git.vakhrushev.me/av/jellybit/internal/magnet"
|
|
"git.vakhrushev.me/av/jellybit/internal/qbt"
|
|
"git.vakhrushev.me/av/jellybit/internal/store"
|
|
)
|
|
|
|
// Store — нужная ingest часть хранилища.
|
|
type Store interface {
|
|
FindActiveByInfohash(ctx context.Context, infohash string) (*store.Download, error)
|
|
CreateDownload(ctx context.Context, d *store.Download) (int64, error)
|
|
SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error
|
|
}
|
|
|
|
// QBittorrent — нужная ingest часть клиента qBittorrent.
|
|
type QBittorrent interface {
|
|
Add(ctx context.Context, ar qbt.AddRequest) error
|
|
}
|
|
|
|
// Config — параметры добавления в qBittorrent.
|
|
type Config struct {
|
|
Category string
|
|
SavePath string
|
|
}
|
|
|
|
// Service — реализация приёма.
|
|
type Service struct {
|
|
store Store
|
|
qbt QBittorrent
|
|
cfg Config
|
|
log *slog.Logger
|
|
}
|
|
|
|
// New собирает сервис приёма.
|
|
func New(st Store, qb QBittorrent, cfg Config, log *slog.Logger) *Service {
|
|
return &Service{store: st, qbt: qb, cfg: cfg, log: log}
|
|
}
|
|
|
|
// Request — входной запрос приёма.
|
|
type Request struct {
|
|
Source string // пока — magnet-ссылка
|
|
Context string // подсказка для распознавания (опц.)
|
|
}
|
|
|
|
// Result — итог приёма.
|
|
type Result struct {
|
|
DownloadID int64
|
|
Infohash string
|
|
State store.State
|
|
Deduplicated bool // присоединились к уже активной задаче, нового добавления не было
|
|
}
|
|
|
|
// Ingest принимает источник: извлекает infohash, дедуплицирует по активной
|
|
// задаче, иначе заводит задачу и отдаёт источник в qBittorrent.
|
|
func (s *Service) Ingest(ctx context.Context, req Request) (Result, error) {
|
|
source := strings.TrimSpace(req.Source)
|
|
info, err := magnet.Parse(source)
|
|
if err != nil {
|
|
// Ф1: поддержан только magnet. .torrent/url — следующий заход.
|
|
return Result{}, fmt.Errorf("ingest: %w", err)
|
|
}
|
|
|
|
if existing, err := s.store.FindActiveByInfohash(ctx, info.Infohash); err != nil {
|
|
return Result{}, fmt.Errorf("ingest: lookup active: %w", err)
|
|
} else if existing != nil {
|
|
s.log.Info("ingest: attached to active download",
|
|
"download_id", existing.ID, "infohash", info.Infohash, "state", existing.State)
|
|
return Result{
|
|
DownloadID: existing.ID,
|
|
Infohash: info.Infohash,
|
|
State: existing.State,
|
|
Deduplicated: true,
|
|
}, nil
|
|
}
|
|
|
|
d := &store.Download{
|
|
SourceType: store.SourceMagnet,
|
|
SourceRef: source,
|
|
Context: req.Context,
|
|
Infohash: store.NullString(info.Infohash),
|
|
IdempotencyKey: store.NullString(info.Infohash),
|
|
State: store.StateDownloading,
|
|
}
|
|
id, err := s.store.CreateDownload(ctx, d)
|
|
if err != nil {
|
|
return Result{}, fmt.Errorf("ingest: create download: %w", err)
|
|
}
|
|
|
|
addErr := s.qbt.Add(ctx, qbt.AddRequest{
|
|
URLs: []string{source},
|
|
Category: s.cfg.Category,
|
|
SavePath: s.cfg.SavePath,
|
|
})
|
|
if addErr != nil {
|
|
// Задача уже в БД — помечаем failed, чтобы worker её не подхватил.
|
|
if setErr := s.store.SetDownloadState(ctx, id, store.StateFailed, "qbit_add", addErr.Error()); setErr != nil {
|
|
s.log.Error("ingest: failed to mark download failed after qbit error",
|
|
"download_id", id, "err", setErr)
|
|
}
|
|
return Result{DownloadID: id, Infohash: info.Infohash, State: store.StateFailed},
|
|
fmt.Errorf("ingest: add to qbittorrent: %w", addErr)
|
|
}
|
|
|
|
s.log.Info("ingest: download accepted",
|
|
"download_id", id, "infohash", info.Infohash, "category", s.cfg.Category)
|
|
return Result{
|
|
DownloadID: id,
|
|
Infohash: info.Infohash,
|
|
State: store.StateDownloading,
|
|
}, nil
|
|
}
|