From 4e077d878ec8112379386dafd605a31862f92e65 Mon Sep 17 00:00:00 2001 From: Anton Vakhrushev Date: Sun, 14 Jun 2026 17:06:59 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=20"?= =?UTF-8?q?=D1=83=D1=81=D1=8B=D0=BD=D0=BE=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8?= =?UTF-8?q?=D0=B5"=20=D1=81=D1=83=D1=89=D0=B5=D1=81=D1=82=D0=B2=D1=83?= =?UTF-8?q?=D1=8E=D1=89=D0=B8=D1=85=20=D1=82=D0=BE=D1=80=D1=80=D0=B5=D0=BD?= =?UTF-8?q?=D1=82=D0=BE=D0=B2=20=D0=BF=D1=80=D0=B8=20=D0=B4=D0=BE=D0=B1?= =?UTF-8?q?=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B8=20=D1=82=D0=B5=D0=B3?= =?UTF-8?q?=D0=B0=20=D0=B8=D0=BB=D0=B8=20=D0=BA=D0=B0=D1=82=D0=B5=D0=B3?= =?UTF-8?q?=D0=BE=D1=80=D0=B8=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/jellybit/serve.go | 1 + config.example.toml | 3 +- internal/config/config.go | 13 ++- internal/qbt/qbt.go | 1 + internal/store/download.go | 12 +++ internal/store/recognition_test.go | 24 +++++ internal/worker/discover.go | 93 ++++++++++++++++++ internal/worker/discover_test.go | 146 +++++++++++++++++++++++++++++ internal/worker/review_test.go | 17 ++++ internal/worker/worker.go | 12 ++- internal/worker/worker_test.go | 17 ++++ 11 files changed, 333 insertions(+), 6 deletions(-) create mode 100644 internal/worker/discover.go create mode 100644 internal/worker/discover_test.go diff --git a/cmd/jellybit/serve.go b/cmd/jellybit/serve.go index 125c2b3..8ad108f 100644 --- a/cmd/jellybit/serve.go +++ b/cmd/jellybit/serve.go @@ -107,6 +107,7 @@ func runServe(args []string) error { wrk := worker.New(st, qb, recognizer, layouter, worker.Config{ Category: cfg.QBittorrent.Category, + Tag: cfg.QBittorrent.Tag, SavePath: cfg.QBittorrent.SavePath, PollInterval: cfg.Worker.PollInterval.Std(), StuckAfter: cfg.Worker.StuckAfter.Std(), diff --git a/config.example.toml b/config.example.toml index 1090f4a..322604f 100644 --- a/config.example.toml +++ b/config.example.toml @@ -6,7 +6,8 @@ url = "http://qbit:8989" # по имени сервиса в общей docker-сети username = "admin" password = "" -category = "jellybit" +category = "jellybit" # категория для добавляемых jellybit раздач (push) +tag = "jellybit" # тег для усыновления существующих раздач (pull, не двигает файлы) savepath = "/srv/media/downloads" # qBit кладёт загрузки сюда (задаём при добавлении) path_map = {} # фолбэк трансляции путей; обычно пуст diff --git a/internal/config/config.go b/internal/config/config.go index 4558242..1d46026 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -26,10 +26,15 @@ type Config struct { // QBittorrent — доступ к qBittorrent WebUI и раскладка путей загрузок. type QBittorrent struct { - URL string `toml:"url"` - Username string `toml:"username"` - Password string `toml:"password"` - Category string `toml:"category"` + URL string `toml:"url"` + Username string `toml:"username"` + Password string `toml:"password"` + // Category — категория для добавляемых jellybit раздач (push, savepath). + Category string `toml:"category"` + // Tag — метка для усыновления существующих раздач (pull, не трогает + // категорию/savepath). Discovery подхватывает раздачи с этой категорией + // ИЛИ этим тегом. + Tag string `toml:"tag"` SavePath string `toml:"savepath"` PathMap map[string]string `toml:"path_map"` } diff --git a/internal/qbt/qbt.go b/internal/qbt/qbt.go index 500ae3f..398d7dc 100644 --- a/internal/qbt/qbt.go +++ b/internal/qbt/qbt.go @@ -47,6 +47,7 @@ type Torrent struct { SavePath string `json:"save_path"` ContentPath string `json:"content_path"` Category string `json:"category"` + Tags string `json:"tags"` // через запятую Progress float64 `json:"progress"` AmountLeft int64 `json:"amount_left"` AddedOn int64 `json:"added_on"` diff --git a/internal/store/download.go b/internal/store/download.go index edbb614..2ab35f8 100644 --- a/internal/store/download.go +++ b/internal/store/download.go @@ -159,6 +159,18 @@ func (s *Store) FindActiveByInfohash(ctx context.Context, infohash string) (*Dow return &d, nil } +// ExistsByInfohash сообщает, есть ли хоть одна загрузка (в любом состоянии) +// с данным infohash. Discovery усыновляет раздачу только если её ещё не +// видели — так готовые задачи не переобрабатываются на каждом тике. +func (s *Store) ExistsByInfohash(ctx context.Context, infohash string) (bool, error) { + var n int + if err := s.DB.GetContext(ctx, &n, + `SELECT COUNT(1) FROM download WHERE infohash = ?`, infohash); err != nil { + return false, fmt.Errorf("exists by infohash: %w", err) + } + return n > 0, nil +} + // SetDownloadState переводит загрузку в новое состояние. Ключ // идемпотентности пересчитывается из текущего infohash: для терминального // состояния снимается (NULL), иначе равен infohash — так partial unique diff --git a/internal/store/recognition_test.go b/internal/store/recognition_test.go index eafe583..a621115 100644 --- a/internal/store/recognition_test.go +++ b/internal/store/recognition_test.go @@ -212,6 +212,30 @@ func TestCandidates_Lifecycle(t *testing.T) { } } +func TestExistsByInfohash(t *testing.T) { + st := newTestStore(t) + ctx := context.Background() + const ih = "aabbccddeeff00112233445566778899aabbccdd" + + exists, err := st.ExistsByInfohash(ctx, ih) + if err != nil || exists { + t.Fatalf("пусто: exists=%v err=%v", exists, err) + } + if _, err := st.CreateDownload(ctx, newDownloading(ih)); err != nil { + t.Fatal(err) + } + exists, err = st.ExistsByInfohash(ctx, ih) + if err != nil || !exists { + t.Fatalf("после вставки: exists=%v err=%v", exists, err) + } + // Терминальное состояние тоже считается «видели» (не реусыновляем). + id, _ := st.CreateDownload(ctx, newDownloading("ffffffffffffffffffffffffffffffffffffffff")) + _ = st.SetDownloadState(ctx, id, StateDone, "", "") + if ex, _ := st.ExistsByInfohash(ctx, "ffffffffffffffffffffffffffffffffffffffff"); !ex { + t.Error("done-задача должна считаться существующей") + } +} + func TestGetCandidate_None(t *testing.T) { st := newTestStore(t) c, err := st.GetCandidate(context.Background(), 999) diff --git a/internal/worker/discover.go b/internal/worker/discover.go new file mode 100644 index 0000000..e8f175c --- /dev/null +++ b/internal/worker/discover.go @@ -0,0 +1,93 @@ +package worker + +import ( + "context" + "strings" + + "git.vakhrushev.me/av/jellybit/internal/qbt" + "git.vakhrushev.me/av/jellybit/internal/store" +) + +// discover усыновляет новые раздачи: для каждого торрента с нашей категорией +// ИЛИ тегом, чьего infohash ещё нет в БД, заводит задачу downloading. Дальше +// её ведёт обычный reconcile. Вызывается под w.mu. +// +// Корректность при гонке с Ingest (другая горутина): Ingest пишет строку в +// БД до добавления в qBit и ставит idempotency_key=infohash, на который есть +// UNIQUE-индекс. Поэтому даже если тик и Ingest столкнутся в окне «проверил → +// вставляю», второй INSERT упадёт на индексе, и adopt просто пропустит. +func (w *Worker) discover(ctx context.Context, torrents []qbt.Torrent) { + for _, t := range torrents { + if w.tracked(t) { + w.adopt(ctx, t) + } + } +} + +// tracked — относится ли торрент к jellybit (категория или тег из конфига). +func (w *Worker) tracked(t qbt.Torrent) bool { + if w.cfg.Category != "" && t.Category == w.cfg.Category { + return true + } + return hasTag(t.Tags, w.cfg.Tag) +} + +// adopt заводит задачу под торрент, если его ещё не видели. +func (w *Worker) adopt(ctx context.Context, t qbt.Torrent) { + infohash := firstInfohash(t) + if infohash == "" { + return // нечем идентифицировать (напр. ещё metaDL без хэша) + } + exists, err := w.store.ExistsByInfohash(ctx, infohash) + if err != nil { + w.log.Warn("discover: exists check failed", "infohash", infohash, "err", err) + return + } + if exists { + return // уже усыновлён ранее (или обработан) — не трогаем + } + + d := &store.Download{ + SourceType: store.SourceMagnet, + SourceRef: "magnet:?xt=urn:btih:" + infohash, + Infohash: store.NullString(infohash), + IdempotencyKey: store.NullString(infohash), + State: store.StateDownloading, + } + id, err := w.store.CreateDownload(ctx, d) + if err != nil { + // Гонка: Ingest/другой тик мог вставить запись между проверкой и + // вставкой — UNIQUE-индекс это отсёк. Если запись появилась, всё ок. + if ex, _ := w.store.ExistsByInfohash(ctx, infohash); ex { + return + } + w.log.Error("discover: adopt failed", "infohash", infohash, "err", err) + return + } + w.log.Info("discover: adopted torrent", + "download_id", id, "infohash", infohash, "name", t.Name, + "category", t.Category, "tags", t.Tags) +} + +// hasTag сообщает, есть ли tag среди списка тегов qBit (через запятую). +func hasTag(tags, tag string) bool { + if tag == "" { + return false + } + for _, x := range strings.Split(tags, ",") { + if strings.TrimSpace(x) == tag { + return true + } + } + return false +} + +// firstInfohash возвращает первый непустой infohash торрента (нижний регистр). +func firstInfohash(t qbt.Torrent) string { + for _, h := range []string{t.Hash, t.InfohashV1, t.InfohashV2} { + if h != "" { + return strings.ToLower(h) + } + } + return "" +} diff --git a/internal/worker/discover_test.go b/internal/worker/discover_test.go new file mode 100644 index 0000000..60a8c16 --- /dev/null +++ b/internal/worker/discover_test.go @@ -0,0 +1,146 @@ +package worker + +import ( + "context" + "testing" + + "git.vakhrushev.me/av/jellybit/internal/qbt" + "git.vakhrushev.me/av/jellybit/internal/store" +) + +const ihDisc = "7931aa3ed6666746012f5739d099b5bc64d72a16" + +func emptyStore() *fakeStore { + return &fakeStore{downloads: map[int64]*store.Download{}} +} + +// findByInfohash возвращает усыновлённую задачу по infohash. +func findByInfohash(st *fakeStore, infohash string) *store.Download { + for _, d := range st.downloads { + if d.Infohash.String == infohash { + return d + } + } + return nil +} + +func TestDiscover_AdoptsByCategory(t *testing.T) { + st := emptyStore() + w := newTestWorker(st, &fakeQbt{}) + w.discover(context.Background(), []qbt.Torrent{ + {Hash: ihDisc, Name: "Avatar", Category: "jellybit", State: "stalledUP"}, + }) + + d := findByInfohash(st, ihDisc) + if d == nil { + t.Fatal("раздача с категорией jellybit не усыновлена") + } + if d.State != store.StateDownloading || d.SourceType != store.SourceMagnet { + t.Errorf("adopted = %+v", d) + } + if d.IdempotencyKey.String != ihDisc { + t.Errorf("idempotency_key = %q", d.IdempotencyKey.String) + } +} + +func TestDiscover_AdoptsByTag(t *testing.T) { + st := emptyStore() + w := newTestWorker(st, &fakeQbt{}) + w.cfg.Tag = "jellybit" + // Категория чужая, но тег наш — усыновляем (не трогая категорию). + w.discover(context.Background(), []qbt.Torrent{ + {Hash: ihDisc, Name: "Fargo", Category: "movies", Tags: "hd, jellybit, rus", State: "uploading"}, + }) + if findByInfohash(st, ihDisc) == nil { + t.Fatal("раздача с тегом jellybit не усыновлена") + } +} + +func TestDiscover_SkipsUntracked(t *testing.T) { + st := emptyStore() + w := newTestWorker(st, &fakeQbt{}) + w.cfg.Tag = "jellybit" + w.discover(context.Background(), []qbt.Torrent{ + {Hash: ihDisc, Category: "movies", Tags: "hd, rus"}, + }) + if len(st.downloads) != 0 { + t.Errorf("чужая раздача не должна усыновляться: %+v", st.downloads) + } +} + +func TestDiscover_SkipsExisting(t *testing.T) { + st := emptyStore() + // Уже есть задача (напр. терминальная done) — не переусыновляем. + st.downloads[1] = &store.Download{ + ID: 1, State: store.StateDone, Infohash: store.NullString(ihDisc), + } + w := newTestWorker(st, &fakeQbt{}) + w.discover(context.Background(), []qbt.Torrent{ + {Hash: ihDisc, Category: "jellybit"}, + }) + if len(st.downloads) != 1 { + t.Errorf("существующий infohash не должен порождать новую задачу: %d", len(st.downloads)) + } +} + +func TestDiscover_SkipsNoInfohash(t *testing.T) { + st := emptyStore() + w := newTestWorker(st, &fakeQbt{}) + w.discover(context.Background(), []qbt.Torrent{{Category: "jellybit"}}) + if len(st.downloads) != 0 { + t.Error("без infohash усыновлять нечего") + } +} + +// TestPoll_AdoptsAndCompletes — сценарий пользователя целиком: помеченная и +// уже скачанная раздача за один тик усыновляется и доходит до completed. +func TestPoll_AdoptsAndCompletes(t *testing.T) { + st := emptyStore() + qb := &fakeQbt{torrents: []qbt.Torrent{ + {Hash: ihDisc, Name: "Avatar", Category: "other", Tags: "jellybit", State: "stalledUP"}, + }} + w := newTestWorker(st, qb) + w.cfg.Tag = "jellybit" + + if err := w.Poll(context.Background()); err != nil { + t.Fatalf("Poll: %v", err) + } + d := findByInfohash(st, ihDisc) + if d == nil { + t.Fatal("не усыновлено") + } + if d.State != store.StateCompleted { + t.Errorf("state = %q, want completed (готовая раздача)", d.State) + } +} + +func TestHasTag(t *testing.T) { + cases := []struct { + tags, tag string + want bool + }{ + {"jellybit", "jellybit", true}, + {"hd, jellybit, rus", "jellybit", true}, + {"hd,rus", "jellybit", false}, + {"jellybit-extra", "jellybit", false}, + {"", "jellybit", false}, + {"jellybit", "", false}, + } + for _, c := range cases { + if got := hasTag(c.tags, c.tag); got != c.want { + t.Errorf("hasTag(%q,%q) = %v, want %v", c.tags, c.tag, got, c.want) + } + } +} + +func TestFirstInfohash(t *testing.T) { + if got := firstInfohash(qbt.Torrent{Hash: "ABC"}); got != "abc" { + t.Errorf("got %q", got) + } + if got := firstInfohash(qbt.Torrent{InfohashV2: "DEF"}); got != "def" { + t.Errorf("got %q", got) + } + if got := firstInfohash(qbt.Torrent{}); got != "" { + t.Errorf("got %q, want empty", got) + } +} diff --git a/internal/worker/review_test.go b/internal/worker/review_test.go index 7dec1d0..3fbb1d1 100644 --- a/internal/worker/review_test.go +++ b/internal/worker/review_test.go @@ -105,6 +105,23 @@ func (m *memStore) ListDownloadsByState(_ context.Context, states ...store.State return out, nil } +func (m *memStore) ExistsByInfohash(_ context.Context, infohash string) (bool, error) { + for _, d := range m.downloads { + if d.Infohash.Valid && d.Infohash.String == infohash { + return true, nil + } + } + return false, nil +} + +func (m *memStore) CreateDownload(_ context.Context, d *store.Download) (int64, error) { + id := int64(len(m.downloads) + 1) + cp := *d + cp.ID = id + m.downloads[id] = &cp + return id, nil +} + func (m *memStore) GetDownload(_ context.Context, id int64) (*store.Download, error) { d, ok := m.downloads[id] if !ok { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 11f8921..9721e77 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -31,6 +31,10 @@ type Store interface { GetDownload(ctx context.Context, id int64) (*store.Download, error) SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error + // Discovery (усыновление раздач по категории/тегу). + ExistsByInfohash(ctx context.Context, infohash string) (bool, error) + CreateDownload(ctx context.Context, d *store.Download) (int64, error) + // Ф3: распознавание, ревью, раскладка. CreateRecognition(ctx context.Context, r *store.Recognition, reasons []string) (int64, error) GetCurrentRecognition(ctx context.Context, downloadID int64) (*store.Recognition, error) @@ -85,6 +89,7 @@ type Notifier interface { // Config — параметры воркера. type Config struct { Category string + Tag string // метка для усыновления существующих раздач (discovery) SavePath string PollInterval time.Duration StuckAfter time.Duration // stalledDL дольше → stuck @@ -158,8 +163,10 @@ func (w *Worker) pollOnce(ctx context.Context) { } // Poll сверяет активные задачи с состоянием qBittorrent и двигает их. +// Листаем все торренты (а не только свою категорию), чтобы reconcile нашёл и +// усыновлённые по тегу раздачи, а discovery — увидел новые. func (w *Worker) Poll(ctx context.Context) error { - torrents, err := w.qbt.Torrents(ctx, w.cfg.Category) + torrents, err := w.qbt.Torrents(ctx, "") if err != nil { return fmt.Errorf("poll: list torrents: %w", err) } @@ -175,6 +182,9 @@ func (w *Worker) Poll(ctx context.Context) error { w.mu.Lock() defer w.mu.Unlock() + // Усыновляем новые раздачи с нашей категорией/тегом до reconcile. + w.discover(ctx, torrents) + active, err := w.store.ListDownloadsByState(ctx, store.StateDownloading) if err != nil { return fmt.Errorf("poll: list active: %w", err) diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index 6119649..d8d2daa 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -51,6 +51,23 @@ func (f *fakeStore) GetDownload(_ context.Context, id int64) (*store.Download, e return &cp, nil } +func (f *fakeStore) ExistsByInfohash(_ context.Context, infohash string) (bool, error) { + for _, d := range f.downloads { + if d.Infohash.Valid && d.Infohash.String == infohash { + return true, nil + } + } + return false, nil +} + +func (f *fakeStore) CreateDownload(_ context.Context, d *store.Download) (int64, error) { + id := int64(len(f.downloads) + 1) + cp := *d + cp.ID = id + f.downloads[id] = &cp + return id, nil +} + func (f *fakeStore) SetDownloadState(_ context.Context, id int64, st store.State, code, msg string) error { d, ok := f.downloads[id] if !ok {