Files
jellybit/internal/worker/worker.go
T

251 lines
8.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package worker — владелец машины состояний. Поллит qBittorrent по
// категории, переводит задачи между состояниями и сериализует команды
// транспортов (cancel/retry), чтобы два транспорта не гонялись за одно
// состояние.
//
// Ф1 ведёт задачу downloading → completed, плюс stuck/failed по таймаутам и
// ошибкам qBittorrent. Распознавание и раскладка (completed →) — Ф2+.
package worker
import (
"context"
"fmt"
"log/slog"
"strings"
"sync"
"time"
"git.vakhrushev.me/av/jellybit/internal/qbt"
"git.vakhrushev.me/av/jellybit/internal/store"
)
// Store — нужная worker часть хранилища.
type Store interface {
ListDownloadsByState(ctx context.Context, states ...store.State) ([]store.Download, error)
GetDownload(ctx context.Context, id int64) (*store.Download, error)
SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error
}
// QBittorrent — нужная worker часть клиента qBittorrent.
type QBittorrent interface {
Torrents(ctx context.Context, category string) ([]qbt.Torrent, error)
Add(ctx context.Context, ar qbt.AddRequest) error
}
// Config — параметры воркера.
type Config struct {
Category string
SavePath string
PollInterval time.Duration
StuckAfter time.Duration // stalledDL дольше → stuck
MagnetTimeout time.Duration // metaDL дольше → failed
}
// Worker — поллер и владелец переходов.
type Worker struct {
store Store
qbt QBittorrent
cfg Config
log *slog.Logger
mu sync.Mutex // сериализует переходы (поллинг + команды)
now func() time.Time // подменяется в тестах
}
// New собирает воркер.
func New(st Store, qb QBittorrent, cfg Config, log *slog.Logger) *Worker {
return &Worker{store: st, qbt: qb, cfg: cfg, log: log, now: time.Now}
}
// Run крутит цикл поллинга до отмены ctx.
func (w *Worker) Run(ctx context.Context) {
w.log.Info("worker started", "poll_interval", w.cfg.PollInterval, "category", w.cfg.Category)
t := time.NewTicker(w.cfg.PollInterval)
defer t.Stop()
w.pollOnce(ctx)
for {
select {
case <-ctx.Done():
w.log.Info("worker stopped")
return
case <-t.C:
w.pollOnce(ctx)
}
}
}
func (w *Worker) pollOnce(ctx context.Context) {
if err := w.Poll(ctx); err != nil {
w.log.Warn("poll failed", "err", err)
}
}
// Poll сверяет активные задачи с состоянием qBittorrent и двигает их.
func (w *Worker) Poll(ctx context.Context) error {
torrents, err := w.qbt.Torrents(ctx, w.cfg.Category)
if err != nil {
return fmt.Errorf("poll: list torrents: %w", err)
}
byHash := make(map[string]qbt.Torrent, len(torrents)*2)
for _, t := range torrents {
for _, h := range []string{t.Hash, t.InfohashV1, t.InfohashV2} {
if h != "" {
byHash[strings.ToLower(h)] = t
}
}
}
w.mu.Lock()
defer w.mu.Unlock()
active, err := w.store.ListDownloadsByState(ctx, store.StateDownloading)
if err != nil {
return fmt.Errorf("poll: list active: %w", err)
}
for _, d := range active {
if !d.Infohash.Valid {
continue // нечем сопоставить (в Ф1 не случается: magnet всегда с infohash)
}
t, ok := byHash[strings.ToLower(d.Infohash.String)]
if !ok {
w.log.Warn("active download not found in qbittorrent",
"download_id", d.ID, "infohash", d.Infohash.String)
continue
}
w.reconcile(ctx, d, t)
}
return nil
}
// reconcile двигает одну задачу по состоянию её торрента. Вызывается под
// w.mu.
func (w *Worker) reconcile(ctx context.Context, d store.Download, t qbt.Torrent) {
switch classify(t.State) {
case classReady:
w.transition(ctx, d, store.StateCompleted, "", "")
case classErrored:
w.transition(ctx, d, store.StateFailed, "qbit_error", "qBittorrent state: "+t.State)
case classDownloading:
w.checkTimeouts(ctx, d, t)
case classBusy:
// moving/checking — ждём, файлы ещё не на финальном месте.
}
}
// checkTimeouts помечает зависшие задачи. Возраст считаем от created_at:
// для metaDL это время с момента добавления (огрублённо, но достаточно).
func (w *Worker) checkTimeouts(ctx context.Context, d store.Download, t qbt.Torrent) {
created, err := d.CreatedTime()
if err != nil {
w.log.Warn("cannot parse created_at", "download_id", d.ID, "value", d.CreatedAt, "err", err)
return
}
age := w.now().Sub(created)
switch {
case isMeta(t.State) && w.cfg.MagnetTimeout > 0 && age > w.cfg.MagnetTimeout:
w.transition(ctx, d, store.StateFailed, "magnet_timeout",
fmt.Sprintf("no metadata after %s", age.Truncate(time.Second)))
case isStalledDL(t.State) && w.cfg.StuckAfter > 0 && age > w.cfg.StuckAfter:
w.transition(ctx, d, store.StateStuck, "stalled",
fmt.Sprintf("stalled for %s", age.Truncate(time.Second)))
}
}
// transition пишет новое состояние и логирует переход.
func (w *Worker) transition(ctx context.Context, d store.Download, state store.State, code, msg string) {
if err := w.store.SetDownloadState(ctx, d.ID, state, code, msg); err != nil {
w.log.Error("state transition failed",
"download_id", d.ID, "from", d.State, "to", state, "err", err)
return
}
w.log.Info("state transition",
"download_id", d.ID, "from", d.State, "to", state, "code", code)
}
// Cancel отклоняет задачу. Торрент в qBittorrent не трогаем — он продолжает
// раздачу (источник неприкосновенен).
func (w *Worker) Cancel(ctx context.Context, id int64) error {
w.mu.Lock()
defer w.mu.Unlock()
d, err := w.store.GetDownload(ctx, id)
if err != nil {
return fmt.Errorf("cancel: %w", err)
}
if d.State.IsTerminal() {
return fmt.Errorf("cancel: download %d is already terminal (%s)", id, d.State)
}
if err := w.store.SetDownloadState(ctx, id, store.StateCancelled, "", ""); err != nil {
return fmt.Errorf("cancel: %w", err)
}
w.log.Info("download cancelled", "download_id", id, "from", d.State)
return nil
}
// Retry повторяет застрявшую/упавшую задачу: заново отдаёт источник в
// qBittorrent и возвращает в downloading.
func (w *Worker) Retry(ctx context.Context, id int64) error {
w.mu.Lock()
defer w.mu.Unlock()
d, err := w.store.GetDownload(ctx, id)
if err != nil {
return fmt.Errorf("retry: %w", err)
}
if d.State != store.StateFailed && d.State != store.StateStuck {
return fmt.Errorf("retry: download %d is %s, only failed/stuck are retriable", id, d.State)
}
if d.SourceType == store.SourceMagnet {
if err := w.qbt.Add(ctx, qbt.AddRequest{
URLs: []string{d.SourceRef},
Category: w.cfg.Category,
SavePath: w.cfg.SavePath,
}); err != nil {
return fmt.Errorf("retry: add to qbittorrent: %w", err)
}
}
if err := w.store.SetDownloadState(ctx, id, store.StateDownloading, "", ""); err != nil {
return fmt.Errorf("retry: %w", err)
}
w.log.Info("download retried", "download_id", id, "from", d.State)
return nil
}
// class — класс состояния торрента qBittorrent.
type class int
const (
classDownloading class = iota // ещё качается
classReady // готов к раскладке
classErrored // ошибка
classBusy // moving/checking — переходный момент, ждём
)
// classify относит состояние qBittorrent к классу (см. architecture.md,
// «Завершение в qBittorrent»). Учитываем и v5-имена (stopped* вместо
// paused*).
func classify(state string) class {
switch state {
case "uploading", "stalledUP", "pausedUP", "stoppedUP", "queuedUP", "forcedUP":
return classReady
case "error", "missingFiles":
return classErrored
case "moving", "checkingUP", "checkingResumeData", "allocating":
return classBusy
default:
// downloading, stalledDL, metaDL, forcedMetaDL, queuedDL, checkingDL,
// forcedDL, pausedDL, stoppedDL, unknown — считаем «ещё качается».
return classDownloading
}
}
func isMeta(state string) bool {
return state == "metaDL" || state == "forcedMetaDL"
}
func isStalledDL(state string) bool {
return state == "stalledDL"
}