4e077d878e
категории
212 lines
7.8 KiB
Go
212 lines
7.8 KiB
Go
package store
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"errors"
|
||
"fmt"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
// State — состояние загрузки в машине состояний (см. architecture.md).
|
||
// В Ф1 используется подмножество: downloading → completed, плюс stuck,
|
||
// failed, cancelled. Остальные состояния заведены под будущие фазы.
|
||
type State string
|
||
|
||
const (
|
||
StateDownloading State = "downloading"
|
||
StateCompleted State = "completed"
|
||
StateRecognizing State = "recognizing" // Ф2
|
||
StateReview State = "review" // Ф3
|
||
StateLinking State = "linking" // Ф3
|
||
StateDone State = "done" // Ф3
|
||
StateDeferred State = "deferred" // Ф3
|
||
StateStuck State = "stuck"
|
||
StateFailed State = "failed"
|
||
StateCancelled State = "cancelled"
|
||
StateReverted State = "reverted" // Ф3
|
||
)
|
||
|
||
// IsTerminal сообщает, завершена ли задача окончательно. Для терминальных
|
||
// состояний снимается ключ идемпотентности — тот же infohash можно завести
|
||
// заново новой задачей (см. architecture.md, «повторное добавление»).
|
||
// stuck терминальным не считается: задача восстановима (retry).
|
||
func (s State) IsTerminal() bool {
|
||
switch s {
|
||
case StateDone, StateCancelled, StateFailed, StateReverted:
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
// SourceType — вид источника загрузки.
|
||
type SourceType string
|
||
|
||
const (
|
||
SourceMagnet SourceType = "magnet"
|
||
SourceTorrent SourceType = "torrent"
|
||
SourceURL SourceType = "url"
|
||
)
|
||
|
||
// Download — строка таблицы download.
|
||
type Download struct {
|
||
ID int64 `db:"id"`
|
||
SourceType SourceType `db:"source_type"`
|
||
SourceRef string `db:"source_ref"`
|
||
Context string `db:"context"`
|
||
Infohash sql.NullString `db:"infohash"`
|
||
IdempotencyKey sql.NullString `db:"idempotency_key"`
|
||
State State `db:"state"`
|
||
ErrorCode sql.NullString `db:"error_code"`
|
||
ErrorMsg sql.NullString `db:"error_msg"`
|
||
CreatedAt string `db:"created_at"`
|
||
UpdatedAt string `db:"updated_at"`
|
||
}
|
||
|
||
// sqliteTimeLayout — формат меток datetime('now') в SQLite (UTC).
|
||
const sqliteTimeLayout = "2006-01-02 15:04:05"
|
||
|
||
// ParseTime разбирает временную метку SQLite (datetime('now'), всегда UTC).
|
||
func ParseTime(s string) (time.Time, error) {
|
||
return time.ParseInLocation(sqliteTimeLayout, s, time.UTC)
|
||
}
|
||
|
||
// CreatedTime возвращает время создания загрузки как time.Time (UTC).
|
||
func (d Download) CreatedTime() (time.Time, error) { return ParseTime(d.CreatedAt) }
|
||
|
||
// NullString строит sql.NullString: пустая строка → NULL.
|
||
func NullString(s string) sql.NullString {
|
||
return sql.NullString{String: s, Valid: s != ""}
|
||
}
|
||
|
||
// CreateDownload вставляет загрузку и возвращает её id.
|
||
func (s *Store) CreateDownload(ctx context.Context, d *Download) (int64, error) {
|
||
const q = `
|
||
INSERT INTO download (source_type, source_ref, context, infohash, idempotency_key, state)
|
||
VALUES (?, ?, ?, ?, ?, ?)`
|
||
res, err := s.DB.ExecContext(ctx, q,
|
||
d.SourceType, d.SourceRef, d.Context, d.Infohash, d.IdempotencyKey, d.State)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("insert download: %w", err)
|
||
}
|
||
id, err := res.LastInsertId()
|
||
if err != nil {
|
||
return 0, fmt.Errorf("download last insert id: %w", err)
|
||
}
|
||
return id, nil
|
||
}
|
||
|
||
// GetDownload возвращает загрузку по id.
|
||
func (s *Store) GetDownload(ctx context.Context, id int64) (*Download, error) {
|
||
var d Download
|
||
if err := s.DB.GetContext(ctx, &d, `SELECT * FROM download WHERE id = ?`, id); err != nil {
|
||
return nil, fmt.Errorf("get download %d: %w", id, err)
|
||
}
|
||
return &d, nil
|
||
}
|
||
|
||
// ListDownloads возвращает все загрузки, новые сверху.
|
||
func (s *Store) ListDownloads(ctx context.Context) ([]Download, error) {
|
||
var out []Download
|
||
if err := s.DB.SelectContext(ctx, &out, `SELECT * FROM download ORDER BY id DESC`); err != nil {
|
||
return nil, fmt.Errorf("list downloads: %w", err)
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// ListDownloadsByState возвращает загрузки в одном из указанных состояний.
|
||
func (s *Store) ListDownloadsByState(ctx context.Context, states ...State) ([]Download, error) {
|
||
if len(states) == 0 {
|
||
return nil, nil
|
||
}
|
||
ph := make([]string, len(states))
|
||
args := make([]any, len(states))
|
||
for i, st := range states {
|
||
ph[i] = "?"
|
||
args[i] = string(st)
|
||
}
|
||
q := `SELECT * FROM download WHERE state IN (` + strings.Join(ph, ",") + `) ORDER BY id DESC`
|
||
var out []Download
|
||
if err := s.DB.SelectContext(ctx, &out, q, args...); err != nil {
|
||
return nil, fmt.Errorf("list downloads by state: %w", err)
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// FindActiveByInfohash возвращает незавершённую задачу для infohash либо
|
||
// (nil, nil), если её нет. Основа идемпотентного приёма.
|
||
func (s *Store) FindActiveByInfohash(ctx context.Context, infohash string) (*Download, error) {
|
||
term := []State{StateDone, StateCancelled, StateFailed, StateReverted}
|
||
ph := make([]string, len(term))
|
||
args := make([]any, 0, len(term)+1)
|
||
args = append(args, infohash)
|
||
for i, st := range term {
|
||
ph[i] = "?"
|
||
args = append(args, string(st))
|
||
}
|
||
q := `SELECT * FROM download WHERE infohash = ? AND state NOT IN (` +
|
||
strings.Join(ph, ",") + `) ORDER BY id DESC LIMIT 1`
|
||
var d Download
|
||
err := s.DB.GetContext(ctx, &d, q, args...)
|
||
if errors.Is(err, sql.ErrNoRows) {
|
||
return nil, nil
|
||
}
|
||
if err != nil {
|
||
return nil, fmt.Errorf("find active by infohash: %w", err)
|
||
}
|
||
return &d, nil
|
||
}
|
||
|
||
// ExistsByInfohash сообщает, есть ли хоть одна загрузка (в любом состоянии)
|
||
// с данным infohash. Discovery усыновляет раздачу только если её ещё не
|
||
// видели — так готовые задачи не переобрабатываются на каждом тике.
|
||
func (s *Store) ExistsByInfohash(ctx context.Context, infohash string) (bool, error) {
|
||
var n int
|
||
if err := s.DB.GetContext(ctx, &n,
|
||
`SELECT COUNT(1) FROM download WHERE infohash = ?`, infohash); err != nil {
|
||
return false, fmt.Errorf("exists by infohash: %w", err)
|
||
}
|
||
return n > 0, nil
|
||
}
|
||
|
||
// SetDownloadState переводит загрузку в новое состояние. Ключ
|
||
// идемпотентности пересчитывается из текущего infohash: для терминального
|
||
// состояния снимается (NULL), иначе равен infohash — так partial unique
|
||
// index гарантирует не более одной активной задачи на infohash.
|
||
func (s *Store) SetDownloadState(ctx context.Context, id int64, state State, errCode, errMsg string) error {
|
||
const q = `
|
||
UPDATE download
|
||
SET state = ?,
|
||
error_code = ?,
|
||
error_msg = ?,
|
||
idempotency_key = CASE WHEN ? = 1 THEN NULL ELSE infohash END,
|
||
updated_at = datetime('now')
|
||
WHERE id = ?`
|
||
terminal := 0
|
||
if state.IsTerminal() {
|
||
terminal = 1
|
||
}
|
||
res, err := s.DB.ExecContext(ctx, q, string(state), nullArg(errCode), nullArg(errMsg), terminal, id)
|
||
if err != nil {
|
||
return fmt.Errorf("set download %d state %q: %w", id, state, err)
|
||
}
|
||
n, err := res.RowsAffected()
|
||
if err != nil {
|
||
return fmt.Errorf("set download %d state %q: %w", id, state, err)
|
||
}
|
||
if n == 0 {
|
||
return fmt.Errorf("set download %d state %q: not found", id, state)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// nullArg возвращает nil для пустой строки (чтобы писать NULL, не "").
|
||
func nullArg(s string) any {
|
||
if s == "" {
|
||
return nil
|
||
}
|
||
return s
|
||
}
|