Files
jellybit/internal/worker/review.go
T

538 lines
18 KiB
Go

package worker
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strings"
"git.vakhrushev.me/av/jellybit/internal/layout"
"git.vakhrushev.me/av/jellybit/internal/qbt"
"git.vakhrushev.me/av/jellybit/internal/recognize"
"git.vakhrushev.me/av/jellybit/internal/store"
)
// Поля override.
const (
ovrMediaType = "media_type"
ovrIgnoredFiles = "ignored_files"
)
// recognizePending распознаёт завершённые загрузки и перезапускает те, что
// помечены к перераспознаванию (recognizing — например, после подсказки или
// после рестарта сервиса). Выполняется последовательно в поллинг-горутине;
// сам вызов LLM идёт вне блокировки, поэтому команды ревью не простаивают.
func (w *Worker) recognizePending(ctx context.Context) {
w.mu.Lock()
pending, err := w.store.ListDownloadsByState(ctx, store.StateCompleted, store.StateRecognizing)
w.mu.Unlock()
if err != nil {
w.log.Warn("recognize: list pending failed", "err", err)
return
}
for _, d := range pending {
w.recognizeOne(ctx, d.ID)
}
}
// recognizeOne проводит одну загрузку через распознавание. Claim-паттерн:
// под блокировкой переводим в recognizing, LLM зовём без блокировки, затем
// под блокировкой фиксируем результат — но только если задачу за это время
// не увели в другое состояние (cancel/defer).
func (w *Worker) recognizeOne(ctx context.Context, id int64) {
w.mu.Lock()
d, err := w.store.GetDownload(ctx, id)
if err != nil {
w.mu.Unlock()
w.log.Warn("recognize: get download", "download_id", id, "err", err)
return
}
if d.State != store.StateCompleted && d.State != store.StateRecognizing {
w.mu.Unlock()
return
}
if d.State == store.StateCompleted {
w.transition(ctx, *d, store.StateRecognizing, "", "")
}
w.mu.Unlock()
result, savePath, err := w.runRecognize(ctx, *d)
if err != nil {
// Не смогли получить сигналы или вызвать LLM — уходим в review с
// причиной, человек перезапустит подсказкой.
result = recognize.Result{Decision: recognize.Decision{
Reasons: []string{"распознавание не удалось: " + err.Error()},
}}
}
w.finishRecognition(ctx, id, result, savePath)
}
// runRecognize собирает сигналы из qBittorrent и накопленные подсказки,
// затем зовёт распознаватель. Возвращает также savePath для маппинга
// относительных путей файлов в абсолютные при раскладке.
func (w *Worker) runRecognize(ctx context.Context, d store.Download) (recognize.Result, string, error) {
if !d.Infohash.Valid {
return recognize.Result{}, "", fmt.Errorf("нет infohash")
}
t, ok, err := w.torrentByInfohash(ctx, d.Infohash.String)
if err != nil {
return recognize.Result{}, "", err
}
if !ok {
return recognize.Result{}, "", fmt.Errorf("торрент не найден в qBittorrent")
}
files, err := w.qbt.Files(ctx, t.Hash)
if err != nil {
return recognize.Result{}, "", err
}
hints, err := w.store.ListHints(ctx, d.ID)
if err != nil {
return recognize.Result{}, "", err
}
in := recognize.Input{
Name: t.Name,
Context: d.Context,
Hints: hints,
Files: make([]recognize.File, len(files)),
}
for i, f := range files {
in.Files[i] = recognize.File{Path: f.Name, Size: f.Size}
}
res, err := w.recognizer.Recognize(ctx, in)
if err != nil {
return recognize.Result{}, t.SavePath, err
}
return res, t.SavePath, nil
}
// finishRecognition сохраняет попытку распознавания и двигает задачу. В Ф3
// метабазы выключены → авто-раскладки не делаем, всегда уходим в review.
func (w *Worker) finishRecognition(ctx context.Context, id int64, res recognize.Result, _ string) {
planJSON, err := json.Marshal(res.Plan)
if err != nil {
w.log.Error("recognize: marshal plan", "download_id", id, "err", err)
planJSON = []byte("{}")
}
rec := &store.Recognition{
DownloadID: id,
MediaType: store.NullString(string(res.Plan.Type)),
Title: store.NullString(res.Plan.Title),
Provider: store.NullString("none"),
Plan: store.NullString(string(planJSON)),
RawLLM: store.NullString(res.Raw),
}
if res.Plan.OriginalTitle != "" {
rec.OriginalTitle = store.NullString(res.Plan.OriginalTitle)
}
if res.Plan.Year != 0 {
rec.Year = sql.NullInt64{Int64: int64(res.Plan.Year), Valid: true}
}
if res.Plan.Confidence != 0 {
rec.Confidence = sql.NullFloat64{Float64: res.Plan.Confidence, Valid: true}
}
w.mu.Lock()
defer w.mu.Unlock()
d, err := w.store.GetDownload(ctx, id)
if err != nil {
w.log.Warn("recognize: reload download", "download_id", id, "err", err)
return
}
if d.State != store.StateRecognizing {
// За время вызова LLM задачу увели (cancel/defer) — результат не нужен.
w.log.Info("recognize: result discarded, state changed",
"download_id", id, "state", d.State)
return
}
if _, err := w.store.CreateRecognition(ctx, rec, res.Decision.Reasons); err != nil {
w.log.Error("recognize: persist", "download_id", id, "err", err)
return
}
w.transition(ctx, *d, store.StateReview, "", "")
}
// --- Команды ревью ---
// Apply создаёт хардлинки по текущему плану (с применёнными правками) и
// переводит задачу в done. Коллизия цели → остаёмся в review с причиной.
func (w *Worker) Apply(ctx context.Context, id int64) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.layouter == nil {
return fmt.Errorf("apply: раскладчик не сконфигурирован")
}
d, err := w.store.GetDownload(ctx, id)
if err != nil {
return fmt.Errorf("apply: %w", err)
}
if d.State != store.StateReview && d.State != store.StateDeferred {
return fmt.Errorf("apply: задача %d в состоянии %s (ожидалось review/deferred)", id, d.State)
}
plan, err := w.effectivePlan(ctx, id)
if err != nil {
return fmt.Errorf("apply: %w", err)
}
t, ok, err := w.torrentByInfohash(ctx, d.Infohash.String)
if err != nil || !ok {
return fmt.Errorf("apply: торрент не найден: %v", err)
}
links, err := w.layouter.BuildLinks(toLayoutPlan(plan, t.SavePath))
if err != nil {
return fmt.Errorf("apply: построение ссылок: %w", err)
}
batch := w.newID()
results, applyErr := w.layouter.Apply(ctx, links)
// Фиксируем то, что успели слинковать (идемпотентность повторного apply).
fl := make([]store.FileLink, 0, len(results))
for _, r := range results {
fl = append(fl, store.FileLink{
DownloadID: id,
ApplyBatchID: batch,
SrcPath: r.Link.Src,
DstPath: r.Link.Dst,
Kind: string(r.Link.Kind),
Status: string(r.Status),
})
}
if len(fl) > 0 {
if err := w.store.CreateFileLinks(ctx, fl); err != nil {
return fmt.Errorf("apply: запись ссылок: %w", err)
}
}
if applyErr != nil {
if errors.Is(applyErr, layout.ErrCollision) {
w.transition(ctx, *d, store.StateReview, "collision", applyErr.Error())
return fmt.Errorf("apply: %w", applyErr)
}
w.transition(ctx, *d, store.StateFailed, "apply", applyErr.Error())
return fmt.Errorf("apply: %w", applyErr)
}
w.transition(ctx, *d, store.StateDone, "", "")
w.log.Info("apply: linked", "download_id", id, "batch", batch, "links", len(fl))
return nil
}
// Refine добавляет подсказку и отправляет задачу на перераспознавание.
func (w *Worker) Refine(ctx context.Context, id int64, hint string) error {
hint = strings.TrimSpace(hint)
if hint == "" {
return fmt.Errorf("refine: пустая подсказка")
}
w.mu.Lock()
defer w.mu.Unlock()
d, err := w.requireReviewable(ctx, id, "refine")
if err != nil {
return err
}
if err := w.store.AddHint(ctx, id, hint); err != nil {
return fmt.Errorf("refine: %w", err)
}
w.transition(ctx, *d, store.StateRecognizing, "", "")
return nil
}
// SetType фиксирует тип (override) и перезапускает распознавание с подсказкой
// — чтобы LLM пересобрал роли файлов под новый тип.
func (w *Worker) SetType(ctx context.Context, id int64, mediaType string) error {
if mediaType != string(recognize.MediaMovie) && mediaType != string(recognize.MediaSeries) {
return fmt.Errorf("set type: недопустимый тип %q", mediaType)
}
w.mu.Lock()
defer w.mu.Unlock()
d, err := w.requireReviewable(ctx, id, "set type")
if err != nil {
return err
}
if err := w.store.SetOverride(ctx, id, ovrMediaType, mediaType); err != nil {
return fmt.Errorf("set type: %w", err)
}
label := "фильм"
if mediaType == string(recognize.MediaSeries) {
label = "сериал"
}
if err := w.store.AddHint(ctx, id, "Тип точно: "+label+"."); err != nil {
return fmt.Errorf("set type: %w", err)
}
w.transition(ctx, *d, store.StateRecognizing, "", "")
return nil
}
// IgnoreFile помечает файл к игнорированию (не линкуем). Остаёмся в review;
// превью пересчитается с учётом правки.
func (w *Worker) IgnoreFile(ctx context.Context, id int64, src string) error {
src = strings.TrimSpace(src)
if src == "" {
return fmt.Errorf("ignore: пустой путь")
}
w.mu.Lock()
defer w.mu.Unlock()
if _, err := w.requireReviewable(ctx, id, "ignore"); err != nil {
return err
}
overrides, err := w.store.ListOverrides(ctx, id)
if err != nil {
return fmt.Errorf("ignore: %w", err)
}
ignored := parseIgnored(overrides[ovrIgnoredFiles])
if !contains(ignored, src) {
ignored = append(ignored, src)
}
b, _ := json.Marshal(ignored)
if err := w.store.SetOverride(ctx, id, ovrIgnoredFiles, string(b)); err != nil {
return fmt.Errorf("ignore: %w", err)
}
return nil
}
// Defer паркует задачу в deferred (вернётся в ревью по действию).
func (w *Worker) Defer(ctx context.Context, id int64) error {
w.mu.Lock()
defer w.mu.Unlock()
d, err := w.store.GetDownload(ctx, id)
if err != nil {
return fmt.Errorf("defer: %w", err)
}
if d.State.IsTerminal() {
return fmt.Errorf("defer: задача %d терминальна (%s)", id, d.State)
}
w.transition(ctx, *d, store.StateDeferred, "", "")
return nil
}
// Undo снимает хардлинки последнего батча и переводит задачу в reverted.
// Источник недосягаем (раскладчик удаляет только пути под библиотекой).
func (w *Worker) Undo(ctx context.Context, id int64) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.layouter == nil {
return fmt.Errorf("undo: раскладчик не сконфигурирован")
}
d, err := w.store.GetDownload(ctx, id)
if err != nil {
return fmt.Errorf("undo: %w", err)
}
if d.State != store.StateDone {
return fmt.Errorf("undo: задача %d в состоянии %s (ожидалось done)", id, d.State)
}
batch, err := w.store.LatestBatchID(ctx, id)
if err != nil {
return fmt.Errorf("undo: %w", err)
}
if batch == "" {
return fmt.Errorf("undo: нечего откатывать")
}
rows, err := w.store.ListFileLinksByBatch(ctx, batch)
if err != nil {
return fmt.Errorf("undo: %w", err)
}
links := make([]layout.Link, len(rows))
for i, r := range rows {
links[i] = layout.Link{Src: r.SrcPath, Dst: r.DstPath, Kind: layout.Kind(r.Kind)}
}
n, err := w.layouter.Undo(ctx, links)
if err != nil {
return fmt.Errorf("undo: %w", err)
}
if err := w.store.DeleteFileLinksByBatch(ctx, batch); err != nil {
return fmt.Errorf("undo: %w", err)
}
w.transition(ctx, *d, store.StateReverted, "", "")
w.log.Info("undo: reverted", "download_id", id, "batch", batch, "removed", n)
return nil
}
// requireReviewable проверяет, что задача в review/deferred. Вызывается под mu.
func (w *Worker) requireReviewable(ctx context.Context, id int64, op string) (*store.Download, error) {
d, err := w.store.GetDownload(ctx, id)
if err != nil {
return nil, fmt.Errorf("%s: %w", op, err)
}
if d.State != store.StateReview && d.State != store.StateDeferred {
return nil, fmt.Errorf("%s: задача %d в состоянии %s (ожидалось review/deferred)", op, id, d.State)
}
return d, nil
}
// --- Данные для экрана ревью ---
// ReviewData — всё, что нужно транспорту для отрисовки ревью.
type ReviewData struct {
Download store.Download
Recognition *store.Recognition
Plan recognize.Plan // эффективный (с применёнными правками)
Preview []layout.Link // целевые пути (Src — относительный, для показа)
Hints []string
Overrides map[string]string
}
// ReviewData собирает данные ревью по загрузке.
func (w *Worker) ReviewData(ctx context.Context, id int64) (*ReviewData, error) {
d, err := w.store.GetDownload(ctx, id)
if err != nil {
return nil, fmt.Errorf("review data: %w", err)
}
rec, err := w.store.GetCurrentRecognition(ctx, id)
if err != nil {
return nil, fmt.Errorf("review data: %w", err)
}
hints, err := w.store.ListHints(ctx, id)
if err != nil {
return nil, fmt.Errorf("review data: %w", err)
}
overrides, err := w.store.ListOverrides(ctx, id)
if err != nil {
return nil, fmt.Errorf("review data: %w", err)
}
rd := &ReviewData{Download: *d, Recognition: rec, Hints: hints, Overrides: overrides}
if rec != nil && rec.Plan.Valid {
var plan recognize.Plan
if err := json.Unmarshal([]byte(rec.Plan.String), &plan); err == nil {
plan = applyOverrides(plan, overrides)
rd.Plan = plan
// Превью строим по относительным путям; ошибку игнорируем —
// просто покажем причины без превью.
if w.layouter != nil {
if links, lerr := w.layouter.BuildLinks(toLayoutPlan(plan, "")); lerr == nil {
rd.Preview = links
}
}
}
}
return rd, nil
}
// effectivePlan загружает текущий план и применяет правки (под mu).
func (w *Worker) effectivePlan(ctx context.Context, id int64) (recognize.Plan, error) {
rec, err := w.store.GetCurrentRecognition(ctx, id)
if err != nil {
return recognize.Plan{}, err
}
if rec == nil || !rec.Plan.Valid {
return recognize.Plan{}, fmt.Errorf("нет плана распознавания")
}
var plan recognize.Plan
if err := json.Unmarshal([]byte(rec.Plan.String), &plan); err != nil {
return recognize.Plan{}, fmt.Errorf("разбор плана: %w", err)
}
overrides, err := w.store.ListOverrides(ctx, id)
if err != nil {
return recognize.Plan{}, err
}
return applyOverrides(plan, overrides), nil
}
// --- Хелперы преобразования ---
// applyOverrides применяет ручные правки к плану: форсит тип и помечает
// игнорируемые файлы ролью ignore (их раскладка пропустит).
func applyOverrides(plan recognize.Plan, overrides map[string]string) recognize.Plan {
if mt := overrides[ovrMediaType]; mt == string(recognize.MediaMovie) || mt == string(recognize.MediaSeries) {
plan.Type = recognize.MediaType(mt)
}
ignored := parseIgnored(overrides[ovrIgnoredFiles])
if len(ignored) > 0 {
for i := range plan.Files {
if contains(ignored, plan.Files[i].Src) {
plan.Files[i].Role = "ignore"
}
}
}
return plan
}
// toLayoutPlan переводит план распознавания в план раскладки. srcPrefix
// (savePath) приклеивается к относительным путям файлов; пустой — оставляет
// относительные (для превью). Роли вне main/episode/subtitle отбрасываются.
func toLayoutPlan(plan recognize.Plan, srcPrefix string) layout.Plan {
lp := layout.Plan{
Type: layout.MediaType(plan.Type),
Title: plan.Title,
Year: plan.Year,
}
for _, f := range plan.Files {
role, ok := mapRole(f.Role)
if !ok {
continue
}
src := f.Src
if srcPrefix != "" {
src = filepath.Join(srcPrefix, f.Src)
}
lp.Files = append(lp.Files, layout.PlanFile{
Src: src,
Role: role,
Season: f.Season,
Episode: f.Episode,
})
}
return lp
}
func mapRole(r recognize.FileRole) (layout.Role, bool) {
switch r {
case recognize.RoleMain:
return layout.RoleMain, true
case recognize.RoleEpisode:
return layout.RoleEpisode, true
case recognize.RoleSubtitle:
return layout.RoleSubtitle, true
default:
return "", false
}
}
// torrentByInfohash ищет торрент категории по infohash (v1/v2/hash).
func (w *Worker) torrentByInfohash(ctx context.Context, infohash string) (qbt.Torrent, bool, error) {
torrents, err := w.qbt.Torrents(ctx, w.cfg.Category)
if err != nil {
return qbt.Torrent{}, false, err
}
want := strings.ToLower(infohash)
for _, t := range torrents {
for _, h := range []string{t.Hash, t.InfohashV1, t.InfohashV2} {
if h != "" && strings.ToLower(h) == want {
return t, true, nil
}
}
}
return qbt.Torrent{}, false, nil
}
func parseIgnored(s string) []string {
if s == "" {
return nil
}
var out []string
_ = json.Unmarshal([]byte(s), &out)
return out
}
func contains(ss []string, s string) bool {
for _, x := range ss {
if x == s {
return true
}
}
return false
}