Files
jellybit/internal/worker/worker.go
T

338 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package worker — владелец машины состояний. Поллит qBittorrent по
// категории, переводит задачи между состояниями и сериализует команды
// транспортов (cancel/retry), чтобы два транспорта не гонялись за одно
// состояние.
//
// Ф1 ведёт задачу downloading → completed, плюс stuck/failed по таймаутам и
// ошибкам qBittorrent. Ф3 продолжает: completed → recognizing (вызов
// recognize) → review; команды ревью (apply/refine/reject/defer/undo,
// переключение типа, пометка «игнор») раскладывают файлы хардлинками через
// layout. Распознавание зовётся в поллинг-цикле, команды — из транспортов;
// всё под per-download блокировкой w.mu.
package worker
import (
"context"
"fmt"
"log/slog"
"strings"
"sync"
"time"
"git.vakhrushev.me/av/jellybit/internal/layout"
"git.vakhrushev.me/av/jellybit/internal/qbt"
"git.vakhrushev.me/av/jellybit/internal/recognize"
"git.vakhrushev.me/av/jellybit/internal/store"
)
// Store — нужная worker часть хранилища.
type Store interface {
ListDownloadsByState(ctx context.Context, states ...store.State) ([]store.Download, error)
GetDownload(ctx context.Context, id int64) (*store.Download, error)
SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error
// Ф3: распознавание, ревью, раскладка.
CreateRecognition(ctx context.Context, r *store.Recognition, reasons []string) (int64, error)
GetCurrentRecognition(ctx context.Context, downloadID int64) (*store.Recognition, error)
AddHint(ctx context.Context, downloadID int64, text string) error
ListHints(ctx context.Context, downloadID int64) ([]string, error)
SetOverride(ctx context.Context, downloadID int64, field, value string) error
ListOverrides(ctx context.Context, downloadID int64) (map[string]string, error)
CreateFileLinks(ctx context.Context, links []store.FileLink) error
LatestBatchID(ctx context.Context, downloadID int64) (string, error)
ListFileLinksByBatch(ctx context.Context, batchID string) ([]store.FileLink, error)
DeleteFileLinksByBatch(ctx context.Context, batchID string) error
// Кандидаты базы метаданных (ручной выбор в review).
CreateCandidates(ctx context.Context, cands []store.MetadataCandidate) error
ListCandidatesByRecognition(ctx context.Context, recognitionID int64) ([]store.MetadataCandidate, error)
GetCandidate(ctx context.Context, id int64) (*store.MetadataCandidate, error)
SetCandidateChosen(ctx context.Context, recognitionID, candidateID int64) error
}
// QBittorrent — нужная worker часть клиента qBittorrent.
type QBittorrent interface {
Torrents(ctx context.Context, category string) ([]qbt.Torrent, error)
Add(ctx context.Context, ar qbt.AddRequest) error
Files(ctx context.Context, hash string) ([]qbt.File, error)
}
// Recognizer — распознаватель (recognize.Recognizer).
type Recognizer interface {
Recognize(ctx context.Context, in recognize.Input) (recognize.Result, error)
}
// Layouter — раскладчик хардлинками (layout.Layouter).
type Layouter interface {
BuildLinks(p layout.Plan) ([]layout.Link, error)
Apply(ctx context.Context, links []layout.Link) ([]layout.Result, error)
Undo(ctx context.Context, links []layout.Link) (int, error)
}
// NotifyEvent — повод позвать пользователя.
type NotifyEvent string
const (
EventReview NotifyEvent = "review" // задача ждёт подтверждения
EventDone NotifyEvent = "done" // раскладка завершена
)
// Notifier — исходящие пинги (Telegram). Вызывается неблокирующе.
type Notifier interface {
Notify(ctx context.Context, downloadID int64, event NotifyEvent)
}
// Config — параметры воркера.
type Config struct {
Category string
SavePath string
PollInterval time.Duration
StuckAfter time.Duration // stalledDL дольше → stuck
MagnetTimeout time.Duration // metaDL дольше → failed
}
// Worker — поллер и владелец переходов.
type Worker struct {
store Store
qbt QBittorrent
recognizer Recognizer
layouter Layouter
cfg Config
log *slog.Logger
mu sync.Mutex // сериализует переходы (поллинг + команды)
now func() time.Time // подменяется в тестах
newID func() string // генератор apply_batch_id (подменяется в тестах)
notifier Notifier // опц. исходящие пинги
}
// SetNotifier подключает исходящие пинги (до запуска Run).
func (w *Worker) SetNotifier(n Notifier) { w.notifier = n }
// New собирает воркер. recognizer/layouter могут быть nil (Ф1 без Ф3-ступеней
// распознавания и раскладки) — тогда completed-задачи не двигаются дальше.
func New(st Store, qb QBittorrent, rec Recognizer, lay Layouter, cfg Config, log *slog.Logger) *Worker {
return &Worker{
store: st,
qbt: qb,
recognizer: rec,
layouter: lay,
cfg: cfg,
log: log,
now: time.Now,
newID: defaultBatchID,
}
}
// defaultBatchID — уникальный идентификатор батча раскладки.
func defaultBatchID() string {
return fmt.Sprintf("b-%d", time.Now().UnixNano())
}
// Run крутит цикл поллинга до отмены ctx.
func (w *Worker) Run(ctx context.Context) {
w.log.Info("worker started", "poll_interval", w.cfg.PollInterval, "category", w.cfg.Category)
t := time.NewTicker(w.cfg.PollInterval)
defer t.Stop()
w.pollOnce(ctx)
for {
select {
case <-ctx.Done():
w.log.Info("worker stopped")
return
case <-t.C:
w.pollOnce(ctx)
}
}
}
func (w *Worker) pollOnce(ctx context.Context) {
if err := w.Poll(ctx); err != nil {
w.log.Warn("poll failed", "err", err)
}
// Ф3: распознаём завершённые загрузки (и перезапускаем по подсказке).
if w.recognizer != nil {
w.recognizePending(ctx)
}
}
// Poll сверяет активные задачи с состоянием qBittorrent и двигает их.
func (w *Worker) Poll(ctx context.Context) error {
torrents, err := w.qbt.Torrents(ctx, w.cfg.Category)
if err != nil {
return fmt.Errorf("poll: list torrents: %w", err)
}
byHash := make(map[string]qbt.Torrent, len(torrents)*2)
for _, t := range torrents {
for _, h := range []string{t.Hash, t.InfohashV1, t.InfohashV2} {
if h != "" {
byHash[strings.ToLower(h)] = t
}
}
}
w.mu.Lock()
defer w.mu.Unlock()
active, err := w.store.ListDownloadsByState(ctx, store.StateDownloading)
if err != nil {
return fmt.Errorf("poll: list active: %w", err)
}
for _, d := range active {
if !d.Infohash.Valid {
continue // нечем сопоставить (в Ф1 не случается: magnet всегда с infohash)
}
t, ok := byHash[strings.ToLower(d.Infohash.String)]
if !ok {
w.log.Warn("active download not found in qbittorrent",
"download_id", d.ID, "infohash", d.Infohash.String)
continue
}
w.reconcile(ctx, d, t)
}
return nil
}
// reconcile двигает одну задачу по состоянию её торрента. Вызывается под
// w.mu.
func (w *Worker) reconcile(ctx context.Context, d store.Download, t qbt.Torrent) {
switch classify(t.State) {
case classReady:
w.transition(ctx, d, store.StateCompleted, "", "")
case classErrored:
w.transition(ctx, d, store.StateFailed, "qbit_error", "qBittorrent state: "+t.State)
case classDownloading:
w.checkTimeouts(ctx, d, t)
case classBusy:
// moving/checking — ждём, файлы ещё не на финальном месте.
}
}
// checkTimeouts помечает зависшие задачи. Возраст считаем от created_at:
// для metaDL это время с момента добавления (огрублённо, но достаточно).
func (w *Worker) checkTimeouts(ctx context.Context, d store.Download, t qbt.Torrent) {
created, err := d.CreatedTime()
if err != nil {
w.log.Warn("cannot parse created_at", "download_id", d.ID, "value", d.CreatedAt, "err", err)
return
}
age := w.now().Sub(created)
switch {
case isMeta(t.State) && w.cfg.MagnetTimeout > 0 && age > w.cfg.MagnetTimeout:
w.transition(ctx, d, store.StateFailed, "magnet_timeout",
fmt.Sprintf("no metadata after %s", age.Truncate(time.Second)))
case isStalledDL(t.State) && w.cfg.StuckAfter > 0 && age > w.cfg.StuckAfter:
w.transition(ctx, d, store.StateStuck, "stalled",
fmt.Sprintf("stalled for %s", age.Truncate(time.Second)))
}
}
// transition пишет новое состояние и логирует переход.
func (w *Worker) transition(ctx context.Context, d store.Download, state store.State, code, msg string) {
if err := w.store.SetDownloadState(ctx, d.ID, state, code, msg); err != nil {
w.log.Error("state transition failed",
"download_id", d.ID, "from", d.State, "to", state, "err", err)
return
}
w.log.Info("state transition",
"download_id", d.ID, "from", d.State, "to", state, "code", code)
// Пинги — неблокирующе и в отдельном контексте: вызов уходит в сеть, а
// мы под w.mu (Notify читает состояние уже после освобождения замка).
if w.notifier != nil {
switch state {
case store.StateReview:
go w.notifier.Notify(context.Background(), d.ID, EventReview)
case store.StateDone:
go w.notifier.Notify(context.Background(), d.ID, EventDone)
}
}
}
// Cancel отклоняет задачу. Торрент в qBittorrent не трогаем — он продолжает
// раздачу (источник неприкосновенен).
func (w *Worker) Cancel(ctx context.Context, id int64) error {
w.mu.Lock()
defer w.mu.Unlock()
d, err := w.store.GetDownload(ctx, id)
if err != nil {
return fmt.Errorf("cancel: %w", err)
}
if d.State.IsTerminal() {
return fmt.Errorf("cancel: download %d is already terminal (%s)", id, d.State)
}
if err := w.store.SetDownloadState(ctx, id, store.StateCancelled, "", ""); err != nil {
return fmt.Errorf("cancel: %w", err)
}
w.log.Info("download cancelled", "download_id", id, "from", d.State)
return nil
}
// Retry повторяет застрявшую/упавшую задачу: заново отдаёт источник в
// qBittorrent и возвращает в downloading.
func (w *Worker) Retry(ctx context.Context, id int64) error {
w.mu.Lock()
defer w.mu.Unlock()
d, err := w.store.GetDownload(ctx, id)
if err != nil {
return fmt.Errorf("retry: %w", err)
}
if d.State != store.StateFailed && d.State != store.StateStuck {
return fmt.Errorf("retry: download %d is %s, only failed/stuck are retriable", id, d.State)
}
if d.SourceType == store.SourceMagnet {
if err := w.qbt.Add(ctx, qbt.AddRequest{
URLs: []string{d.SourceRef},
Category: w.cfg.Category,
SavePath: w.cfg.SavePath,
}); err != nil {
return fmt.Errorf("retry: add to qbittorrent: %w", err)
}
}
if err := w.store.SetDownloadState(ctx, id, store.StateDownloading, "", ""); err != nil {
return fmt.Errorf("retry: %w", err)
}
w.log.Info("download retried", "download_id", id, "from", d.State)
return nil
}
// class — класс состояния торрента qBittorrent.
type class int
const (
classDownloading class = iota // ещё качается
classReady // готов к раскладке
classErrored // ошибка
classBusy // moving/checking — переходный момент, ждём
)
// classify относит состояние qBittorrent к классу (см. architecture.md,
// «Завершение в qBittorrent»). Учитываем и v5-имена (stopped* вместо
// paused*).
func classify(state string) class {
switch state {
case "uploading", "stalledUP", "pausedUP", "stoppedUP", "queuedUP", "forcedUP":
return classReady
case "error", "missingFiles":
return classErrored
case "moving", "checkingUP", "checkingResumeData", "allocating":
return classBusy
default:
// downloading, stalledDL, metaDL, forcedMetaDL, queuedDL, checkingDL,
// forcedDL, pausedDL, stoppedDL, unknown — считаем «ещё качается».
return classDownloading
}
}
func isMeta(state string) bool {
return state == "metaDL" || state == "forcedMetaDL"
}
func isStalledDL(state string) bool {
return state == "stalledDL"
}