worker
This commit is contained in:
parent
2af51cdb81
commit
ff3b41d427
|
@ -180,8 +180,8 @@ func (e *EMDB) GetReviews(movieID string) ([]moviestore.Review, error) {
|
||||||
return reviews, nil
|
return reviews, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EMDB) GetNextUnratedReview() (moviestore.Review, error) {
|
func (e *EMDB) GetReview(id string) (moviestore.Review, error) {
|
||||||
url := fmt.Sprintf("%s/review/unrated/next", e.baseURL)
|
url := fmt.Sprintf("%s/review/%s", e.baseURL, id)
|
||||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return moviestore.Review{}, err
|
return moviestore.Review{}, err
|
||||||
|
@ -208,8 +208,8 @@ func (e *EMDB) GetNextUnratedReview() (moviestore.Review, error) {
|
||||||
return review, nil
|
return review, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EMDB) GetNextNoTitlesReview() (moviestore.Review, error) {
|
func (e *EMDB) GetNextUnratedReview() (moviestore.Review, error) {
|
||||||
url := fmt.Sprintf("%s/review/no-titles/next", e.baseURL)
|
url := fmt.Sprintf("%s/review/unrated/next", e.baseURL)
|
||||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return moviestore.Review{}, err
|
return moviestore.Review{}, err
|
||||||
|
@ -293,3 +293,31 @@ func (e *EMDB) CreateJob(movieID, action string) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *EMDB) GetNextAIJob() (moviestore.Job, error) {
|
||||||
|
url := fmt.Sprintf("%s/job/next-ai", e.baseURL)
|
||||||
|
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return moviestore.Job{}, err
|
||||||
|
}
|
||||||
|
req.Header.Add("Authorization", e.apiKey)
|
||||||
|
|
||||||
|
resp, err := e.c.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return moviestore.Job{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return moviestore.Job{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var j moviestore.Job
|
||||||
|
if err := json.Unmarshal(body, &j); err != nil {
|
||||||
|
return moviestore.Job{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return j, nil
|
||||||
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"ewintr.nl/emdb/cmd/api-service/job"
|
"ewintr.nl/emdb/cmd/api-service/job"
|
||||||
|
"ewintr.nl/emdb/cmd/api-service/moviestore"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JobAPI struct {
|
type JobAPI struct {
|
||||||
|
@ -46,7 +47,7 @@ func (jobAPI *JobAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
func (jobAPI *JobAPI) Add(w http.ResponseWriter, r *http.Request) {
|
func (jobAPI *JobAPI) Add(w http.ResponseWriter, r *http.Request) {
|
||||||
logger := jobAPI.logger.With("method", "add")
|
logger := jobAPI.logger.With("method", "add")
|
||||||
|
|
||||||
var j job.Job
|
var j moviestore.Job
|
||||||
if err := json.NewDecoder(r.Body).Decode(&j); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(&j); err != nil {
|
||||||
Error(w, http.StatusBadRequest, "could not decode job", err, logger)
|
Error(w, http.StatusBadRequest, "could not decode job", err, logger)
|
||||||
return
|
return
|
||||||
|
@ -81,7 +82,7 @@ func (jobAPI *JobAPI) List(w http.ResponseWriter, r *http.Request) {
|
||||||
func (jobAPI *JobAPI) NextAI(w http.ResponseWriter, r *http.Request) {
|
func (jobAPI *JobAPI) NextAI(w http.ResponseWriter, r *http.Request) {
|
||||||
logger := jobAPI.logger.With("method", "nextai")
|
logger := jobAPI.logger.With("method", "nextai")
|
||||||
|
|
||||||
j, err := jobAPI.jq.Next(job.TypeAI)
|
j, err := jobAPI.jq.Next(moviestore.TypeAI)
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, sql.ErrNoRows):
|
case errors.Is(err, sql.ErrNoRows):
|
||||||
logger.Info("no ai jobs found")
|
logger.Info("no ai jobs found")
|
||||||
|
|
|
@ -1,51 +1,7 @@
|
||||||
package job
|
package job
|
||||||
|
|
||||||
import (
|
import "time"
|
||||||
"slices"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Status string
|
|
||||||
type Type string
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
interval = 20 * time.Second
|
interval = 20 * time.Second
|
||||||
|
|
||||||
TypeSimple Type = "simple"
|
|
||||||
TypeAI Type = "ai"
|
|
||||||
|
|
||||||
ActionRefreshIMDBReviews = "refresh-imdb-reviews"
|
|
||||||
ActionRefreshAllIMDBReviews = "refresh-all-imdb-reviews"
|
|
||||||
ActionFindTitles = "find-titles"
|
|
||||||
ActionFindAllTitles = "find-all-titles"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
simpleActions = []string{
|
|
||||||
ActionRefreshIMDBReviews,
|
|
||||||
ActionRefreshAllIMDBReviews, // just creates a job for each movie
|
|
||||||
ActionFindAllTitles, // just creates a job for each review
|
|
||||||
}
|
|
||||||
aiActions = []string{
|
|
||||||
ActionFindTitles,
|
|
||||||
}
|
|
||||||
|
|
||||||
validActions = append(simpleActions, aiActions...)
|
|
||||||
)
|
|
||||||
|
|
||||||
type Job struct {
|
|
||||||
ID int
|
|
||||||
ActionID string
|
|
||||||
Action string
|
|
||||||
Status Status
|
|
||||||
Created time.Time
|
|
||||||
Updated time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
func Valid(action string) bool {
|
|
||||||
if slices.Contains(validActions, action) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ func NewJobQueue(db *moviestore.SQLite, logger *slog.Logger) *JobQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jq *JobQueue) Add(movieID, action string) error {
|
func (jq *JobQueue) Add(movieID, action string) error {
|
||||||
if !Valid(action) {
|
if !moviestore.Valid(action) {
|
||||||
return errors.New("invalid action")
|
return errors.New("invalid action")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,12 +33,12 @@ func (jq *JobQueue) Add(movieID, action string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jq *JobQueue) Next(t Type) (Job, error) {
|
func (jq *JobQueue) Next(t moviestore.JobType) (moviestore.Job, error) {
|
||||||
logger := jq.logger.With("method", "next")
|
logger := jq.logger.With("method", "next")
|
||||||
|
|
||||||
actions := simpleActions
|
actions := moviestore.SimpleActions
|
||||||
if t == TypeAI {
|
if t == moviestore.TypeAI {
|
||||||
actions = aiActions
|
actions = moviestore.AIActions
|
||||||
}
|
}
|
||||||
actionsStr := fmt.Sprintf("('%s')", strings.Join(actions, "', '"))
|
actionsStr := fmt.Sprintf("('%s')", strings.Join(actions, "', '"))
|
||||||
query := fmt.Sprintf(`
|
query := fmt.Sprintf(`
|
||||||
|
@ -49,13 +49,13 @@ WHERE status='todo'
|
||||||
ORDER BY id ASC
|
ORDER BY id ASC
|
||||||
LIMIT 1`, actionsStr)
|
LIMIT 1`, actionsStr)
|
||||||
row := jq.db.QueryRow(query)
|
row := jq.db.QueryRow(query)
|
||||||
var job Job
|
var job moviestore.Job
|
||||||
err := row.Scan(&job.ID, &job.ActionID, &job.Action)
|
err := row.Scan(&job.ID, &job.ActionID, &job.Action)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, sql.ErrNoRows) {
|
if !errors.Is(err, sql.ErrNoRows) {
|
||||||
logger.Error("could not fetch next job", "error", err)
|
logger.Error("could not fetch next job", "error", err)
|
||||||
}
|
}
|
||||||
return Job{}, err
|
return moviestore.Job{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("found a job", "id", job.ID)
|
logger.Info("found a job", "id", job.ID)
|
||||||
|
@ -64,7 +64,7 @@ UPDATE job_queue
|
||||||
SET status='doing'
|
SET status='doing'
|
||||||
WHERE id=?`, job.ID); err != nil {
|
WHERE id=?`, job.ID); err != nil {
|
||||||
logger.Error("could not set job to doing", "error")
|
logger.Error("could not set job to doing", "error")
|
||||||
return Job{}, err
|
return moviestore.Job{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return job, nil
|
return job, nil
|
||||||
|
@ -91,7 +91,7 @@ WHERE id=?`, id); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jq *JobQueue) List() ([]Job, error) {
|
func (jq *JobQueue) List() ([]moviestore.Job, error) {
|
||||||
rows, err := jq.db.Query(`
|
rows, err := jq.db.Query(`
|
||||||
SELECT id, action_id, action, status, created_at, updated_at
|
SELECT id, action_id, action, status, created_at, updated_at
|
||||||
FROM job_queue
|
FROM job_queue
|
||||||
|
@ -101,9 +101,9 @@ ORDER BY id DESC`)
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
var jobs []Job
|
var jobs []moviestore.Job
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var j Job
|
var j moviestore.Job
|
||||||
if err := rows.Scan(&j.ID, &j.ActionID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil {
|
if err := rows.Scan(&j.ID, &j.ActionID, &j.Action, &j.Status, &j.Created, &j.Updated); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ func (w *Worker) Run() {
|
||||||
logger.Info("starting worker")
|
logger.Info("starting worker")
|
||||||
for {
|
for {
|
||||||
time.Sleep(interval)
|
time.Sleep(interval)
|
||||||
j, err := w.jq.Next(TypeSimple)
|
j, err := w.jq.Next(moviestore.TypeSimple)
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, sql.ErrNoRows):
|
case errors.Is(err, sql.ErrNoRows):
|
||||||
logger.Info("no simple jobs found")
|
logger.Info("no simple jobs found")
|
||||||
|
@ -45,11 +45,11 @@ func (w *Worker) Run() {
|
||||||
|
|
||||||
logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action)
|
logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action)
|
||||||
switch j.Action {
|
switch j.Action {
|
||||||
case ActionRefreshIMDBReviews:
|
case moviestore.ActionRefreshIMDBReviews:
|
||||||
w.RefreshReviews(j.ID, j.ActionID)
|
w.RefreshReviews(j.ID, j.ActionID)
|
||||||
case ActionRefreshAllIMDBReviews:
|
case moviestore.ActionRefreshAllIMDBReviews:
|
||||||
w.RefreshAllReviews(j.ID)
|
w.RefreshAllReviews(j.ID)
|
||||||
case ActionFindAllTitles:
|
case moviestore.ActionFindAllTitles:
|
||||||
w.FindAllTitles(j.ID)
|
w.FindAllTitles(j.ID)
|
||||||
default:
|
default:
|
||||||
logger.Error("unknown job action", "action", j.Action)
|
logger.Error("unknown job action", "action", j.Action)
|
||||||
|
@ -68,7 +68,7 @@ func (w *Worker) RefreshAllReviews(jobID int) {
|
||||||
|
|
||||||
for _, m := range movies {
|
for _, m := range movies {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
if err := w.jq.Add(m.ID, ActionRefreshIMDBReviews); err != nil {
|
if err := w.jq.Add(m.ID, moviestore.ActionRefreshIMDBReviews); err != nil {
|
||||||
logger.Error("could not add job", "error", err)
|
logger.Error("could not add job", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -90,7 +90,7 @@ func (w *Worker) FindAllTitles(jobID int) {
|
||||||
|
|
||||||
for _, r := range reviews {
|
for _, r := range reviews {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
if err := w.jq.Add(r.ID, ActionFindTitles); err != nil {
|
if err := w.jq.Add(r.ID, moviestore.ActionFindTitles); err != nil {
|
||||||
logger.Error("could not add job", "error", err)
|
logger.Error("could not add job", "error", err)
|
||||||
w.jq.MarkFailed(jobID)
|
w.jq.MarkFailed(jobID)
|
||||||
return
|
return
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
package moviestore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"slices"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobStatus string
|
||||||
|
type JobType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
interval = 20 * time.Second
|
||||||
|
|
||||||
|
TypeSimple JobType = "simple"
|
||||||
|
TypeAI JobType = "ai"
|
||||||
|
|
||||||
|
ActionRefreshIMDBReviews = "refresh-imdb-reviews"
|
||||||
|
ActionRefreshAllIMDBReviews = "refresh-all-imdb-reviews"
|
||||||
|
ActionFindTitles = "find-titles"
|
||||||
|
ActionFindAllTitles = "find-all-titles"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
SimpleActions = []string{
|
||||||
|
ActionRefreshIMDBReviews,
|
||||||
|
ActionRefreshAllIMDBReviews, // just creates a job for each movie
|
||||||
|
ActionFindAllTitles, // just creates a job for each review
|
||||||
|
}
|
||||||
|
AIActions = []string{
|
||||||
|
ActionFindTitles,
|
||||||
|
}
|
||||||
|
|
||||||
|
ValidActions = append(SimpleActions, AIActions...)
|
||||||
|
)
|
||||||
|
|
||||||
|
type Job struct {
|
||||||
|
ID int
|
||||||
|
ActionID string
|
||||||
|
Action string
|
||||||
|
Status JobStatus
|
||||||
|
Created time.Time
|
||||||
|
Updated time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func Valid(action string) bool {
|
||||||
|
if slices.Contains(ValidActions, action) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"ewintr.nl/emdb/client"
|
"ewintr.nl/emdb/client"
|
||||||
"ewintr.nl/emdb/cmd/api-service/job"
|
"ewintr.nl/emdb/cmd/api-service/moviestore"
|
||||||
"github.com/charmbracelet/bubbles/list"
|
"github.com/charmbracelet/bubbles/list"
|
||||||
"github.com/charmbracelet/bubbles/textinput"
|
"github.com/charmbracelet/bubbles/textinput"
|
||||||
tea "github.com/charmbracelet/bubbletea"
|
tea "github.com/charmbracelet/bubbletea"
|
||||||
|
@ -131,7 +131,7 @@ func (m *tabTMDB) ImportMovieCmd(movie Movie) tea.Cmd {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := m.emdb.CreateJob(newMovie.ID, string(job.ActionRefreshIMDBReviews)); err != nil {
|
if err := m.emdb.CreateJob(newMovie.ID, string(moviestore.ActionRefreshIMDBReviews)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,12 @@ func main() {
|
||||||
//movieID := "2fce2f8f-a048-4e39-8ffe-82df09a29d32" // shadows in paradise
|
//movieID := "2fce2f8f-a048-4e39-8ffe-82df09a29d32" // shadows in paradise
|
||||||
|
|
||||||
emdb := client.NewEMDB(os.Getenv("EMDB_BASE_URL"), os.Getenv("EMDB_API_KEY"))
|
emdb := client.NewEMDB(os.Getenv("EMDB_BASE_URL"), os.Getenv("EMDB_API_KEY"))
|
||||||
review, err := emdb.GetNextNoTitlesReview()
|
j, err := emdb.GetNextAIJob()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
review, err := emdb.GetReview(j.ActionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
|
Loading…
Reference in New Issue