2023-12-30 09:19:53 +01:00
|
|
|
package job
|
2023-12-29 19:10:31 +01:00
|
|
|
|
|
|
|
import (
|
|
|
|
"database/sql"
|
|
|
|
"errors"
|
2024-01-20 12:30:06 +01:00
|
|
|
"fmt"
|
2023-12-29 19:10:31 +01:00
|
|
|
"log/slog"
|
2024-01-20 12:30:06 +01:00
|
|
|
"strings"
|
2023-12-29 19:10:31 +01:00
|
|
|
|
2023-12-30 09:19:53 +01:00
|
|
|
"ewintr.nl/emdb/cmd/api-service/moviestore"
|
2023-12-29 19:10:31 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
type JobQueue struct {
|
2023-12-30 09:19:53 +01:00
|
|
|
db *moviestore.SQLite
|
2023-12-29 19:10:31 +01:00
|
|
|
logger *slog.Logger
|
|
|
|
}
|
|
|
|
|
2023-12-30 09:19:53 +01:00
|
|
|
func NewJobQueue(db *moviestore.SQLite, logger *slog.Logger) *JobQueue {
|
2023-12-29 19:10:31 +01:00
|
|
|
return &JobQueue{
|
|
|
|
db: db,
|
|
|
|
logger: logger.With("service", "jobqueue"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-20 12:30:06 +01:00
|
|
|
func (jq *JobQueue) Add(movieID, action string) error {
|
2024-01-18 08:57:56 +01:00
|
|
|
if !Valid(action) {
|
|
|
|
return errors.New("invalid action")
|
2024-01-18 07:38:32 +01:00
|
|
|
}
|
|
|
|
|
2024-01-20 13:27:19 +01:00
|
|
|
_, err := jq.db.Exec(`INSERT INTO job_queue (action_id, action, status)
|
2023-12-29 19:10:31 +01:00
|
|
|
VALUES (?, ?, 'todo')`, movieID, action)
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-01-20 12:30:06 +01:00
|
|
|
func (jq *JobQueue) Next(t Type) (Job, error) {
|
|
|
|
logger := jq.logger.With("method", "next")
|
2023-12-29 19:10:31 +01:00
|
|
|
|
2024-01-20 12:30:06 +01:00
|
|
|
actions := simpleActions
|
|
|
|
if t == TypeAI {
|
|
|
|
actions = aiActions
|
|
|
|
}
|
|
|
|
actionsStr := fmt.Sprintf("('%s')", strings.Join(actions, "', '"))
|
|
|
|
query := fmt.Sprintf(`
|
2024-01-20 13:27:19 +01:00
|
|
|
SELECT id, action_id, action
|
2023-12-29 19:10:31 +01:00
|
|
|
FROM job_queue
|
|
|
|
WHERE status='todo'
|
2024-01-20 12:30:06 +01:00
|
|
|
AND action IN %s
|
2024-01-18 08:57:56 +01:00
|
|
|
ORDER BY id ASC
|
2024-01-20 12:30:06 +01:00
|
|
|
LIMIT 1`, actionsStr)
|
|
|
|
row := jq.db.QueryRow(query)
|
|
|
|
var job Job
|
2024-01-20 13:24:42 +01:00
|
|
|
err := row.Scan(&job.ID, &job.ActionID, &job.Action)
|
2024-01-20 12:30:06 +01:00
|
|
|
if err != nil {
|
|
|
|
if !errors.Is(err, sql.ErrNoRows) {
|
|
|
|
logger.Error("could not fetch next job", "error", err)
|
2023-12-29 19:10:31 +01:00
|
|
|
}
|
2024-01-20 12:30:06 +01:00
|
|
|
return Job{}, err
|
|
|
|
}
|
2023-12-29 19:10:31 +01:00
|
|
|
|
2024-01-20 12:30:06 +01:00
|
|
|
logger.Info("found a job", "id", job.ID)
|
|
|
|
if _, err := jq.db.Exec(`
|
2023-12-29 19:10:31 +01:00
|
|
|
UPDATE job_queue
|
|
|
|
SET status='doing'
|
|
|
|
WHERE id=?`, job.ID); err != nil {
|
2024-01-20 12:30:06 +01:00
|
|
|
logger.Error("could not set job to doing", "error")
|
|
|
|
return Job{}, err
|
2023-12-29 19:10:31 +01:00
|
|
|
}
|
2024-01-20 12:30:06 +01:00
|
|
|
|
|
|
|
return job, nil
|
2023-12-29 19:10:31 +01:00
|
|
|
}
|
|
|
|
|
2024-01-18 07:47:36 +01:00
|
|
|
func (jq *JobQueue) MarkDone(id int) {
|
2023-12-29 19:10:31 +01:00
|
|
|
logger := jq.logger.With("method", "markdone")
|
|
|
|
if _, err := jq.db.Exec(`
|
2024-01-18 07:56:25 +01:00
|
|
|
DELETE FROM job_queue
|
2023-12-29 19:10:31 +01:00
|
|
|
WHERE id=?`, id); err != nil {
|
|
|
|
logger.Error("could not mark job done", "error", err)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
2024-01-18 07:38:32 +01:00
|
|
|
|
2024-01-20 13:24:42 +01:00
|
|
|
func (jq *JobQueue) MarkFailed(id int) {
|
|
|
|
logger := jq.logger.With("method", "markfailed")
|
|
|
|
if _, err := jq.db.Exec(`
|
|
|
|
UPDATE job_queue
|
|
|
|
SET status='failed'
|
|
|
|
WHERE id=?`, id); err != nil {
|
|
|
|
logger.Error("could not mark job failed", "error", err)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-01-18 07:38:32 +01:00
|
|
|
func (jq *JobQueue) List() ([]Job, error) {
|
|
|
|
rows, err := jq.db.Query(`
|
2024-01-20 13:27:19 +01:00
|
|
|
SELECT id, action_id, action, status, created_at, updated_at
|
2024-01-18 07:38:32 +01:00
|
|
|
FROM job_queue
|
|
|
|
ORDER BY id DESC`)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
|
|
var jobs []Job
|
|
|
|
for rows.Next() {
|
|
|
|
var j Job
|
2024-01-20 13:24:42 +01:00
|
|
|
if err := rows.Scan(&j.ID, &j.ActionID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil {
|
2024-01-18 07:38:32 +01:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
jobs = append(jobs, j)
|
|
|
|
}
|
|
|
|
return jobs, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (jq *JobQueue) Delete(id string) error {
|
|
|
|
if _, err := jq.db.Exec(`
|
|
|
|
DELETE FROM job_queue
|
|
|
|
WHERE id=?`, id); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2024-01-18 07:47:36 +01:00
|
|
|
|
|
|
|
func (jq *JobQueue) DeleteAll() error {
|
|
|
|
if _, err := jq.db.Exec(`
|
|
|
|
DELETE FROM job_queue`); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|