Добавил "усыновление" существующих торрентов при добавлении тега или
категории
This commit is contained in:
@@ -0,0 +1,93 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"git.vakhrushev.me/av/jellybit/internal/qbt"
|
||||
"git.vakhrushev.me/av/jellybit/internal/store"
|
||||
)
|
||||
|
||||
// discover усыновляет новые раздачи: для каждого торрента с нашей категорией
|
||||
// ИЛИ тегом, чьего infohash ещё нет в БД, заводит задачу downloading. Дальше
|
||||
// её ведёт обычный reconcile. Вызывается под w.mu.
|
||||
//
|
||||
// Корректность при гонке с Ingest (другая горутина): Ingest пишет строку в
|
||||
// БД до добавления в qBit и ставит idempotency_key=infohash, на который есть
|
||||
// UNIQUE-индекс. Поэтому даже если тик и Ingest столкнутся в окне «проверил →
|
||||
// вставляю», второй INSERT упадёт на индексе, и adopt просто пропустит.
|
||||
func (w *Worker) discover(ctx context.Context, torrents []qbt.Torrent) {
|
||||
for _, t := range torrents {
|
||||
if w.tracked(t) {
|
||||
w.adopt(ctx, t)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// tracked — относится ли торрент к jellybit (категория или тег из конфига).
|
||||
func (w *Worker) tracked(t qbt.Torrent) bool {
|
||||
if w.cfg.Category != "" && t.Category == w.cfg.Category {
|
||||
return true
|
||||
}
|
||||
return hasTag(t.Tags, w.cfg.Tag)
|
||||
}
|
||||
|
||||
// adopt заводит задачу под торрент, если его ещё не видели.
|
||||
func (w *Worker) adopt(ctx context.Context, t qbt.Torrent) {
|
||||
infohash := firstInfohash(t)
|
||||
if infohash == "" {
|
||||
return // нечем идентифицировать (напр. ещё metaDL без хэша)
|
||||
}
|
||||
exists, err := w.store.ExistsByInfohash(ctx, infohash)
|
||||
if err != nil {
|
||||
w.log.Warn("discover: exists check failed", "infohash", infohash, "err", err)
|
||||
return
|
||||
}
|
||||
if exists {
|
||||
return // уже усыновлён ранее (или обработан) — не трогаем
|
||||
}
|
||||
|
||||
d := &store.Download{
|
||||
SourceType: store.SourceMagnet,
|
||||
SourceRef: "magnet:?xt=urn:btih:" + infohash,
|
||||
Infohash: store.NullString(infohash),
|
||||
IdempotencyKey: store.NullString(infohash),
|
||||
State: store.StateDownloading,
|
||||
}
|
||||
id, err := w.store.CreateDownload(ctx, d)
|
||||
if err != nil {
|
||||
// Гонка: Ingest/другой тик мог вставить запись между проверкой и
|
||||
// вставкой — UNIQUE-индекс это отсёк. Если запись появилась, всё ок.
|
||||
if ex, _ := w.store.ExistsByInfohash(ctx, infohash); ex {
|
||||
return
|
||||
}
|
||||
w.log.Error("discover: adopt failed", "infohash", infohash, "err", err)
|
||||
return
|
||||
}
|
||||
w.log.Info("discover: adopted torrent",
|
||||
"download_id", id, "infohash", infohash, "name", t.Name,
|
||||
"category", t.Category, "tags", t.Tags)
|
||||
}
|
||||
|
||||
// hasTag сообщает, есть ли tag среди списка тегов qBit (через запятую).
|
||||
func hasTag(tags, tag string) bool {
|
||||
if tag == "" {
|
||||
return false
|
||||
}
|
||||
for _, x := range strings.Split(tags, ",") {
|
||||
if strings.TrimSpace(x) == tag {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// firstInfohash возвращает первый непустой infohash торрента (нижний регистр).
|
||||
func firstInfohash(t qbt.Torrent) string {
|
||||
for _, h := range []string{t.Hash, t.InfohashV1, t.InfohashV2} {
|
||||
if h != "" {
|
||||
return strings.ToLower(h)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -0,0 +1,146 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.vakhrushev.me/av/jellybit/internal/qbt"
|
||||
"git.vakhrushev.me/av/jellybit/internal/store"
|
||||
)
|
||||
|
||||
const ihDisc = "7931aa3ed6666746012f5739d099b5bc64d72a16"
|
||||
|
||||
func emptyStore() *fakeStore {
|
||||
return &fakeStore{downloads: map[int64]*store.Download{}}
|
||||
}
|
||||
|
||||
// findByInfohash возвращает усыновлённую задачу по infohash.
|
||||
func findByInfohash(st *fakeStore, infohash string) *store.Download {
|
||||
for _, d := range st.downloads {
|
||||
if d.Infohash.String == infohash {
|
||||
return d
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDiscover_AdoptsByCategory(t *testing.T) {
|
||||
st := emptyStore()
|
||||
w := newTestWorker(st, &fakeQbt{})
|
||||
w.discover(context.Background(), []qbt.Torrent{
|
||||
{Hash: ihDisc, Name: "Avatar", Category: "jellybit", State: "stalledUP"},
|
||||
})
|
||||
|
||||
d := findByInfohash(st, ihDisc)
|
||||
if d == nil {
|
||||
t.Fatal("раздача с категорией jellybit не усыновлена")
|
||||
}
|
||||
if d.State != store.StateDownloading || d.SourceType != store.SourceMagnet {
|
||||
t.Errorf("adopted = %+v", d)
|
||||
}
|
||||
if d.IdempotencyKey.String != ihDisc {
|
||||
t.Errorf("idempotency_key = %q", d.IdempotencyKey.String)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDiscover_AdoptsByTag(t *testing.T) {
|
||||
st := emptyStore()
|
||||
w := newTestWorker(st, &fakeQbt{})
|
||||
w.cfg.Tag = "jellybit"
|
||||
// Категория чужая, но тег наш — усыновляем (не трогая категорию).
|
||||
w.discover(context.Background(), []qbt.Torrent{
|
||||
{Hash: ihDisc, Name: "Fargo", Category: "movies", Tags: "hd, jellybit, rus", State: "uploading"},
|
||||
})
|
||||
if findByInfohash(st, ihDisc) == nil {
|
||||
t.Fatal("раздача с тегом jellybit не усыновлена")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDiscover_SkipsUntracked(t *testing.T) {
|
||||
st := emptyStore()
|
||||
w := newTestWorker(st, &fakeQbt{})
|
||||
w.cfg.Tag = "jellybit"
|
||||
w.discover(context.Background(), []qbt.Torrent{
|
||||
{Hash: ihDisc, Category: "movies", Tags: "hd, rus"},
|
||||
})
|
||||
if len(st.downloads) != 0 {
|
||||
t.Errorf("чужая раздача не должна усыновляться: %+v", st.downloads)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDiscover_SkipsExisting(t *testing.T) {
|
||||
st := emptyStore()
|
||||
// Уже есть задача (напр. терминальная done) — не переусыновляем.
|
||||
st.downloads[1] = &store.Download{
|
||||
ID: 1, State: store.StateDone, Infohash: store.NullString(ihDisc),
|
||||
}
|
||||
w := newTestWorker(st, &fakeQbt{})
|
||||
w.discover(context.Background(), []qbt.Torrent{
|
||||
{Hash: ihDisc, Category: "jellybit"},
|
||||
})
|
||||
if len(st.downloads) != 1 {
|
||||
t.Errorf("существующий infohash не должен порождать новую задачу: %d", len(st.downloads))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDiscover_SkipsNoInfohash(t *testing.T) {
|
||||
st := emptyStore()
|
||||
w := newTestWorker(st, &fakeQbt{})
|
||||
w.discover(context.Background(), []qbt.Torrent{{Category: "jellybit"}})
|
||||
if len(st.downloads) != 0 {
|
||||
t.Error("без infohash усыновлять нечего")
|
||||
}
|
||||
}
|
||||
|
||||
// TestPoll_AdoptsAndCompletes — сценарий пользователя целиком: помеченная и
|
||||
// уже скачанная раздача за один тик усыновляется и доходит до completed.
|
||||
func TestPoll_AdoptsAndCompletes(t *testing.T) {
|
||||
st := emptyStore()
|
||||
qb := &fakeQbt{torrents: []qbt.Torrent{
|
||||
{Hash: ihDisc, Name: "Avatar", Category: "other", Tags: "jellybit", State: "stalledUP"},
|
||||
}}
|
||||
w := newTestWorker(st, qb)
|
||||
w.cfg.Tag = "jellybit"
|
||||
|
||||
if err := w.Poll(context.Background()); err != nil {
|
||||
t.Fatalf("Poll: %v", err)
|
||||
}
|
||||
d := findByInfohash(st, ihDisc)
|
||||
if d == nil {
|
||||
t.Fatal("не усыновлено")
|
||||
}
|
||||
if d.State != store.StateCompleted {
|
||||
t.Errorf("state = %q, want completed (готовая раздача)", d.State)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHasTag(t *testing.T) {
|
||||
cases := []struct {
|
||||
tags, tag string
|
||||
want bool
|
||||
}{
|
||||
{"jellybit", "jellybit", true},
|
||||
{"hd, jellybit, rus", "jellybit", true},
|
||||
{"hd,rus", "jellybit", false},
|
||||
{"jellybit-extra", "jellybit", false},
|
||||
{"", "jellybit", false},
|
||||
{"jellybit", "", false},
|
||||
}
|
||||
for _, c := range cases {
|
||||
if got := hasTag(c.tags, c.tag); got != c.want {
|
||||
t.Errorf("hasTag(%q,%q) = %v, want %v", c.tags, c.tag, got, c.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFirstInfohash(t *testing.T) {
|
||||
if got := firstInfohash(qbt.Torrent{Hash: "ABC"}); got != "abc" {
|
||||
t.Errorf("got %q", got)
|
||||
}
|
||||
if got := firstInfohash(qbt.Torrent{InfohashV2: "DEF"}); got != "def" {
|
||||
t.Errorf("got %q", got)
|
||||
}
|
||||
if got := firstInfohash(qbt.Torrent{}); got != "" {
|
||||
t.Errorf("got %q, want empty", got)
|
||||
}
|
||||
}
|
||||
@@ -105,6 +105,23 @@ func (m *memStore) ListDownloadsByState(_ context.Context, states ...store.State
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (m *memStore) ExistsByInfohash(_ context.Context, infohash string) (bool, error) {
|
||||
for _, d := range m.downloads {
|
||||
if d.Infohash.Valid && d.Infohash.String == infohash {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (m *memStore) CreateDownload(_ context.Context, d *store.Download) (int64, error) {
|
||||
id := int64(len(m.downloads) + 1)
|
||||
cp := *d
|
||||
cp.ID = id
|
||||
m.downloads[id] = &cp
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (m *memStore) GetDownload(_ context.Context, id int64) (*store.Download, error) {
|
||||
d, ok := m.downloads[id]
|
||||
if !ok {
|
||||
|
||||
@@ -31,6 +31,10 @@ type Store interface {
|
||||
GetDownload(ctx context.Context, id int64) (*store.Download, error)
|
||||
SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error
|
||||
|
||||
// Discovery (усыновление раздач по категории/тегу).
|
||||
ExistsByInfohash(ctx context.Context, infohash string) (bool, error)
|
||||
CreateDownload(ctx context.Context, d *store.Download) (int64, error)
|
||||
|
||||
// Ф3: распознавание, ревью, раскладка.
|
||||
CreateRecognition(ctx context.Context, r *store.Recognition, reasons []string) (int64, error)
|
||||
GetCurrentRecognition(ctx context.Context, downloadID int64) (*store.Recognition, error)
|
||||
@@ -85,6 +89,7 @@ type Notifier interface {
|
||||
// Config — параметры воркера.
|
||||
type Config struct {
|
||||
Category string
|
||||
Tag string // метка для усыновления существующих раздач (discovery)
|
||||
SavePath string
|
||||
PollInterval time.Duration
|
||||
StuckAfter time.Duration // stalledDL дольше → stuck
|
||||
@@ -158,8 +163,10 @@ func (w *Worker) pollOnce(ctx context.Context) {
|
||||
}
|
||||
|
||||
// Poll сверяет активные задачи с состоянием qBittorrent и двигает их.
|
||||
// Листаем все торренты (а не только свою категорию), чтобы reconcile нашёл и
|
||||
// усыновлённые по тегу раздачи, а discovery — увидел новые.
|
||||
func (w *Worker) Poll(ctx context.Context) error {
|
||||
torrents, err := w.qbt.Torrents(ctx, w.cfg.Category)
|
||||
torrents, err := w.qbt.Torrents(ctx, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("poll: list torrents: %w", err)
|
||||
}
|
||||
@@ -175,6 +182,9 @@ func (w *Worker) Poll(ctx context.Context) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
// Усыновляем новые раздачи с нашей категорией/тегом до reconcile.
|
||||
w.discover(ctx, torrents)
|
||||
|
||||
active, err := w.store.ListDownloadsByState(ctx, store.StateDownloading)
|
||||
if err != nil {
|
||||
return fmt.Errorf("poll: list active: %w", err)
|
||||
|
||||
@@ -51,6 +51,23 @@ func (f *fakeStore) GetDownload(_ context.Context, id int64) (*store.Download, e
|
||||
return &cp, nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) ExistsByInfohash(_ context.Context, infohash string) (bool, error) {
|
||||
for _, d := range f.downloads {
|
||||
if d.Infohash.Valid && d.Infohash.String == infohash {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) CreateDownload(_ context.Context, d *store.Download) (int64, error) {
|
||||
id := int64(len(f.downloads) + 1)
|
||||
cp := *d
|
||||
cp.ID = id
|
||||
f.downloads[id] = &cp
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (f *fakeStore) SetDownloadState(_ context.Context, id int64, st store.State, code, msg string) error {
|
||||
d, ok := f.downloads[id]
|
||||
if !ok {
|
||||
|
||||
Reference in New Issue
Block a user