From 600cde5279948e0106060f1fafa54ba5d58c831a Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Sat, 13 May 2023 12:53:37 +0200 Subject: [PATCH] summary in pipeline --- fetcher/fetcher.go | 58 ++++++++++++++++++++++++++++++++++++++---- fetcher/openai.go | 49 +++++++++++++++++++++++++++++++++++ fetcher/summary.go | 7 +++++ go.mod | 1 + go.sum | 2 ++ handler/handler.go | 1 + handler/server.go].go | 1 + model/video.go | 6 +++-- service.go | 4 ++- storage/postgres.go | 59 +++++++++++++++++++++++++++++++++++-------- storage/storage.go | 1 + 11 files changed, 170 insertions(+), 19 deletions(-) create mode 100644 fetcher/openai.go create mode 100644 fetcher/summary.go create mode 100644 handler/handler.go create mode 100644 handler/server.go].go diff --git a/fetcher/fetcher.go b/fetcher/fetcher.go index 8abd410..9460ba7 100644 --- a/fetcher/fetcher.go +++ b/fetcher/fetcher.go @@ -13,19 +13,23 @@ type Fetcher struct { videoRepo storage.VideoRepository feedReader FeedReader metadataFetcher MetadataFetcher + summaryFetcher SummaryFetcher pipeline chan *model.Video needsMetadata chan *model.Video + needsSummary chan *model.Video logger *slog.Logger } -func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration, metadataFetcher MetadataFetcher, logger *slog.Logger) *Fetcher { +func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration, metadataFetcher MetadataFetcher, summaryFetcher SummaryFetcher, logger *slog.Logger) *Fetcher { return &Fetcher{ interval: interval, videoRepo: videoRepo, feedReader: feedReader, metadataFetcher: metadataFetcher, - pipeline: make(chan *model.Video), - needsMetadata: make(chan *model.Video), + summaryFetcher: summaryFetcher, + pipeline: make(chan *model.Video, 10), + needsMetadata: make(chan *model.Video, 10), + needsSummary: make(chan *model.Video, 10), logger: logger, } } @@ -33,16 +37,42 @@ func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval func (f *Fetcher) Run() { go f.ReadFeeds() go f.MetadataFetcher() + go f.SummaryFetcher() + go f.FindUnprocessed() f.logger.Info("started pipeline") for { select { case video := <-f.pipeline: switch video.Status { - case model.STATUS_NEW: + case model.StatusNew: f.needsMetadata <- video + case model.StatusHasMetadata: + f.needsSummary <- video + case model.StatusHasSummary: + video.Status = model.StatusReady + f.logger.Info("video is ready", slog.String("id", video.ID.String())) + + } + if err := f.videoRepo.Save(video); err != nil { + f.logger.Error("failed to save video", err) + continue } } + + } +} + +func (f *Fetcher) FindUnprocessed() { + f.logger.Info("looking for unprocessed videos") + videos, err := f.videoRepo.FindByStatus(model.StatusNew, model.StatusHasMetadata) + if err != nil { + f.logger.Error("failed to fetch unprocessed videos", err) + return + } + f.logger.Info("found unprocessed videos", slog.Int("count", len(videos))) + for _, video := range videos { + f.pipeline <- video } } @@ -63,7 +93,7 @@ func (f *Fetcher) ReadFeeds() { for _, entry := range entries { video := &model.Video{ ID: uuid.New(), - Status: model.STATUS_NEW, + Status: model.StatusNew, YoutubeID: entry.YouTubeID, // feed id } @@ -102,12 +132,14 @@ func (f *Fetcher) MetadataFetcher() { for _, video := range videos { video.Title = mds[video.YoutubeID].Title video.Description = mds[video.YoutubeID].Description + video.Status = model.StatusHasMetadata if err := f.videoRepo.Save(video); err != nil { f.logger.Error("failed to save video", err) continue } } + f.logger.Info("fetched metadata", slog.Int("count", len(videos))) } }() @@ -133,3 +165,19 @@ func (f *Fetcher) MetadataFetcher() { } } } + +func (f *Fetcher) SummaryFetcher() { + for { + select { + case video := <-f.needsSummary: + f.logger.Info("fetching summary", slog.String("id", video.ID.String())) + if err := f.summaryFetcher.FetchSummary(video); err != nil { + f.logger.Error("failed to fetch summary", err) + continue + } + video.Status = model.StatusHasSummary + f.logger.Info("fetched summary", slog.String("id", video.ID.String())) + f.pipeline <- video + } + } +} diff --git a/fetcher/openai.go b/fetcher/openai.go new file mode 100644 index 0000000..c08f114 --- /dev/null +++ b/fetcher/openai.go @@ -0,0 +1,49 @@ +package fetcher + +import ( + "context" + "ewintr.nl/yogai/model" + "fmt" + "github.com/sashabaranov/go-openai" +) + +const summarizePrompt = `You are an helpful assistant. Your task is to extract all text that refers to the content of a yoga workout video from the description a user gives you. +You will not add introductory sentences like "This text is about", or "Summary of...". Just give the words verbatim. Trim any white space back to a simple space +` + +type OpenAI struct { + client *openai.Client +} + +func NewOpenAI(apiKey string) *OpenAI { + return &OpenAI{ + client: openai.NewClient(apiKey), + } +} + +func (o *OpenAI) FetchSummary(video *model.Video) error { + resp, err := o.client.CreateChatCompletion( + context.Background(), + openai.ChatCompletionRequest{ + Model: openai.GPT4, + Messages: []openai.ChatCompletionMessage{ + { + Role: openai.ChatMessageRoleSystem, + Content: summarizePrompt, + }, + + { + Role: openai.ChatMessageRoleUser, + Content: fmt.Sprintf("%s\n\n%s", video.Title, video.Description), + }, + }, + }) + + if err != nil { + return fmt.Errorf("failed to fetch summary: %w", err) + } + + video.Summary = resp.Choices[len(resp.Choices)-1].Message.Content + + return nil +} diff --git a/fetcher/summary.go b/fetcher/summary.go new file mode 100644 index 0000000..1fae394 --- /dev/null +++ b/fetcher/summary.go @@ -0,0 +1,7 @@ +package fetcher + +import "ewintr.nl/yogai/model" + +type SummaryFetcher interface { + FetchSummary(video *model.Video) error +} diff --git a/go.mod b/go.mod index 21eece0..ad14369 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/google/uuid v1.3.0 github.com/lib/pq v1.10.9 + github.com/sashabaranov/go-openai v1.9.4 golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 google.golang.org/api v0.122.0 miniflux.app v0.0.0-20230505000442-88062ab9f959 diff --git a/go.sum b/go.sum index 7015e7e..f791187 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/sashabaranov/go-openai v1.9.4 h1:KanoCEoowAI45jVXlenMCckutSRr39qOmSi9MyPBfZM= +github.com/sashabaranov/go-openai v1.9.4/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/handler/handler.go b/handler/handler.go new file mode 100644 index 0000000..abeebd1 --- /dev/null +++ b/handler/handler.go @@ -0,0 +1 @@ +package handler diff --git a/handler/server.go].go b/handler/server.go].go new file mode 100644 index 0000000..abeebd1 --- /dev/null +++ b/handler/server.go].go @@ -0,0 +1 @@ +package handler diff --git a/model/video.go b/model/video.go index 227a7e5..737272d 100644 --- a/model/video.go +++ b/model/video.go @@ -5,8 +5,10 @@ import "github.com/google/uuid" type Status string const ( - STATUS_NEW Status = "new" - STATUS_READY Status = "ready" + StatusNew Status = "new" + StatusHasMetadata Status = "has_metadata" + StatusHasSummary Status = "has_summary" + StatusReady Status = "ready" ) type Video struct { diff --git a/service.go b/service.go index f33012a..aa9ca97 100644 --- a/service.go +++ b/service.go @@ -46,7 +46,9 @@ func main() { } yt := fetcher.NewYoutube(ytClient) - fetcher := fetcher.NewFetch(videoRepo, mflx, fetchInterval, yt, logger) + openAIClient := fetcher.NewOpenAI(getParam("OPENAI_API_KEY", "")) + + fetcher := fetcher.NewFetch(videoRepo, mflx, fetchInterval, yt, openAIClient, logger) go fetcher.Run() logger.Info("service started") diff --git a/storage/postgres.go b/storage/postgres.go index 923c1d2..31ae6fa 100644 --- a/storage/postgres.go +++ b/storage/postgres.go @@ -4,6 +4,7 @@ import ( "database/sql" "ewintr.nl/yogai/model" "fmt" + "github.com/lib/pq" _ "github.com/lib/pq" ) @@ -42,8 +43,8 @@ func NewPostgresVideoRepository(postgres *Postgres) *PostgresVideoRepository { } func (p *PostgresVideoRepository) Save(v *model.Video) error { - query := `INSERT INTO video (id, status, youtube_id, feed_id, title, description) -VALUES ($1, $2, $3, $4, $5, $6) + query := `INSERT INTO video (id, status, youtube_id, feed_id, title, description, summary) +VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id, @@ -51,23 +52,59 @@ DO UPDATE SET youtube_id = EXCLUDED.youtube_id, feed_id = EXCLUDED.feed_id, title = EXCLUDED.title, - description = EXCLUDED.description;` - _, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeID, v.FeedID, v.Title, v.Description) + description = EXCLUDED.description, + summary = EXCLUDED.summary;` + _, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeID, v.FeedID, v.Title, v.Description, v.Summary) return err } +func (p *PostgresVideoRepository) FindByStatus(statuses ...model.Status) ([]*model.Video, error) { + query := `SELECT id, status, youtube_id, feed_id, title, description, summary +FROM video +WHERE status = ANY($1)` + rows, err := p.db.Query(query, pq.Array(statuses)) + if err != nil { + return nil, err + } + + videos := []*model.Video{} + for rows.Next() { + v := &model.Video{} + if err := rows.Scan(&v.ID, &v.Status, &v.YoutubeID, &v.FeedID, &v.Title, &v.Description, &v.Summary); err != nil { + return nil, err + } + videos = append(videos, v) + } + rows.Close() + + return videos, nil +} + var pgMigration = []string{ `CREATE TYPE video_status AS ENUM ('new', 'ready')`, `CREATE TABLE video ( - id uuid PRIMARY KEY, - status video_status NOT NULL, - youtube_id VARCHAR(255) NOT NULL UNIQUE, - title VARCHAR(255) NOT NULL, - feed_id VARCHAR(255) NOT NULL, - description TEXT, - summary TEXT +id uuid PRIMARY KEY, +status video_status NOT NULL, +youtube_id VARCHAR(255) NOT NULL UNIQUE, +title VARCHAR(255) NOT NULL, +feed_id VARCHAR(255) NOT NULL, +description TEXT, +summary TEXT )`, + `CREATE TYPE video_status_new AS ENUM ('new', 'has_metadata', 'has_summary', 'ready')`, + `ALTER TABLE video +ALTER COLUMN status TYPE video_status_new +USING video::text::video_status_new`, + `DROP TYPE video_status`, + `ALTER TYPE video_status_new RENAME TO video_status`, + `UPDATE video SET summary = '' WHERE summary IS NULL `, + `UPDATE video SET description = '' WHERE description IS NULL `, + `ALTER TABLE video +ALTER COLUMN summary SET DEFAULT '', +ALTER COLUMN summary SET NOT NULL, +ALTER COLUMN description SET DEFAULT '', +ALTER COLUMN description SET NOT NULL`, } func (p *Postgres) migrate(wanted []string) error { diff --git a/storage/storage.go b/storage/storage.go index aebca41..0687788 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -6,4 +6,5 @@ import ( type VideoRepository interface { Save(video *model.Video) error + FindByStatus(statuses ...model.Status) ([]*model.Video, error) }