job types

This commit is contained in:
Erik Winter 2024-01-20 12:30:06 +01:00
parent fb6f9f411d
commit 6b8ec34b8e
4 changed files with 67 additions and 47 deletions

View File

@ -5,34 +5,44 @@ import (
"time" "time"
) )
type JobStatus string type Status string
type Type string
type Action string
const ( const (
interval = 20 * time.Second interval = 20 * time.Second
ActionRefreshIMDBReviews Action = "refresh-imdb-reviews" TypeSimple Type = "simple"
ActionRefreshAllIMDBReviews Action = "refresh-all-imdb-reviews" TypeAI Type = "ai"
ActionRefreshIMDBReviews = "refresh-imdb-reviews"
ActionRefreshAllIMDBReviews = "refresh-all-imdb-reviews"
ActionFindTitles = "find-titles"
ActionFindAllTitles = "find-all-titles"
) )
var ( var (
validActions = []Action{ simpleActions = []string{
ActionRefreshIMDBReviews, ActionRefreshIMDBReviews,
ActionRefreshAllIMDBReviews, ActionRefreshAllIMDBReviews, // just creates a job for each movie
ActionFindAllTitles, // just creates a job for each review
} }
aiActions = []string{
ActionFindTitles,
}
validActions = append(simpleActions, aiActions...)
) )
type Job struct { type Job struct {
ID int ID int
MovieID string MovieID string
Action Action Action string
Status JobStatus Status Status
Created time.Time Created time.Time
Updated time.Time Updated time.Time
} }
func Valid(action Action) bool { func Valid(action string) bool {
if slices.Contains(validActions, action) { if slices.Contains(validActions, action) {
return true return true
} }

View File

@ -3,27 +3,26 @@ package job
import ( import (
"database/sql" "database/sql"
"errors" "errors"
"fmt"
"log/slog" "log/slog"
"time" "strings"
"ewintr.nl/emdb/cmd/api-service/moviestore" "ewintr.nl/emdb/cmd/api-service/moviestore"
) )
type JobQueue struct { type JobQueue struct {
db *moviestore.SQLite db *moviestore.SQLite
out chan Job
logger *slog.Logger logger *slog.Logger
} }
func NewJobQueue(db *moviestore.SQLite, logger *slog.Logger) *JobQueue { func NewJobQueue(db *moviestore.SQLite, logger *slog.Logger) *JobQueue {
return &JobQueue{ return &JobQueue{
db: db, db: db,
out: make(chan Job),
logger: logger.With("service", "jobqueue"), logger: logger.With("service", "jobqueue"),
} }
} }
func (jq *JobQueue) Add(movieID string, action Action) error { func (jq *JobQueue) Add(movieID, action string) error {
if !Valid(action) { if !Valid(action) {
return errors.New("invalid action") return errors.New("invalid action")
} }
@ -34,44 +33,41 @@ func (jq *JobQueue) Add(movieID string, action Action) error {
return err return err
} }
func (jq *JobQueue) Next() chan Job { func (jq *JobQueue) Next(t Type) (Job, error) {
return jq.out logger := jq.logger.With("method", "next")
}
func (jq *JobQueue) Run() { actions := simpleActions
logger := jq.logger.With("method", "run") if t == TypeAI {
logger.Info("starting job queue") actions = aiActions
for { }
time.Sleep(interval) actionsStr := fmt.Sprintf("('%s')", strings.Join(actions, "', '"))
row := jq.db.QueryRow(` query := fmt.Sprintf(`
SELECT id, movie_id, action SELECT id, movie_id, action
FROM job_queue FROM job_queue
WHERE status='todo' WHERE status='todo'
AND action IN %s
ORDER BY id ASC ORDER BY id ASC
LIMIT 1`) LIMIT 1`, actionsStr)
row := jq.db.QueryRow(query)
var job Job var job Job
err := row.Scan(&job.ID, &job.MovieID, &job.Action) err := row.Scan(&job.ID, &job.MovieID, &job.Action)
switch { if err != nil {
case errors.Is(err, sql.ErrNoRows): if !errors.Is(err, sql.ErrNoRows) {
logger.Info("nothing to do") logger.Error("could not fetch next job", "error", err)
continue }
case err != nil: return Job{}, err
logger.Error("could not fetch next job", "error", row.Err())
continue
} }
logger.Info("found a job", "id", job.ID)
logger.Info("found a job", "id", job.ID)
if _, err := jq.db.Exec(` if _, err := jq.db.Exec(`
UPDATE job_queue UPDATE job_queue
SET status='doing' SET status='doing'
WHERE id=?`, job.ID); err != nil { WHERE id=?`, job.ID); err != nil {
logger.Error("could not set job to doing", "error") logger.Error("could not set job to doing", "error")
continue return Job{}, err
} }
jq.out <- job return job, nil
}
} }
func (jq *JobQueue) MarkDone(id int) { func (jq *JobQueue) MarkDone(id int) {

View File

@ -1,7 +1,10 @@
package job package job
import ( import (
"database/sql"
"errors"
"log/slog" "log/slog"
"time"
"ewintr.nl/emdb/client" "ewintr.nl/emdb/client"
"ewintr.nl/emdb/cmd/api-service/moviestore" "ewintr.nl/emdb/cmd/api-service/moviestore"
@ -26,16 +29,28 @@ func NewWorker(jq *JobQueue, movieRepo *moviestore.MovieRepository, reviewRepo *
} }
func (w *Worker) Run() { func (w *Worker) Run() {
w.logger.Info("starting worker") logger := w.logger.With("method", "run")
for j := range w.jq.Next() { logger.Info("starting worker")
w.logger.Info("got a new job", "jobID", j.ID, "movieID", j.MovieID, "action", j.Action) for {
time.Sleep(interval)
j, err := w.jq.Next(TypeSimple)
switch {
case errors.Is(err, sql.ErrNoRows):
logger.Info("no simple jobs found")
continue
case err != nil:
logger.Error("could not get next job", "error", err)
continue
}
logger.Info("got a new job", "jobID", j.ID, "movieID", j.MovieID, "action", j.Action)
switch j.Action { switch j.Action {
case ActionRefreshIMDBReviews: case ActionRefreshIMDBReviews:
w.RefreshReviews(j.ID, j.MovieID) w.RefreshReviews(j.ID, j.MovieID)
case ActionRefreshAllIMDBReviews: case ActionRefreshAllIMDBReviews:
w.RefreshAllReviews(j.ID) w.RefreshAllReviews(j.ID)
default: default:
w.logger.Warn("unknown job action", "action", j.Action) logger.Error("unknown job action", "action", j.Action)
} }
} }
} }

View File

@ -33,7 +33,6 @@ func main() {
} }
jobQueue := job.NewJobQueue(db, logger) jobQueue := job.NewJobQueue(db, logger)
go jobQueue.Run()
worker := job.NewWorker(jobQueue, moviestore.NewMovieRepository(db), moviestore.NewReviewRepository(db), client.NewIMDB(), logger) worker := job.NewWorker(jobQueue, moviestore.NewMovieRepository(db), moviestore.NewReviewRepository(db), client.NewIMDB(), logger)
go worker.Run() go worker.Run()