372 lines
15 KiB
Go
372 lines
15 KiB
Go
// Package worker — владелец машины состояний. Поллит qBittorrent по
|
||
// категории, переводит задачи между состояниями и сериализует команды
|
||
// транспортов (cancel/retry), чтобы два транспорта не гонялись за одно
|
||
// состояние.
|
||
//
|
||
// Ф1 ведёт задачу downloading → completed, плюс stuck/failed по таймаутам и
|
||
// ошибкам qBittorrent. Ф3 продолжает: completed → recognizing (вызов
|
||
// recognize) → review; команды ревью (apply/refine/reject/defer/undo,
|
||
// переключение типа, пометка «игнор») раскладывают файлы хардлинками через
|
||
// layout. Распознавание зовётся в поллинг-цикле, команды — из транспортов;
|
||
// всё под per-download блокировкой w.mu.
|
||
package worker
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log/slog"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"git.vakhrushev.me/av/jellybit/internal/layout"
|
||
"git.vakhrushev.me/av/jellybit/internal/qbt"
|
||
"git.vakhrushev.me/av/jellybit/internal/recognize"
|
||
"git.vakhrushev.me/av/jellybit/internal/store"
|
||
)
|
||
|
||
// Store — нужная worker часть хранилища.
|
||
type Store interface {
|
||
ListDownloadsByState(ctx context.Context, states ...store.State) ([]store.Download, error)
|
||
GetDownload(ctx context.Context, id int64) (*store.Download, error)
|
||
SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error
|
||
|
||
// Discovery (усыновление раздач по категории/тегу).
|
||
ExistsByInfohash(ctx context.Context, infohash string) (bool, error)
|
||
CreateDownload(ctx context.Context, d *store.Download) (int64, error)
|
||
|
||
// Ф3: распознавание, ревью, раскладка.
|
||
CreateRecognition(ctx context.Context, r *store.Recognition, reasons []string) (int64, error)
|
||
GetCurrentRecognition(ctx context.Context, downloadID int64) (*store.Recognition, error)
|
||
AddHint(ctx context.Context, downloadID int64, text string) error
|
||
ListHints(ctx context.Context, downloadID int64) ([]string, error)
|
||
SetOverride(ctx context.Context, downloadID int64, field, value string) error
|
||
ListOverrides(ctx context.Context, downloadID int64) (map[string]string, error)
|
||
CreateFileLinks(ctx context.Context, links []store.FileLink) error
|
||
LatestBatchID(ctx context.Context, downloadID int64) (string, error)
|
||
ListFileLinksByBatch(ctx context.Context, batchID string) ([]store.FileLink, error)
|
||
DeleteFileLinksByBatch(ctx context.Context, batchID string) error
|
||
|
||
// Кандидаты базы метаданных (ручной выбор в review).
|
||
CreateCandidates(ctx context.Context, cands []store.MetadataCandidate) error
|
||
ListCandidatesByRecognition(ctx context.Context, recognitionID int64) ([]store.MetadataCandidate, error)
|
||
GetCandidate(ctx context.Context, id int64) (*store.MetadataCandidate, error)
|
||
SetCandidateChosen(ctx context.Context, recognitionID, candidateID int64) error
|
||
}
|
||
|
||
// QBittorrent — нужная worker часть клиента qBittorrent.
|
||
type QBittorrent interface {
|
||
Torrents(ctx context.Context, category string) ([]qbt.Torrent, error)
|
||
Add(ctx context.Context, ar qbt.AddRequest) error
|
||
Files(ctx context.Context, hash string) ([]qbt.File, error)
|
||
}
|
||
|
||
// Recognizer — распознаватель (recognize.Recognizer).
|
||
type Recognizer interface {
|
||
Recognize(ctx context.Context, in recognize.Input) (recognize.Result, error)
|
||
}
|
||
|
||
// Layouter — раскладчик хардлинками (layout.Layouter).
|
||
type Layouter interface {
|
||
BuildLinks(p layout.Plan) ([]layout.Link, error)
|
||
Apply(ctx context.Context, links []layout.Link) ([]layout.Result, error)
|
||
Undo(ctx context.Context, links []layout.Link) (int, error)
|
||
}
|
||
|
||
// NotifyEvent — повод позвать пользователя.
|
||
type NotifyEvent string
|
||
|
||
const (
|
||
EventReview NotifyEvent = "review" // задача ждёт подтверждения
|
||
EventDone NotifyEvent = "done" // раскладка завершена
|
||
)
|
||
|
||
// Notifier — исходящие пинги (Telegram). Вызывается неблокирующе.
|
||
type Notifier interface {
|
||
Notify(ctx context.Context, downloadID int64, event NotifyEvent)
|
||
}
|
||
|
||
// Scanner — триггер пересканирования медиатеки Jellyfin. Вызывается
|
||
// неблокирующе после успешной раскладки, чтобы новые файлы быстрее появились
|
||
// в проигрывателе.
|
||
type Scanner interface {
|
||
RefreshLibraries(ctx context.Context) error
|
||
}
|
||
|
||
// Config — параметры воркера.
|
||
type Config struct {
|
||
Category string
|
||
Tag string // метка для усыновления существующих раздач (discovery)
|
||
SavePath string
|
||
PathMap map[string]string // трансляция save_path qBit → хост-путь (обычно пусто)
|
||
PollInterval time.Duration
|
||
StuckAfter time.Duration // stalledDL дольше → stuck
|
||
MagnetTimeout time.Duration // metaDL дольше → failed
|
||
}
|
||
|
||
// Worker — поллер и владелец переходов.
|
||
type Worker struct {
|
||
store Store
|
||
qbt QBittorrent
|
||
recognizer Recognizer
|
||
layouter Layouter
|
||
cfg Config
|
||
log *slog.Logger
|
||
|
||
mu sync.Mutex // сериализует переходы (поллинг + команды)
|
||
now func() time.Time // подменяется в тестах
|
||
newID func() string // генератор apply_batch_id (подменяется в тестах)
|
||
notifier Notifier // опц. исходящие пинги
|
||
scanner Scanner // опц. пересканирование Jellyfin
|
||
}
|
||
|
||
// SetNotifier подключает исходящие пинги (до запуска Run).
|
||
func (w *Worker) SetNotifier(n Notifier) { w.notifier = n }
|
||
|
||
// SetScanner подключает пересканирование Jellyfin (до запуска Run).
|
||
func (w *Worker) SetScanner(s Scanner) { w.scanner = s }
|
||
|
||
// New собирает воркер. recognizer/layouter могут быть nil (Ф1 без Ф3-ступеней
|
||
// распознавания и раскладки) — тогда completed-задачи не двигаются дальше.
|
||
func New(st Store, qb QBittorrent, rec Recognizer, lay Layouter, cfg Config, log *slog.Logger) *Worker {
|
||
return &Worker{
|
||
store: st,
|
||
qbt: qb,
|
||
recognizer: rec,
|
||
layouter: lay,
|
||
cfg: cfg,
|
||
log: log,
|
||
now: time.Now,
|
||
newID: defaultBatchID,
|
||
}
|
||
}
|
||
|
||
// defaultBatchID — уникальный идентификатор батча раскладки.
|
||
func defaultBatchID() string {
|
||
return fmt.Sprintf("b-%d", time.Now().UnixNano())
|
||
}
|
||
|
||
// Run крутит цикл поллинга до отмены ctx.
|
||
func (w *Worker) Run(ctx context.Context) {
|
||
w.log.Info("worker started", "poll_interval", w.cfg.PollInterval, "category", w.cfg.Category)
|
||
t := time.NewTicker(w.cfg.PollInterval)
|
||
defer t.Stop()
|
||
|
||
w.pollOnce(ctx)
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
w.log.Info("worker stopped")
|
||
return
|
||
case <-t.C:
|
||
w.pollOnce(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (w *Worker) pollOnce(ctx context.Context) {
|
||
if err := w.Poll(ctx); err != nil {
|
||
w.log.Warn("poll failed", "err", err)
|
||
}
|
||
// Ф3: распознаём завершённые загрузки (и перезапускаем по подсказке).
|
||
if w.recognizer != nil {
|
||
w.recognizePending(ctx)
|
||
}
|
||
}
|
||
|
||
// Poll сверяет активные задачи с состоянием qBittorrent и двигает их.
|
||
// Листаем все торренты (а не только свою категорию), чтобы reconcile нашёл и
|
||
// усыновлённые по тегу раздачи, а discovery — увидел новые.
|
||
func (w *Worker) Poll(ctx context.Context) error {
|
||
torrents, err := w.qbt.Torrents(ctx, "")
|
||
if err != nil {
|
||
return fmt.Errorf("poll: list torrents: %w", err)
|
||
}
|
||
byHash := make(map[string]qbt.Torrent, len(torrents)*2)
|
||
for _, t := range torrents {
|
||
for _, h := range []string{t.Hash, t.InfohashV1, t.InfohashV2} {
|
||
if h != "" {
|
||
byHash[strings.ToLower(h)] = t
|
||
}
|
||
}
|
||
}
|
||
|
||
w.mu.Lock()
|
||
defer w.mu.Unlock()
|
||
|
||
// Усыновляем новые раздачи с нашей категорией/тегом до reconcile.
|
||
w.discover(ctx, torrents)
|
||
|
||
active, err := w.store.ListDownloadsByState(ctx, store.StateDownloading)
|
||
if err != nil {
|
||
return fmt.Errorf("poll: list active: %w", err)
|
||
}
|
||
for _, d := range active {
|
||
if !d.Infohash.Valid {
|
||
continue // нечем сопоставить (в Ф1 не случается: magnet всегда с infohash)
|
||
}
|
||
t, ok := byHash[strings.ToLower(d.Infohash.String)]
|
||
if !ok {
|
||
w.log.Warn("active download not found in qbittorrent",
|
||
"download_id", d.ID, "infohash", d.Infohash.String)
|
||
continue
|
||
}
|
||
w.reconcile(ctx, d, t)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// reconcile двигает одну задачу по состоянию её торрента. Вызывается под
|
||
// w.mu.
|
||
func (w *Worker) reconcile(ctx context.Context, d store.Download, t qbt.Torrent) {
|
||
switch classify(t.State) {
|
||
case classReady:
|
||
w.transition(ctx, d, store.StateCompleted, "", "")
|
||
case classErrored:
|
||
w.transition(ctx, d, store.StateFailed, "qbit_error", "qBittorrent state: "+t.State)
|
||
case classDownloading:
|
||
w.checkTimeouts(ctx, d, t)
|
||
case classBusy:
|
||
// moving/checking — ждём, файлы ещё не на финальном месте.
|
||
}
|
||
}
|
||
|
||
// checkTimeouts помечает зависшие задачи. Возраст считаем от created_at:
|
||
// для metaDL это время с момента добавления (огрублённо, но достаточно).
|
||
func (w *Worker) checkTimeouts(ctx context.Context, d store.Download, t qbt.Torrent) {
|
||
created, err := d.CreatedTime()
|
||
if err != nil {
|
||
w.log.Warn("cannot parse created_at", "download_id", d.ID, "value", d.CreatedAt, "err", err)
|
||
return
|
||
}
|
||
age := w.now().Sub(created)
|
||
|
||
switch {
|
||
case isMeta(t.State) && w.cfg.MagnetTimeout > 0 && age > w.cfg.MagnetTimeout:
|
||
w.transition(ctx, d, store.StateFailed, "magnet_timeout",
|
||
fmt.Sprintf("no metadata after %s", age.Truncate(time.Second)))
|
||
case isStalledDL(t.State) && w.cfg.StuckAfter > 0 && age > w.cfg.StuckAfter:
|
||
w.transition(ctx, d, store.StateStuck, "stalled",
|
||
fmt.Sprintf("stalled for %s", age.Truncate(time.Second)))
|
||
}
|
||
}
|
||
|
||
// transition пишет новое состояние и логирует переход.
|
||
func (w *Worker) transition(ctx context.Context, d store.Download, state store.State, code, msg string) {
|
||
if err := w.store.SetDownloadState(ctx, d.ID, state, code, msg); err != nil {
|
||
w.log.Error("state transition failed",
|
||
"download_id", d.ID, "from", d.State, "to", state, "err", err)
|
||
return
|
||
}
|
||
w.log.Info("state transition",
|
||
"download_id", d.ID, "from", d.State, "to", state, "code", code)
|
||
|
||
// Пинги — неблокирующе и в отдельном контексте: вызов уходит в сеть, а
|
||
// мы под w.mu (Notify читает состояние уже после освобождения замка).
|
||
if w.notifier != nil {
|
||
switch state {
|
||
case store.StateReview:
|
||
go w.notifier.Notify(context.Background(), d.ID, EventReview)
|
||
case store.StateDone:
|
||
go w.notifier.Notify(context.Background(), d.ID, EventDone)
|
||
}
|
||
}
|
||
|
||
// Раскладка завершена — просим Jellyfin пересканировать библиотеку, чтобы
|
||
// новые файлы быстрее появились в проигрывателе. Тоже неблокирующе и вне
|
||
// w.mu; недоступность Jellyfin не влияет на состояние задачи.
|
||
if w.scanner != nil && state == store.StateDone {
|
||
id := d.ID
|
||
go func() {
|
||
if err := w.scanner.RefreshLibraries(context.Background()); err != nil {
|
||
w.log.Warn("jellyfin: library refresh failed", "download_id", id, "err", err)
|
||
}
|
||
}()
|
||
}
|
||
}
|
||
|
||
// Cancel отклоняет задачу. Торрент в qBittorrent не трогаем — он продолжает
|
||
// раздачу (источник неприкосновенен).
|
||
func (w *Worker) Cancel(ctx context.Context, id int64) error {
|
||
w.mu.Lock()
|
||
defer w.mu.Unlock()
|
||
|
||
d, err := w.store.GetDownload(ctx, id)
|
||
if err != nil {
|
||
return fmt.Errorf("cancel: %w", err)
|
||
}
|
||
if d.State.IsTerminal() {
|
||
return fmt.Errorf("cancel: download %d is already terminal (%s)", id, d.State)
|
||
}
|
||
if err := w.store.SetDownloadState(ctx, id, store.StateCancelled, "", ""); err != nil {
|
||
return fmt.Errorf("cancel: %w", err)
|
||
}
|
||
w.log.Info("download cancelled", "download_id", id, "from", d.State)
|
||
return nil
|
||
}
|
||
|
||
// Retry повторяет застрявшую/упавшую задачу: заново отдаёт источник в
|
||
// qBittorrent и возвращает в downloading.
|
||
func (w *Worker) Retry(ctx context.Context, id int64) error {
|
||
w.mu.Lock()
|
||
defer w.mu.Unlock()
|
||
|
||
d, err := w.store.GetDownload(ctx, id)
|
||
if err != nil {
|
||
return fmt.Errorf("retry: %w", err)
|
||
}
|
||
if d.State != store.StateFailed && d.State != store.StateStuck {
|
||
return fmt.Errorf("retry: download %d is %s, only failed/stuck are retriable", id, d.State)
|
||
}
|
||
if d.SourceType == store.SourceMagnet {
|
||
if err := w.qbt.Add(ctx, qbt.AddRequest{
|
||
URLs: []string{d.SourceRef},
|
||
Category: w.cfg.Category,
|
||
SavePath: w.cfg.SavePath,
|
||
}); err != nil {
|
||
return fmt.Errorf("retry: add to qbittorrent: %w", err)
|
||
}
|
||
}
|
||
if err := w.store.SetDownloadState(ctx, id, store.StateDownloading, "", ""); err != nil {
|
||
return fmt.Errorf("retry: %w", err)
|
||
}
|
||
w.log.Info("download retried", "download_id", id, "from", d.State)
|
||
return nil
|
||
}
|
||
|
||
// class — класс состояния торрента qBittorrent.
|
||
type class int
|
||
|
||
const (
|
||
classDownloading class = iota // ещё качается
|
||
classReady // готов к раскладке
|
||
classErrored // ошибка
|
||
classBusy // moving/checking — переходный момент, ждём
|
||
)
|
||
|
||
// classify относит состояние qBittorrent к классу (см. architecture.md,
|
||
// «Завершение в qBittorrent»). Учитываем и v5-имена (stopped* вместо
|
||
// paused*).
|
||
func classify(state string) class {
|
||
switch state {
|
||
case "uploading", "stalledUP", "pausedUP", "stoppedUP", "queuedUP", "forcedUP":
|
||
return classReady
|
||
case "error", "missingFiles":
|
||
return classErrored
|
||
case "moving", "checkingUP", "checkingResumeData", "allocating":
|
||
return classBusy
|
||
default:
|
||
// downloading, stalledDL, metaDL, forcedMetaDL, queuedDL, checkingDL,
|
||
// forcedDL, pausedDL, stoppedDL, unknown — считаем «ещё качается».
|
||
return classDownloading
|
||
}
|
||
}
|
||
|
||
func isMeta(state string) bool {
|
||
return state == "metaDL" || state == "forcedMetaDL"
|
||
}
|
||
|
||
func isStalledDL(state string) bool {
|
||
return state == "stalledDL"
|
||
}
|