markfailed, findalltitles

This commit is contained in:
Erik Winter 2024-01-20 13:24:42 +01:00
parent afb1cea172
commit 7ad29cbb46
5 changed files with 66 additions and 11 deletions

View File

@ -52,7 +52,7 @@ func (jobAPI *JobAPI) Add(w http.ResponseWriter, r *http.Request) {
return return
} }
if err := jobAPI.jq.Add(j.MovieID, j.Action); err != nil { if err := jobAPI.jq.Add(j.ActionID, j.Action); err != nil {
Error(w, http.StatusInternalServerError, "could not add job", err, logger) Error(w, http.StatusInternalServerError, "could not add job", err, logger)
return return
} }

View File

@ -34,12 +34,12 @@ var (
) )
type Job struct { type Job struct {
ID int ID int
MovieID string ActionID string
Action string Action string
Status Status Status Status
Created time.Time Created time.Time
Updated time.Time Updated time.Time
} }
func Valid(action string) bool { func Valid(action string) bool {

View File

@ -50,7 +50,7 @@ ORDER BY id ASC
LIMIT 1`, actionsStr) LIMIT 1`, actionsStr)
row := jq.db.QueryRow(query) row := jq.db.QueryRow(query)
var job Job var job Job
err := row.Scan(&job.ID, &job.MovieID, &job.Action) err := row.Scan(&job.ID, &job.ActionID, &job.Action)
if err != nil { if err != nil {
if !errors.Is(err, sql.ErrNoRows) { if !errors.Is(err, sql.ErrNoRows) {
logger.Error("could not fetch next job", "error", err) logger.Error("could not fetch next job", "error", err)
@ -80,6 +80,17 @@ WHERE id=?`, id); err != nil {
return return
} }
func (jq *JobQueue) MarkFailed(id int) {
logger := jq.logger.With("method", "markfailed")
if _, err := jq.db.Exec(`
UPDATE job_queue
SET status='failed'
WHERE id=?`, id); err != nil {
logger.Error("could not mark job failed", "error", err)
}
return
}
func (jq *JobQueue) List() ([]Job, error) { func (jq *JobQueue) List() ([]Job, error) {
rows, err := jq.db.Query(` rows, err := jq.db.Query(`
SELECT id, movie_id, action, status, created_at, updated_at SELECT id, movie_id, action, status, created_at, updated_at
@ -93,7 +104,7 @@ ORDER BY id DESC`)
var jobs []Job var jobs []Job
for rows.Next() { for rows.Next() {
var j Job var j Job
if err := rows.Scan(&j.ID, &j.MovieID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil { if err := rows.Scan(&j.ID, &j.ActionID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil {
return nil, err return nil, err
} }
jobs = append(jobs, j) jobs = append(jobs, j)

View File

@ -43,12 +43,14 @@ func (w *Worker) Run() {
continue continue
} }
logger.Info("got a new job", "jobID", j.ID, "movieID", j.MovieID, "action", j.Action) logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action)
switch j.Action { switch j.Action {
case ActionRefreshIMDBReviews: case ActionRefreshIMDBReviews:
w.RefreshReviews(j.ID, j.MovieID) w.RefreshReviews(j.ID, j.ActionID)
case ActionRefreshAllIMDBReviews: case ActionRefreshAllIMDBReviews:
w.RefreshAllReviews(j.ID) w.RefreshAllReviews(j.ID)
case ActionFindAllTitles:
w.FindAllTitles(j.ID)
default: default:
logger.Error("unknown job action", "action", j.Action) logger.Error("unknown job action", "action", j.Action)
} }
@ -65,6 +67,7 @@ func (w *Worker) RefreshAllReviews(jobID int) {
} }
for _, m := range movies { for _, m := range movies {
time.Sleep(1 * time.Second)
if err := w.jq.Add(m.ID, ActionRefreshIMDBReviews); err != nil { if err := w.jq.Add(m.ID, ActionRefreshIMDBReviews); err != nil {
logger.Error("could not add job", "error", err) logger.Error("could not add job", "error", err)
return return
@ -75,29 +78,56 @@ func (w *Worker) RefreshAllReviews(jobID int) {
w.jq.MarkDone(jobID) w.jq.MarkDone(jobID)
} }
func (w *Worker) FindAllTitles(jobID int) {
logger := w.logger.With("method", "findTitles", "jobID", jobID)
reviews, err := w.reviewRepo.FindAll()
if err != nil {
logger.Error("could not get reviews", "error", err)
w.jq.MarkFailed(jobID)
return
}
for _, r := range reviews {
time.Sleep(1 * time.Second)
if err := w.jq.Add(r.ID, ActionFindTitles); err != nil {
logger.Error("could not add job", "error", err)
w.jq.MarkFailed(jobID)
return
}
}
logger.Info("find all titles", "count", len(reviews))
w.jq.MarkDone(jobID)
}
func (w *Worker) RefreshReviews(jobID int, movieID string) { func (w *Worker) RefreshReviews(jobID int, movieID string) {
logger := w.logger.With("method", "fetchReviews", "jobID", jobID, "movieID", movieID) logger := w.logger.With("method", "fetchReviews", "jobID", jobID, "movieID", movieID)
m, err := w.movieRepo.FindOne(movieID) m, err := w.movieRepo.FindOne(movieID)
if err != nil { if err != nil {
logger.Error("could not get movie", "error", err) logger.Error("could not get movie", "error", err)
w.jq.MarkFailed(jobID)
return return
} }
if err := w.reviewRepo.DeleteByMovieID(m.ID); err != nil { if err := w.reviewRepo.DeleteByMovieID(m.ID); err != nil {
logger.Error("could not delete reviews", "error", err) logger.Error("could not delete reviews", "error", err)
w.jq.MarkFailed(jobID)
return return
} }
reviews, err := w.imdb.GetReviews(m) reviews, err := w.imdb.GetReviews(m)
if err != nil { if err != nil {
logger.Error("could not get reviews", "error", err) logger.Error("could not get reviews", "error", err)
w.jq.MarkFailed(jobID)
return return
} }
for _, review := range reviews { for _, review := range reviews {
if err := w.reviewRepo.Store(review); err != nil { if err := w.reviewRepo.Store(review); err != nil {
logger.Error("could not store review", "error", err) logger.Error("could not store review", "error", err)
w.jq.MarkFailed(jobID)
return return
} }
} }

View File

@ -96,6 +96,20 @@ var sqliteMigrations = []sqliteMigration{
`ALTER TABLE review DROP COLUMN "mentions"`, `ALTER TABLE review DROP COLUMN "mentions"`,
`ALTER TABLE review DROP COLUMN "mentioned_titles"`, `ALTER TABLE review DROP COLUMN "mentioned_titles"`,
`ALTER TABLE review ADD COLUMN "mentioned_titles" JSON NOT NULL Default '{}'`, `ALTER TABLE review ADD COLUMN "mentioned_titles" JSON NOT NULL Default '{}'`,
`BEGIN TRANSACTION;
CREATE TABLE job_queue_new (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"action_id" TEXT NOT NULL,
"action" TEXT NOT NULL DEFAULT "",
"status" TEXT NOT NULL DEFAULT "",
"created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
"updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO job_queue_new (id, action_id, action, status)
SELECT id, movie_id, action, status FROM job_queue;
DROP TABLE job_queue;
ALTER TABLE job_queue_new RENAME TO job_queue;
COMMIT`,
} }
var ( var (