integrate workers
This commit is contained in:
parent
f82841fc34
commit
002702745d
2
Makefile
2
Makefile
|
@ -11,5 +11,5 @@ md-export:
|
||||||
go run ./markdown-export/main.go
|
go run ./markdown-export/main.go
|
||||||
|
|
||||||
worker:
|
worker:
|
||||||
go run ./worker/main.go
|
go run ./worker-client/main.go
|
||||||
|
|
||||||
|
|
|
@ -9,8 +9,6 @@ type JobStatus string
|
||||||
type JobType string
|
type JobType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
interval = 20 * time.Second
|
|
||||||
|
|
||||||
TypeSimple JobType = "simple"
|
TypeSimple JobType = "simple"
|
||||||
TypeAI JobType = "ai"
|
TypeAI JobType = "ai"
|
||||||
|
|
||||||
|
|
142
job/worker.go
142
job/worker.go
|
@ -1,142 +0,0 @@
|
||||||
package job
|
|
||||||
|
|
||||||
import (
|
|
||||||
"database/sql"
|
|
||||||
"errors"
|
|
||||||
"log/slog"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"code.ewintr.nl/emdb/client"
|
|
||||||
"code.ewintr.nl/emdb/storage"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Worker struct {
|
|
||||||
jq *JobQueue
|
|
||||||
movieRepo *storage.MovieRepository
|
|
||||||
reviewRepo *storage.ReviewRepository
|
|
||||||
imdb *client.IMDB
|
|
||||||
logger *slog.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWorker(jq *JobQueue, movieRepo *storage.MovieRepository, reviewRepo *storage.ReviewRepository, imdb *client.IMDB, logger *slog.Logger) *Worker {
|
|
||||||
return &Worker{
|
|
||||||
jq: jq,
|
|
||||||
movieRepo: movieRepo,
|
|
||||||
reviewRepo: reviewRepo,
|
|
||||||
imdb: imdb,
|
|
||||||
logger: logger.With("service", "worker"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) Run() {
|
|
||||||
logger := w.logger.With("method", "run")
|
|
||||||
logger.Info("starting worker")
|
|
||||||
for {
|
|
||||||
time.Sleep(interval)
|
|
||||||
j, err := w.jq.Next()
|
|
||||||
switch {
|
|
||||||
case errors.Is(err, sql.ErrNoRows):
|
|
||||||
logger.Info("no simple jobs found")
|
|
||||||
continue
|
|
||||||
case err != nil:
|
|
||||||
logger.Error("could not get next job", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action)
|
|
||||||
switch j.Action {
|
|
||||||
case ActionRefreshIMDBReviews:
|
|
||||||
w.RefreshReviews(j.ID, j.ActionID)
|
|
||||||
case ActionRefreshAllIMDBReviews:
|
|
||||||
w.RefreshAllReviews(j.ID)
|
|
||||||
case ActionFindAllTitles:
|
|
||||||
w.FindAllTitles(j.ID)
|
|
||||||
default:
|
|
||||||
logger.Error("unknown job action", "action", j.Action)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) RefreshAllReviews(jobID int) {
|
|
||||||
logger := w.logger.With("method", "fetchReviews", "jobID", jobID)
|
|
||||||
|
|
||||||
movies, err := w.movieRepo.FindAll()
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("could not get movies", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range movies {
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
if err := w.jq.Add(m.ID, ActionRefreshIMDBReviews); err != nil {
|
|
||||||
logger.Error("could not add job", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("refresh all reviews", "count", len(movies))
|
|
||||||
w.jq.MarkDone(jobID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) FindAllTitles(jobID int) {
|
|
||||||
logger := w.logger.With("method", "findTitles", "jobID", jobID)
|
|
||||||
|
|
||||||
reviews, err := w.reviewRepo.FindAll()
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("could not get reviews", "error", err)
|
|
||||||
w.jq.MarkFailed(jobID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, r := range reviews {
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
if err := w.jq.Add(r.ID, ActionFindTitles); err != nil {
|
|
||||||
logger.Error("could not add job", "error", err)
|
|
||||||
w.jq.MarkFailed(jobID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("find all titles", "count", len(reviews))
|
|
||||||
w.jq.MarkDone(jobID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) RefreshReviews(jobID int, movieID string) {
|
|
||||||
logger := w.logger.With("method", "fetchReviews", "jobID", jobID, "movieID", movieID)
|
|
||||||
|
|
||||||
m, err := w.movieRepo.FindOne(movieID)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("could not get movie", "error", err)
|
|
||||||
w.jq.MarkFailed(jobID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := w.reviewRepo.DeleteByMovieID(m.ID); err != nil {
|
|
||||||
logger.Error("could not delete reviews", "error", err)
|
|
||||||
w.jq.MarkFailed(jobID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
reviews, err := w.imdb.GetReviews(m)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("could not get reviews", "error", err)
|
|
||||||
w.jq.MarkFailed(jobID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, review := range reviews {
|
|
||||||
if err := w.reviewRepo.Store(review); err != nil {
|
|
||||||
logger.Error("could not store review", "error", err)
|
|
||||||
w.jq.MarkFailed(jobID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := w.jq.Add(review.ID, ActionFindTitles); err != nil {
|
|
||||||
logger.Error("could not add job", "error", err)
|
|
||||||
w.jq.MarkFailed(jobID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("refresh reviews", "count", len(reviews))
|
|
||||||
w.jq.MarkDone(jobID)
|
|
||||||
}
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/charmbracelet/bubbles/list"
|
"github.com/charmbracelet/bubbles/list"
|
||||||
"github.com/charmbracelet/bubbles/textinput"
|
"github.com/charmbracelet/bubbles/textinput"
|
||||||
tea "github.com/charmbracelet/bubbletea"
|
tea "github.com/charmbracelet/bubbletea"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tabTMDB struct {
|
type tabTMDB struct {
|
||||||
|
@ -74,6 +75,7 @@ func (m tabTMDB) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||||
case "i":
|
case "i":
|
||||||
if m.focused == "result" {
|
if m.focused == "result" {
|
||||||
movie := m.searchResults.SelectedItem().(Movie)
|
movie := m.searchResults.SelectedItem().(Movie)
|
||||||
|
movie.m.ID = uuid.New().String()
|
||||||
cmds = append(cmds, m.ImportMovieCmd(movie), m.ResetCmd())
|
cmds = append(cmds, m.ImportMovieCmd(movie), m.ResetCmd())
|
||||||
m.Log(fmt.Sprintf("imported movie %s", movie.Title()))
|
m.Log(fmt.Sprintf("imported movie %s", movie.Title()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"code.ewintr.nl/emdb/client"
|
||||||
|
"code.ewintr.nl/emdb/job"
|
||||||
|
"code.ewintr.nl/emdb/storage"
|
||||||
|
"code.ewintr.nl/emdb/worker-client/worker"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
mentionsTemplate = `The following text is a user comment about the movie {{.title}}. In it, the user may have referenced other movie titles. List them if you see any.
|
||||||
|
|
||||||
|
----
|
||||||
|
{{.review}}
|
||||||
|
----
|
||||||
|
|
||||||
|
If you found any movie titles other than {{.title}}, list them below in a JSON array. If there are other titles, like TV shows, books or games, ignore them. The format is as follows:
|
||||||
|
|
||||||
|
["movie title 1", "movie title 2"]
|
||||||
|
|
||||||
|
Just answer with the JSON and nothing else. If you don't see any other movie titles, just answer with an empty array.`
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
||||||
|
dbHost := os.Getenv("EMDB_DB_HOST")
|
||||||
|
dbName := os.Getenv("EMDB_DB_NAME")
|
||||||
|
dbUser := os.Getenv("EMDB_DB_USER")
|
||||||
|
dbPassword := os.Getenv("EMDB_DB_PASSWORD")
|
||||||
|
pgConnStr := fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=disable", dbHost, dbUser, dbPassword, dbName)
|
||||||
|
dbPostgres, err := storage.NewPostgres(pgConnStr)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("could not create new postgres repo: %s", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
movieRepo := storage.NewMovieRepository(dbPostgres)
|
||||||
|
reviewRepo := storage.NewReviewRepository(dbPostgres)
|
||||||
|
jobQueue := job.NewJobQueue(dbPostgres, logger)
|
||||||
|
|
||||||
|
w := worker.NewWorker(jobQueue, movieRepo, reviewRepo, client.NewIMDB(), logger)
|
||||||
|
|
||||||
|
go w.Run()
|
||||||
|
|
||||||
|
c := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
<-c
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.ewintr.nl/emdb/job"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (w *Worker) FindAllTitles(jobID int) {
|
||||||
|
logger := w.logger.With("method", "findAllTitles", "jobID", jobID)
|
||||||
|
|
||||||
|
reviews, err := w.reviewRepo.FindAll()
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("could not get reviews", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, r := range reviews {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
if err := w.jq.Add(r.ID, job.ActionFindTitles); err != nil {
|
||||||
|
logger.Error("could not add job", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("find all titles", "count", len(reviews))
|
||||||
|
w.jq.MarkDone(jobID)
|
||||||
|
}
|
|
@ -0,0 +1,102 @@
|
||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/tmc/langchaingo/chains"
|
||||||
|
"github.com/tmc/langchaingo/llms/ollama"
|
||||||
|
"github.com/tmc/langchaingo/prompts"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
mentionsTemplate = `The following text is a user comment about the movie {{.title}}. In it, the user may have referenced other movie titles. List them if you see any.
|
||||||
|
|
||||||
|
----
|
||||||
|
{{.review}}
|
||||||
|
----
|
||||||
|
|
||||||
|
If you found any movie titles other than {{.title}}, list them below in a JSON array. If there are other titles, like TV shows, books or games, ignore them. The format is as follows:
|
||||||
|
|
||||||
|
["movie title 1", "movie title 2"]
|
||||||
|
|
||||||
|
Just answer with the JSON and nothing else. If you don't see any other movie titles, just answer with an empty array.`
|
||||||
|
)
|
||||||
|
|
||||||
|
func (w *Worker) FindTitles(jobID int, reviewID string) {
|
||||||
|
logger := w.logger.With("method", "findTitles", "jobID", jobID)
|
||||||
|
|
||||||
|
review, err := w.reviewRepo.FindOne(reviewID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("could not get review", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
movie, err := w.movieRepo.FindOne(review.MovieID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("could not get movie", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
llm, err := ollama.New(ollama.WithModel("mistral"))
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("could not create llm", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
prompt := prompts.NewPromptTemplate(
|
||||||
|
mentionsTemplate,
|
||||||
|
[]string{"title", "review"},
|
||||||
|
)
|
||||||
|
llmChain := chains.NewLLMChain(llm, prompt)
|
||||||
|
|
||||||
|
movieTitle := movie.Title
|
||||||
|
if movie.EnglishTitle != "" && movie.EnglishTitle != movie.Title {
|
||||||
|
movieTitle = fmt.Sprintf("%s (English title: %s)", movieTitle, movie.EnglishTitle)
|
||||||
|
}
|
||||||
|
fmt.Printf("Processing review for movie: %s\n", movieTitle)
|
||||||
|
fmt.Printf("Review: %s\n", review.Review)
|
||||||
|
|
||||||
|
outputValues, err := chains.Call(ctx, llmChain, map[string]any{
|
||||||
|
"title": movieTitle,
|
||||||
|
"review": review.Review,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("could not call chain", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
out, ok := outputValues[llmChain.OutputKey].(string)
|
||||||
|
if !ok {
|
||||||
|
logger.Error("chain output is not valid")
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
//fmt.Println(out)
|
||||||
|
resp := struct {
|
||||||
|
Movies []string `json:"movies"`
|
||||||
|
TVShows []string `json:"tvShows"`
|
||||||
|
Games []string `json:"games"`
|
||||||
|
Books []string `json:"books"`
|
||||||
|
}{}
|
||||||
|
|
||||||
|
if err := json.Unmarshal([]byte(out), &resp); err != nil {
|
||||||
|
logger.Error("could not unmarshal llm response", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
review.Titles = resp
|
||||||
|
|
||||||
|
if err := w.reviewRepo.Store(review); err != nil {
|
||||||
|
logger.Error("could not update review", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.ewintr.nl/emdb/job"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (w *Worker) RefreshAllReviews(jobID int) {
|
||||||
|
logger := w.logger.With("method", "fetchReviews", "jobID", jobID)
|
||||||
|
|
||||||
|
movies, err := w.movieRepo.FindAll()
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("could not get movies", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range movies {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
if err := w.jq.Add(m.ID, job.ActionRefreshIMDBReviews); err != nil {
|
||||||
|
logger.Error("could not add job", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("refresh all reviews", "count", len(movies))
|
||||||
|
w.jq.MarkDone(jobID)
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
package worker
|
||||||
|
|
||||||
|
import "code.ewintr.nl/emdb/job"
|
||||||
|
|
||||||
|
func (w *Worker) RefreshReviews(jobID int, movieID string) {
|
||||||
|
logger := w.logger.With("method", "fetchReviews", "jobID", jobID, "movieID", movieID)
|
||||||
|
|
||||||
|
m, err := w.movieRepo.FindOne(movieID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("could not get movie", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.reviewRepo.DeleteByMovieID(m.ID); err != nil {
|
||||||
|
logger.Error("could not delete reviews", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
reviews, err := w.imdb.GetReviews(m)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("could not get reviews", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, review := range reviews {
|
||||||
|
if err := w.reviewRepo.Store(review); err != nil {
|
||||||
|
logger.Error("could not store review", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := w.jq.Add(review.ID, job.ActionFindTitles); err != nil {
|
||||||
|
logger.Error("could not add job", "error", err)
|
||||||
|
w.jq.MarkFailed(jobID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("refresh reviews", "count", len(reviews))
|
||||||
|
w.jq.MarkDone(jobID)
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.ewintr.nl/emdb/client"
|
||||||
|
"code.ewintr.nl/emdb/job"
|
||||||
|
"code.ewintr.nl/emdb/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
interval = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
type Worker struct {
|
||||||
|
jq *job.JobQueue
|
||||||
|
movieRepo *storage.MovieRepository
|
||||||
|
reviewRepo *storage.ReviewRepository
|
||||||
|
imdb *client.IMDB
|
||||||
|
logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWorker(jq *job.JobQueue, movieRepo *storage.MovieRepository, reviewRepo *storage.ReviewRepository, imdb *client.IMDB, logger *slog.Logger) *Worker {
|
||||||
|
return &Worker{
|
||||||
|
jq: jq,
|
||||||
|
movieRepo: movieRepo,
|
||||||
|
reviewRepo: reviewRepo,
|
||||||
|
imdb: imdb,
|
||||||
|
logger: logger.With("service", "worker"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Run() {
|
||||||
|
logger := w.logger.With("method", "run")
|
||||||
|
logger.Info("starting worker")
|
||||||
|
|
||||||
|
for {
|
||||||
|
time.Sleep(interval)
|
||||||
|
|
||||||
|
j, err := w.jq.Next()
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, sql.ErrNoRows):
|
||||||
|
logger.Info("no jobs found")
|
||||||
|
continue
|
||||||
|
case err != nil:
|
||||||
|
logger.Error("could not get next job", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("got a new job", "jobID", j.ID, "movieID", j.ActionID, "action", j.Action)
|
||||||
|
switch j.Action {
|
||||||
|
case job.ActionRefreshIMDBReviews:
|
||||||
|
w.RefreshReviews(j.ID, j.ActionID)
|
||||||
|
case job.ActionRefreshAllIMDBReviews:
|
||||||
|
w.RefreshAllReviews(j.ID)
|
||||||
|
case job.ActionFindTitles:
|
||||||
|
//w.FindTitles(j.ID, j.ActionID)
|
||||||
|
case job.ActionFindAllTitles:
|
||||||
|
w.FindAllTitles(j.ID)
|
||||||
|
default:
|
||||||
|
logger.Error("unknown job action", "action", j.Action)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
128
worker/main.go
128
worker/main.go
|
@ -1,128 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"log/slog"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"code.ewintr.nl/emdb/job"
|
|
||||||
"code.ewintr.nl/emdb/storage"
|
|
||||||
"github.com/tmc/langchaingo/chains"
|
|
||||||
"github.com/tmc/langchaingo/llms/ollama"
|
|
||||||
"github.com/tmc/langchaingo/prompts"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
mentionsTemplate = `The following text is a user comment about the movie {{.title}}. In it, the user may have referenced other movie titles. List them if you see any.
|
|
||||||
|
|
||||||
----
|
|
||||||
{{.review}}
|
|
||||||
----
|
|
||||||
|
|
||||||
If you found any movie titles other than {{.title}}, list them below in a JSON array. If there are other titles, like TV shows, books or games, ignore them. The format is as follows:
|
|
||||||
|
|
||||||
["movie title 1", "movie title 2"]
|
|
||||||
|
|
||||||
Just answer with the JSON and nothing else. If you don't see any other movie titles, just answer with an empty array.`
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
|
|
||||||
dbHost := os.Getenv("EMDB_DB_HOST")
|
|
||||||
dbName := os.Getenv("EMDB_DB_NAME")
|
|
||||||
dbUser := os.Getenv("EMDB_DB_USER")
|
|
||||||
dbPassword := os.Getenv("EMDB_DB_PASSWORD")
|
|
||||||
pgConnStr := fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=disable", dbHost, dbUser, dbPassword, dbName)
|
|
||||||
dbPostgres, err := storage.NewPostgres(pgConnStr)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("could not create new postgres repo: %s", err.Error())
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
movieRepo := storage.NewMovieRepository(dbPostgres)
|
|
||||||
reviewRepo := storage.NewReviewRepository(dbPostgres)
|
|
||||||
jobQueue := job.NewJobQueue(dbPostgres, logger)
|
|
||||||
|
|
||||||
go Work(movieRepo, reviewRepo, jobQueue)
|
|
||||||
|
|
||||||
c := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
<-c
|
|
||||||
}
|
|
||||||
|
|
||||||
func Work(movieRepo *storage.MovieRepository, reviewRepo *storage.ReviewRepository, jobQueue *job.JobQueue) {
|
|
||||||
for {
|
|
||||||
j, err := jobQueue.Next()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
review, err := reviewRepo.FindOne(j.ActionID)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
movie, err := movieRepo.FindOne(review.MovieID)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
llm, err := ollama.New(ollama.WithModel("mistral"))
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
prompt := prompts.NewPromptTemplate(
|
|
||||||
mentionsTemplate,
|
|
||||||
[]string{"title", "review"},
|
|
||||||
)
|
|
||||||
llmChain := chains.NewLLMChain(llm, prompt)
|
|
||||||
|
|
||||||
movieTitle := movie.Title
|
|
||||||
if movie.EnglishTitle != "" && movie.EnglishTitle != movie.Title {
|
|
||||||
movieTitle = fmt.Sprintf("%s (English title: %s)", movieTitle, movie.EnglishTitle)
|
|
||||||
}
|
|
||||||
fmt.Printf("Processing review for movie: %s\n", movieTitle)
|
|
||||||
fmt.Printf("Review: %s\n", review.Review)
|
|
||||||
|
|
||||||
outputValues, err := chains.Call(ctx, llmChain, map[string]any{
|
|
||||||
"title": movieTitle,
|
|
||||||
"review": review.Review,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
out, ok := outputValues[llmChain.OutputKey].(string)
|
|
||||||
if !ok {
|
|
||||||
fmt.Println("invalid chain return")
|
|
||||||
}
|
|
||||||
fmt.Println(out)
|
|
||||||
resp := struct {
|
|
||||||
Movies []string `json:"movies"`
|
|
||||||
TVShows []string `json:"tvShows"`
|
|
||||||
Games []string `json:"games"`
|
|
||||||
Books []string `json:"books"`
|
|
||||||
}{}
|
|
||||||
|
|
||||||
if err := json.Unmarshal([]byte(out), &resp); err != nil {
|
|
||||||
fmt.Printf("could not unmarshal llm response, skipping this one: %s", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
review.Titles = resp
|
|
||||||
|
|
||||||
if err := reviewRepo.Store(review); err != nil {
|
|
||||||
fmt.Printf("could not update review: %s\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue