251 lines
8.3 KiB
Go
251 lines
8.3 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"flag"
|
||
"fmt"
|
||
"log/slog"
|
||
"net/http"
|
||
"net/url"
|
||
"os/signal"
|
||
"syscall"
|
||
"time"
|
||
|
||
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
|
||
|
||
"git.vakhrushev.me/av/jellybit/internal/config"
|
||
"git.vakhrushev.me/av/jellybit/internal/httpapi"
|
||
"git.vakhrushev.me/av/jellybit/internal/ingest"
|
||
"git.vakhrushev.me/av/jellybit/internal/layout"
|
||
"git.vakhrushev.me/av/jellybit/internal/llm"
|
||
"git.vakhrushev.me/av/jellybit/internal/logging"
|
||
"git.vakhrushev.me/av/jellybit/internal/metadata"
|
||
"git.vakhrushev.me/av/jellybit/internal/qbt"
|
||
"git.vakhrushev.me/av/jellybit/internal/recognize"
|
||
"git.vakhrushev.me/av/jellybit/internal/store"
|
||
"git.vakhrushev.me/av/jellybit/internal/tgbot"
|
||
"git.vakhrushev.me/av/jellybit/internal/worker"
|
||
)
|
||
|
||
// runServe запускает сервис: конфиг → хранилище → клиент qBittorrent →
|
||
// воркер (фоном) → HTTP-сервер; останавливается по SIGINT/SIGTERM.
|
||
func runServe(args []string) error {
|
||
fs := flag.NewFlagSet("serve", flag.ContinueOnError)
|
||
configPath := fs.String("config", "/data/config.toml", "путь к config.toml")
|
||
if err := fs.Parse(args); err != nil {
|
||
return err
|
||
}
|
||
|
||
cfg, err := config.Load(*configPath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
logger := logging.New(cfg.Log.Level, cfg.Log.Format)
|
||
logger.Info("starting jellybit", "config", *configPath)
|
||
|
||
st, err := store.Open(cfg.Storage.DBPath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer func() { _ = st.Close() }()
|
||
logger.Info("database ready", "path", cfg.Storage.DBPath)
|
||
|
||
qb, err := qbt.New(qbt.Config{
|
||
URL: cfg.QBittorrent.URL,
|
||
Username: cfg.QBittorrent.Username,
|
||
Password: cfg.QBittorrent.Password,
|
||
}, logger)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
ingestor := ingest.New(st, qb, ingest.Config{
|
||
Category: cfg.QBittorrent.Category,
|
||
SavePath: cfg.QBittorrent.SavePath,
|
||
}, logger)
|
||
|
||
// Ф4: базы метаданных (опц.). Без них авто-раскладки нет — всё в review.
|
||
providers, err := metadataProviders(cfg, logger)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for _, p := range providers {
|
||
logger.Info("metadata provider enabled", "provider", p.Name())
|
||
}
|
||
|
||
// Ф2/Ф3: распознаватель и раскладчик. Если LLM не сконфигурирован,
|
||
// сервис работает как в Ф1 (completed-задачи дальше не двигаются).
|
||
var recognizer worker.Recognizer
|
||
if cfg.LLM.Type != "" && cfg.LLM.BaseURL != "" {
|
||
provider, perr := llm.New(llm.Config{
|
||
Type: cfg.LLM.Type,
|
||
BaseURL: cfg.LLM.BaseURL,
|
||
APIKey: cfg.LLM.APIKey,
|
||
Model: cfg.LLM.Model,
|
||
Proxy: cfg.LLM.Proxy,
|
||
Timeout: cfg.LLM.Timeout.Std(),
|
||
}, logger)
|
||
if perr != nil {
|
||
return fmt.Errorf("llm provider: %w", perr)
|
||
}
|
||
recognizer = recognize.New(provider, providers, recognize.Config{
|
||
MaxRetries: cfg.LLM.MaxRetries,
|
||
AutoThreshold: cfg.Recognition.AutoConfidenceThreshold,
|
||
}, logger)
|
||
logger.Info("recognizer ready", "model", cfg.LLM.Model, "providers", len(providers))
|
||
} else {
|
||
logger.Warn("llm not configured, recognition disabled")
|
||
}
|
||
|
||
layouter, err := layout.New(layout.Config{
|
||
MoviesDir: cfg.Paths.Movies,
|
||
SeriesDir: cfg.Paths.Series,
|
||
}, logger)
|
||
if err != nil {
|
||
return fmt.Errorf("layouter: %w", err)
|
||
}
|
||
|
||
wrk := worker.New(st, qb, recognizer, layouter, worker.Config{
|
||
Category: cfg.QBittorrent.Category,
|
||
Tag: cfg.QBittorrent.Tag,
|
||
SavePath: cfg.QBittorrent.SavePath,
|
||
PathMap: cfg.QBittorrent.PathMap,
|
||
PollInterval: cfg.Worker.PollInterval.Std(),
|
||
StuckAfter: cfg.Worker.StuckAfter.Std(),
|
||
MagnetTimeout: cfg.Worker.MagnetTimeout.Std(),
|
||
}, logger)
|
||
|
||
router, err := httpapi.NewRouter(httpapi.Deps{
|
||
Logger: logger,
|
||
Ingestor: ingestor,
|
||
Commander: wrk,
|
||
Reader: st,
|
||
Reviewer: wrk,
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||
defer stop()
|
||
|
||
// Ф5: Telegram-транспорт + пинги. Доступ — по allowed_user_ids
|
||
// (пусто = запрет всем, fail-closed). Недоступность Telegram на старте не
|
||
// валит сервис — бот просто отключается.
|
||
if cfg.Telegram.Enabled {
|
||
if cfg.Telegram.Token == "" {
|
||
return fmt.Errorf("telegram enabled, but token is empty")
|
||
}
|
||
tgClient, perr := telegramHTTPClient(cfg.Telegram.Proxy)
|
||
if perr != nil {
|
||
return perr
|
||
}
|
||
api, terr := tgbotapi.NewBotAPIWithClient(cfg.Telegram.Token, tgbotapi.APIEndpoint, tgClient)
|
||
if terr != nil {
|
||
logger.Error("telegram bot disabled: cannot connect", "err", terr)
|
||
} else {
|
||
bot := tgbot.New(api, ingestor, wrk, tgbot.Config{
|
||
AllowedUserIDs: cfg.Telegram.AllowedUserIDs,
|
||
WebBaseURL: cfg.Telegram.WebBaseURL,
|
||
}, logger)
|
||
wrk.SetNotifier(bot)
|
||
go bot.Run(ctx)
|
||
logger.Info("telegram bot enabled",
|
||
"bot", api.Self.UserName, "allowed_users", len(cfg.Telegram.AllowedUserIDs))
|
||
}
|
||
}
|
||
|
||
go wrk.Run(ctx)
|
||
|
||
srv := &http.Server{
|
||
Addr: cfg.HTTP.Listen,
|
||
Handler: router,
|
||
ReadHeaderTimeout: 10 * time.Second,
|
||
}
|
||
|
||
errCh := make(chan error, 1)
|
||
go func() {
|
||
logger.Info("http server listening", "addr", cfg.HTTP.Listen)
|
||
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||
errCh <- err
|
||
}
|
||
}()
|
||
|
||
select {
|
||
case err := <-errCh:
|
||
return err
|
||
case <-ctx.Done():
|
||
logger.Info("shutdown signal received")
|
||
}
|
||
|
||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
if err := srv.Shutdown(shutdownCtx); err != nil {
|
||
return fmt.Errorf("http shutdown: %w", err)
|
||
}
|
||
logger.Info("stopped")
|
||
return nil
|
||
}
|
||
|
||
// telegramHTTPClient собирает HTTP-клиент бота с опц. прокси. Таймаута уровня
|
||
// клиента нет намеренно — он порвал бы long-poll; вместо этого ограничиваем
|
||
// установление соединения (dial/TLS из DefaultTransport) и ожидание заголовков
|
||
// ответа с запасом над long-poll (30с в tgbot). Так мёртвый прокси не подвешивает
|
||
// ни отправку уведомлений, ни приёмный цикл навсегда — клиент переподключится.
|
||
func telegramHTTPClient(proxy string) (*http.Client, error) {
|
||
transport := http.DefaultTransport.(*http.Transport).Clone()
|
||
if proxy != "" {
|
||
proxyURL, err := url.Parse(proxy)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("telegram: parse proxy %q: %w", proxy, err)
|
||
}
|
||
transport.Proxy = http.ProxyURL(proxyURL)
|
||
}
|
||
transport.ResponseHeaderTimeout = 45 * time.Second
|
||
return &http.Client{Transport: transport}, nil
|
||
}
|
||
|
||
// metadataProviders собирает включённые конфигом базы метаданных. Для
|
||
// сериалов Jellyfin привычнее tvdbid, поэтому TVDB идёт первым.
|
||
func metadataProviders(cfg *config.Config, logger *slog.Logger) ([]metadata.Provider, error) {
|
||
var out []metadata.Provider
|
||
// TVMaze без ключа и покрывает сериалы — ставим первым.
|
||
if cfg.Metadata.TVMaze.Enabled {
|
||
p, err := metadata.NewTVMaze(metadata.TVMazeConfig{
|
||
Proxy: cfg.Metadata.TVMaze.Proxy,
|
||
Timeout: cfg.Metadata.TVMaze.Timeout.Std(),
|
||
}, logger)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("tvmaze provider: %w", err)
|
||
}
|
||
out = append(out, p)
|
||
}
|
||
// TVDB/TMDB включаются ключом: если enabled, но ключ пуст — тихо
|
||
// пропускаем (сервис стартует), а не падаем.
|
||
if cfg.Metadata.TVDB.Enabled && cfg.Metadata.TVDB.APIKey != "" {
|
||
p, err := metadata.NewTVDB(metadata.TVDBConfig{
|
||
APIKey: cfg.Metadata.TVDB.APIKey,
|
||
Proxy: cfg.Metadata.TVDB.Proxy,
|
||
Timeout: cfg.Metadata.TVDB.Timeout.Std(),
|
||
}, logger)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("tvdb provider: %w", err)
|
||
}
|
||
out = append(out, p)
|
||
}
|
||
if cfg.Metadata.TMDB.Enabled && cfg.Metadata.TMDB.APIKey != "" {
|
||
p, err := metadata.NewTMDB(metadata.TMDBConfig{
|
||
APIKey: cfg.Metadata.TMDB.APIKey,
|
||
Proxy: cfg.Metadata.TMDB.Proxy,
|
||
Timeout: cfg.Metadata.TMDB.Timeout.Std(),
|
||
}, logger)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("tmdb provider: %w", err)
|
||
}
|
||
out = append(out, p)
|
||
}
|
||
return out, nil
|
||
}
|