job queue endpoints and timstamps on jobs

This commit is contained in:
Erik Winter 2024-01-18 07:38:32 +01:00
parent 14b34f6d0d
commit faf5f45d9b
7 changed files with 152 additions and 58 deletions

View File

@ -1,53 +0,0 @@
package handler
import (
"encoding/json"
"log/slog"
"net/http"
"ewintr.nl/emdb/cmd/api-service/job"
)
type AdminAPI struct {
jq *job.JobQueue
logger *slog.Logger
}
func NewAdminAPI(jq *job.JobQueue, logger *slog.Logger) *AdminAPI {
return &AdminAPI{
jq: jq,
logger: logger.With("api", "admin"),
}
}
func (adminAPI *AdminAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := adminAPI.logger.With("method", "serveHTTP")
subPath, _ := ShiftPath(r.URL.Path)
switch {
case r.Method == http.MethodPost && subPath == "":
adminAPI.Add(w, r)
default:
Error(w, http.StatusNotFound, "unregistered path", nil, logger)
}
}
func (adminAPI *AdminAPI) Add(w http.ResponseWriter, r *http.Request) {
logger := adminAPI.logger.With("method", "add")
var job job.Job
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
Error(w, http.StatusBadRequest, "could not decode job", err, logger)
return
}
if err := adminAPI.jq.Add(job.MovieID, job.Action); err != nil {
Error(w, http.StatusInternalServerError, "could not add job", err, logger)
return
}
if err := json.NewEncoder(w).Encode(job); err != nil {
Error(w, http.StatusInternalServerError, "could not encode job", err, logger)
return
}
}

View File

@ -0,0 +1,83 @@
package handler
import (
"encoding/json"
"log/slog"
"net/http"
"ewintr.nl/emdb/cmd/api-service/job"
)
type JobAPI struct {
jq *job.JobQueue
logger *slog.Logger
}
func NewJobAPI(jq *job.JobQueue, logger *slog.Logger) *JobAPI {
return &JobAPI{
jq: jq,
logger: logger.With("api", "admin"),
}
}
func (jobAPI *JobAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := jobAPI.logger.With("method", "serveHTTP")
subPath, _ := ShiftPath(r.URL.Path)
switch {
case r.Method == http.MethodPost && subPath == "":
jobAPI.Add(w, r)
case r.Method == http.MethodGet && subPath == "":
jobAPI.List(w, r)
case r.Method == http.MethodDelete && subPath != "":
jobAPI.Delete(w, r, subPath)
default:
Error(w, http.StatusNotFound, "unregistered path", nil, logger)
}
}
func (jobAPI *JobAPI) Add(w http.ResponseWriter, r *http.Request) {
logger := jobAPI.logger.With("method", "add")
var j job.Job
if err := json.NewDecoder(r.Body).Decode(&j); err != nil {
Error(w, http.StatusBadRequest, "could not decode job", err, logger)
return
}
if err := jobAPI.jq.Add(j.MovieID, j.Action); err != nil {
Error(w, http.StatusInternalServerError, "could not add job", err, logger)
return
}
if err := json.NewEncoder(w).Encode(j); err != nil {
Error(w, http.StatusInternalServerError, "could not encode job", err, logger)
return
}
}
func (jobAPI *JobAPI) List(w http.ResponseWriter, r *http.Request) {
logger := jobAPI.logger.With("method", "list")
jobs, err := jobAPI.jq.List()
if err != nil {
Error(w, http.StatusInternalServerError, "could not list jobs", err, logger)
return
}
if err := json.NewEncoder(w).Encode(jobs); err != nil {
Error(w, http.StatusInternalServerError, "could not encode jobs", err, logger)
return
}
}
func (jobAPI *JobAPI) Delete(w http.ResponseWriter, r *http.Request, id string) {
logger := jobAPI.logger.With("method", "delete")
if err := jobAPI.jq.Delete(id); err != nil {
Error(w, http.StatusInternalServerError, "could not delete job", err, logger)
return
}
w.WriteHeader(http.StatusNoContent)
}

View File

@ -15,9 +15,18 @@ const (
ActionRefreshAllIMDBReviews Action = "refresh-all-imdb-reviews" ActionRefreshAllIMDBReviews Action = "refresh-all-imdb-reviews"
) )
var (
validActions = []Action{
ActionRefreshIMDBReviews,
ActionRefreshAllIMDBReviews,
}
)
type Job struct { type Job struct {
ID int ID int
MovieID string MovieID string
Action Action Action Action
Status JobStatus Status JobStatus
Created time.Time
Updated time.Time
} }

