emdb/worker-client/worker/worker.go

76 lines
1.7 KiB
Go
Raw Normal View History

2024-03-09 14:59:18 +01:00
package worker
import (
"database/sql"
"errors"
"log/slog"
"time"
"code.ewintr.nl/emdb/client"
"code.ewintr.nl/emdb/job"
"code.ewintr.nl/emdb/storage"
)
const (
interval = 5 * time.Second
)
type Worker struct {
jq *job.JobQueue
movieRepo *storage.MovieRepository
reviewRepo *storage.ReviewRepository
imdb *client.IMDB
2024-03-09 21:13:32 +01:00
ollama *client.Ollama
2024-03-09 14:59:18 +01:00
logger *slog.Logger
}
2024-03-09 21:13:32 +01:00
func NewWorker(jq *job.JobQueue, movieRepo *storage.MovieRepository, reviewRepo *storage.ReviewRepository, imdb *client.IMDB, ollama *client.Ollama, logger *slog.Logger) *Worker {
2024-03-09 14:59:18 +01:00
return &Worker{
jq: jq,
movieRepo: movieRepo,
reviewRepo: reviewRepo,
imdb: imdb,
2024-03-09 21:13:32 +01:00
ollama: ollama,
2024-03-09 14:59:18 +01:00
logger: logger.With("service", "worker"),
}
}
func (w *Worker) Run() {
logger := w.logger.With("method", "run")
logger.Info("starting worker")
2024-03-10 15:04:23 +01:00
logger.Info("setting al existing jobs to todo")
if err := w.jq.ResetAll(); err != nil {
logger.Error("could not set all jobs to todo", "error", err)
return
}
2024-03-09 14:59:18 +01:00
for {
time.Sleep(interval)
j, err := w.jq.Next()
switch {
case errors.Is(err, sql.ErrNoRows):
2024-03-10 10:21:01 +01:00
//logger.Info("no jobs found")
2024-03-09 14:59:18 +01:00
continue
case err != nil:
logger.Error("could not get next job", "error", err)
continue
}
logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action)
switch j.Action {
case job.ActionRefreshIMDBReviews:
w.RefreshReviews(j.ID, j.ActionID)
case job.ActionRefreshAllIMDBReviews:
w.RefreshAllReviews(j.ID)
case job.ActionFindTitles:
2024-03-09 21:13:32 +01:00
w.FindTitles(j.ID, j.ActionID)
2024-03-09 14:59:18 +01:00
case job.ActionFindAllTitles:
w.FindAllTitles(j.ID)
default:
logger.Error("unknown job action", "action", j.Action)
}
}
}