diff --git a/Makefile b/Makefile index d8c983a..a1d7606 100644 --- a/Makefile +++ b/Makefile @@ -11,5 +11,5 @@ md-export: go run ./markdown-export/main.go worker: - go run ./worker/main.go + go run ./worker-client/main.go diff --git a/job/job.go b/job/job.go index 62b0738..3046fe5 100644 --- a/job/job.go +++ b/job/job.go @@ -9,8 +9,6 @@ type JobStatus string type JobType string const ( - interval = 20 * time.Second - TypeSimple JobType = "simple" TypeAI JobType = "ai" diff --git a/job/worker.go b/job/worker.go deleted file mode 100644 index b5f7bc6..0000000 --- a/job/worker.go +++ /dev/null @@ -1,142 +0,0 @@ -package job - -import ( - "database/sql" - "errors" - "log/slog" - "time" - - "code.ewintr.nl/emdb/client" - "code.ewintr.nl/emdb/storage" -) - -type Worker struct { - jq *JobQueue - movieRepo *storage.MovieRepository - reviewRepo *storage.ReviewRepository - imdb *client.IMDB - logger *slog.Logger -} - -func NewWorker(jq *JobQueue, movieRepo *storage.MovieRepository, reviewRepo *storage.ReviewRepository, imdb *client.IMDB, logger *slog.Logger) *Worker { - return &Worker{ - jq: jq, - movieRepo: movieRepo, - reviewRepo: reviewRepo, - imdb: imdb, - logger: logger.With("service", "worker"), - } -} - -func (w *Worker) Run() { - logger := w.logger.With("method", "run") - logger.Info("starting worker") - for { - time.Sleep(interval) - j, err := w.jq.Next() - switch { - case errors.Is(err, sql.ErrNoRows): - logger.Info("no simple jobs found") - continue - case err != nil: - logger.Error("could not get next job", "error", err) - continue - } - - logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action) - switch j.Action { - case ActionRefreshIMDBReviews: - w.RefreshReviews(j.ID, j.ActionID) - case ActionRefreshAllIMDBReviews: - w.RefreshAllReviews(j.ID) - case ActionFindAllTitles: - w.FindAllTitles(j.ID) - default: - logger.Error("unknown job action", "action", j.Action) - } - } -} - -func (w *Worker) RefreshAllReviews(jobID int) { - logger := w.logger.With("method", "fetchReviews", "jobID", jobID) - - movies, err := w.movieRepo.FindAll() - if err != nil { - logger.Error("could not get movies", "error", err) - return - } - - for _, m := range movies { - time.Sleep(1 * time.Second) - if err := w.jq.Add(m.ID, ActionRefreshIMDBReviews); err != nil { - logger.Error("could not add job", "error", err) - return - } - } - - logger.Info("refresh all reviews", "count", len(movies)) - w.jq.MarkDone(jobID) -} - -func (w *Worker) FindAllTitles(jobID int) { - logger := w.logger.With("method", "findTitles", "jobID", jobID) - - reviews, err := w.reviewRepo.FindAll() - if err != nil { - logger.Error("could not get reviews", "error", err) - w.jq.MarkFailed(jobID) - return - } - - for _, r := range reviews { - time.Sleep(1 * time.Second) - if err := w.jq.Add(r.ID, ActionFindTitles); err != nil { - logger.Error("could not add job", "error", err) - w.jq.MarkFailed(jobID) - return - } - } - - logger.Info("find all titles", "count", len(reviews)) - w.jq.MarkDone(jobID) -} - -func (w *Worker) RefreshReviews(jobID int, movieID string) { - logger := w.logger.With("method", "fetchReviews", "jobID", jobID, "movieID", movieID) - - m, err := w.movieRepo.FindOne(movieID) - if err != nil { - logger.Error("could not get movie", "error", err) - w.jq.MarkFailed(jobID) - return - } - - if err := w.reviewRepo.DeleteByMovieID(m.ID); err != nil { - logger.Error("could not delete reviews", "error", err) - w.jq.MarkFailed(jobID) - return - } - - reviews, err := w.imdb.GetReviews(m) - if err != nil { - logger.Error("could not get reviews", "error", err) - w.jq.MarkFailed(jobID) - return - } - - for _, review := range reviews { - if err := w.reviewRepo.Store(review); err != nil { - logger.Error("could not store review", "error", err) - w.jq.MarkFailed(jobID) - return - } - if err := w.jq.Add(review.ID, ActionFindTitles); err != nil { - logger.Error("could not add job", "error", err) - w.jq.MarkFailed(jobID) - return - } - } - - logger.Info("refresh reviews", "count", len(reviews)) - w.jq.MarkDone(jobID) -} diff --git a/terminal-client/tui/tabtmdb.go b/terminal-client/tui/tabtmdb.go index 0e5a8eb..71b2559 100644 --- a/terminal-client/tui/tabtmdb.go +++ b/terminal-client/tui/tabtmdb.go @@ -9,6 +9,7 @@ import ( "github.com/charmbracelet/bubbles/list" "github.com/charmbracelet/bubbles/textinput" tea "github.com/charmbracelet/bubbletea" + "github.com/google/uuid" ) type tabTMDB struct { @@ -74,6 +75,7 @@ func (m tabTMDB) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case "i": if m.focused == "result" { movie := m.searchResults.SelectedItem().(Movie) + movie.m.ID = uuid.New().String() cmds = append(cmds, m.ImportMovieCmd(movie), m.ResetCmd()) m.Log(fmt.Sprintf("imported movie %s", movie.Title())) } diff --git a/worker-client/main.go b/worker-client/main.go new file mode 100644 index 0000000..b296837 --- /dev/null +++ b/worker-client/main.go @@ -0,0 +1,53 @@ +package main + +import ( + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + + "code.ewintr.nl/emdb/client" + "code.ewintr.nl/emdb/job" + "code.ewintr.nl/emdb/storage" + "code.ewintr.nl/emdb/worker-client/worker" +) + +const ( + mentionsTemplate = `The following text is a user comment about the movie {{.title}}. In it, the user may have referenced other movie titles. List them if you see any. + +---- +{{.review}} +---- + +If you found any movie titles other than {{.title}}, list them below in a JSON array. If there are other titles, like TV shows, books or games, ignore them. The format is as follows: + +["movie title 1", "movie title 2"] + +Just answer with the JSON and nothing else. If you don't see any other movie titles, just answer with an empty array.` +) + +func main() { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + dbHost := os.Getenv("EMDB_DB_HOST") + dbName := os.Getenv("EMDB_DB_NAME") + dbUser := os.Getenv("EMDB_DB_USER") + dbPassword := os.Getenv("EMDB_DB_PASSWORD") + pgConnStr := fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=disable", dbHost, dbUser, dbPassword, dbName) + dbPostgres, err := storage.NewPostgres(pgConnStr) + if err != nil { + fmt.Printf("could not create new postgres repo: %s", err.Error()) + os.Exit(1) + } + movieRepo := storage.NewMovieRepository(dbPostgres) + reviewRepo := storage.NewReviewRepository(dbPostgres) + jobQueue := job.NewJobQueue(dbPostgres, logger) + + w := worker.NewWorker(jobQueue, movieRepo, reviewRepo, client.NewIMDB(), logger) + + go w.Run() + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + <-c +} diff --git a/worker-client/worker/findalltitles.go b/worker-client/worker/findalltitles.go new file mode 100644 index 0000000..a136881 --- /dev/null +++ b/worker-client/worker/findalltitles.go @@ -0,0 +1,30 @@ +package worker + +import ( + "time" + + "code.ewintr.nl/emdb/job" +) + +func (w *Worker) FindAllTitles(jobID int) { + logger := w.logger.With("method", "findAllTitles", "jobID", jobID) + + reviews, err := w.reviewRepo.FindAll() + if err != nil { + logger.Error("could not get reviews", "error", err) + w.jq.MarkFailed(jobID) + return + } + + for _, r := range reviews { + time.Sleep(1 * time.Second) + if err := w.jq.Add(r.ID, job.ActionFindTitles); err != nil { + logger.Error("could not add job", "error", err) + w.jq.MarkFailed(jobID) + return + } + } + + logger.Info("find all titles", "count", len(reviews)) + w.jq.MarkDone(jobID) +} diff --git a/worker-client/worker/findtitles.go b/worker-client/worker/findtitles.go new file mode 100644 index 0000000..cce564b --- /dev/null +++ b/worker-client/worker/findtitles.go @@ -0,0 +1,102 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/tmc/langchaingo/chains" + "github.com/tmc/langchaingo/llms/ollama" + "github.com/tmc/langchaingo/prompts" +) + +const ( + mentionsTemplate = `The following text is a user comment about the movie {{.title}}. In it, the user may have referenced other movie titles. List them if you see any. + +---- +{{.review}} +---- + +If you found any movie titles other than {{.title}}, list them below in a JSON array. If there are other titles, like TV shows, books or games, ignore them. The format is as follows: + +["movie title 1", "movie title 2"] + +Just answer with the JSON and nothing else. If you don't see any other movie titles, just answer with an empty array.` +) + +func (w *Worker) FindTitles(jobID int, reviewID string) { + logger := w.logger.With("method", "findTitles", "jobID", jobID) + + review, err := w.reviewRepo.FindOne(reviewID) + if err != nil { + logger.Error("could not get review", "error", err) + w.jq.MarkFailed(jobID) + return + } + + movie, err := w.movieRepo.FindOne(review.MovieID) + if err != nil { + logger.Error("could not get movie", "error", err) + w.jq.MarkFailed(jobID) + return + } + llm, err := ollama.New(ollama.WithModel("mistral")) + if err != nil { + logger.Error("could not create llm", "error", err) + w.jq.MarkFailed(jobID) + return + } + + ctx := context.Background() + + prompt := prompts.NewPromptTemplate( + mentionsTemplate, + []string{"title", "review"}, + ) + llmChain := chains.NewLLMChain(llm, prompt) + + movieTitle := movie.Title + if movie.EnglishTitle != "" && movie.EnglishTitle != movie.Title { + movieTitle = fmt.Sprintf("%s (English title: %s)", movieTitle, movie.EnglishTitle) + } + fmt.Printf("Processing review for movie: %s\n", movieTitle) + fmt.Printf("Review: %s\n", review.Review) + + outputValues, err := chains.Call(ctx, llmChain, map[string]any{ + "title": movieTitle, + "review": review.Review, + }) + if err != nil { + logger.Error("could not call chain", "error", err) + w.jq.MarkFailed(jobID) + return + } + + out, ok := outputValues[llmChain.OutputKey].(string) + if !ok { + logger.Error("chain output is not valid") + w.jq.MarkFailed(jobID) + return + } + //fmt.Println(out) + resp := struct { + Movies []string `json:"movies"` + TVShows []string `json:"tvShows"` + Games []string `json:"games"` + Books []string `json:"books"` + }{} + + if err := json.Unmarshal([]byte(out), &resp); err != nil { + logger.Error("could not unmarshal llm response", "error", err) + w.jq.MarkFailed(jobID) + return + } + + review.Titles = resp + + if err := w.reviewRepo.Store(review); err != nil { + logger.Error("could not update review", "error", err) + w.jq.MarkFailed(jobID) + return + } +} diff --git a/worker-client/worker/refreshallreviews.go b/worker-client/worker/refreshallreviews.go new file mode 100644 index 0000000..da2c803 --- /dev/null +++ b/worker-client/worker/refreshallreviews.go @@ -0,0 +1,28 @@ +package worker + +import ( + "time" + + "code.ewintr.nl/emdb/job" +) + +func (w *Worker) RefreshAllReviews(jobID int) { + logger := w.logger.With("method", "fetchReviews", "jobID", jobID) + + movies, err := w.movieRepo.FindAll() + if err != nil { + logger.Error("could not get movies", "error", err) + return + } + + for _, m := range movies { + time.Sleep(1 * time.Second) + if err := w.jq.Add(m.ID, job.ActionRefreshIMDBReviews); err != nil { + logger.Error("could not add job", "error", err) + return + } + } + + logger.Info("refresh all reviews", "count", len(movies)) + w.jq.MarkDone(jobID) +} diff --git a/worker-client/worker/refreshreviews.go b/worker-client/worker/refreshreviews.go new file mode 100644 index 0000000..6125d5c --- /dev/null +++ b/worker-client/worker/refreshreviews.go @@ -0,0 +1,43 @@ +package worker + +import "code.ewintr.nl/emdb/job" + +func (w *Worker) RefreshReviews(jobID int, movieID string) { + logger := w.logger.With("method", "fetchReviews", "jobID", jobID, "movieID", movieID) + + m, err := w.movieRepo.FindOne(movieID) + if err != nil { + logger.Error("could not get movie", "error", err) + w.jq.MarkFailed(jobID) + return + } + + if err := w.reviewRepo.DeleteByMovieID(m.ID); err != nil { + logger.Error("could not delete reviews", "error", err) + w.jq.MarkFailed(jobID) + return + } + + reviews, err := w.imdb.GetReviews(m) + if err != nil { + logger.Error("could not get reviews", "error", err) + w.jq.MarkFailed(jobID) + return + } + + for _, review := range reviews { + if err := w.reviewRepo.Store(review); err != nil { + logger.Error("could not store review", "error", err) + w.jq.MarkFailed(jobID) + return + } + if err := w.jq.Add(review.ID, job.ActionFindTitles); err != nil { + logger.Error("could not add job", "error", err) + w.jq.MarkFailed(jobID) + return + } + } + + logger.Info("refresh reviews", "count", len(reviews)) + w.jq.MarkDone(jobID) +} diff --git a/worker-client/worker/worker.go b/worker-client/worker/worker.go new file mode 100644 index 0000000..331a445 --- /dev/null +++ b/worker-client/worker/worker.go @@ -0,0 +1,67 @@ +package worker + +import ( + "database/sql" + "errors" + "log/slog" + "time" + + "code.ewintr.nl/emdb/client" + "code.ewintr.nl/emdb/job" + "code.ewintr.nl/emdb/storage" +) + +const ( + interval = 5 * time.Second +) + +type Worker struct { + jq *job.JobQueue + movieRepo *storage.MovieRepository + reviewRepo *storage.ReviewRepository + imdb *client.IMDB + logger *slog.Logger +} + +func NewWorker(jq *job.JobQueue, movieRepo *storage.MovieRepository, reviewRepo *storage.ReviewRepository, imdb *client.IMDB, logger *slog.Logger) *Worker { + return &Worker{ + jq: jq, + movieRepo: movieRepo, + reviewRepo: reviewRepo, + imdb: imdb, + logger: logger.With("service", "worker"), + } +} + +func (w *Worker) Run() { + logger := w.logger.With("method", "run") + logger.Info("starting worker") + + for { + time.Sleep(interval) + + j, err := w.jq.Next() + switch { + case errors.Is(err, sql.ErrNoRows): + logger.Info("no jobs found") + continue + case err != nil: + logger.Error("could not get next job", "error", err) + continue + } + + logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action) + switch j.Action { + case job.ActionRefreshIMDBReviews: + w.RefreshReviews(j.ID, j.ActionID) + case job.ActionRefreshAllIMDBReviews: + w.RefreshAllReviews(j.ID) + case job.ActionFindTitles: + //w.FindTitles(j.ID, j.ActionID) + case job.ActionFindAllTitles: + w.FindAllTitles(j.ID) + default: + logger.Error("unknown job action", "action", j.Action) + } + } +} diff --git a/worker/main.go b/worker/main.go deleted file mode 100644 index ac89787..0000000 --- a/worker/main.go +++ /dev/null @@ -1,128 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "os" - "os/signal" - "syscall" - - "code.ewintr.nl/emdb/job" - "code.ewintr.nl/emdb/storage" - "github.com/tmc/langchaingo/chains" - "github.com/tmc/langchaingo/llms/ollama" - "github.com/tmc/langchaingo/prompts" -) - -const ( - mentionsTemplate = `The following text is a user comment about the movie {{.title}}. In it, the user may have referenced other movie titles. List them if you see any. - ----- -{{.review}} ----- - -If you found any movie titles other than {{.title}}, list them below in a JSON array. If there are other titles, like TV shows, books or games, ignore them. The format is as follows: - -["movie title 1", "movie title 2"] - -Just answer with the JSON and nothing else. If you don't see any other movie titles, just answer with an empty array.` -) - -func main() { - logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) - dbHost := os.Getenv("EMDB_DB_HOST") - dbName := os.Getenv("EMDB_DB_NAME") - dbUser := os.Getenv("EMDB_DB_USER") - dbPassword := os.Getenv("EMDB_DB_PASSWORD") - pgConnStr := fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=disable", dbHost, dbUser, dbPassword, dbName) - dbPostgres, err := storage.NewPostgres(pgConnStr) - if err != nil { - fmt.Printf("could not create new postgres repo: %s", err.Error()) - os.Exit(1) - } - movieRepo := storage.NewMovieRepository(dbPostgres) - reviewRepo := storage.NewReviewRepository(dbPostgres) - jobQueue := job.NewJobQueue(dbPostgres, logger) - - go Work(movieRepo, reviewRepo, jobQueue) - - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - <-c -} - -func Work(movieRepo *storage.MovieRepository, reviewRepo *storage.ReviewRepository, jobQueue *job.JobQueue) { - for { - j, err := jobQueue.Next() - if err != nil { - fmt.Println(err) - os.Exit(1) - } - review, err := reviewRepo.FindOne(j.ActionID) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - movie, err := movieRepo.FindOne(review.MovieID) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - llm, err := ollama.New(ollama.WithModel("mistral")) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - ctx := context.Background() - - prompt := prompts.NewPromptTemplate( - mentionsTemplate, - []string{"title", "review"}, - ) - llmChain := chains.NewLLMChain(llm, prompt) - - movieTitle := movie.Title - if movie.EnglishTitle != "" && movie.EnglishTitle != movie.Title { - movieTitle = fmt.Sprintf("%s (English title: %s)", movieTitle, movie.EnglishTitle) - } - fmt.Printf("Processing review for movie: %s\n", movieTitle) - fmt.Printf("Review: %s\n", review.Review) - - outputValues, err := chains.Call(ctx, llmChain, map[string]any{ - "title": movieTitle, - "review": review.Review, - }) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - out, ok := outputValues[llmChain.OutputKey].(string) - if !ok { - fmt.Println("invalid chain return") - } - fmt.Println(out) - resp := struct { - Movies []string `json:"movies"` - TVShows []string `json:"tvShows"` - Games []string `json:"games"` - Books []string `json:"books"` - }{} - - if err := json.Unmarshal([]byte(out), &resp); err != nil { - fmt.Printf("could not unmarshal llm response, skipping this one: %s", err) - os.Exit(1) - } - - review.Titles = resp - - if err := reviewRepo.Store(review); err != nil { - fmt.Printf("could not update review: %s\n", err) - os.Exit(1) - } - } -}