diff --git a/.gitignore b/.gitignore index 42e84fc..8e1eece 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,9 @@ *.db-wal *.db-shm +# Временные файлы (smoke-тесты, мок-сервисы, песочница) +/tmp/ + # IDE /.idea/ /.vscode/ diff --git a/README.md b/README.md index 3af776c..5f185ea 100644 --- a/README.md +++ b/README.md @@ -32,9 +32,11 @@ Arr-стек (prowlarr/radarr/sonarr) плохо ложится на русск ## Статус -Ранняя разработка. Готов каркас (Ф0): загрузка TOML-конфига, SQLite + -миграции, slog-логи, HTTP-сервер с `/healthz`. Дальше — приём загрузок и -трекинг (Ф1). См. [дорожную карту](docs/drafts/roadmap.md). +Ранняя разработка. Готовы каркас (Ф0) и приём + трекинг (Ф1): добавление +magnet в qBittorrent, идемпотентность по infohash, поллинг завершения и +машина состояний (`downloading → completed`, плюс stuck/failed); наружу — +REST API, веб-UI и `jellybit add`. Источники кроме magnet (.torrent/url) и +распознавание (Ф2) — дальше. См. [дорожную карту](docs/drafts/roadmap.md). ## Документация diff --git a/cmd/jellybit/add.go b/cmd/jellybit/add.go new file mode 100644 index 0000000..7137cf8 --- /dev/null +++ b/cmd/jellybit/add.go @@ -0,0 +1,67 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// runAdd — тонкий CLI-клиент REST API запущенного сервиса (для отладки): +// +// jellybit add --context "..." --server http://localhost:8080 +func runAdd(args []string) error { + fs := flag.NewFlagSet("add", flag.ContinueOnError) + server := fs.String("server", "http://localhost:8080", "адрес запущенного jellybit") + contextStr := fs.String("context", "", "контекст для распознавания") + + // stdlib flag прекращает разбор на первом позиционном аргументе, поэтому + // magnet (если он идёт первым) вынимаем до Parse — так работают оба + // порядка: `add --context ...` и `add --context ... `. + var source string + if len(args) > 0 && !strings.HasPrefix(args[0], "-") { + source, args = args[0], args[1:] + } + if err := fs.Parse(args); err != nil { + return err + } + if source == "" { + if fs.NArg() < 1 { + return fmt.Errorf("usage: jellybit add [--context ...] [--server ...]") + } + source = fs.Arg(0) + } + + body, err := json.Marshal(map[string]string{"source": source, "context": *contextStr}) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + endpoint := strings.TrimRight(*server, "/") + "/api/downloads" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("call %s: %w", endpoint, err) + } + defer func() { _ = resp.Body.Close() }() + + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<16)) + if resp.StatusCode >= 400 { + return fmt.Errorf("server returned %d: %s", resp.StatusCode, strings.TrimSpace(string(respBody))) + } + fmt.Println(strings.TrimSpace(string(respBody))) + return nil +} diff --git a/cmd/jellybit/main.go b/cmd/jellybit/main.go index 69aa963..2657fe8 100644 --- a/cmd/jellybit/main.go +++ b/cmd/jellybit/main.go @@ -1,77 +1,38 @@ // Команда jellybit — связующий сервис qBittorrent ↔ Jellyfin. +// +// Подкоманды: +// +// jellybit [serve] --config запустить сервис (по умолчанию) +// jellybit add [--context] добавить загрузку через REST API сервиса package main import ( - "context" - "errors" - "flag" - "net/http" "os" - "os/signal" - "syscall" - "time" - - "git.vakhrushev.me/av/jellybit/internal/config" - "git.vakhrushev.me/av/jellybit/internal/httpapi" - "git.vakhrushev.me/av/jellybit/internal/logging" - "git.vakhrushev.me/av/jellybit/internal/store" + "strings" ) func main() { - if err := run(); err != nil { + args := os.Args[1:] + + // Первый позиционный аргумент (не флаг) — подкоманда. Без него (и при + // `--config ...`, как в Dockerfile ENTRYPOINT) запускаем сервис. + cmd := "serve" + if len(args) > 0 && !strings.HasPrefix(args[0], "-") { + cmd, args = args[0], args[1:] + } + + var err error + switch cmd { + case "serve": + err = runServe(args) + case "add": + err = runAdd(args) + default: + _, _ = os.Stderr.WriteString("unknown command: " + cmd + "\n") + os.Exit(2) + } + if err != nil { _, _ = os.Stderr.WriteString("fatal: " + err.Error() + "\n") os.Exit(1) } } - -func run() error { - configPath := flag.String("config", "/data/config.toml", "путь к config.toml") - flag.Parse() - - cfg, err := config.Load(*configPath) - if err != nil { - return err - } - - logger := logging.New(cfg.Log.Level, cfg.Log.Format) - logger.Info("starting jellybit", "config", *configPath) - - st, err := store.Open(cfg.Storage.DBPath) - if err != nil { - return err - } - defer func() { _ = st.Close() }() - logger.Info("database ready", "path", cfg.Storage.DBPath) - - srv := &http.Server{ - Addr: cfg.HTTP.Listen, - Handler: httpapi.NewRouter(logger), - ReadHeaderTimeout: 10 * time.Second, - } - - ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer stop() - - errCh := make(chan error, 1) - go func() { - logger.Info("http server listening", "addr", cfg.HTTP.Listen) - if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - errCh <- err - } - }() - - select { - case err := <-errCh: - return err - case <-ctx.Done(): - logger.Info("shutdown signal received") - } - - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := srv.Shutdown(shutdownCtx); err != nil { - return err - } - logger.Info("stopped") - return nil -} diff --git a/cmd/jellybit/serve.go b/cmd/jellybit/serve.go new file mode 100644 index 0000000..b28b205 --- /dev/null +++ b/cmd/jellybit/serve.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "net/http" + "os/signal" + "syscall" + "time" + + "git.vakhrushev.me/av/jellybit/internal/config" + "git.vakhrushev.me/av/jellybit/internal/httpapi" + "git.vakhrushev.me/av/jellybit/internal/ingest" + "git.vakhrushev.me/av/jellybit/internal/logging" + "git.vakhrushev.me/av/jellybit/internal/qbt" + "git.vakhrushev.me/av/jellybit/internal/store" + "git.vakhrushev.me/av/jellybit/internal/worker" +) + +// runServe запускает сервис: конфиг → хранилище → клиент qBittorrent → +// воркер (фоном) → HTTP-сервер; останавливается по SIGINT/SIGTERM. +func runServe(args []string) error { + fs := flag.NewFlagSet("serve", flag.ContinueOnError) + configPath := fs.String("config", "/data/config.toml", "путь к config.toml") + if err := fs.Parse(args); err != nil { + return err + } + + cfg, err := config.Load(*configPath) + if err != nil { + return err + } + + logger := logging.New(cfg.Log.Level, cfg.Log.Format) + logger.Info("starting jellybit", "config", *configPath) + + st, err := store.Open(cfg.Storage.DBPath) + if err != nil { + return err + } + defer func() { _ = st.Close() }() + logger.Info("database ready", "path", cfg.Storage.DBPath) + + qb, err := qbt.New(qbt.Config{ + URL: cfg.QBittorrent.URL, + Username: cfg.QBittorrent.Username, + Password: cfg.QBittorrent.Password, + }) + if err != nil { + return err + } + + ingestor := ingest.New(st, qb, ingest.Config{ + Category: cfg.QBittorrent.Category, + SavePath: cfg.QBittorrent.SavePath, + }, logger) + + wrk := worker.New(st, qb, worker.Config{ + Category: cfg.QBittorrent.Category, + SavePath: cfg.QBittorrent.SavePath, + PollInterval: cfg.Worker.PollInterval.Std(), + StuckAfter: cfg.Worker.StuckAfter.Std(), + MagnetTimeout: cfg.Worker.MagnetTimeout.Std(), + }, logger) + + router, err := httpapi.NewRouter(httpapi.Deps{ + Logger: logger, + Ingestor: ingestor, + Commander: wrk, + Reader: st, + }) + if err != nil { + return err + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + go wrk.Run(ctx) + + srv := &http.Server{ + Addr: cfg.HTTP.Listen, + Handler: router, + ReadHeaderTimeout: 10 * time.Second, + } + + errCh := make(chan error, 1) + go func() { + logger.Info("http server listening", "addr", cfg.HTTP.Listen) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + errCh <- err + } + }() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + logger.Info("shutdown signal received") + } + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := srv.Shutdown(shutdownCtx); err != nil { + return fmt.Errorf("http shutdown: %w", err) + } + logger.Info("stopped") + return nil +} diff --git a/internal/httpapi/httpapi.go b/internal/httpapi/httpapi.go index 34a795b..f065abb 100644 --- a/internal/httpapi/httpapi.go +++ b/internal/httpapi/httpapi.go @@ -1,33 +1,314 @@ -// Package httpapi предоставляет HTTP API и веб-UI (server-rendered + htmx). +// Package httpapi предоставляет HTTP API и веб-UI (server-rendered). // -// Сейчас — каркас: только /healthz. Эндпоинты приёма и ревью — в Ф1+. +// Тонкий транспорт над ядром: приём идёт в ingest, команды (cancel/retry) — +// в worker, чтение — в store. В v1 без авторизации (доверенная LAN). package httpapi import ( + "context" "encoding/json" + "errors" + "html/template" "log/slog" "net/http" + "net/url" + "strconv" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" + + "git.vakhrushev.me/av/jellybit/internal/ingest" + "git.vakhrushev.me/av/jellybit/internal/store" + "git.vakhrushev.me/av/jellybit/web" ) +// Ingestor принимает загрузку (ingest.Service). +type Ingestor interface { + Ingest(ctx context.Context, req ingest.Request) (ingest.Result, error) +} + +// Commander исполняет команды над задачей (worker.Worker). +type Commander interface { + Cancel(ctx context.Context, id int64) error + Retry(ctx context.Context, id int64) error +} + +// Reader читает задачи (store.Store). +type Reader interface { + ListDownloads(ctx context.Context) ([]store.Download, error) + GetDownload(ctx context.Context, id int64) (*store.Download, error) +} + +// Deps — зависимости транспорта. +type Deps struct { + Logger *slog.Logger + Ingestor Ingestor + Commander Commander + Reader Reader +} + +type server struct { + deps Deps + index *template.Template +} + // NewRouter собирает HTTP-обработчик сервиса. -func NewRouter(logger *slog.Logger) http.Handler { +func NewRouter(d Deps) (http.Handler, error) { + index, err := template.ParseFS(web.FS, "templates/index.html") + if err != nil { + return nil, err + } + s := &server{deps: d, index: index} + r := chi.NewRouter() r.Use(middleware.RequestID) r.Use(middleware.Recoverer) - r.Use(requestLogger(logger)) + r.Use(requestLogger(d.Logger)) r.Get("/healthz", handleHealthz) - return r + // Веб-UI. + r.Get("/", s.handleIndex) + r.Post("/ui/downloads", s.handleUIAdd) + r.Post("/ui/downloads/{id}/cancel", s.handleUICancel) + + // REST API. + r.Route("/api", func(r chi.Router) { + r.Get("/downloads", s.handleAPIList) + r.Post("/downloads", s.handleAPIAdd) + r.Get("/downloads/{id}", s.handleAPIGet) + r.Post("/downloads/{id}/cancel", s.handleAPICancel) + r.Post("/downloads/{id}/retry", s.handleAPIRetry) + }) + + return r, nil } func handleHealthz(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// --- Веб-UI --- + +type indexView struct { + Error string + Downloads []downloadView +} + +type downloadView struct { + ID int64 + Source string + Infohash string + Context string + State string + Error string + Terminal bool +} + +func (s *server) handleIndex(w http.ResponseWriter, r *http.Request) { + downloads, err := s.deps.Reader.ListDownloads(r.Context()) + if err != nil { + s.deps.Logger.Error("list downloads", "err", err) + http.Error(w, "internal error", http.StatusInternalServerError) + return + } + view := indexView{Error: r.URL.Query().Get("err")} + for _, d := range downloads { + view.Downloads = append(view.Downloads, toView(d)) + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + if err := s.index.Execute(w, view); err != nil { + s.deps.Logger.Error("render index", "err", err) + } +} + +func (s *server) handleUIAdd(w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + redirectErr(w, r, "не удалось разобрать форму") + return + } + _, err := s.deps.Ingestor.Ingest(r.Context(), ingest.Request{ + Source: r.PostForm.Get("source"), + Context: r.PostForm.Get("context"), + }) + if err != nil { + redirectErr(w, r, err.Error()) + return + } + http.Redirect(w, r, "/", http.StatusSeeOther) +} + +func (s *server) handleUICancel(w http.ResponseWriter, r *http.Request) { + id, err := pathID(r) + if err != nil { + redirectErr(w, r, "некорректный id") + return + } + if err := s.deps.Commander.Cancel(r.Context(), id); err != nil { + redirectErr(w, r, err.Error()) + return + } + http.Redirect(w, r, "/", http.StatusSeeOther) +} + +// --- REST API --- + +type downloadDTO struct { + ID int64 `json:"id"` + SourceType string `json:"source_type"` + Infohash string `json:"infohash,omitempty"` + Context string `json:"context,omitempty"` + State string `json:"state"` + ErrorCode string `json:"error_code,omitempty"` + ErrorMsg string `json:"error_msg,omitempty"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +type addRequest struct { + Source string `json:"source"` + Context string `json:"context"` +} + +type addResponse struct { + ID int64 `json:"id"` + Infohash string `json:"infohash"` + State string `json:"state"` + Deduplicated bool `json:"deduplicated"` +} + +func (s *server) handleAPIList(w http.ResponseWriter, r *http.Request) { + downloads, err := s.deps.Reader.ListDownloads(r.Context()) + if err != nil { + writeJSON(w, http.StatusInternalServerError, errJSON(err)) + return + } + out := make([]downloadDTO, 0, len(downloads)) + for _, d := range downloads { + out = append(out, toDTO(d)) + } + writeJSON(w, http.StatusOK, out) +} + +func (s *server) handleAPIGet(w http.ResponseWriter, r *http.Request) { + id, err := pathID(r) + if err != nil { + writeJSON(w, http.StatusBadRequest, errJSON(err)) + return + } + d, err := s.deps.Reader.GetDownload(r.Context(), id) + if err != nil { + writeJSON(w, http.StatusNotFound, errJSON(err)) + return + } + writeJSON(w, http.StatusOK, toDTO(*d)) +} + +func (s *server) handleAPIAdd(w http.ResponseWriter, r *http.Request) { + var req addRequest + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1<<16)).Decode(&req); err != nil { + writeJSON(w, http.StatusBadRequest, errJSON(err)) + return + } + res, err := s.deps.Ingestor.Ingest(r.Context(), ingest.Request{Source: req.Source, Context: req.Context}) + if err != nil { + writeJSON(w, http.StatusBadRequest, errJSON(err)) + return + } + status := http.StatusCreated + if res.Deduplicated { + status = http.StatusOK + } + writeJSON(w, status, addResponse{ + ID: res.DownloadID, + Infohash: res.Infohash, + State: string(res.State), + Deduplicated: res.Deduplicated, + }) +} + +func (s *server) handleAPICancel(w http.ResponseWriter, r *http.Request) { + s.apiCommand(w, r, s.deps.Commander.Cancel) +} + +func (s *server) handleAPIRetry(w http.ResponseWriter, r *http.Request) { + s.apiCommand(w, r, s.deps.Commander.Retry) +} + +func (s *server) apiCommand(w http.ResponseWriter, r *http.Request, cmd func(context.Context, int64) error) { + id, err := pathID(r) + if err != nil { + writeJSON(w, http.StatusBadRequest, errJSON(err)) + return + } + if err := cmd(r.Context(), id); err != nil { + writeJSON(w, http.StatusConflict, errJSON(err)) + return + } + d, err := s.deps.Reader.GetDownload(r.Context(), id) + if err != nil { + writeJSON(w, http.StatusOK, map[string]int64{"id": id}) + return + } + writeJSON(w, http.StatusOK, toDTO(*d)) +} + +// --- helpers --- + +func toDTO(d store.Download) downloadDTO { + return downloadDTO{ + ID: d.ID, + SourceType: string(d.SourceType), + Infohash: d.Infohash.String, + Context: d.Context, + State: string(d.State), + ErrorCode: d.ErrorCode.String, + ErrorMsg: d.ErrorMsg.String, + CreatedAt: d.CreatedAt, + UpdatedAt: d.UpdatedAt, + } +} + +func toView(d store.Download) downloadView { + return downloadView{ + ID: d.ID, + Source: shorten(d.SourceRef, 64), + Infohash: d.Infohash.String, + Context: d.Context, + State: string(d.State), + Error: d.ErrorMsg.String, + Terminal: d.State.IsTerminal(), + } +} + +func shorten(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] + "…" +} + +func pathID(r *http.Request) (int64, error) { + id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) + if err != nil { + return 0, errors.New("invalid id") + } + return id, nil +} + +func redirectErr(w http.ResponseWriter, r *http.Request, msg string) { + http.Redirect(w, r, "/?err="+url.QueryEscape(msg), http.StatusSeeOther) +} + +func writeJSON(w http.ResponseWriter, status int, v any) { w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} + +func errJSON(err error) map[string]string { + return map[string]string{"error": err.Error()} } // requestLogger пишет структурированный лог по каждому запросу. diff --git a/internal/httpapi/httpapi_test.go b/internal/httpapi/httpapi_test.go new file mode 100644 index 0000000..ded9f8b --- /dev/null +++ b/internal/httpapi/httpapi_test.go @@ -0,0 +1,177 @@ +package httpapi_test + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "git.vakhrushev.me/av/jellybit/internal/httpapi" + "git.vakhrushev.me/av/jellybit/internal/ingest" + "git.vakhrushev.me/av/jellybit/internal/store" +) + +type fakeIngestor struct { + res ingest.Result + err error + lastReq ingest.Request +} + +func (f *fakeIngestor) Ingest(_ context.Context, req ingest.Request) (ingest.Result, error) { + f.lastReq = req + return f.res, f.err +} + +type fakeCommander struct { + cancelled []int64 + retried []int64 + err error +} + +func (f *fakeCommander) Cancel(_ context.Context, id int64) error { + if f.err != nil { + return f.err + } + f.cancelled = append(f.cancelled, id) + return nil +} + +func (f *fakeCommander) Retry(_ context.Context, id int64) error { + if f.err != nil { + return f.err + } + f.retried = append(f.retried, id) + return nil +} + +type fakeReader struct { + list []store.Download + get *store.Download +} + +func (f *fakeReader) ListDownloads(_ context.Context) ([]store.Download, error) { return f.list, nil } + +func (f *fakeReader) GetDownload(_ context.Context, id int64) (*store.Download, error) { + if f.get != nil { + return f.get, nil + } + return &store.Download{ID: id, State: store.StateCancelled}, nil +} + +func newServer(t *testing.T, d httpapi.Deps) *httptest.Server { + t.Helper() + if d.Logger == nil { + d.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + } + h, err := httpapi.NewRouter(d) + if err != nil { + t.Fatalf("NewRouter: %v", err) + } + srv := httptest.NewServer(h) + t.Cleanup(srv.Close) + return srv +} + +func TestAPIAdd(t *testing.T) { + ing := &fakeIngestor{res: ingest.Result{DownloadID: 1, Infohash: "abc", State: store.StateDownloading}} + srv := newServer(t, httpapi.Deps{Ingestor: ing, Commander: &fakeCommander{}, Reader: &fakeReader{}}) + + resp, err := http.Post(srv.URL+"/api/downloads", "application/json", + strings.NewReader(`{"source":"magnet:?xt=urn:btih:abc","context":"Дюна"}`)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusCreated { + t.Fatalf("status = %d, want 201", resp.StatusCode) + } + var got map[string]any + _ = json.NewDecoder(resp.Body).Decode(&got) + if got["id"].(float64) != 1 || got["state"] != "downloading" { + t.Errorf("body = %v", got) + } + if ing.lastReq.Context != "Дюна" { + t.Errorf("контекст не проброшен: %q", ing.lastReq.Context) + } +} + +func TestAPIAddBadInput(t *testing.T) { + ing := &fakeIngestor{err: ingestErr("bad magnet")} + srv := newServer(t, httpapi.Deps{Ingestor: ing, Commander: &fakeCommander{}, Reader: &fakeReader{}}) + + resp, err := http.Post(srv.URL+"/api/downloads", "application/json", strings.NewReader(`{"source":"x"}`)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("status = %d, want 400", resp.StatusCode) + } +} + +func TestAPIList(t *testing.T) { + reader := &fakeReader{list: []store.Download{ + {ID: 2, SourceType: store.SourceMagnet, State: store.StateCompleted, Infohash: store.NullString("abc")}, + {ID: 1, SourceType: store.SourceMagnet, State: store.StateDownloading}, + }} + srv := newServer(t, httpapi.Deps{Ingestor: &fakeIngestor{}, Commander: &fakeCommander{}, Reader: reader}) + + resp, err := http.Get(srv.URL + "/api/downloads") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + var got []map[string]any + _ = json.NewDecoder(resp.Body).Decode(&got) + if len(got) != 2 { + t.Fatalf("len = %d, want 2", len(got)) + } + if got[0]["state"] != "completed" || got[0]["infohash"] != "abc" { + t.Errorf("first = %v", got[0]) + } +} + +func TestAPICancel(t *testing.T) { + cmd := &fakeCommander{} + srv := newServer(t, httpapi.Deps{Ingestor: &fakeIngestor{}, Commander: cmd, Reader: &fakeReader{}}) + + resp, err := http.Post(srv.URL+"/api/downloads/5/cancel", "", nil) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d, want 200", resp.StatusCode) + } + if len(cmd.cancelled) != 1 || cmd.cancelled[0] != 5 { + t.Errorf("cancel вызван неверно: %v", cmd.cancelled) + } +} + +func TestIndexRenders(t *testing.T) { + reader := &fakeReader{list: []store.Download{ + {ID: 1, SourceType: store.SourceMagnet, SourceRef: "magnet:?xt=urn:btih:abc", State: store.StateDownloading}, + }} + srv := newServer(t, httpapi.Deps{Ingestor: &fakeIngestor{}, Commander: &fakeCommander{}, Reader: reader}) + + resp, err := http.Get(srv.URL + "/") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d", resp.StatusCode) + } + if !strings.Contains(string(body), "jellybit") || !strings.Contains(string(body), "downloading") { + t.Error("страница не содержит ожидаемого контента") + } +} + +type ingestErr string + +func (e ingestErr) Error() string { return string(e) } diff --git a/internal/ingest/doc.go b/internal/ingest/doc.go deleted file mode 100644 index e935732..0000000 --- a/internal/ingest/doc.go +++ /dev/null @@ -1,4 +0,0 @@ -// Package ingest — use-case приёма загрузки, общий для всех транспортов. -// -// Заглушка: реализация в фазе Ф1 (см. docs/specs/architecture.md). -package ingest diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go new file mode 100644 index 0000000..5ddbc6c --- /dev/null +++ b/internal/ingest/ingest.go @@ -0,0 +1,120 @@ +// Package ingest — use-case приёма загрузки, общий для всех транспортов +// (HTTP, Telegram, CLI). Принимает источник + контекст, отдаёт источник в +// qBittorrent и заводит/находит задачу в БД. +package ingest + +import ( + "context" + "fmt" + "log/slog" + "strings" + + "git.vakhrushev.me/av/jellybit/internal/magnet" + "git.vakhrushev.me/av/jellybit/internal/qbt" + "git.vakhrushev.me/av/jellybit/internal/store" +) + +// Store — нужная ingest часть хранилища. +type Store interface { + FindActiveByInfohash(ctx context.Context, infohash string) (*store.Download, error) + CreateDownload(ctx context.Context, d *store.Download) (int64, error) + SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error +} + +// QBittorrent — нужная ingest часть клиента qBittorrent. +type QBittorrent interface { + Add(ctx context.Context, ar qbt.AddRequest) error +} + +// Config — параметры добавления в qBittorrent. +type Config struct { + Category string + SavePath string +} + +// Service — реализация приёма. +type Service struct { + store Store + qbt QBittorrent + cfg Config + log *slog.Logger +} + +// New собирает сервис приёма. +func New(st Store, qb QBittorrent, cfg Config, log *slog.Logger) *Service { + return &Service{store: st, qbt: qb, cfg: cfg, log: log} +} + +// Request — входной запрос приёма. +type Request struct { + Source string // пока — magnet-ссылка + Context string // подсказка для распознавания (опц.) +} + +// Result — итог приёма. +type Result struct { + DownloadID int64 + Infohash string + State store.State + Deduplicated bool // присоединились к уже активной задаче, нового добавления не было +} + +// Ingest принимает источник: извлекает infohash, дедуплицирует по активной +// задаче, иначе заводит задачу и отдаёт источник в qBittorrent. +func (s *Service) Ingest(ctx context.Context, req Request) (Result, error) { + source := strings.TrimSpace(req.Source) + info, err := magnet.Parse(source) + if err != nil { + // Ф1: поддержан только magnet. .torrent/url — следующий заход. + return Result{}, fmt.Errorf("ingest: %w", err) + } + + if existing, err := s.store.FindActiveByInfohash(ctx, info.Infohash); err != nil { + return Result{}, fmt.Errorf("ingest: lookup active: %w", err) + } else if existing != nil { + s.log.Info("ingest: attached to active download", + "download_id", existing.ID, "infohash", info.Infohash, "state", existing.State) + return Result{ + DownloadID: existing.ID, + Infohash: info.Infohash, + State: existing.State, + Deduplicated: true, + }, nil + } + + d := &store.Download{ + SourceType: store.SourceMagnet, + SourceRef: source, + Context: req.Context, + Infohash: store.NullString(info.Infohash), + IdempotencyKey: store.NullString(info.Infohash), + State: store.StateDownloading, + } + id, err := s.store.CreateDownload(ctx, d) + if err != nil { + return Result{}, fmt.Errorf("ingest: create download: %w", err) + } + + addErr := s.qbt.Add(ctx, qbt.AddRequest{ + URLs: []string{source}, + Category: s.cfg.Category, + SavePath: s.cfg.SavePath, + }) + if addErr != nil { + // Задача уже в БД — помечаем failed, чтобы worker её не подхватил. + if setErr := s.store.SetDownloadState(ctx, id, store.StateFailed, "qbit_add", addErr.Error()); setErr != nil { + s.log.Error("ingest: failed to mark download failed after qbit error", + "download_id", id, "err", setErr) + } + return Result{DownloadID: id, Infohash: info.Infohash, State: store.StateFailed}, + fmt.Errorf("ingest: add to qbittorrent: %w", addErr) + } + + s.log.Info("ingest: download accepted", + "download_id", id, "infohash", info.Infohash, "category", s.cfg.Category) + return Result{ + DownloadID: id, + Infohash: info.Infohash, + State: store.StateDownloading, + }, nil +} diff --git a/internal/ingest/ingest_test.go b/internal/ingest/ingest_test.go new file mode 100644 index 0000000..ddec56f --- /dev/null +++ b/internal/ingest/ingest_test.go @@ -0,0 +1,140 @@ +package ingest + +import ( + "context" + "errors" + "io" + "log/slog" + "testing" + + "git.vakhrushev.me/av/jellybit/internal/qbt" + "git.vakhrushev.me/av/jellybit/internal/store" +) + +const sampleMagnet = "magnet:?xt=urn:btih:541ADCFF3B6DD5DBA7088EA83317D9D6FAC331D6&dn=Dune" + +const sampleInfohash = "541adcff3b6dd5dba7088ea83317d9d6fac331d6" + +type fakeStore struct { + active *store.Download + created []store.Download + nextID int64 + stateCalls []stateCall +} + +type stateCall struct { + id int64 + state store.State + code string + msg string +} + +func (f *fakeStore) FindActiveByInfohash(_ context.Context, _ string) (*store.Download, error) { + return f.active, nil +} + +func (f *fakeStore) CreateDownload(_ context.Context, d *store.Download) (int64, error) { + f.nextID++ + d.ID = f.nextID + f.created = append(f.created, *d) + return f.nextID, nil +} + +func (f *fakeStore) SetDownloadState(_ context.Context, id int64, st store.State, code, msg string) error { + f.stateCalls = append(f.stateCalls, stateCall{id, st, code, msg}) + return nil +} + +type fakeQbt struct { + added []qbt.AddRequest + err error +} + +func (f *fakeQbt) Add(_ context.Context, ar qbt.AddRequest) error { + if f.err != nil { + return f.err + } + f.added = append(f.added, ar) + return nil +} + +func newService(st Store, qb QBittorrent) *Service { + return New(st, qb, Config{Category: "jellybit", SavePath: "/srv/media/downloads"}, + slog.New(slog.NewTextHandler(io.Discard, nil))) +} + +func TestIngestHappyPath(t *testing.T) { + fs := &fakeStore{} + fq := &fakeQbt{} + res, err := newService(fs, fq).Ingest(context.Background(), Request{Source: sampleMagnet, Context: "Дюна 2"}) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + if res.Infohash != sampleInfohash { + t.Errorf("infohash = %q", res.Infohash) + } + if res.State != store.StateDownloading || res.Deduplicated { + t.Errorf("res = %+v", res) + } + if len(fs.created) != 1 { + t.Fatalf("создано задач: %d, want 1", len(fs.created)) + } + if got := fs.created[0]; got.Context != "Дюна 2" || got.Infohash.String != sampleInfohash { + t.Errorf("сохранённая задача: %+v", got) + } + if len(fq.added) != 1 { + t.Fatalf("вызовов qbt.Add: %d, want 1", len(fq.added)) + } + add := fq.added[0] + if len(add.URLs) != 1 || add.URLs[0] != sampleMagnet { + t.Errorf("URLs = %v", add.URLs) + } + if add.Category != "jellybit" || add.SavePath != "/srv/media/downloads" { + t.Errorf("category/savepath = %q/%q", add.Category, add.SavePath) + } +} + +func TestIngestIdempotent(t *testing.T) { + existing := &store.Download{ID: 7, State: store.StateDownloading} + fs := &fakeStore{active: existing} + fq := &fakeQbt{} + res, err := newService(fs, fq).Ingest(context.Background(), Request{Source: sampleMagnet}) + if err != nil { + t.Fatalf("Ingest: %v", err) + } + if !res.Deduplicated || res.DownloadID != 7 { + t.Errorf("ожидалось присоединение к задаче 7: %+v", res) + } + if len(fs.created) != 0 { + t.Error("не должно создаваться новой задачи") + } + if len(fq.added) != 0 { + t.Error("не должно быть повторного добавления в qBittorrent") + } +} + +func TestIngestQbitErrorMarksFailed(t *testing.T) { + fs := &fakeStore{} + fq := &fakeQbt{err: errors.New("connection refused")} + res, err := newService(fs, fq).Ingest(context.Background(), Request{Source: sampleMagnet}) + if err == nil { + t.Fatal("ожидалась ошибка") + } + if res.State != store.StateFailed { + t.Errorf("state = %q, want failed", res.State) + } + if len(fs.stateCalls) != 1 || fs.stateCalls[0].state != store.StateFailed { + t.Errorf("ожидался перевод в failed: %+v", fs.stateCalls) + } +} + +func TestIngestRejectsNonMagnet(t *testing.T) { + fs := &fakeStore{} + fq := &fakeQbt{} + if _, err := newService(fs, fq).Ingest(context.Background(), Request{Source: "https://example.com/x.torrent"}); err == nil { + t.Fatal("ожидалась ошибка для не-magnet источника") + } + if len(fs.created) != 0 || len(fq.added) != 0 { + t.Error("не должно быть ни записи, ни добавления") + } +} diff --git a/internal/magnet/magnet.go b/internal/magnet/magnet.go new file mode 100644 index 0000000..e845e7e --- /dev/null +++ b/internal/magnet/magnet.go @@ -0,0 +1,101 @@ +// Package magnet разбирает magnet-ссылки: извлекает infohash и метаданные. +// +// infohash нормализуется к нижнему hex — в этом же виде его отдаёт +// qBittorrent (поля hash/infohash_v1/infohash_v2), что позволяет +// сопоставлять задачи с торрентами при поллинге. +package magnet + +import ( + "encoding/base32" + "encoding/hex" + "errors" + "fmt" + "net/url" + "strings" +) + +// Info — разобранная magnet-ссылка. +type Info struct { + Infohash string // нормализованный нижний hex (40 для v1, 64 для v2) + DisplayName string // dn — человекочитаемое имя, если задано + Trackers []string // tr — трекеры +} + +// ErrNotMagnet возвращается, если строка не является magnet-ссылкой. +var ErrNotMagnet = errors.New("not a magnet link") + +// Parse разбирает magnet-ссылку. Поддерживаются btih (v1: 40-hex или +// 32-символьный base32) и btmh (v2: sha256-multihash). При нескольких xt +// предпочитается v1. +func Parse(raw string) (Info, error) { + raw = strings.TrimSpace(raw) + u, err := url.Parse(raw) + if err != nil || !strings.EqualFold(u.Scheme, "magnet") { + return Info{}, ErrNotMagnet + } + vals := u.Query() + + var v1, v2 string + for _, xt := range vals["xt"] { + switch { + case strings.HasPrefix(xt, "urn:btih:"): + if h, err := normalizeBTIH(strings.TrimPrefix(xt, "urn:btih:")); err == nil && v1 == "" { + v1 = h + } + case strings.HasPrefix(xt, "urn:btmh:"): + if h, err := normalizeBTMH(strings.TrimPrefix(xt, "urn:btmh:")); err == nil && v2 == "" { + v2 = h + } + } + } + + infohash := v1 + if infohash == "" { + infohash = v2 + } + if infohash == "" { + return Info{}, fmt.Errorf("magnet without a usable infohash (xt)") + } + + return Info{ + Infohash: infohash, + DisplayName: vals.Get("dn"), + Trackers: vals["tr"], + }, nil +} + +// normalizeBTIH нормализует v1-infohash (SHA-1, 20 байт) к нижнему hex. +func normalizeBTIH(h string) (string, error) { + switch len(h) { + case 40: // hex + if _, err := hex.DecodeString(h); err != nil { + return "", fmt.Errorf("btih hex: %w", err) + } + return strings.ToLower(h), nil + case 32: // base32 (RFC 4648, без паддинга) + b, err := base32.StdEncoding.DecodeString(strings.ToUpper(h)) + if err != nil { + return "", fmt.Errorf("btih base32: %w", err) + } + if len(b) != 20 { + return "", fmt.Errorf("btih base32: got %d bytes, want 20", len(b)) + } + return hex.EncodeToString(b), nil + default: + return "", fmt.Errorf("btih: unexpected length %d", len(h)) + } +} + +// normalizeBTMH нормализует v2-infohash. Multihash sha256 имеет вид +// 1220<64-hex>; возвращаем сами 64-hex (так его отдаёт qBittorrent в +// infohash_v2). +func normalizeBTMH(h string) (string, error) { + h = strings.ToLower(h) + if _, err := hex.DecodeString(h); err != nil { + return "", fmt.Errorf("btmh hex: %w", err) + } + if len(h) != 68 || !strings.HasPrefix(h, "1220") { + return "", fmt.Errorf("btmh: unsupported multihash %q", h) + } + return h[4:], nil +} diff --git a/internal/magnet/magnet_test.go b/internal/magnet/magnet_test.go new file mode 100644 index 0000000..d0e270b --- /dev/null +++ b/internal/magnet/magnet_test.go @@ -0,0 +1,78 @@ +package magnet + +import "testing" + +func TestParse(t *testing.T) { + tests := []struct { + name string + raw string + infohash string + dn string + trackers int + }{ + { + name: "btih hex (из BRIEF)", + raw: "magnet:?xt=urn:btih:541ADCFF3B6DD5DBA7088EA83317D9D6FAC331D6&tr=http%3A%2F%2Fbt.t-ru.org%2Fann%3Fmagnet&dn=rutracker-topic-6514485", + infohash: "541adcff3b6dd5dba7088ea83317d9d6fac331d6", + dn: "rutracker-topic-6514485", + trackers: 1, + }, + { + name: "btih base32 (20 нулевых байт)", + raw: "magnet:?xt=urn:btih:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + infohash: "0000000000000000000000000000000000000000", + }, + { + name: "btmh v2 sha256", + raw: "magnet:?xt=urn:btmh:12200123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", + infohash: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got, err := Parse(tc.raw) + if err != nil { + t.Fatalf("Parse: %v", err) + } + if got.Infohash != tc.infohash { + t.Errorf("infohash = %q, want %q", got.Infohash, tc.infohash) + } + if got.DisplayName != tc.dn { + t.Errorf("dn = %q, want %q", got.DisplayName, tc.dn) + } + if len(got.Trackers) != tc.trackers { + t.Errorf("trackers = %d, want %d", len(got.Trackers), tc.trackers) + } + }) + } +} + +func TestParseErrors(t *testing.T) { + cases := []string{ + "https://example.com/file.torrent", // не magnet + "magnet:?dn=no-infohash", // нет xt + "magnet:?xt=urn:btih:zzzz", // некорректный hash + "", // пусто + } + for _, raw := range cases { + if _, err := Parse(raw); err == nil { + t.Errorf("Parse(%q): ожидалась ошибка", raw) + } + } +} + +func TestParseNormalisesCase(t *testing.T) { + lower := "magnet:?xt=urn:btih:541adcff3b6dd5dba7088ea83317d9d6fac331d6" + upper := "magnet:?xt=urn:btih:541ADCFF3B6DD5DBA7088EA83317D9D6FAC331D6" + a, err := Parse(lower) + if err != nil { + t.Fatal(err) + } + b, err := Parse(upper) + if err != nil { + t.Fatal(err) + } + if a.Infohash != b.Infohash { + t.Errorf("регистр влияет: %q != %q", a.Infohash, b.Infohash) + } +} diff --git a/internal/qbt/doc.go b/internal/qbt/doc.go deleted file mode 100644 index 6916f06..0000000 --- a/internal/qbt/doc.go +++ /dev/null @@ -1,4 +0,0 @@ -// Package qbt — клиент qBittorrent WebUI API (сессия, добавление, опрос). -// -// Заглушка: реализация в фазе Ф1 (см. docs/specs/architecture.md). -package qbt diff --git a/internal/qbt/qbt.go b/internal/qbt/qbt.go new file mode 100644 index 0000000..c15a4a7 --- /dev/null +++ b/internal/qbt/qbt.go @@ -0,0 +1,219 @@ +// Package qbt — клиент qBittorrent WebUI API (v2): сессия, добавление +// торрента, опрос задач по категории. +// +// Логин ленивый: cookie-сессия устанавливается при первом 403 и повторно +// при её протухании. Источник (magnet/.torrent) отдаём qBittorrent — он сам +// качает, jellybit не делает исходящих запросов на пользовательский URL. +package qbt + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/http/cookiejar" + "net/url" + "strconv" + "strings" + "sync" + "time" +) + +// Config — параметры подключения к qBittorrent WebUI. +type Config struct { + URL string + Username string + Password string + Timeout time.Duration +} + +// Client — клиент qBittorrent WebUI API. +type Client struct { + base *url.URL + hc *http.Client + user string + pass string + mu sync.Mutex // сериализует логин +} + +// Torrent — подмножество полей /torrents/info, нужное jellybit. +type Torrent struct { + Hash string `json:"hash"` + Name string `json:"name"` + State string `json:"state"` + SavePath string `json:"save_path"` + ContentPath string `json:"content_path"` + Category string `json:"category"` + Progress float64 `json:"progress"` + AmountLeft int64 `json:"amount_left"` + AddedOn int64 `json:"added_on"` + InfohashV1 string `json:"infohash_v1"` + InfohashV2 string `json:"infohash_v2"` +} + +// AddRequest — параметры добавления торрента. +type AddRequest struct { + URLs []string // magnet/URL-ссылки + Torrents [][]byte // .torrent-файлы (Ф1 не использует) + Category string + SavePath string + Paused bool +} + +// New создаёт клиент с собственным cookie-jar. +func New(cfg Config) (*Client, error) { + base, err := url.Parse(strings.TrimRight(cfg.URL, "/")) + if err != nil { + return nil, fmt.Errorf("parse qbittorrent url %q: %w", cfg.URL, err) + } + jar, err := cookiejar.New(nil) + if err != nil { + return nil, fmt.Errorf("cookie jar: %w", err) + } + timeout := cfg.Timeout + if timeout == 0 { + timeout = 30 * time.Second + } + return &Client{ + base: base, + hc: &http.Client{Jar: jar, Timeout: timeout}, + user: cfg.Username, + pass: cfg.Password, + }, nil +} + +func (c *Client) endpoint(path string) string { return c.base.String() + path } + +// login устанавливает cookie-сессию. Сериализован, чтобы параллельные +// вызовы (поллинг + приём) не логинились наперегонки. +func (c *Client) login(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + form := url.Values{"username": {c.user}, "password": {c.pass}} + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + c.endpoint("/api/v2/auth/login"), strings.NewReader(form.Encode())) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Referer", c.base.String()) // qBit проверяет Referer/Host + resp, err := c.hc.Do(req) + if err != nil { + return fmt.Errorf("qbittorrent login: %w", err) + } + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10)) + if resp.StatusCode != http.StatusOK || strings.TrimSpace(string(body)) != "Ok." { + return fmt.Errorf("qbittorrent login failed: status %d body %q", + resp.StatusCode, strings.TrimSpace(string(body))) + } + return nil +} + +// do выполняет запрос; при 403 один раз перелогинивается и повторяет. +// build вызывается заново для повтора, т.к. тело запроса одноразовое. +func (c *Client) do(ctx context.Context, build func() (*http.Request, error)) (*http.Response, error) { + req, err := build() + if err != nil { + return nil, err + } + resp, err := c.hc.Do(req) + if err != nil { + return nil, err + } + if resp.StatusCode == http.StatusForbidden { + _ = resp.Body.Close() + if err := c.login(ctx); err != nil { + return nil, err + } + req2, err := build() + if err != nil { + return nil, err + } + return c.hc.Do(req2) + } + return resp, nil +} + +// Add добавляет торрент(ы) в qBittorrent. +func (c *Client) Add(ctx context.Context, ar AddRequest) error { + var buf bytes.Buffer + mw := multipart.NewWriter(&buf) + if len(ar.URLs) > 0 { + _ = mw.WriteField("urls", strings.Join(ar.URLs, "\n")) + } + if ar.Category != "" { + _ = mw.WriteField("category", ar.Category) + } + if ar.SavePath != "" { + _ = mw.WriteField("savepath", ar.SavePath) + } + _ = mw.WriteField("paused", strconv.FormatBool(ar.Paused)) + for i, data := range ar.Torrents { + fw, err := mw.CreateFormFile("torrents", fmt.Sprintf("file%d.torrent", i)) + if err != nil { + return fmt.Errorf("qbittorrent add: form file: %w", err) + } + if _, err := fw.Write(data); err != nil { + return fmt.Errorf("qbittorrent add: write file: %w", err) + } + } + if err := mw.Close(); err != nil { + return fmt.Errorf("qbittorrent add: close multipart: %w", err) + } + contentType := mw.FormDataContentType() + payload := buf.Bytes() + + resp, err := c.do(ctx, func() (*http.Request, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + c.endpoint("/api/v2/torrents/add"), bytes.NewReader(payload)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", contentType) + req.Header.Set("Referer", c.base.String()) + return req, nil + }) + if err != nil { + return fmt.Errorf("qbittorrent add: %w", err) + } + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10)) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("qbittorrent add: status %d body %q", + resp.StatusCode, strings.TrimSpace(string(body))) + } + if strings.TrimSpace(string(body)) == "Fails." { + return fmt.Errorf("qbittorrent add: rejected (Fails.)") + } + return nil +} + +// Torrents возвращает задачи указанной категории (пустая — все). +func (c *Client) Torrents(ctx context.Context, category string) ([]Torrent, error) { + resp, err := c.do(ctx, func() (*http.Request, error) { + u := c.endpoint("/api/v2/torrents/info") + if category != "" { + u += "?category=" + url.QueryEscape(category) + } + return http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + }) + if err != nil { + return nil, fmt.Errorf("qbittorrent info: %w", err) + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10)) + return nil, fmt.Errorf("qbittorrent info: status %d body %q", + resp.StatusCode, strings.TrimSpace(string(body))) + } + var ts []Torrent + if err := json.NewDecoder(resp.Body).Decode(&ts); err != nil { + return nil, fmt.Errorf("decode qbittorrent info: %w", err) + } + return ts, nil +} diff --git a/internal/qbt/qbt_test.go b/internal/qbt/qbt_test.go new file mode 100644 index 0000000..17ad952 --- /dev/null +++ b/internal/qbt/qbt_test.go @@ -0,0 +1,118 @@ +package qbt + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +// fakeQBittorrent — минимальный стенд WebUI API: требует cookie SID, выдаёт +// его на /auth/login. Так проверяется и ленивый логин по 403. +func fakeQBittorrent(t *testing.T, info string) *httptest.Server { + t.Helper() + mux := http.NewServeMux() + + mux.HandleFunc("/api/v2/auth/login", func(w http.ResponseWriter, r *http.Request) { + _ = r.ParseForm() + if r.PostForm.Get("username") != "admin" || r.PostForm.Get("password") != "secret" { + w.WriteHeader(http.StatusForbidden) + return + } + http.SetCookie(w, &http.Cookie{Name: "SID", Value: "token", Path: "/"}) + _, _ = w.Write([]byte("Ok.")) + }) + + authed := func(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if c, err := r.Cookie("SID"); err != nil || c.Value != "token" { + w.WriteHeader(http.StatusForbidden) + return + } + next(w, r) + } + } + + mux.HandleFunc("/api/v2/torrents/add", authed(func(w http.ResponseWriter, r *http.Request) { + if err := r.ParseMultipartForm(1 << 20); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if r.FormValue("category") != "jellybit" || !strings.Contains(r.FormValue("urls"), "magnet:") { + w.WriteHeader(http.StatusBadRequest) + return + } + _, _ = w.Write([]byte("Ok.")) + })) + + mux.HandleFunc("/api/v2/torrents/info", authed(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Query().Get("category") != "jellybit" { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(info)) + })) + + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + return srv +} + +func newClient(t *testing.T, url string) *Client { + t.Helper() + c, err := New(Config{URL: url, Username: "admin", Password: "secret"}) + if err != nil { + t.Fatalf("New: %v", err) + } + return c +} + +func TestAddPerformsLazyLogin(t *testing.T) { + srv := fakeQBittorrent(t, "[]") + c := newClient(t, srv.URL) + + // Первый вызов без cookie → сервер вернёт 403 → клиент логинится и повторяет. + err := c.Add(context.Background(), AddRequest{ + URLs: []string{"magnet:?xt=urn:btih:541adcff3b6dd5dba7088ea83317d9d6fac331d6"}, + Category: "jellybit", + SavePath: "/srv/media/downloads", + }) + if err != nil { + t.Fatalf("Add: %v", err) + } +} + +func TestTorrents(t *testing.T) { + const body = `[{"hash":"541adcff3b6dd5dba7088ea83317d9d6fac331d6","name":"Dune","state":"uploading","save_path":"/srv/media/downloads","content_path":"/srv/media/downloads/Dune","progress":1.0,"amount_left":0}]` + srv := fakeQBittorrent(t, body) + c := newClient(t, srv.URL) + + ts, err := c.Torrents(context.Background(), "jellybit") + if err != nil { + t.Fatalf("Torrents: %v", err) + } + if len(ts) != 1 { + t.Fatalf("torrents = %d, want 1", len(ts)) + } + got := ts[0] + if got.Hash != "541adcff3b6dd5dba7088ea83317d9d6fac331d6" || got.State != "uploading" { + t.Errorf("torrent = %+v", got) + } + if got.ContentPath != "/srv/media/downloads/Dune" { + t.Errorf("content_path = %q", got.ContentPath) + } +} + +func TestLoginFailure(t *testing.T) { + srv := fakeQBittorrent(t, "[]") + c, err := New(Config{URL: srv.URL, Username: "admin", Password: "wrong"}) + if err != nil { + t.Fatal(err) + } + // 403 → попытка логина с неверным паролем → снова 403 → ошибка. + if _, err := c.Torrents(context.Background(), "jellybit"); err == nil { + t.Error("ожидалась ошибка логина") + } +} diff --git a/internal/store/download.go b/internal/store/download.go new file mode 100644 index 0000000..edbb614 --- /dev/null +++ b/internal/store/download.go @@ -0,0 +1,199 @@ +package store + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + "time" +) + +// State — состояние загрузки в машине состояний (см. architecture.md). +// В Ф1 используется подмножество: downloading → completed, плюс stuck, +// failed, cancelled. Остальные состояния заведены под будущие фазы. +type State string + +const ( + StateDownloading State = "downloading" + StateCompleted State = "completed" + StateRecognizing State = "recognizing" // Ф2 + StateReview State = "review" // Ф3 + StateLinking State = "linking" // Ф3 + StateDone State = "done" // Ф3 + StateDeferred State = "deferred" // Ф3 + StateStuck State = "stuck" + StateFailed State = "failed" + StateCancelled State = "cancelled" + StateReverted State = "reverted" // Ф3 +) + +// IsTerminal сообщает, завершена ли задача окончательно. Для терминальных +// состояний снимается ключ идемпотентности — тот же infohash можно завести +// заново новой задачей (см. architecture.md, «повторное добавление»). +// stuck терминальным не считается: задача восстановима (retry). +func (s State) IsTerminal() bool { + switch s { + case StateDone, StateCancelled, StateFailed, StateReverted: + return true + default: + return false + } +} + +// SourceType — вид источника загрузки. +type SourceType string + +const ( + SourceMagnet SourceType = "magnet" + SourceTorrent SourceType = "torrent" + SourceURL SourceType = "url" +) + +// Download — строка таблицы download. +type Download struct { + ID int64 `db:"id"` + SourceType SourceType `db:"source_type"` + SourceRef string `db:"source_ref"` + Context string `db:"context"` + Infohash sql.NullString `db:"infohash"` + IdempotencyKey sql.NullString `db:"idempotency_key"` + State State `db:"state"` + ErrorCode sql.NullString `db:"error_code"` + ErrorMsg sql.NullString `db:"error_msg"` + CreatedAt string `db:"created_at"` + UpdatedAt string `db:"updated_at"` +} + +// sqliteTimeLayout — формат меток datetime('now') в SQLite (UTC). +const sqliteTimeLayout = "2006-01-02 15:04:05" + +// ParseTime разбирает временную метку SQLite (datetime('now'), всегда UTC). +func ParseTime(s string) (time.Time, error) { + return time.ParseInLocation(sqliteTimeLayout, s, time.UTC) +} + +// CreatedTime возвращает время создания загрузки как time.Time (UTC). +func (d Download) CreatedTime() (time.Time, error) { return ParseTime(d.CreatedAt) } + +// NullString строит sql.NullString: пустая строка → NULL. +func NullString(s string) sql.NullString { + return sql.NullString{String: s, Valid: s != ""} +} + +// CreateDownload вставляет загрузку и возвращает её id. +func (s *Store) CreateDownload(ctx context.Context, d *Download) (int64, error) { + const q = ` +INSERT INTO download (source_type, source_ref, context, infohash, idempotency_key, state) +VALUES (?, ?, ?, ?, ?, ?)` + res, err := s.DB.ExecContext(ctx, q, + d.SourceType, d.SourceRef, d.Context, d.Infohash, d.IdempotencyKey, d.State) + if err != nil { + return 0, fmt.Errorf("insert download: %w", err) + } + id, err := res.LastInsertId() + if err != nil { + return 0, fmt.Errorf("download last insert id: %w", err) + } + return id, nil +} + +// GetDownload возвращает загрузку по id. +func (s *Store) GetDownload(ctx context.Context, id int64) (*Download, error) { + var d Download + if err := s.DB.GetContext(ctx, &d, `SELECT * FROM download WHERE id = ?`, id); err != nil { + return nil, fmt.Errorf("get download %d: %w", id, err) + } + return &d, nil +} + +// ListDownloads возвращает все загрузки, новые сверху. +func (s *Store) ListDownloads(ctx context.Context) ([]Download, error) { + var out []Download + if err := s.DB.SelectContext(ctx, &out, `SELECT * FROM download ORDER BY id DESC`); err != nil { + return nil, fmt.Errorf("list downloads: %w", err) + } + return out, nil +} + +// ListDownloadsByState возвращает загрузки в одном из указанных состояний. +func (s *Store) ListDownloadsByState(ctx context.Context, states ...State) ([]Download, error) { + if len(states) == 0 { + return nil, nil + } + ph := make([]string, len(states)) + args := make([]any, len(states)) + for i, st := range states { + ph[i] = "?" + args[i] = string(st) + } + q := `SELECT * FROM download WHERE state IN (` + strings.Join(ph, ",") + `) ORDER BY id DESC` + var out []Download + if err := s.DB.SelectContext(ctx, &out, q, args...); err != nil { + return nil, fmt.Errorf("list downloads by state: %w", err) + } + return out, nil +} + +// FindActiveByInfohash возвращает незавершённую задачу для infohash либо +// (nil, nil), если её нет. Основа идемпотентного приёма. +func (s *Store) FindActiveByInfohash(ctx context.Context, infohash string) (*Download, error) { + term := []State{StateDone, StateCancelled, StateFailed, StateReverted} + ph := make([]string, len(term)) + args := make([]any, 0, len(term)+1) + args = append(args, infohash) + for i, st := range term { + ph[i] = "?" + args = append(args, string(st)) + } + q := `SELECT * FROM download WHERE infohash = ? AND state NOT IN (` + + strings.Join(ph, ",") + `) ORDER BY id DESC LIMIT 1` + var d Download + err := s.DB.GetContext(ctx, &d, q, args...) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("find active by infohash: %w", err) + } + return &d, nil +} + +// SetDownloadState переводит загрузку в новое состояние. Ключ +// идемпотентности пересчитывается из текущего infohash: для терминального +// состояния снимается (NULL), иначе равен infohash — так partial unique +// index гарантирует не более одной активной задачи на infohash. +func (s *Store) SetDownloadState(ctx context.Context, id int64, state State, errCode, errMsg string) error { + const q = ` +UPDATE download +SET state = ?, + error_code = ?, + error_msg = ?, + idempotency_key = CASE WHEN ? = 1 THEN NULL ELSE infohash END, + updated_at = datetime('now') +WHERE id = ?` + terminal := 0 + if state.IsTerminal() { + terminal = 1 + } + res, err := s.DB.ExecContext(ctx, q, string(state), nullArg(errCode), nullArg(errMsg), terminal, id) + if err != nil { + return fmt.Errorf("set download %d state %q: %w", id, state, err) + } + n, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("set download %d state %q: %w", id, state, err) + } + if n == 0 { + return fmt.Errorf("set download %d state %q: not found", id, state) + } + return nil +} + +// nullArg возвращает nil для пустой строки (чтобы писать NULL, не ""). +func nullArg(s string) any { + if s == "" { + return nil + } + return s +} diff --git a/internal/store/download_test.go b/internal/store/download_test.go new file mode 100644 index 0000000..3fe0e6d --- /dev/null +++ b/internal/store/download_test.go @@ -0,0 +1,155 @@ +package store + +import ( + "context" + "testing" +) + +func newTestStore(t *testing.T) *Store { + t.Helper() + st, err := Open(t.TempDir() + "/test.db") + if err != nil { + t.Fatalf("open store: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + return st +} + +func newDownloading(infohash string) *Download { + return &Download{ + SourceType: SourceMagnet, + SourceRef: "magnet:?xt=urn:btih:" + infohash, + Context: "ctx", + Infohash: NullString(infohash), + IdempotencyKey: NullString(infohash), + State: StateDownloading, + } +} + +func TestCreateAndGetDownload(t *testing.T) { + st := newTestStore(t) + ctx := context.Background() + + id, err := st.CreateDownload(ctx, newDownloading("aabbccddeeff00112233445566778899aabbccdd")) + if err != nil { + t.Fatalf("create: %v", err) + } + + got, err := st.GetDownload(ctx, id) + if err != nil { + t.Fatalf("get: %v", err) + } + if got.State != StateDownloading { + t.Errorf("state = %q, want downloading", got.State) + } + if got.Context != "ctx" { + t.Errorf("context = %q", got.Context) + } + if got.CreatedAt == "" { + t.Error("created_at пуст") + } +} + +func TestFindActiveByInfohash(t *testing.T) { + st := newTestStore(t) + ctx := context.Background() + const ih = "1111111111111111111111111111111111111111" + + if d, err := st.FindActiveByInfohash(ctx, ih); err != nil || d != nil { + t.Fatalf("ожидался (nil,nil), получили (%v,%v)", d, err) + } + + id, err := st.CreateDownload(ctx, newDownloading(ih)) + if err != nil { + t.Fatal(err) + } + d, err := st.FindActiveByInfohash(ctx, ih) + if err != nil { + t.Fatal(err) + } + if d == nil || d.ID != id { + t.Fatalf("активная задача не найдена: %v", d) + } +} + +// Терминальное состояние снимает ключ идемпотентности и позволяет завести +// тот же infohash заново (повторная закачка спустя время). +func TestTerminalReleasesInfohash(t *testing.T) { + st := newTestStore(t) + ctx := context.Background() + const ih = "2222222222222222222222222222222222222222" + + id, err := st.CreateDownload(ctx, newDownloading(ih)) + if err != nil { + t.Fatal(err) + } + if err := st.SetDownloadState(ctx, id, StateFailed, "qbit_add", "boom"); err != nil { + t.Fatal(err) + } + + // После терминального состояния активной задачи нет. + if d, err := st.FindActiveByInfohash(ctx, ih); err != nil || d != nil { + t.Fatalf("после failed активная задача не должна находиться: (%v,%v)", d, err) + } + // Ключ идемпотентности снят. + got, err := st.GetDownload(ctx, id) + if err != nil { + t.Fatal(err) + } + if got.IdempotencyKey.Valid { + t.Errorf("idempotency_key должен быть NULL, получили %q", got.IdempotencyKey.String) + } + if got.ErrorCode.String != "qbit_add" { + t.Errorf("error_code = %q", got.ErrorCode.String) + } + + // Тот же infohash заводится заново — unique index не мешает. + id2, err := st.CreateDownload(ctx, newDownloading(ih)) + if err != nil { + t.Fatalf("повторное добавление после терминального должно проходить: %v", err) + } + if id2 == id { + t.Error("ожидалась новая задача") + } +} + +// Две активные задачи с одним ключом идемпотентности недопустимы. +func TestActiveDuplicateRejected(t *testing.T) { + st := newTestStore(t) + ctx := context.Background() + const ih = "3333333333333333333333333333333333333333" + + if _, err := st.CreateDownload(ctx, newDownloading(ih)); err != nil { + t.Fatal(err) + } + if _, err := st.CreateDownload(ctx, newDownloading(ih)); err == nil { + t.Error("ожидалось нарушение уникальности idempotency_key") + } +} + +func TestListAndByState(t *testing.T) { + st := newTestStore(t) + ctx := context.Background() + + id1, _ := st.CreateDownload(ctx, newDownloading("4444444444444444444444444444444444444444")) + id2, _ := st.CreateDownload(ctx, newDownloading("5555555555555555555555555555555555555555")) + if err := st.SetDownloadState(ctx, id2, StateCompleted, "", ""); err != nil { + t.Fatal(err) + } + + all, err := st.ListDownloads(ctx) + if err != nil { + t.Fatal(err) + } + if len(all) != 2 { + t.Fatalf("ListDownloads = %d, want 2", len(all)) + } + + dl, err := st.ListDownloadsByState(ctx, StateDownloading) + if err != nil { + t.Fatal(err) + } + if len(dl) != 1 || dl[0].ID != id1 { + t.Fatalf("ListDownloadsByState(downloading) = %v", dl) + } +} diff --git a/internal/worker/doc.go b/internal/worker/doc.go deleted file mode 100644 index 7938831..0000000 --- a/internal/worker/doc.go +++ /dev/null @@ -1,4 +0,0 @@ -// Package worker — владелец машины состояний и поллинга qBittorrent. -// -// Заглушка: реализация в фазе Ф1 (см. docs/specs/architecture.md). -package worker diff --git a/internal/worker/worker.go b/internal/worker/worker.go new file mode 100644 index 0000000..74f472a --- /dev/null +++ b/internal/worker/worker.go @@ -0,0 +1,250 @@ +// Package worker — владелец машины состояний. Поллит qBittorrent по +// категории, переводит задачи между состояниями и сериализует команды +// транспортов (cancel/retry), чтобы два транспорта не гонялись за одно +// состояние. +// +// Ф1 ведёт задачу downloading → completed, плюс stuck/failed по таймаутам и +// ошибкам qBittorrent. Распознавание и раскладка (completed →) — Ф2+. +package worker + +import ( + "context" + "fmt" + "log/slog" + "strings" + "sync" + "time" + + "git.vakhrushev.me/av/jellybit/internal/qbt" + "git.vakhrushev.me/av/jellybit/internal/store" +) + +// Store — нужная worker часть хранилища. +type Store interface { + ListDownloadsByState(ctx context.Context, states ...store.State) ([]store.Download, error) + GetDownload(ctx context.Context, id int64) (*store.Download, error) + SetDownloadState(ctx context.Context, id int64, state store.State, errCode, errMsg string) error +} + +// QBittorrent — нужная worker часть клиента qBittorrent. +type QBittorrent interface { + Torrents(ctx context.Context, category string) ([]qbt.Torrent, error) + Add(ctx context.Context, ar qbt.AddRequest) error +} + +// Config — параметры воркера. +type Config struct { + Category string + SavePath string + PollInterval time.Duration + StuckAfter time.Duration // stalledDL дольше → stuck + MagnetTimeout time.Duration // metaDL дольше → failed +} + +// Worker — поллер и владелец переходов. +type Worker struct { + store Store + qbt QBittorrent + cfg Config + log *slog.Logger + + mu sync.Mutex // сериализует переходы (поллинг + команды) + now func() time.Time // подменяется в тестах +} + +// New собирает воркер. +func New(st Store, qb QBittorrent, cfg Config, log *slog.Logger) *Worker { + return &Worker{store: st, qbt: qb, cfg: cfg, log: log, now: time.Now} +} + +// Run крутит цикл поллинга до отмены ctx. +func (w *Worker) Run(ctx context.Context) { + w.log.Info("worker started", "poll_interval", w.cfg.PollInterval, "category", w.cfg.Category) + t := time.NewTicker(w.cfg.PollInterval) + defer t.Stop() + + w.pollOnce(ctx) + for { + select { + case <-ctx.Done(): + w.log.Info("worker stopped") + return + case <-t.C: + w.pollOnce(ctx) + } + } +} + +func (w *Worker) pollOnce(ctx context.Context) { + if err := w.Poll(ctx); err != nil { + w.log.Warn("poll failed", "err", err) + } +} + +// Poll сверяет активные задачи с состоянием qBittorrent и двигает их. +func (w *Worker) Poll(ctx context.Context) error { + torrents, err := w.qbt.Torrents(ctx, w.cfg.Category) + if err != nil { + return fmt.Errorf("poll: list torrents: %w", err) + } + byHash := make(map[string]qbt.Torrent, len(torrents)*2) + for _, t := range torrents { + for _, h := range []string{t.Hash, t.InfohashV1, t.InfohashV2} { + if h != "" { + byHash[strings.ToLower(h)] = t + } + } + } + + w.mu.Lock() + defer w.mu.Unlock() + + active, err := w.store.ListDownloadsByState(ctx, store.StateDownloading) + if err != nil { + return fmt.Errorf("poll: list active: %w", err) + } + for _, d := range active { + if !d.Infohash.Valid { + continue // нечем сопоставить (в Ф1 не случается: magnet всегда с infohash) + } + t, ok := byHash[strings.ToLower(d.Infohash.String)] + if !ok { + w.log.Warn("active download not found in qbittorrent", + "download_id", d.ID, "infohash", d.Infohash.String) + continue + } + w.reconcile(ctx, d, t) + } + return nil +} + +// reconcile двигает одну задачу по состоянию её торрента. Вызывается под +// w.mu. +func (w *Worker) reconcile(ctx context.Context, d store.Download, t qbt.Torrent) { + switch classify(t.State) { + case classReady: + w.transition(ctx, d, store.StateCompleted, "", "") + case classErrored: + w.transition(ctx, d, store.StateFailed, "qbit_error", "qBittorrent state: "+t.State) + case classDownloading: + w.checkTimeouts(ctx, d, t) + case classBusy: + // moving/checking — ждём, файлы ещё не на финальном месте. + } +} + +// checkTimeouts помечает зависшие задачи. Возраст считаем от created_at: +// для metaDL это время с момента добавления (огрублённо, но достаточно). +func (w *Worker) checkTimeouts(ctx context.Context, d store.Download, t qbt.Torrent) { + created, err := d.CreatedTime() + if err != nil { + w.log.Warn("cannot parse created_at", "download_id", d.ID, "value", d.CreatedAt, "err", err) + return + } + age := w.now().Sub(created) + + switch { + case isMeta(t.State) && w.cfg.MagnetTimeout > 0 && age > w.cfg.MagnetTimeout: + w.transition(ctx, d, store.StateFailed, "magnet_timeout", + fmt.Sprintf("no metadata after %s", age.Truncate(time.Second))) + case isStalledDL(t.State) && w.cfg.StuckAfter > 0 && age > w.cfg.StuckAfter: + w.transition(ctx, d, store.StateStuck, "stalled", + fmt.Sprintf("stalled for %s", age.Truncate(time.Second))) + } +} + +// transition пишет новое состояние и логирует переход. +func (w *Worker) transition(ctx context.Context, d store.Download, state store.State, code, msg string) { + if err := w.store.SetDownloadState(ctx, d.ID, state, code, msg); err != nil { + w.log.Error("state transition failed", + "download_id", d.ID, "from", d.State, "to", state, "err", err) + return + } + w.log.Info("state transition", + "download_id", d.ID, "from", d.State, "to", state, "code", code) +} + +// Cancel отклоняет задачу. Торрент в qBittorrent не трогаем — он продолжает +// раздачу (источник неприкосновенен). +func (w *Worker) Cancel(ctx context.Context, id int64) error { + w.mu.Lock() + defer w.mu.Unlock() + + d, err := w.store.GetDownload(ctx, id) + if err != nil { + return fmt.Errorf("cancel: %w", err) + } + if d.State.IsTerminal() { + return fmt.Errorf("cancel: download %d is already terminal (%s)", id, d.State) + } + if err := w.store.SetDownloadState(ctx, id, store.StateCancelled, "", ""); err != nil { + return fmt.Errorf("cancel: %w", err) + } + w.log.Info("download cancelled", "download_id", id, "from", d.State) + return nil +} + +// Retry повторяет застрявшую/упавшую задачу: заново отдаёт источник в +// qBittorrent и возвращает в downloading. +func (w *Worker) Retry(ctx context.Context, id int64) error { + w.mu.Lock() + defer w.mu.Unlock() + + d, err := w.store.GetDownload(ctx, id) + if err != nil { + return fmt.Errorf("retry: %w", err) + } + if d.State != store.StateFailed && d.State != store.StateStuck { + return fmt.Errorf("retry: download %d is %s, only failed/stuck are retriable", id, d.State) + } + if d.SourceType == store.SourceMagnet { + if err := w.qbt.Add(ctx, qbt.AddRequest{ + URLs: []string{d.SourceRef}, + Category: w.cfg.Category, + SavePath: w.cfg.SavePath, + }); err != nil { + return fmt.Errorf("retry: add to qbittorrent: %w", err) + } + } + if err := w.store.SetDownloadState(ctx, id, store.StateDownloading, "", ""); err != nil { + return fmt.Errorf("retry: %w", err) + } + w.log.Info("download retried", "download_id", id, "from", d.State) + return nil +} + +// class — класс состояния торрента qBittorrent. +type class int + +const ( + classDownloading class = iota // ещё качается + classReady // готов к раскладке + classErrored // ошибка + classBusy // moving/checking — переходный момент, ждём +) + +// classify относит состояние qBittorrent к классу (см. architecture.md, +// «Завершение в qBittorrent»). Учитываем и v5-имена (stopped* вместо +// paused*). +func classify(state string) class { + switch state { + case "uploading", "stalledUP", "pausedUP", "stoppedUP", "queuedUP", "forcedUP": + return classReady + case "error", "missingFiles": + return classErrored + case "moving", "checkingUP", "checkingResumeData", "allocating": + return classBusy + default: + // downloading, stalledDL, metaDL, forcedMetaDL, queuedDL, checkingDL, + // forcedDL, pausedDL, stoppedDL, unknown — считаем «ещё качается». + return classDownloading + } +} + +func isMeta(state string) bool { + return state == "metaDL" || state == "forcedMetaDL" +} + +func isStalledDL(state string) bool { + return state == "stalledDL" +} diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go new file mode 100644 index 0000000..90e1c04 --- /dev/null +++ b/internal/worker/worker_test.go @@ -0,0 +1,219 @@ +package worker + +import ( + "context" + "fmt" + "io" + "log/slog" + "testing" + "time" + + "git.vakhrushev.me/av/jellybit/internal/qbt" + "git.vakhrushev.me/av/jellybit/internal/store" +) + +// фиксированные метки времени для детерминированных таймаут-тестов. +const ( + timeNow = "2026-06-14 10:00:00" + timeOld = "2026-06-14 08:00:00" // 2 часа назад + timeRecent = "2026-06-14 09:59:00" // 1 минута назад +) + +type fakeStore struct { + downloads map[int64]*store.Download + transitions []transition +} + +type transition struct { + id int64 + state store.State +} + +func (f *fakeStore) ListDownloadsByState(_ context.Context, states ...store.State) ([]store.Download, error) { + var out []store.Download + for _, d := range f.downloads { + for _, s := range states { + if d.State == s { + out = append(out, *d) + break + } + } + } + return out, nil +} + +func (f *fakeStore) GetDownload(_ context.Context, id int64) (*store.Download, error) { + d, ok := f.downloads[id] + if !ok { + return nil, fmt.Errorf("download %d not found", id) + } + cp := *d + return &cp, nil +} + +func (f *fakeStore) SetDownloadState(_ context.Context, id int64, st store.State, code, msg string) error { + d, ok := f.downloads[id] + if !ok { + return fmt.Errorf("download %d not found", id) + } + d.State = st + d.ErrorCode = store.NullString(code) + d.ErrorMsg = store.NullString(msg) + f.transitions = append(f.transitions, transition{id, st}) + return nil +} + +type fakeQbt struct { + torrents []qbt.Torrent + added []qbt.AddRequest +} + +func (f *fakeQbt) Torrents(_ context.Context, _ string) ([]qbt.Torrent, error) { + return f.torrents, nil +} + +func (f *fakeQbt) Add(_ context.Context, ar qbt.AddRequest) error { + f.added = append(f.added, ar) + return nil +} + +func newTestWorker(st *fakeStore, qb *fakeQbt) *Worker { + w := New(st, qb, Config{ + Category: "jellybit", + SavePath: "/srv/media/downloads", + MagnetTimeout: 30 * time.Minute, + StuckAfter: time.Hour, + }, slog.New(slog.NewTextHandler(io.Discard, nil))) + w.now = func() time.Time { return time.Date(2026, 6, 14, 10, 0, 0, 0, time.UTC) } + return w +} + +func oneDownloading(infohash, createdAt string) *fakeStore { + return &fakeStore{downloads: map[int64]*store.Download{ + 1: { + ID: 1, + State: store.StateDownloading, + SourceType: store.SourceMagnet, + SourceRef: "magnet:?xt=urn:btih:" + infohash, + Infohash: store.NullString(infohash), + CreatedAt: createdAt, + }, + }} +} + +func TestPollTransitions(t *testing.T) { + const ih = "541adcff3b6dd5dba7088ea83317d9d6fac331d6" + tests := []struct { + name string + qbitState string + createdAt string + want store.State + }{ + {"готов → completed", "uploading", timeRecent, store.StateCompleted}, + {"stalledUP → completed", "stalledUP", timeRecent, store.StateCompleted}, + {"ошибка → failed", "error", timeRecent, store.StateFailed}, + {"missingFiles → failed", "missingFiles", timeRecent, store.StateFailed}, + {"metaDL долго → failed", "metaDL", timeOld, store.StateFailed}, + {"stalledDL долго → stuck", "stalledDL", timeOld, store.StateStuck}, + {"свежий downloading → остаётся", "downloading", timeRecent, store.StateDownloading}, + {"moving → остаётся", "moving", timeRecent, store.StateDownloading}, + {"свежий metaDL → остаётся", "metaDL", timeRecent, store.StateDownloading}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + st := oneDownloading(ih, tc.createdAt) + qb := &fakeQbt{torrents: []qbt.Torrent{{Hash: ih, State: tc.qbitState}}} + w := newTestWorker(st, qb) + if err := w.Poll(context.Background()); err != nil { + t.Fatalf("Poll: %v", err) + } + if got := st.downloads[1].State; got != tc.want { + t.Errorf("state = %q, want %q", got, tc.want) + } + }) + } +} + +func TestPollMatchesByInfohashV2(t *testing.T) { + const v2 = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" + st := oneDownloading(v2, timeRecent) + qb := &fakeQbt{torrents: []qbt.Torrent{{Hash: "deadbeef", InfohashV2: v2, State: "uploading"}}} + w := newTestWorker(st, qb) + if err := w.Poll(context.Background()); err != nil { + t.Fatal(err) + } + if st.downloads[1].State != store.StateCompleted { + t.Errorf("сопоставление по infohash_v2 не сработало: %q", st.downloads[1].State) + } +} + +func TestPollIgnoresMissingTorrent(t *testing.T) { + st := oneDownloading("541adcff3b6dd5dba7088ea83317d9d6fac331d6", timeRecent) + qb := &fakeQbt{torrents: nil} // торрента в qBittorrent нет + w := newTestWorker(st, qb) + if err := w.Poll(context.Background()); err != nil { + t.Fatal(err) + } + if st.downloads[1].State != store.StateDownloading { + t.Errorf("без торрента состояние не должно меняться, got %q", st.downloads[1].State) + } +} + +func TestCancel(t *testing.T) { + st := oneDownloading("541adcff3b6dd5dba7088ea83317d9d6fac331d6", timeRecent) + w := newTestWorker(st, &fakeQbt{}) + if err := w.Cancel(context.Background(), 1); err != nil { + t.Fatalf("Cancel: %v", err) + } + if st.downloads[1].State != store.StateCancelled { + t.Errorf("state = %q, want cancelled", st.downloads[1].State) + } + // Повторная отмена терминальной задачи — ошибка. + if err := w.Cancel(context.Background(), 1); err == nil { + t.Error("ожидалась ошибка при отмене терминальной задачи") + } +} + +func TestRetry(t *testing.T) { + st := oneDownloading("541adcff3b6dd5dba7088ea83317d9d6fac331d6", timeRecent) + st.downloads[1].State = store.StateStuck + qb := &fakeQbt{} + w := newTestWorker(st, qb) + if err := w.Retry(context.Background(), 1); err != nil { + t.Fatalf("Retry: %v", err) + } + if st.downloads[1].State != store.StateDownloading { + t.Errorf("state = %q, want downloading", st.downloads[1].State) + } + if len(qb.added) != 1 { + t.Errorf("ожидалось повторное добавление в qBittorrent, got %d", len(qb.added)) + } +} + +func TestRetryRejectsActive(t *testing.T) { + st := oneDownloading("541adcff3b6dd5dba7088ea83317d9d6fac331d6", timeRecent) + w := newTestWorker(st, &fakeQbt{}) + if err := w.Retry(context.Background(), 1); err == nil { + t.Error("retry активной (downloading) задачи должен отклоняться") + } +} + +func TestClassify(t *testing.T) { + cases := map[string]class{ + "uploading": classReady, + "stalledUP": classReady, + "stoppedUP": classReady, + "error": classErrored, + "missingFiles": classErrored, + "moving": classBusy, + "checkingUP": classBusy, + "downloading": classDownloading, + "metaDL": classDownloading, + "stalledDL": classDownloading, + } + for state, want := range cases { + if got := classify(state); got != want { + t.Errorf("classify(%q) = %d, want %d", state, got, want) + } + } +} diff --git a/web/templates/index.html b/web/templates/index.html new file mode 100644 index 0000000..5cc8a23 --- /dev/null +++ b/web/templates/index.html @@ -0,0 +1,68 @@ + + + + + + + jellybit + + + +

jellybit

+

обновляется каждые 5 с

+ +
+ + + +
+ + {{if .Error}}

{{.Error}}

{{end}} + + + + + + + {{range .Downloads}} + + + + + + + + {{else}} + + {{end}} + +
#ИсточникКонтекстСостояние
{{.ID}}{{.Source}}{{.Context}} + {{.State}} + {{if .Error}}
{{.Error}}{{end}} +
+ {{if not .Terminal}} +
+ +
+ {{end}} +
пока пусто
+ + diff --git a/web/web.go b/web/web.go new file mode 100644 index 0000000..299bb38 --- /dev/null +++ b/web/web.go @@ -0,0 +1,9 @@ +// Package web несёт встроенные (embed) ресурсы веб-UI: HTML-шаблоны. +package web + +import "embed" + +// FS — встроенные шаблоны (templates/*.html). +// +//go:embed templates/*.html +var FS embed.FS