diff --git a/cmd/api-service/handler/job.go b/cmd/api-service/handler/job.go index 82c5e5e..ed2b44b 100644 --- a/cmd/api-service/handler/job.go +++ b/cmd/api-service/handler/job.go @@ -52,7 +52,7 @@ func (jobAPI *JobAPI) Add(w http.ResponseWriter, r *http.Request) { return } - if err := jobAPI.jq.Add(j.MovieID, j.Action); err != nil { + if err := jobAPI.jq.Add(j.ActionID, j.Action); err != nil { Error(w, http.StatusInternalServerError, "could not add job", err, logger) return } diff --git a/cmd/api-service/job/job.go b/cmd/api-service/job/job.go index d4fddfb..18db9b3 100644 --- a/cmd/api-service/job/job.go +++ b/cmd/api-service/job/job.go @@ -34,12 +34,12 @@ var ( ) type Job struct { - ID int - MovieID string - Action string - Status Status - Created time.Time - Updated time.Time + ID int + ActionID string + Action string + Status Status + Created time.Time + Updated time.Time } func Valid(action string) bool { diff --git a/cmd/api-service/job/queue.go b/cmd/api-service/job/queue.go index b152681..741f7f5 100644 --- a/cmd/api-service/job/queue.go +++ b/cmd/api-service/job/queue.go @@ -50,7 +50,7 @@ ORDER BY id ASC LIMIT 1`, actionsStr) row := jq.db.QueryRow(query) var job Job - err := row.Scan(&job.ID, &job.MovieID, &job.Action) + err := row.Scan(&job.ID, &job.ActionID, &job.Action) if err != nil { if !errors.Is(err, sql.ErrNoRows) { logger.Error("could not fetch next job", "error", err) @@ -80,6 +80,17 @@ WHERE id=?`, id); err != nil { return } +func (jq *JobQueue) MarkFailed(id int) { + logger := jq.logger.With("method", "markfailed") + if _, err := jq.db.Exec(` +UPDATE job_queue +SET status='failed' +WHERE id=?`, id); err != nil { + logger.Error("could not mark job failed", "error", err) + } + return +} + func (jq *JobQueue) List() ([]Job, error) { rows, err := jq.db.Query(` SELECT id, movie_id, action, status, created_at, updated_at @@ -93,7 +104,7 @@ ORDER BY id DESC`) var jobs []Job for rows.Next() { var j Job - if err := rows.Scan(&j.ID, &j.MovieID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil { + if err := rows.Scan(&j.ID, &j.ActionID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil { return nil, err } jobs = append(jobs, j) diff --git a/cmd/api-service/job/worker.go b/cmd/api-service/job/worker.go index bb10f42..8e66f75 100644 --- a/cmd/api-service/job/worker.go +++ b/cmd/api-service/job/worker.go @@ -43,12 +43,14 @@ func (w *Worker) Run() { continue } - logger.Info("got a new job", "jobID", j.ID, "movieID", j.MovieID, "action", j.Action) + logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action) switch j.Action { case ActionRefreshIMDBReviews: - w.RefreshReviews(j.ID, j.MovieID) + w.RefreshReviews(j.ID, j.ActionID) case ActionRefreshAllIMDBReviews: w.RefreshAllReviews(j.ID) + case ActionFindAllTitles: + w.FindAllTitles(j.ID) default: logger.Error("unknown job action", "action", j.Action) } @@ -65,6 +67,7 @@ func (w *Worker) RefreshAllReviews(jobID int) { } for _, m := range movies { + time.Sleep(1 * time.Second) if err := w.jq.Add(m.ID, ActionRefreshIMDBReviews); err != nil { logger.Error("could not add job", "error", err) return @@ -75,29 +78,56 @@ func (w *Worker) RefreshAllReviews(jobID int) { w.jq.MarkDone(jobID) } +func (w *Worker) FindAllTitles(jobID int) { + logger := w.logger.With("method", "findTitles", "jobID", jobID) + + reviews, err := w.reviewRepo.FindAll() + if err != nil { + logger.Error("could not get reviews", "error", err) + w.jq.MarkFailed(jobID) + return + } + + for _, r := range reviews { + time.Sleep(1 * time.Second) + if err := w.jq.Add(r.ID, ActionFindTitles); err != nil { + logger.Error("could not add job", "error", err) + w.jq.MarkFailed(jobID) + return + } + } + + logger.Info("find all titles", "count", len(reviews)) + w.jq.MarkDone(jobID) +} + func (w *Worker) RefreshReviews(jobID int, movieID string) { logger := w.logger.With("method", "fetchReviews", "jobID", jobID, "movieID", movieID) m, err := w.movieRepo.FindOne(movieID) if err != nil { logger.Error("could not get movie", "error", err) + w.jq.MarkFailed(jobID) return } if err := w.reviewRepo.DeleteByMovieID(m.ID); err != nil { logger.Error("could not delete reviews", "error", err) + w.jq.MarkFailed(jobID) return } reviews, err := w.imdb.GetReviews(m) if err != nil { logger.Error("could not get reviews", "error", err) + w.jq.MarkFailed(jobID) return } for _, review := range reviews { if err := w.reviewRepo.Store(review); err != nil { logger.Error("could not store review", "error", err) + w.jq.MarkFailed(jobID) return } } diff --git a/cmd/api-service/moviestore/sqlite.go b/cmd/api-service/moviestore/sqlite.go index acc6de4..9e069bf 100644 --- a/cmd/api-service/moviestore/sqlite.go +++ b/cmd/api-service/moviestore/sqlite.go @@ -96,6 +96,20 @@ var sqliteMigrations = []sqliteMigration{ `ALTER TABLE review DROP COLUMN "mentions"`, `ALTER TABLE review DROP COLUMN "mentioned_titles"`, `ALTER TABLE review ADD COLUMN "mentioned_titles" JSON NOT NULL Default '{}'`, + `BEGIN TRANSACTION; + CREATE TABLE job_queue_new ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "action_id" TEXT NOT NULL, + "action" TEXT NOT NULL DEFAULT "", + "status" TEXT NOT NULL DEFAULT "", + "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP, + "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP + ); + INSERT INTO job_queue_new (id, action_id, action, status) + SELECT id, movie_id, action, status FROM job_queue; + DROP TABLE job_queue; + ALTER TABLE job_queue_new RENAME TO job_queue; + COMMIT`, } var (