emdb/job/worker.go

143 lines
3.3 KiB
Go
Raw Normal View History

2023-12-30 09:19:53 +01:00
package job
import (
2024-01-20 12:30:06 +01:00
"database/sql"
"errors"
2023-12-30 09:19:53 +01:00
"log/slog"
2024-01-20 12:30:06 +01:00
"time"
2023-12-30 09:19:53 +01:00
2024-03-08 09:25:02 +01:00
"code.ewintr.nl/emdb/client"
2024-03-09 13:18:51 +01:00
"code.ewintr.nl/emdb/storage"
2023-12-30 09:19:53 +01:00
)
type Worker struct {
jq *JobQueue
2024-03-09 13:18:51 +01:00
movieRepo *storage.MovieRepository
reviewRepo *storage.ReviewRepository
2023-12-30 09:19:53 +01:00
imdb *client.IMDB
logger *slog.Logger
}
2024-03-09 13:18:51 +01:00
func NewWorker(jq *JobQueue, movieRepo *storage.MovieRepository, reviewRepo *storage.ReviewRepository, imdb *client.IMDB, logger *slog.Logger) *Worker {
2023-12-30 09:19:53 +01:00
return &Worker{
jq: jq,
movieRepo: movieRepo,
reviewRepo: reviewRepo,
imdb: imdb,
logger: logger.With("service", "worker"),
}
}
func (w *Worker) Run() {
2024-01-20 12:30:06 +01:00
logger := w.logger.With("method", "run")
logger.Info("starting worker")
for {
time.Sleep(interval)
2024-03-09 13:41:57 +01:00
j, err := w.jq.Next()
2024-01-20 12:30:06 +01:00
switch {
case errors.Is(err, sql.ErrNoRows):
logger.Info("no simple jobs found")
continue
case err != nil:
logger.Error("could not get next job", "error", err)
continue
}
2024-01-20 13:24:42 +01:00
logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action)
2023-12-30 09:19:53 +01:00
switch j.Action {
2024-03-09 13:08:25 +01:00
case ActionRefreshIMDBReviews:
2024-01-20 13:24:42 +01:00
w.RefreshReviews(j.ID, j.ActionID)
2024-03-09 13:08:25 +01:00
case ActionRefreshAllIMDBReviews:
2023-12-30 09:19:53 +01:00
w.RefreshAllReviews(j.ID)
2024-03-09 13:08:25 +01:00
case ActionFindAllTitles:
2024-01-20 13:24:42 +01:00
w.FindAllTitles(j.ID)
2023-12-30 09:19:53 +01:00
default:
2024-01-20 12:30:06 +01:00
logger.Error("unknown job action", "action", j.Action)
2023-12-30 09:19:53 +01:00
}
}
}
func (w *Worker) RefreshAllReviews(jobID int) {
logger := w.logger.With("method", "fetchReviews", "jobID", jobID)
movies, err := w.movieRepo.FindAll()
if err != nil {
logger.Error("could not get movies", "error", err)
return
}
for _, m := range movies {
2024-01-20 13:24:42 +01:00
time.Sleep(1 * time.Second)
2024-03-09 13:08:25 +01:00
if err := w.jq.Add(m.ID, ActionRefreshIMDBReviews); err != nil {
2024-01-06 14:32:32 +01:00
logger.Error("could not add job", "error", err)
return
}
2023-12-30 09:19:53 +01:00
}
2024-01-18 07:47:36 +01:00
logger.Info("refresh all reviews", "count", len(movies))
w.jq.MarkDone(jobID)
2023-12-30 09:19:53 +01:00
}
2024-01-20 13:24:42 +01:00
func (w *Worker) FindAllTitles(jobID int) {
logger := w.logger.With("method", "findTitles", "jobID", jobID)
reviews, err := w.reviewRepo.FindAll()
if err != nil {
logger.Error("could not get reviews", "error", err)
w.jq.MarkFailed(jobID)
return
}
for _, r := range reviews {
time.Sleep(1 * time.Second)
2024-03-09 13:08:25 +01:00
if err := w.jq.Add(r.ID, ActionFindTitles); err != nil {
2024-01-20 13:24:42 +01:00
logger.Error("could not add job", "error", err)
w.jq.MarkFailed(jobID)
return
}
}
logger.Info("find all titles", "count", len(reviews))
w.jq.MarkDone(jobID)
}
2023-12-30 09:19:53 +01:00
func (w *Worker) RefreshReviews(jobID int, movieID string) {
logger := w.logger.With("method", "fetchReviews", "jobID", jobID, "movieID", movieID)
m, err := w.movieRepo.FindOne(movieID)
if err != nil {
logger.Error("could not get movie", "error", err)
2024-01-20 13:24:42 +01:00
w.jq.MarkFailed(jobID)
2023-12-30 09:19:53 +01:00
return
}
if err := w.reviewRepo.DeleteByMovieID(m.ID); err != nil {
logger.Error("could not delete reviews", "error", err)
2024-01-20 13:24:42 +01:00
w.jq.MarkFailed(jobID)
2023-12-30 09:19:53 +01:00
return
}
2024-01-17 07:57:52 +01:00
reviews, err := w.imdb.GetReviews(m)
2023-12-30 09:19:53 +01:00
if err != nil {
logger.Error("could not get reviews", "error", err)
2024-01-20 13:24:42 +01:00
w.jq.MarkFailed(jobID)
2023-12-30 09:19:53 +01:00
return
}
2024-01-17 07:57:52 +01:00
for _, review := range reviews {
if err := w.reviewRepo.Store(review); err != nil {
2023-12-30 09:19:53 +01:00
logger.Error("could not store review", "error", err)
2024-01-20 13:24:42 +01:00
w.jq.MarkFailed(jobID)
2023-12-30 09:19:53 +01:00
return
}
2024-03-09 13:08:25 +01:00
if err := w.jq.Add(review.ID, ActionFindTitles); err != nil {
2024-01-20 14:04:55 +01:00
logger.Error("could not add job", "error", err)
w.jq.MarkFailed(jobID)
return
}
2023-12-30 09:19:53 +01:00
}
logger.Info("refresh reviews", "count", len(reviews))
2024-01-18 07:47:36 +01:00
w.jq.MarkDone(jobID)
2023-12-30 09:19:53 +01:00
}