From 72e91d27edfa32ff185d2d183d138a43635d2ea8 Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Sat, 9 Mar 2024 13:08:25 +0100 Subject: [PATCH] move job queue --- client/emdb.go | 323 -------------------- cmd/api-service/handler/{job.go => job2.go} | 5 +- cmd/api-service/handler/movie.go | 1 - cmd/api-service/moviestore/job.go | 51 ---- job/job.go | 46 ++- job/queue.go | 23 +- job/worker.go | 14 +- terminal-client/tui/tabtmdb.go | 3 +- 8 files changed, 66 insertions(+), 400 deletions(-) delete mode 100644 client/emdb.go rename cmd/api-service/handler/{job.go => job2.go} (96%) delete mode 100644 cmd/api-service/moviestore/job.go diff --git a/client/emdb.go b/client/emdb.go deleted file mode 100644 index b57170c..0000000 --- a/client/emdb.go +++ /dev/null @@ -1,323 +0,0 @@ -package client - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net/http" - - "code.ewintr.nl/emdb/cmd/api-service/moviestore" -) - -type EMDB struct { - baseURL string - apiKey string - c *http.Client -} - -func NewEMDB(baseURL string, apiKey string) *EMDB { - return &EMDB{ - baseURL: baseURL, - apiKey: apiKey, - c: &http.Client{}, - } -} - -func (e *EMDB) GetMovies() ([]moviestore.Movie, error) { - url := fmt.Sprintf("%s/movie", e.baseURL) - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return nil, err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - defer resp.Body.Close() - - var movies []moviestore.Movie - if err := json.Unmarshal(body, &movies); err != nil { - return nil, err - } - - return movies, nil -} - -func (e *EMDB) GetMovie(id string) (moviestore.Movie, error) { - url := fmt.Sprintf("%s/movie/%s", e.baseURL, id) - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return moviestore.Movie{}, err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return moviestore.Movie{}, err - } - - if resp.StatusCode != http.StatusOK { - return moviestore.Movie{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - defer resp.Body.Close() - - var movie moviestore.Movie - if err := json.Unmarshal(body, &movie); err != nil { - return moviestore.Movie{}, err - } - - return movie, nil -} - -func (e *EMDB) CreateMovie(m moviestore.Movie) (moviestore.Movie, error) { - body, err := json.Marshal(m) - if err != nil { - return moviestore.Movie{}, err - } - - url := fmt.Sprintf("%s/movie", e.baseURL) - req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body)) - if err != nil { - return moviestore.Movie{}, err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return moviestore.Movie{}, err - } - - if resp.StatusCode != http.StatusOK { - return moviestore.Movie{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - newBody, err := io.ReadAll(resp.Body) - if err != nil { - return moviestore.Movie{}, err - } - defer resp.Body.Close() - - var newMovie moviestore.Movie - if err := json.Unmarshal(newBody, &newMovie); err != nil { - return moviestore.Movie{}, err - } - - return newMovie, nil -} - -func (e *EMDB) UpdateMovie(m moviestore.Movie) (moviestore.Movie, error) { - body, err := json.Marshal(m) - if err != nil { - return moviestore.Movie{}, err - } - - url := fmt.Sprintf("%s/movie/%s", e.baseURL, m.ID) - req, err := http.NewRequest(http.MethodPut, url, bytes.NewReader(body)) - if err != nil { - return moviestore.Movie{}, err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return moviestore.Movie{}, err - } - - if resp.StatusCode != http.StatusOK { - return moviestore.Movie{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - newBody, err := io.ReadAll(resp.Body) - if err != nil { - return moviestore.Movie{}, err - } - defer resp.Body.Close() - - var newMovie moviestore.Movie - if err := json.Unmarshal(newBody, &newMovie); err != nil { - return moviestore.Movie{}, err - } - - return newMovie, nil -} - -func (e *EMDB) GetReviews(movieID string) ([]moviestore.Review, error) { - url := fmt.Sprintf("%s/movie/%s/review", e.baseURL, movieID) - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return nil, err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - defer resp.Body.Close() - - var reviews []moviestore.Review - if err := json.Unmarshal(body, &reviews); err != nil { - return nil, err - } - - return reviews, nil -} - -func (e *EMDB) GetReview(id string) (moviestore.Review, error) { - url := fmt.Sprintf("%s/review/%s", e.baseURL, id) - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return moviestore.Review{}, err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return moviestore.Review{}, err - } - - if resp.StatusCode != http.StatusOK { - return moviestore.Review{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - defer resp.Body.Close() - - var review moviestore.Review - if err := json.Unmarshal(body, &review); err != nil { - return moviestore.Review{}, err - } - - return review, nil -} - -func (e *EMDB) GetNextUnratedReview() (moviestore.Review, error) { - url := fmt.Sprintf("%s/review/unrated/next", e.baseURL) - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return moviestore.Review{}, err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return moviestore.Review{}, err - } - - if resp.StatusCode != http.StatusOK { - return moviestore.Review{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - defer resp.Body.Close() - - var review moviestore.Review - if err := json.Unmarshal(body, &review); err != nil { - return moviestore.Review{}, err - } - - return review, nil -} - -func (e *EMDB) UpdateReview(review moviestore.Review) error { - body, err := json.Marshal(review) - if err != nil { - return err - } - - url := fmt.Sprintf("%s/review/%s", e.baseURL, review.ID) - req, err := http.NewRequest(http.MethodPut, url, bytes.NewReader(body)) - if err != nil { - return err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return err - } - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - return nil -} - -func (e *EMDB) CreateJob(movieID, action string) error { - j := struct { - MovieID string - Action string - }{ - MovieID: movieID, - Action: action, - } - - body, err := json.Marshal(j) - if err != nil { - return err - } - - url := fmt.Sprintf("%s/job", e.baseURL) - req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body)) - if err != nil { - return err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return err - } - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - return nil -} - -func (e *EMDB) GetNextAIJob() (moviestore.Job, error) { - url := fmt.Sprintf("%s/job/next-ai", e.baseURL) - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return moviestore.Job{}, err - } - req.Header.Add("Authorization", e.apiKey) - - resp, err := e.c.Do(req) - if err != nil { - return moviestore.Job{}, err - } - - if resp.StatusCode != http.StatusOK { - return moviestore.Job{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - defer resp.Body.Close() - - var j moviestore.Job - if err := json.Unmarshal(body, &j); err != nil { - return moviestore.Job{}, err - } - - return j, nil -} diff --git a/cmd/api-service/handler/job.go b/cmd/api-service/handler/job2.go similarity index 96% rename from cmd/api-service/handler/job.go rename to cmd/api-service/handler/job2.go index c101d39..f4bc66e 100644 --- a/cmd/api-service/handler/job.go +++ b/cmd/api-service/handler/job2.go @@ -8,7 +8,6 @@ import ( "log/slog" "net/http" - "code.ewintr.nl/emdb/cmd/api-service/moviestore" "code.ewintr.nl/emdb/job" ) @@ -47,7 +46,7 @@ func (jobAPI *JobAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (jobAPI *JobAPI) Add(w http.ResponseWriter, r *http.Request) { logger := jobAPI.logger.With("method", "add") - var j moviestore.Job + var j job.Job if err := json.NewDecoder(r.Body).Decode(&j); err != nil { Error(w, http.StatusBadRequest, "could not decode job", err, logger) return @@ -82,7 +81,7 @@ func (jobAPI *JobAPI) List(w http.ResponseWriter, r *http.Request) { func (jobAPI *JobAPI) NextAI(w http.ResponseWriter, r *http.Request) { logger := jobAPI.logger.With("method", "nextai") - j, err := jobAPI.jq.Next(moviestore.TypeAI) + j, err := jobAPI.jq.Next(job.TypeAI) switch { case errors.Is(err, sql.ErrNoRows): logger.Info("no ai jobs found") diff --git a/cmd/api-service/handler/movie.go b/cmd/api-service/handler/movie.go index 08e4f66..75aa4b0 100644 --- a/cmd/api-service/handler/movie.go +++ b/cmd/api-service/handler/movie.go @@ -26,7 +26,6 @@ func NewMovieAPI(apis APIIndex, repo *moviestore.MovieRepository, jq *job.JobQue return &MovieAPI{ apis: apis, repo: repo, - jq: jq, logger: logger.With("api", "movie"), } } diff --git a/cmd/api-service/moviestore/job.go b/cmd/api-service/moviestore/job.go deleted file mode 100644 index 097dccb..0000000 --- a/cmd/api-service/moviestore/job.go +++ /dev/null @@ -1,51 +0,0 @@ -package moviestore - -import ( - "slices" - "time" -) - -type JobStatus string -type JobType string - -const ( - interval = 20 * time.Second - - TypeSimple JobType = "simple" - TypeAI JobType = "ai" - - ActionRefreshIMDBReviews = "refresh-imdb-reviews" - ActionRefreshAllIMDBReviews = "refresh-all-imdb-reviews" - ActionFindTitles = "find-titles" - ActionFindAllTitles = "find-all-titles" -) - -var ( - SimpleActions = []string{ - ActionRefreshIMDBReviews, - ActionRefreshAllIMDBReviews, // just creates a job for each movie - ActionFindAllTitles, // just creates a job for each review - } - AIActions = []string{ - ActionFindTitles, - } - - ValidActions = append(SimpleActions, AIActions...) -) - -type Job struct { - ID int - ActionID string - Action string - Status JobStatus - Created time.Time - Updated time.Time -} - -func Valid(action string) bool { - if slices.Contains(ValidActions, action) { - return true - } - - return false -} diff --git a/job/job.go b/job/job.go index 2189ccf..62b0738 100644 --- a/job/job.go +++ b/job/job.go @@ -1,7 +1,51 @@ package job -import "time" +import ( + "slices" + "time" +) + +type JobStatus string +type JobType string const ( interval = 20 * time.Second + + TypeSimple JobType = "simple" + TypeAI JobType = "ai" + + ActionRefreshIMDBReviews = "refresh-imdb-reviews" + ActionRefreshAllIMDBReviews = "refresh-all-imdb-reviews" + ActionFindTitles = "find-titles" + ActionFindAllTitles = "find-all-titles" ) + +var ( + SimpleActions = []string{ + ActionRefreshIMDBReviews, + ActionRefreshAllIMDBReviews, // just creates a job for each movie + ActionFindAllTitles, // just creates a job for each review + } + AIActions = []string{ + ActionFindTitles, + } + + ValidActions = append(SimpleActions, AIActions...) +) + +type Job struct { + ID int + ActionID string + Action string + Status JobStatus + Created time.Time + Updated time.Time +} + +func Valid(action string) bool { + if slices.Contains(ValidActions, action) { + return true + } + + return false +} diff --git a/job/queue.go b/job/queue.go index 68918e9..60e79eb 100644 --- a/job/queue.go +++ b/job/queue.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "code.ewintr.nl/emdb/cmd/api-service/moviestore" "code.ewintr.nl/emdb/storage" ) @@ -47,7 +46,7 @@ WHERE status = 'doing' } func (jq *JobQueue) Add(movieID, action string) error { - if !moviestore.Valid(action) { + if !Valid(action) { return errors.New("invalid action") } @@ -58,12 +57,12 @@ VALUES ($1, $2, 'todo');`, movieID, action) return err } -func (jq *JobQueue) Next(t moviestore.JobType) (moviestore.Job, error) { +func (jq *JobQueue) Next(t JobType) (Job, error) { logger := jq.logger.With("method", "next") - actions := moviestore.SimpleActions - if t == moviestore.TypeAI { - actions = moviestore.AIActions + actions := SimpleActions + if t == TypeAI { + actions = AIActions } actionsStr := fmt.Sprintf("('%s')", strings.Join(actions, "', '")) query := fmt.Sprintf(` @@ -74,13 +73,13 @@ WHERE status='todo' ORDER BY id ASC LIMIT 1;`, actionsStr) row := jq.db.QueryRow(query) - var job moviestore.Job + var job Job err := row.Scan(&job.ID, &job.ActionID, &job.Action) if err != nil { if !errors.Is(err, sql.ErrNoRows) { logger.Error("could not fetch next job", "error", err) } - return moviestore.Job{}, err + return Job{}, err } logger.Info("found a job", "id", job.ID) @@ -89,7 +88,7 @@ UPDATE job_queue SET status='doing' WHERE id=$1;`, job.ID); err != nil { logger.Error("could not set job to doing", "error") - return moviestore.Job{}, err + return Job{}, err } return job, nil @@ -116,7 +115,7 @@ WHERE id=$1;`, id); err != nil { return } -func (jq *JobQueue) List() ([]moviestore.Job, error) { +func (jq *JobQueue) List() ([]Job, error) { rows, err := jq.db.Query(` SELECT id, action_id, action, status, created_at, updated_at FROM job_queue @@ -126,9 +125,9 @@ ORDER BY id DESC;`) } defer rows.Close() - var jobs []moviestore.Job + var jobs []Job for rows.Next() { - var j moviestore.Job + var j Job if err := rows.Scan(&j.ID, &j.ActionID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil { return nil, err } diff --git a/job/worker.go b/job/worker.go index be2de86..862823d 100644 --- a/job/worker.go +++ b/job/worker.go @@ -33,7 +33,7 @@ func (w *Worker) Run() { logger.Info("starting worker") for { time.Sleep(interval) - j, err := w.jq.Next(moviestore.TypeSimple) + j, err := w.jq.Next(TypeSimple) switch { case errors.Is(err, sql.ErrNoRows): logger.Info("no simple jobs found") @@ -45,11 +45,11 @@ func (w *Worker) Run() { logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action) switch j.Action { - case moviestore.ActionRefreshIMDBReviews: + case ActionRefreshIMDBReviews: w.RefreshReviews(j.ID, j.ActionID) - case moviestore.ActionRefreshAllIMDBReviews: + case ActionRefreshAllIMDBReviews: w.RefreshAllReviews(j.ID) - case moviestore.ActionFindAllTitles: + case ActionFindAllTitles: w.FindAllTitles(j.ID) default: logger.Error("unknown job action", "action", j.Action) @@ -68,7 +68,7 @@ func (w *Worker) RefreshAllReviews(jobID int) { for _, m := range movies { time.Sleep(1 * time.Second) - if err := w.jq.Add(m.ID, moviestore.ActionRefreshIMDBReviews); err != nil { + if err := w.jq.Add(m.ID, ActionRefreshIMDBReviews); err != nil { logger.Error("could not add job", "error", err) return } @@ -90,7 +90,7 @@ func (w *Worker) FindAllTitles(jobID int) { for _, r := range reviews { time.Sleep(1 * time.Second) - if err := w.jq.Add(r.ID, moviestore.ActionFindTitles); err != nil { + if err := w.jq.Add(r.ID, ActionFindTitles); err != nil { logger.Error("could not add job", "error", err) w.jq.MarkFailed(jobID) return @@ -130,7 +130,7 @@ func (w *Worker) RefreshReviews(jobID int, movieID string) { w.jq.MarkFailed(jobID) return } - if err := w.jq.Add(review.ID, moviestore.ActionFindTitles); err != nil { + if err := w.jq.Add(review.ID, ActionFindTitles); err != nil { logger.Error("could not add job", "error", err) w.jq.MarkFailed(jobID) return diff --git a/terminal-client/tui/tabtmdb.go b/terminal-client/tui/tabtmdb.go index 3dcde96..050b2c5 100644 --- a/terminal-client/tui/tabtmdb.go +++ b/terminal-client/tui/tabtmdb.go @@ -4,7 +4,6 @@ import ( "fmt" "code.ewintr.nl/emdb/client" - "code.ewintr.nl/emdb/cmd/api-service/moviestore" "code.ewintr.nl/emdb/job" "code.ewintr.nl/emdb/storage" "github.com/charmbracelet/bubbles/list" @@ -134,7 +133,7 @@ func (m *tabTMDB) ImportMovieCmd(movie Movie) tea.Cmd { if err := m.movieRepo.Store(movie.m); err != nil { return err } - if err := m.jobQueue.Add(movie.m.ID, string(moviestore.ActionRefreshIMDBReviews)); err != nil { + if err := m.jobQueue.Add(movie.m.ID, string(job.ActionRefreshIMDBReviews)); err != nil { return err }