emdb/job/queue.go

130 lines
2.5 KiB
Go
Raw Normal View History

2023-12-30 09:19:53 +01:00
package job
2023-12-29 19:10:31 +01:00
import (
"database/sql"
"errors"
"log/slog"
2024-09-17 07:43:12 +02:00
"go-mod.ewintr.nl/emdb/storage"
2023-12-29 19:10:31 +01:00
)
type JobQueue struct {
db *storage.Postgres
2023-12-29 19:10:31 +01:00
logger *slog.Logger
}
func NewJobQueue(db *storage.Postgres, logger *slog.Logger) *JobQueue {
2024-01-20 13:59:46 +01:00
jq := &JobQueue{
2023-12-29 19:10:31 +01:00
db: db,
logger: logger.With("service", "jobqueue"),
}
2024-01-20 13:59:46 +01:00
return jq
}
2024-03-10 15:04:23 +01:00
func (jq *JobQueue) ResetAll() error {
_, err := jq.db.Exec(`UPDATE job_queue SET status='todo'`)
return err
2023-12-29 19:10:31 +01:00
}
2024-01-20 12:30:06 +01:00
func (jq *JobQueue) Add(movieID, action string) error {
2024-03-09 13:08:25 +01:00
if !Valid(action) {
2024-01-18 08:57:56 +01:00
return errors.New("invalid action")
}
_, err := jq.db.Exec(`
INSERT INTO job_queue (action_id, action, status)
VALUES ($1, $2, 'todo');`, movieID, action)
2023-12-29 19:10:31 +01:00
return err
}
2024-03-09 13:41:57 +01:00
func (jq *JobQueue) Next() (Job, error) {
2024-01-20 12:30:06 +01:00
logger := jq.logger.With("method", "next")
2023-12-29 19:10:31 +01:00
2024-03-09 13:41:57 +01:00
row := jq.db.QueryRow(`
2024-01-20 13:27:19 +01:00
SELECT id, action_id, action
2023-12-29 19:10:31 +01:00
FROM job_queue
WHERE status='todo'
2024-01-18 08:57:56 +01:00
ORDER BY id ASC
2024-03-09 13:41:57 +01:00
LIMIT 1;`)
2024-03-09 13:08:25 +01:00
var job Job
2024-01-20 13:24:42 +01:00
err := row.Scan(&job.ID, &job.ActionID, &job.Action)
2024-01-20 12:30:06 +01:00
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
logger.Error("could not fetch next job", "error", err)
2023-12-29 19:10:31 +01:00
}
2024-03-09 13:08:25 +01:00
return Job{}, err
2024-01-20 12:30:06 +01:00
}
2023-12-29 19:10:31 +01:00
2024-01-20 12:30:06 +01:00
logger.Info("found a job", "id", job.ID)
if _, err := jq.db.Exec(`
2023-12-29 19:10:31 +01:00
UPDATE job_queue
SET status='doing'
WHERE id=$1;`, job.ID); err != nil {
2024-09-17 07:43:12 +02:00
logger.Error("could not set job to doing", "error", err)
2024-03-09 13:08:25 +01:00
return Job{}, err
2023-12-29 19:10:31 +01:00
}
2024-01-20 12:30:06 +01:00
return job, nil
2023-12-29 19:10:31 +01:00
}
2024-01-18 07:47:36 +01:00
func (jq *JobQueue) MarkDone(id int) {
2023-12-29 19:10:31 +01:00
logger := jq.logger.With("method", "markdone")
if _, err := jq.db.Exec(`
2024-01-18 07:56:25 +01:00
DELETE FROM job_queue
WHERE id=$1;`, id); err != nil {
2023-12-29 19:10:31 +01:00
logger.Error("could not mark job done", "error", err)
}
return
}
2024-01-20 13:24:42 +01:00
func (jq *JobQueue) MarkFailed(id int) {
logger := jq.logger.With("method", "markfailed")
if _, err := jq.db.Exec(`
UPDATE job_queue
SET status='failed'
WHERE id=$1;`, id); err != nil {
2024-01-20 13:24:42 +01:00
logger.Error("could not mark job failed", "error", err)
}
return
}
2024-03-09 13:08:25 +01:00
func (jq *JobQueue) List() ([]Job, error) {
rows, err := jq.db.Query(`
2024-01-20 13:27:19 +01:00
SELECT id, action_id, action, status, created_at, updated_at
FROM job_queue
ORDER BY id DESC;`)
if err != nil {
return nil, err
}
defer rows.Close()
2024-03-09 13:08:25 +01:00
var jobs []Job
for rows.Next() {
2024-03-09 13:08:25 +01:00
var j Job
2024-01-20 13:24:42 +01:00
if err := rows.Scan(&j.ID, &j.ActionID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil {
return nil, err
}
jobs = append(jobs, j)
}
return jobs, nil
}
func (jq *JobQueue) Delete(id string) error {
if _, err := jq.db.Exec(`
DELETE FROM job_queue
WHERE id=$1;`, id); err != nil {
return err
}
return nil
}
2024-01-18 07:47:36 +01:00
func (jq *JobQueue) DeleteAll() error {
if _, err := jq.db.Exec(`DELETE FROM job_queue;`); err != nil {
2024-01-18 07:47:36 +01:00
return err
}
return nil
}