From 6b8ec34b8ec8552c606074caf2c67ca207dfec8a Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Sat, 20 Jan 2024 12:30:06 +0100 Subject: [PATCH] job types --- cmd/api-service/job/job.go | 30 ++++++++++++------ cmd/api-service/job/queue.go | 60 ++++++++++++++++------------------- cmd/api-service/job/worker.go | 23 +++++++++++--- cmd/api-service/service.go | 1 - 4 files changed, 67 insertions(+), 47 deletions(-) diff --git a/cmd/api-service/job/job.go b/cmd/api-service/job/job.go index 8cb5943..d4fddfb 100644 --- a/cmd/api-service/job/job.go +++ b/cmd/api-service/job/job.go @@ -5,34 +5,44 @@ import ( "time" ) -type JobStatus string - -type Action string +type Status string +type Type string const ( interval = 20 * time.Second - ActionRefreshIMDBReviews Action = "refresh-imdb-reviews" - ActionRefreshAllIMDBReviews Action = "refresh-all-imdb-reviews" + TypeSimple Type = "simple" + TypeAI Type = "ai" + + ActionRefreshIMDBReviews = "refresh-imdb-reviews" + ActionRefreshAllIMDBReviews = "refresh-all-imdb-reviews" + ActionFindTitles = "find-titles" + ActionFindAllTitles = "find-all-titles" ) var ( - validActions = []Action{ + simpleActions = []string{ ActionRefreshIMDBReviews, - ActionRefreshAllIMDBReviews, + ActionRefreshAllIMDBReviews, // just creates a job for each movie + ActionFindAllTitles, // just creates a job for each review } + aiActions = []string{ + ActionFindTitles, + } + + validActions = append(simpleActions, aiActions...) ) type Job struct { ID int MovieID string - Action Action - Status JobStatus + Action string + Status Status Created time.Time Updated time.Time } -func Valid(action Action) bool { +func Valid(action string) bool { if slices.Contains(validActions, action) { return true } diff --git a/cmd/api-service/job/queue.go b/cmd/api-service/job/queue.go index 56e5d6b..b152681 100644 --- a/cmd/api-service/job/queue.go +++ b/cmd/api-service/job/queue.go @@ -3,27 +3,26 @@ package job import ( "database/sql" "errors" + "fmt" "log/slog" - "time" + "strings" "ewintr.nl/emdb/cmd/api-service/moviestore" ) type JobQueue struct { db *moviestore.SQLite - out chan Job logger *slog.Logger } func NewJobQueue(db *moviestore.SQLite, logger *slog.Logger) *JobQueue { return &JobQueue{ db: db, - out: make(chan Job), logger: logger.With("service", "jobqueue"), } } -func (jq *JobQueue) Add(movieID string, action Action) error { +func (jq *JobQueue) Add(movieID, action string) error { if !Valid(action) { return errors.New("invalid action") } @@ -34,44 +33,41 @@ func (jq *JobQueue) Add(movieID string, action Action) error { return err } -func (jq *JobQueue) Next() chan Job { - return jq.out -} +func (jq *JobQueue) Next(t Type) (Job, error) { + logger := jq.logger.With("method", "next") -func (jq *JobQueue) Run() { - logger := jq.logger.With("method", "run") - logger.Info("starting job queue") - for { - time.Sleep(interval) - row := jq.db.QueryRow(` -SELECT id, movie_id, action + actions := simpleActions + if t == TypeAI { + actions = aiActions + } + actionsStr := fmt.Sprintf("('%s')", strings.Join(actions, "', '")) + query := fmt.Sprintf(` +SELECT id, movie_id, action FROM job_queue WHERE status='todo' + AND action IN %s ORDER BY id ASC -LIMIT 1`) - - var job Job - err := row.Scan(&job.ID, &job.MovieID, &job.Action) - switch { - case errors.Is(err, sql.ErrNoRows): - logger.Info("nothing to do") - continue - case err != nil: - logger.Error("could not fetch next job", "error", row.Err()) - continue +LIMIT 1`, actionsStr) + row := jq.db.QueryRow(query) + var job Job + err := row.Scan(&job.ID, &job.MovieID, &job.Action) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + logger.Error("could not fetch next job", "error", err) } - logger.Info("found a job", "id", job.ID) + return Job{}, err + } - if _, err := jq.db.Exec(` + logger.Info("found a job", "id", job.ID) + if _, err := jq.db.Exec(` UPDATE job_queue SET status='doing' WHERE id=?`, job.ID); err != nil { - logger.Error("could not set job to doing", "error") - continue - } - - jq.out <- job + logger.Error("could not set job to doing", "error") + return Job{}, err } + + return job, nil } func (jq *JobQueue) MarkDone(id int) { diff --git a/cmd/api-service/job/worker.go b/cmd/api-service/job/worker.go index 8f7c7a9..bb10f42 100644 --- a/cmd/api-service/job/worker.go +++ b/cmd/api-service/job/worker.go @@ -1,7 +1,10 @@ package job import ( + "database/sql" + "errors" "log/slog" + "time" "ewintr.nl/emdb/client" "ewintr.nl/emdb/cmd/api-service/moviestore" @@ -26,16 +29,28 @@ func NewWorker(jq *JobQueue, movieRepo *moviestore.MovieRepository, reviewRepo * } func (w *Worker) Run() { - w.logger.Info("starting worker") - for j := range w.jq.Next() { - w.logger.Info("got a new job", "jobID", j.ID, "movieID", j.MovieID, "action", j.Action) + logger := w.logger.With("method", "run") + logger.Info("starting worker") + for { + time.Sleep(interval) + j, err := w.jq.Next(TypeSimple) + switch { + case errors.Is(err, sql.ErrNoRows): + logger.Info("no simple jobs found") + continue + case err != nil: + logger.Error("could not get next job", "error", err) + continue + } + + logger.Info("got a new job", "jobID", j.ID, "movieID", j.MovieID, "action", j.Action) switch j.Action { case ActionRefreshIMDBReviews: w.RefreshReviews(j.ID, j.MovieID) case ActionRefreshAllIMDBReviews: w.RefreshAllReviews(j.ID) default: - w.logger.Warn("unknown job action", "action", j.Action) + logger.Error("unknown job action", "action", j.Action) } } } diff --git a/cmd/api-service/service.go b/cmd/api-service/service.go index aba6b33..8744a56 100644 --- a/cmd/api-service/service.go +++ b/cmd/api-service/service.go @@ -33,7 +33,6 @@ func main() { } jobQueue := job.NewJobQueue(db, logger) - go jobQueue.Run() worker := job.NewWorker(jobQueue, moviestore.NewMovieRepository(db), moviestore.NewReviewRepository(db), client.NewIMDB(), logger) go worker.Run()