View File

@ -4,6 +4,7 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"log/slog" "log/slog"
"slices"
"time" "time"
"ewintr.nl/emdb/cmd/api-service/moviestore" "ewintr.nl/emdb/cmd/api-service/moviestore"
@ -24,6 +25,10 @@ func NewJobQueue(db *moviestore.SQLite, logger *slog.Logger) *JobQueue {
} }
func (jq *JobQueue) Add(movieID string, action Action) error { func (jq *JobQueue) Add(movieID string, action Action) error {
if !slices.Contains(validActions, action) {
return errors.New("unknown action")
}
_, err := jq.db.Exec(`INSERT INTO job_queue (movie_id, action, status) _, err := jq.db.Exec(`INSERT INTO job_queue (movie_id, action, status)
VALUES (?, ?, 'todo')`, movieID, action) VALUES (?, ?, 'todo')`, movieID, action)
@ -82,3 +87,33 @@ WHERE id=?`, id); err != nil {
} }
return return
} }
func (jq *JobQueue) List() ([]Job, error) {
rows, err := jq.db.Query(`
SELECT id, movie_id, action, status, created_at, updated_at
FROM job_queue
ORDER BY id DESC`)
if err != nil {
return nil, err
}
defer rows.Close()
var jobs []Job
for rows.Next() {
var j Job
if err := rows.Scan(&j.ID, &j.MovieID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil {
return nil, err
}
jobs = append(jobs, j)
}
return jobs, nil
}
func (jq *JobQueue) Delete(id string) error {
if _, err := jq.db.Exec(`
DELETE FROM job_queue
WHERE id=?`, id); err != nil {
return err
}
return nil
}

View File

@ -40,10 +40,6 @@ func (w *Worker) Run() {
} }
} }
func (w *Worker) FindNewJobs() {
}
func (w *Worker) RefreshAllReviews(jobID int) { func (w *Worker) RefreshAllReviews(jobID int) {
logger := w.logger.With("method", "fetchReviews", "jobID", jobID) logger := w.logger.With("method", "fetchReviews", "jobID", jobID)

View File

@ -68,6 +68,30 @@ var sqliteMigrations = []sqliteMigration{
`ALTER TABLE review DROP COLUMN "references"`, `ALTER TABLE review DROP COLUMN "references"`,
`ALTER TABLE review ADD COLUMN "mentions" TEXT NOT NULL DEFAULT ""`, `ALTER TABLE review ADD COLUMN "mentions" TEXT NOT NULL DEFAULT ""`,
`ALTER TABLE review ADD COLUMN "movie_rating" INTEGER NOT NULL DEFAULT 0`, `ALTER TABLE review ADD COLUMN "movie_rating" INTEGER NOT NULL DEFAULT 0`,
`BEGIN TRANSACTION;
CREATE TABLE job_queue_new (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"movie_id" TEXT NOT NULL,
"action" TEXT NOT NULL DEFAULT "",
"status" TEXT NOT NULL DEFAULT "",
"created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
"updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO job_queue_new (id, movie_id, action, status)
SELECT id, movie_id, action, status FROM job_queue;
DROP TABLE job_queue;
ALTER TABLE job_queue_new RENAME TO job_queue;
COMMIT`,
`CREATE TRIGGER set_timestamp_after_insert
AFTER INSERT ON job_queue
BEGIN
UPDATE job_queue SET created_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE rowid = new.rowid;
END;
CREATE TRIGGER set_timestamp_after_update
AFTER UPDATE ON job_queue
BEGIN
UPDATE job_queue SET updated_at = CURRENT_TIMESTAMP WHERE rowid = old.rowid;
END;`,
} }
var ( var (

View File

@ -38,7 +38,7 @@ func main() {
go worker.Run() go worker.Run()
apis := handler.APIIndex{ apis := handler.APIIndex{
"admin": handler.NewAdminAPI(jobQueue, logger), "job": handler.NewJobAPI(jobQueue, logger),
"movie": handler.NewMovieAPI(handler.APIIndex{ "movie": handler.NewMovieAPI(handler.APIIndex{
"review": handler.NewMovieReviewAPI(moviestore.NewReviewRepository(db), logger), "review": handler.NewMovieReviewAPI(moviestore.NewReviewRepository(db), logger),
}, moviestore.NewMovieRepository(db), jobQueue, logger), }, moviestore.NewMovieRepository(db), jobQueue, logger),