// Package worker — владелец машины состояний. Поллит qBittorrent по // категории, переводит задачи между состояниями и сериализует команды // транспортов (cancel/retry), чтобы два транспорта не гонялись за одно // состояние. // // Ф1 ведёт задачу downloading → completed, плюс stuck/failed по таймаутам и // ошибкам qBittorrent. Ф3 продолжает: completed → recognizing (вызов // recognize) → review; команды ревью (apply/refine/reject/defer/undo, // переключение типа, пометка «игнор») раскладывают файлы хардлинками через // layout. Распознавание зовётся в поллинг-цикле, команды — из транспортов; // всё под per-download блокировкой w.mu. package worker import ( "context" "fmt" "log/slog" "strings" "sync" "time" "git.vakhrushev.me/av/jellybit/internal/layout" "git.vakhrushev.me/av/jellybit/internal/qbt" "git.vakhrushev.me/av/jellybit/internal/recognize" "git.vakhrushev.me/av/jellybit/internal/store" ) // Store — нужная worker часть хранилища. type Store interface { ListDownloadsByState(ctx context.Context, states ...store.State) ([]store.Download, error) GetDownload(ctx context.Context, id int64) (*store.Download, error) SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error // Ф3: распознавание, ревью, раскладка. CreateRecognition(ctx context.Context, r *store.Recognition, reasons []string) (int64, error) GetCurrentRecognition(ctx context.Context, downloadID int64) (*store.Recognition, error) AddHint(ctx context.Context, downloadID int64, text string) error ListHints(ctx context.Context, downloadID int64) ([]string, error) SetOverride(ctx context.Context, downloadID int64, field, value string) error ListOverrides(ctx context.Context, downloadID int64) (map[string]string, error) CreateFileLinks(ctx context.Context, links []store.FileLink) error LatestBatchID(ctx context.Context, downloadID int64) (string, error) ListFileLinksByBatch(ctx context.Context, batchID string) ([]store.FileLink, error) DeleteFileLinksByBatch(ctx context.Context, batchID string) error } // QBittorrent — нужная worker часть клиента qBittorrent. type QBittorrent interface { Torrents(ctx context.Context, category string) ([]qbt.Torrent, error) Add(ctx context.Context, ar qbt.AddRequest) error Files(ctx context.Context, hash string) ([]qbt.File, error) } // Recognizer — распознаватель (recognize.Recognizer). type Recognizer interface { Recognize(ctx context.Context, in recognize.Input) (recognize.Result, error) } // Layouter — раскладчик хардлинками (layout.Layouter). type Layouter interface { BuildLinks(p layout.Plan) ([]layout.Link, error) Apply(ctx context.Context, links []layout.Link) ([]layout.Result, error) Undo(ctx context.Context, links []layout.Link) (int, error) } // Config — параметры воркера. type Config struct { Category string SavePath string PollInterval time.Duration StuckAfter time.Duration // stalledDL дольше → stuck MagnetTimeout time.Duration // metaDL дольше → failed } // Worker — поллер и владелец переходов. type Worker struct { store Store qbt QBittorrent recognizer Recognizer layouter Layouter cfg Config log *slog.Logger mu sync.Mutex // сериализует переходы (поллинг + команды) now func() time.Time // подменяется в тестах newID func() string // генератор apply_batch_id (подменяется в тестах) } // New собирает воркер. recognizer/layouter могут быть nil (Ф1 без Ф3-ступеней // распознавания и раскладки) — тогда completed-задачи не двигаются дальше. func New(st Store, qb QBittorrent, rec Recognizer, lay Layouter, cfg Config, log *slog.Logger) *Worker { return &Worker{ store: st, qbt: qb, recognizer: rec, layouter: lay, cfg: cfg, log: log, now: time.Now, newID: defaultBatchID, } } // defaultBatchID — уникальный идентификатор батча раскладки. func defaultBatchID() string { return fmt.Sprintf("b-%d", time.Now().UnixNano()) } // Run крутит цикл поллинга до отмены ctx. func (w *Worker) Run(ctx context.Context) { w.log.Info("worker started", "poll_interval", w.cfg.PollInterval, "category", w.cfg.Category) t := time.NewTicker(w.cfg.PollInterval) defer t.Stop() w.pollOnce(ctx) for { select { case <-ctx.Done(): w.log.Info("worker stopped") return case <-t.C: w.pollOnce(ctx) } } } func (w *Worker) pollOnce(ctx context.Context) { if err := w.Poll(ctx); err != nil { w.log.Warn("poll failed", "err", err) } // Ф3: распознаём завершённые загрузки (и перезапускаем по подсказке). if w.recognizer != nil { w.recognizePending(ctx) } } // Poll сверяет активные задачи с состоянием qBittorrent и двигает их. func (w *Worker) Poll(ctx context.Context) error { torrents, err := w.qbt.Torrents(ctx, w.cfg.Category) if err != nil { return fmt.Errorf("poll: list torrents: %w", err) } byHash := make(map[string]qbt.Torrent, len(torrents)*2) for _, t := range torrents { for _, h := range []string{t.Hash, t.InfohashV1, t.InfohashV2} { if h != "" { byHash[strings.ToLower(h)] = t } } } w.mu.Lock() defer w.mu.Unlock() active, err := w.store.ListDownloadsByState(ctx, store.StateDownloading) if err != nil { return fmt.Errorf("poll: list active: %w", err) } for _, d := range active { if !d.Infohash.Valid { continue // нечем сопоставить (в Ф1 не случается: magnet всегда с infohash) } t, ok := byHash[strings.ToLower(d.Infohash.String)] if !ok { w.log.Warn("active download not found in qbittorrent", "download_id", d.ID, "infohash", d.Infohash.String) continue } w.reconcile(ctx, d, t) } return nil } // reconcile двигает одну задачу по состоянию её торрента. Вызывается под // w.mu. func (w *Worker) reconcile(ctx context.Context, d store.Download, t qbt.Torrent) { switch classify(t.State) { case classReady: w.transition(ctx, d, store.StateCompleted, "", "") case classErrored: w.transition(ctx, d, store.StateFailed, "qbit_error", "qBittorrent state: "+t.State) case classDownloading: w.checkTimeouts(ctx, d, t) case classBusy: // moving/checking — ждём, файлы ещё не на финальном месте. } } // checkTimeouts помечает зависшие задачи. Возраст считаем от created_at: // для metaDL это время с момента добавления (огрублённо, но достаточно). func (w *Worker) checkTimeouts(ctx context.Context, d store.Download, t qbt.Torrent) { created, err := d.CreatedTime() if err != nil { w.log.Warn("cannot parse created_at", "download_id", d.ID, "value", d.CreatedAt, "err", err) return } age := w.now().Sub(created) switch { case isMeta(t.State) && w.cfg.MagnetTimeout > 0 && age > w.cfg.MagnetTimeout: w.transition(ctx, d, store.StateFailed, "magnet_timeout", fmt.Sprintf("no metadata after %s", age.Truncate(time.Second))) case isStalledDL(t.State) && w.cfg.StuckAfter > 0 && age > w.cfg.StuckAfter: w.transition(ctx, d, store.StateStuck, "stalled", fmt.Sprintf("stalled for %s", age.Truncate(time.Second))) } } // transition пишет новое состояние и логирует переход. func (w *Worker) transition(ctx context.Context, d store.Download, state store.State, code, msg string) { if err := w.store.SetDownloadState(ctx, d.ID, state, code, msg); err != nil { w.log.Error("state transition failed", "download_id", d.ID, "from", d.State, "to", state, "err", err) return } w.log.Info("state transition", "download_id", d.ID, "from", d.State, "to", state, "code", code) } // Cancel отклоняет задачу. Торрент в qBittorrent не трогаем — он продолжает // раздачу (источник неприкосновенен). func (w *Worker) Cancel(ctx context.Context, id int64) error { w.mu.Lock() defer w.mu.Unlock() d, err := w.store.GetDownload(ctx, id) if err != nil { return fmt.Errorf("cancel: %w", err) } if d.State.IsTerminal() { return fmt.Errorf("cancel: download %d is already terminal (%s)", id, d.State) } if err := w.store.SetDownloadState(ctx, id, store.StateCancelled, "", ""); err != nil { return fmt.Errorf("cancel: %w", err) } w.log.Info("download cancelled", "download_id", id, "from", d.State) return nil } // Retry повторяет застрявшую/упавшую задачу: заново отдаёт источник в // qBittorrent и возвращает в downloading. func (w *Worker) Retry(ctx context.Context, id int64) error { w.mu.Lock() defer w.mu.Unlock() d, err := w.store.GetDownload(ctx, id) if err != nil { return fmt.Errorf("retry: %w", err) } if d.State != store.StateFailed && d.State != store.StateStuck { return fmt.Errorf("retry: download %d is %s, only failed/stuck are retriable", id, d.State) } if d.SourceType == store.SourceMagnet { if err := w.qbt.Add(ctx, qbt.AddRequest{ URLs: []string{d.SourceRef}, Category: w.cfg.Category, SavePath: w.cfg.SavePath, }); err != nil { return fmt.Errorf("retry: add to qbittorrent: %w", err) } } if err := w.store.SetDownloadState(ctx, id, store.StateDownloading, "", ""); err != nil { return fmt.Errorf("retry: %w", err) } w.log.Info("download retried", "download_id", id, "from", d.State) return nil } // class — класс состояния торрента qBittorrent. type class int const ( classDownloading class = iota // ещё качается classReady // готов к раскладке classErrored // ошибка classBusy // moving/checking — переходный момент, ждём ) // classify относит состояние qBittorrent к классу (см. architecture.md, // «Завершение в qBittorrent»). Учитываем и v5-имена (stopped* вместо // paused*). func classify(state string) class { switch state { case "uploading", "stalledUP", "pausedUP", "stoppedUP", "queuedUP", "forcedUP": return classReady case "error", "missingFiles": return classErrored case "moving", "checkingUP", "checkingResumeData", "allocating": return classBusy default: // downloading, stalledDL, metaDL, forcedMetaDL, queuedDL, checkingDL, // forcedDL, pausedDL, stoppedDL, unknown — считаем «ещё качается». return classDownloading } } func isMeta(state string) bool { return state == "metaDL" || state == "forcedMetaDL" } func isStalledDL(state string) bool { return state == "stalledDL" }