Files

212 lines
7.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package store
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
)
// State — состояние загрузки в машине состояний (см. architecture.md).
// В Ф1 используется подмножество: downloading → completed, плюс stuck,
// failed, cancelled. Остальные состояния заведены под будущие фазы.
type State string
const (
StateDownloading State = "downloading"
StateCompleted State = "completed"
StateRecognizing State = "recognizing" // Ф2
StateReview State = "review" // Ф3
StateLinking State = "linking" // Ф3
StateDone State = "done" // Ф3
StateDeferred State = "deferred" // Ф3
StateStuck State = "stuck"
StateFailed State = "failed"
StateCancelled State = "cancelled"
StateReverted State = "reverted" // Ф3
)
// IsTerminal сообщает, завершена ли задача окончательно. Для терминальных
// состояний снимается ключ идемпотентности — тот же infohash можно завести
// заново новой задачей (см. architecture.md, «повторное добавление»).
// stuck терминальным не считается: задача восстановима (retry).
func (s State) IsTerminal() bool {
switch s {
case StateDone, StateCancelled, StateFailed, StateReverted:
return true
default:
return false
}
}
// SourceType — вид источника загрузки.
type SourceType string
const (
SourceMagnet SourceType = "magnet"
SourceTorrent SourceType = "torrent"
SourceURL SourceType = "url"
)
// Download — строка таблицы download.
type Download struct {
ID int64 `db:"id"`
SourceType SourceType `db:"source_type"`
SourceRef string `db:"source_ref"`
Context string `db:"context"`
Infohash sql.NullString `db:"infohash"`
IdempotencyKey sql.NullString `db:"idempotency_key"`
State State `db:"state"`
ErrorCode sql.NullString `db:"error_code"`
ErrorMsg sql.NullString `db:"error_msg"`
CreatedAt string `db:"created_at"`
UpdatedAt string `db:"updated_at"`
}
// sqliteTimeLayout — формат меток datetime('now') в SQLite (UTC).
const sqliteTimeLayout = "2006-01-02 15:04:05"
// ParseTime разбирает временную метку SQLite (datetime('now'), всегда UTC).
func ParseTime(s string) (time.Time, error) {
return time.ParseInLocation(sqliteTimeLayout, s, time.UTC)
}
// CreatedTime возвращает время создания загрузки как time.Time (UTC).
func (d Download) CreatedTime() (time.Time, error) { return ParseTime(d.CreatedAt) }
// NullString строит sql.NullString: пустая строка → NULL.
func NullString(s string) sql.NullString {
return sql.NullString{String: s, Valid: s != ""}
}
// CreateDownload вставляет загрузку и возвращает её id.
func (s *Store) CreateDownload(ctx context.Context, d *Download) (int64, error) {
const q = `
INSERT INTO download (source_type, source_ref, context, infohash, idempotency_key, state)
VALUES (?, ?, ?, ?, ?, ?)`
res, err := s.DB.ExecContext(ctx, q,
d.SourceType, d.SourceRef, d.Context, d.Infohash, d.IdempotencyKey, d.State)
if err != nil {
return 0, fmt.Errorf("insert download: %w", err)
}
id, err := res.LastInsertId()
if err != nil {
return 0, fmt.Errorf("download last insert id: %w", err)
}
return id, nil
}
// GetDownload возвращает загрузку по id.
func (s *Store) GetDownload(ctx context.Context, id int64) (*Download, error) {
var d Download
if err := s.DB.GetContext(ctx, &d, `SELECT * FROM download WHERE id = ?`, id); err != nil {
return nil, fmt.Errorf("get download %d: %w", id, err)
}
return &d, nil
}
// ListDownloads возвращает все загрузки, новые сверху.
func (s *Store) ListDownloads(ctx context.Context) ([]Download, error) {
var out []Download
if err := s.DB.SelectContext(ctx, &out, `SELECT * FROM download ORDER BY id DESC`); err != nil {
return nil, fmt.Errorf("list downloads: %w", err)
}
return out, nil
}
// ListDownloadsByState возвращает загрузки в одном из указанных состояний.
func (s *Store) ListDownloadsByState(ctx context.Context, states ...State) ([]Download, error) {
if len(states) == 0 {
return nil, nil
}
ph := make([]string, len(states))
args := make([]any, len(states))
for i, st := range states {
ph[i] = "?"
args[i] = string(st)
}
q := `SELECT * FROM download WHERE state IN (` + strings.Join(ph, ",") + `) ORDER BY id DESC`
var out []Download
if err := s.DB.SelectContext(ctx, &out, q, args...); err != nil {
return nil, fmt.Errorf("list downloads by state: %w", err)
}
return out, nil
}
// FindActiveByInfohash возвращает незавершённую задачу для infohash либо
// (nil, nil), если её нет. Основа идемпотентного приёма.
func (s *Store) FindActiveByInfohash(ctx context.Context, infohash string) (*Download, error) {
term := []State{StateDone, StateCancelled, StateFailed, StateReverted}
ph := make([]string, len(term))
args := make([]any, 0, len(term)+1)
args = append(args, infohash)
for i, st := range term {
ph[i] = "?"
args = append(args, string(st))
}
q := `SELECT * FROM download WHERE infohash = ? AND state NOT IN (` +
strings.Join(ph, ",") + `) ORDER BY id DESC LIMIT 1`
var d Download
err := s.DB.GetContext(ctx, &d, q, args...)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("find active by infohash: %w", err)
}
return &d, nil
}
// ExistsByInfohash сообщает, есть ли хоть одна загрузка (в любом состоянии)
// с данным infohash. Discovery усыновляет раздачу только если её ещё не
// видели — так готовые задачи не переобрабатываются на каждом тике.
func (s *Store) ExistsByInfohash(ctx context.Context, infohash string) (bool, error) {
var n int
if err := s.DB.GetContext(ctx, &n,
`SELECT COUNT(1) FROM download WHERE infohash = ?`, infohash); err != nil {
return false, fmt.Errorf("exists by infohash: %w", err)
}
return n > 0, nil
}
// SetDownloadState переводит загрузку в новое состояние. Ключ
// идемпотентности пересчитывается из текущего infohash: для терминального
// состояния снимается (NULL), иначе равен infohash — так partial unique
// index гарантирует не более одной активной задачи на infohash.
func (s *Store) SetDownloadState(ctx context.Context, id int64, state State, errCode, errMsg string) error {
const q = `
UPDATE download
SET state = ?,
error_code = ?,
error_msg = ?,
idempotency_key = CASE WHEN ? = 1 THEN NULL ELSE infohash END,
updated_at = datetime('now')
WHERE id = ?`
terminal := 0
if state.IsTerminal() {
terminal = 1
}
res, err := s.DB.ExecContext(ctx, q, string(state), nullArg(errCode), nullArg(errMsg), terminal, id)
if err != nil {
return fmt.Errorf("set download %d state %q: %w", id, state, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("set download %d state %q: %w", id, state, err)
}
if n == 0 {
return fmt.Errorf("set download %d state %q: not found", id, state)
}
return nil
}
// nullArg возвращает nil для пустой строки (чтобы писать NULL, не "").
func nullArg(s string) any {
if s == "" {
return nil
}
return s
}