4e077d878e
категории
94 lines
3.4 KiB
Go
94 lines
3.4 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
|
|
"git.vakhrushev.me/av/jellybit/internal/qbt"
|
|
"git.vakhrushev.me/av/jellybit/internal/store"
|
|
)
|
|
|
|
// discover усыновляет новые раздачи: для каждого торрента с нашей категорией
|
|
// ИЛИ тегом, чьего infohash ещё нет в БД, заводит задачу downloading. Дальше
|
|
// её ведёт обычный reconcile. Вызывается под w.mu.
|
|
//
|
|
// Корректность при гонке с Ingest (другая горутина): Ingest пишет строку в
|
|
// БД до добавления в qBit и ставит idempotency_key=infohash, на который есть
|
|
// UNIQUE-индекс. Поэтому даже если тик и Ingest столкнутся в окне «проверил →
|
|
// вставляю», второй INSERT упадёт на индексе, и adopt просто пропустит.
|
|
func (w *Worker) discover(ctx context.Context, torrents []qbt.Torrent) {
|
|
for _, t := range torrents {
|
|
if w.tracked(t) {
|
|
w.adopt(ctx, t)
|
|
}
|
|
}
|
|
}
|
|
|
|
// tracked — относится ли торрент к jellybit (категория или тег из конфига).
|
|
func (w *Worker) tracked(t qbt.Torrent) bool {
|
|
if w.cfg.Category != "" && t.Category == w.cfg.Category {
|
|
return true
|
|
}
|
|
return hasTag(t.Tags, w.cfg.Tag)
|
|
}
|
|
|
|
// adopt заводит задачу под торрент, если его ещё не видели.
|
|
func (w *Worker) adopt(ctx context.Context, t qbt.Torrent) {
|
|
infohash := firstInfohash(t)
|
|
if infohash == "" {
|
|
return // нечем идентифицировать (напр. ещё metaDL без хэша)
|
|
}
|
|
exists, err := w.store.ExistsByInfohash(ctx, infohash)
|
|
if err != nil {
|
|
w.log.Warn("discover: exists check failed", "infohash", infohash, "err", err)
|
|
return
|
|
}
|
|
if exists {
|
|
return // уже усыновлён ранее (или обработан) — не трогаем
|
|
}
|
|
|
|
d := &store.Download{
|
|
SourceType: store.SourceMagnet,
|
|
SourceRef: "magnet:?xt=urn:btih:" + infohash,
|
|
Infohash: store.NullString(infohash),
|
|
IdempotencyKey: store.NullString(infohash),
|
|
State: store.StateDownloading,
|
|
}
|
|
id, err := w.store.CreateDownload(ctx, d)
|
|
if err != nil {
|
|
// Гонка: Ingest/другой тик мог вставить запись между проверкой и
|
|
// вставкой — UNIQUE-индекс это отсёк. Если запись появилась, всё ок.
|
|
if ex, _ := w.store.ExistsByInfohash(ctx, infohash); ex {
|
|
return
|
|
}
|
|
w.log.Error("discover: adopt failed", "infohash", infohash, "err", err)
|
|
return
|
|
}
|
|
w.log.Info("discover: adopted torrent",
|
|
"download_id", id, "infohash", infohash, "name", t.Name,
|
|
"category", t.Category, "tags", t.Tags)
|
|
}
|
|
|
|
// hasTag сообщает, есть ли tag среди списка тегов qBit (через запятую).
|
|
func hasTag(tags, tag string) bool {
|
|
if tag == "" {
|
|
return false
|
|
}
|
|
for _, x := range strings.Split(tags, ",") {
|
|
if strings.TrimSpace(x) == tag {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// firstInfohash возвращает первый непустой infohash торрента (нижний регистр).
|
|
func firstInfohash(t qbt.Torrent) string {
|
|
for _, h := range []string{t.Hash, t.InfohashV1, t.InfohashV2} {
|
|
if h != "" {
|
|
return strings.ToLower(h)
|
|
}
|
|
}
|
|
return ""
|
|
}
|