From 5d125c0ac18358211402ed9d5c9b60261000ea8e Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Thu, 6 Jul 2023 13:25:51 +0200 Subject: [PATCH] split fetch and process --- {fetcher => fetch}/feedreader.go | 2 +- {fetcher => fetch}/fetcher.go | 67 ++++++++----------------- {fetcher => fetch}/metadata.go | 2 +- {fetcher => fetch}/miniflux.go | 2 +- {fetcher => fetch}/summary.go | 2 +- {fetcher => fetch}/youtube.go | 2 +- handler/server.go | 9 ++-- handler/video.go | 4 +- model/video.go | 7 ++- {fetcher => process}/openai.go | 28 ++++++----- process/processor.go | 85 ++++++++++++++++++++++++++++++++ service.go | 30 ++++++----- storage/migrations.go | 10 ++++ storage/storage.go | 10 +++- 14 files changed, 173 insertions(+), 87 deletions(-) rename {fetcher => fetch}/feedreader.go (95%) rename {fetcher => fetch}/fetcher.go (77%) rename {fetcher => fetch}/metadata.go (94%) rename {fetcher => fetch}/miniflux.go (98%) rename {fetcher => fetch}/summary.go (86%) rename {fetcher => fetch}/youtube.go (99%) rename {fetcher => process}/openai.go (58%) create mode 100644 process/processor.go diff --git a/fetcher/feedreader.go b/fetch/feedreader.go similarity index 95% rename from fetcher/feedreader.go rename to fetch/feedreader.go index e587278..93eed89 100644 --- a/fetcher/feedreader.go +++ b/fetch/feedreader.go @@ -1,4 +1,4 @@ -package fetcher +package fetch import "ewintr.nl/yogai/model" diff --git a/fetcher/fetcher.go b/fetch/fetcher.go similarity index 77% rename from fetcher/fetcher.go rename to fetch/fetcher.go index 8413a3b..c71b444 100644 --- a/fetcher/fetcher.go +++ b/fetch/fetcher.go @@ -1,7 +1,6 @@ -package fetcher +package fetch import ( - "context" "time" "ewintr.nl/yogai/model" @@ -12,34 +11,30 @@ import ( type Fetcher struct { interval time.Duration - feedRepo storage.FeedRepository - videoRepo storage.VideoRepository - vectorClient *storage.Weaviate + feedRepo storage.FeedRelRepository + videoRepo storage.VideoRelRepository feedReader FeedReader channelReader ChannelReader metadataFetcher MetadataFetcher - summaryFetcher SummaryFetcher feedPipeline chan *model.Feed videoPipeline chan *model.Video needsMetadata chan *model.Video - needsSummary chan *model.Video + out chan *model.Video logger *slog.Logger } -func NewFetch(feedRepo storage.FeedRepository, videoRepo storage.VideoRepository, channelReader ChannelReader, feedReader FeedReader, wvClient *storage.Weaviate, interval time.Duration, metadataFetcher MetadataFetcher, summaryFetcher SummaryFetcher, logger *slog.Logger) *Fetcher { +func NewFetch(feedRepo storage.FeedRelRepository, videoRepo storage.VideoRelRepository, channelReader ChannelReader, feedReader FeedReader, interval time.Duration, metadataFetcher MetadataFetcher, logger *slog.Logger) *Fetcher { return &Fetcher{ interval: interval, feedRepo: feedRepo, videoRepo: videoRepo, - vectorClient: wvClient, channelReader: channelReader, feedReader: feedReader, metadataFetcher: metadataFetcher, - summaryFetcher: summaryFetcher, feedPipeline: make(chan *model.Feed, 10), videoPipeline: make(chan *model.Video, 10), needsMetadata: make(chan *model.Video, 10), - needsSummary: make(chan *model.Video, 10), + out: make(chan *model.Video), logger: logger, } } @@ -50,36 +45,30 @@ func (f *Fetcher) Run() { go f.ReadFeeds() go f.MetadataFetcher() - go f.SummaryFetcher() go f.FindUnprocessed() f.logger.Info("started videoPipeline") for { select { case video := <-f.videoPipeline: - switch video.Status { - case model.StatusNew: - f.needsMetadata <- video - case model.StatusHasMetadata: - // f.needsSummary <- video - //case model.StatusHasSummary: - video.Status = model.StatusReady - f.logger.Info("video is ready", slog.String("id", video.ID.String())) - if err := f.vectorClient.Save(context.Background(), video); err != nil { - f.logger.Error("failed to save video in vector db", err) - continue - } - - } if err := f.videoRepo.Save(video); err != nil { f.logger.Error("failed to save video in normal db", err) continue } + switch video.Status { + case model.StatusNew: + f.needsMetadata <- video + case model.StatusFetched: + f.out <- video + } } - } } +func (f *Fetcher) Out() chan *model.Video { + return f.out +} + func (f *Fetcher) FindNewFeeds() { f.logger.Info("looking for new feeds") feeds, err := f.feedRepo.FindByStatus(model.FeedStatusNew) @@ -93,7 +82,7 @@ func (f *Fetcher) FindNewFeeds() { } func (f *Fetcher) FetchHistoricalVideos() { - f.logger.Info("started historical video fetcher") + f.logger.Info("started historical video fetch") for feed := range f.feedPipeline { f.logger.Info("fetching historical videos", slog.String("channelid", string(feed.YoutubeChannelID))) @@ -139,7 +128,7 @@ func (f *Fetcher) FetchHistoricalVideoPage(channelID model.YoutubeChannelID, pag func (f *Fetcher) FindUnprocessed() { f.logger.Info("looking for unprocessed videos") - videos, err := f.videoRepo.FindByStatus(model.StatusNew, model.StatusHasMetadata) + videos, err := f.videoRepo.FindByStatus(model.StatusNew, model.StatusFetched) if err != nil { f.logger.Error("failed to fetch unprocessed videos", err) return @@ -185,7 +174,7 @@ func (f *Fetcher) ReadFeeds() { } func (f *Fetcher) MetadataFetcher() { - f.logger.Info("started metadata fetcher") + f.logger.Info("started metadata fetch") buffer := []*model.Video{} timeout := time.NewTimer(10 * time.Second) @@ -209,7 +198,7 @@ func (f *Fetcher) MetadataFetcher() { video.YoutubeDescription = md.Description video.YoutubeDuration = md.Duration video.YoutubePublishedAt = md.PublishedAt - video.Status = model.StatusHasMetadata + video.Status = model.StatusFetched if err := f.videoRepo.Save(video); err != nil { f.logger.Error("failed to save video", err) @@ -242,19 +231,3 @@ func (f *Fetcher) MetadataFetcher() { } } } - -func (f *Fetcher) SummaryFetcher() { - for { - select { - case video := <-f.needsSummary: - f.logger.Info("fetching summary", slog.String("id", video.ID.String())) - if err := f.summaryFetcher.FetchSummary(video); err != nil { - f.logger.Error("failed to fetch summary", err) - continue - } - video.Status = model.StatusHasSummary - f.logger.Info("fetched summary", slog.String("id", video.ID.String())) - f.videoPipeline <- video - } - } -} diff --git a/fetcher/metadata.go b/fetch/metadata.go similarity index 94% rename from fetcher/metadata.go rename to fetch/metadata.go index 187926a..fb8a908 100644 --- a/fetcher/metadata.go +++ b/fetch/metadata.go @@ -1,4 +1,4 @@ -package fetcher +package fetch import "ewintr.nl/yogai/model" diff --git a/fetcher/miniflux.go b/fetch/miniflux.go similarity index 98% rename from fetcher/miniflux.go rename to fetch/miniflux.go index 91c714d..8dc654e 100644 --- a/fetcher/miniflux.go +++ b/fetch/miniflux.go @@ -1,4 +1,4 @@ -package fetcher +package fetch import ( "strings" diff --git a/fetcher/summary.go b/fetch/summary.go similarity index 86% rename from fetcher/summary.go rename to fetch/summary.go index 1fae394..ccc7b04 100644 --- a/fetcher/summary.go +++ b/fetch/summary.go @@ -1,4 +1,4 @@ -package fetcher +package fetch import "ewintr.nl/yogai/model" diff --git a/fetcher/youtube.go b/fetch/youtube.go similarity index 99% rename from fetcher/youtube.go rename to fetch/youtube.go index 83bb399..2823837 100644 --- a/fetcher/youtube.go +++ b/fetch/youtube.go @@ -1,4 +1,4 @@ -package fetcher +package fetch import ( "strings" diff --git a/handler/server.go b/handler/server.go index 1f8caa9..9f25424 100644 --- a/handler/server.go +++ b/handler/server.go @@ -1,14 +1,15 @@ package handler import ( - "ewintr.nl/yogai/storage" "fmt" - "golang.org/x/exp/slog" - "miniflux.app/logger" "net/http" "net/http/httptest" "path" "strings" + + "ewintr.nl/yogai/storage" + "golang.org/x/exp/slog" + "miniflux.app/logger" ) type Server struct { @@ -16,7 +17,7 @@ type Server struct { logger *slog.Logger } -func NewServer(videoRepo storage.VideoRepository, logger *slog.Logger) *Server { +func NewServer(videoRepo storage.VideoRelRepository, logger *slog.Logger) *Server { return &Server{ apis: map[string]http.Handler{ "video": NewVideoAPI(videoRepo, logger), diff --git a/handler/video.go b/handler/video.go index 932e5b6..9f50fe4 100644 --- a/handler/video.go +++ b/handler/video.go @@ -12,11 +12,11 @@ import ( ) type VideoAPI struct { - videoRepo storage.VideoRepository + videoRepo storage.VideoRelRepository logger *slog.Logger } -func NewVideoAPI(videoRepo storage.VideoRepository, logger *slog.Logger) *VideoAPI { +func NewVideoAPI(videoRepo storage.VideoRelRepository, logger *slog.Logger) *VideoAPI { return &VideoAPI{ videoRepo: videoRepo, logger: logger, diff --git a/model/video.go b/model/video.go index ab17c2c..e70273c 100644 --- a/model/video.go +++ b/model/video.go @@ -5,10 +5,9 @@ import "github.com/google/uuid" type VideoStatus string const ( - StatusNew VideoStatus = "new" - StatusHasMetadata VideoStatus = "has_metadata" - StatusHasSummary VideoStatus = "has_summary" - StatusReady VideoStatus = "ready" + StatusNew VideoStatus = "new" + StatusFetched VideoStatus = "fetched" + StatusReady VideoStatus = "ready" ) type YoutubeVideoID string diff --git a/fetcher/openai.go b/process/openai.go similarity index 58% rename from fetcher/openai.go rename to process/openai.go index 39219e9..32f1cbf 100644 --- a/fetcher/openai.go +++ b/process/openai.go @@ -1,4 +1,4 @@ -package fetcher +package process import ( "context" @@ -8,23 +8,27 @@ import ( "github.com/sashabaranov/go-openai" ) -const summarizePrompt = `You are an helpful assistant. Your task is to extract all text that refers to the content of a yoga workout video from the description a user gives you. -You will not add introductory sentences like "This text is about", or "Summary of...". Just give the words verbatim. Trim any white space back to a simple space -` - -type OpenAI struct { +type OpenAISummarizer struct { client *openai.Client } -func NewOpenAI(apiKey string) *OpenAI { - return &OpenAI{ - client: openai.NewClient(apiKey), +func NewOpenAISummarizer(client *openai.Client) *OpenAISummarizer { + return &OpenAISummarizer{ + client: client, } } -func (o *OpenAI) FetchSummary(video *model.Video) error { - resp, err := o.client.CreateChatCompletion( - context.Background(), +func (sum *OpenAISummarizer) Name() string { + return "openai summarizer" +} + +func (sum *OpenAISummarizer) Do(ctx context.Context, video *model.Video) error { + const summarizePrompt = `You are an helpful assistant. Your task is to extract all text that refers to the content of a yoga workout video from the description a user gives you. +You will not add introductory sentences like "This text is about", or "Summary of...". Just give the words verbatim. Trim any white space back to a simple space +` + + resp, err := sum.client.CreateChatCompletion( + ctx, openai.ChatCompletionRequest{ Model: openai.GPT4, Messages: []openai.ChatCompletionMessage{ diff --git a/process/processor.go b/process/processor.go new file mode 100644 index 0000000..d2db65a --- /dev/null +++ b/process/processor.go @@ -0,0 +1,85 @@ +package process + +import ( + "context" + + "ewintr.nl/yogai/model" + "ewintr.nl/yogai/storage" + "github.com/sashabaranov/go-openai" + "golang.org/x/exp/slog" +) + +type VideoProcessor interface { + Name() string + Do(ctx context.Context, video *model.Video) error +} + +type Processors struct { + procs map[string]VideoProcessor +} + +func NewProcessors(openAIClient *openai.Client) *Processors { + return &Processors{ + procs: map[string]VideoProcessor{ + "summarizer": NewOpenAISummarizer(openAIClient), + }, + } +} + +func (p *Processors) Next(video *model.Video) VideoProcessor { + if video.Summary == "" { + return p.procs["summarizer"] + } + + return nil +} + +type Pipeline struct { + in chan *model.Video + procs *Processors + logger *slog.Logger + relStorage storage.VideoRelRepository + vecStorage storage.VideoVecRepository +} + +func NewPipeline(in chan *model.Video, processors *Processors, relDB storage.VideoRelRepository, vecDB storage.VideoVecRepository, logger *slog.Logger) *Pipeline { + return &Pipeline{ + in: in, + procs: processors, + relStorage: relDB, + vecStorage: vecDB, + logger: logger, + } +} + +func (p *Pipeline) Run() { + ctx := context.Background() + for video := range p.in { + p.Process(ctx, video) + } +} + +func (p *Pipeline) Process(ctx context.Context, video *model.Video) { + p.logger.Info("processing video", slog.String("video", string(video.YoutubeID))) + for { + next := p.procs.Next(video) + if next == nil { + p.logger.Info("no more processors for video", slog.String("video", string(video.YoutubeID))) + return + } + + p.logger.Info("processing video", slog.String("video", string(video.YoutubeID)), slog.String("processor", next.Name())) + if err := next.Do(context.Background(), video); err != nil { + p.logger.Error("failed to process video", slog.String("video", string(video.YoutubeID)), slog.String("processor", next.Name()), slog.String("error", err.Error())) + return + } + if err := p.relStorage.Save(video); err != nil { + p.logger.Error("failed to save video in rel db", slog.String("video", string(video.YoutubeID)), slog.String("error", err.Error())) + return + } + if err := p.vecStorage.Save(ctx, video); err != nil { + p.logger.Error("failed to save video in rel db", slog.String("video", string(video.YoutubeID)), slog.String("error", err.Error())) + return + } + } +} diff --git a/service.go b/service.go index ec0a508..838c61d 100644 --- a/service.go +++ b/service.go @@ -9,9 +9,11 @@ import ( "strconv" "time" - "ewintr.nl/yogai/fetcher" + "ewintr.nl/yogai/fetch" "ewintr.nl/yogai/handler" + "ewintr.nl/yogai/process" "ewintr.nl/yogai/storage" + "github.com/sashabaranov/go-openai" "golang.org/x/exp/slog" "google.golang.org/api/option" "google.golang.org/api/youtube/v3" @@ -33,10 +35,10 @@ func main() { logger.Error("unable to connect to postgres", err) os.Exit(1) } - videoRepo := storage.NewPostgresVideoRepository(postgres) - feedRepo := storage.NewPostgresFeedRepository(postgres) + videoRelRepo := storage.NewPostgresVideoRepository(postgres) + feedRelRepo := storage.NewPostgresFeedRepository(postgres) - mflx := fetcher.NewMiniflux(fetcher.MinifluxInfo{ + mflxClient := fetch.NewMiniflux(fetch.MinifluxInfo{ Endpoint: getParam("MINIFLUX_ENDPOINT", "http://localhost/v1"), ApiKey: getParam("MINIFLUX_APIKEY", ""), }) @@ -47,39 +49,45 @@ func main() { os.Exit(1) } - ytClient, err := youtube.NewService(ctx, option.WithAPIKey(getParam("YOUTUBE_API_KEY", ""))) + yt, err := youtube.NewService(ctx, option.WithAPIKey(getParam("YOUTUBE_API_KEY", ""))) if err != nil { logger.Error("unable to create youtube service", err) os.Exit(1) } - yt := fetcher.NewYoutube(ytClient) + ytClient := fetch.NewYoutube(yt) openaiKey := getParam("OPENAI_API_KEY", "") - openAIClient := fetcher.NewOpenAI(openaiKey) + openAIClient := openai.NewClient(openaiKey) wvResetSchema := getParam("WEAVIATE_RESET_SCHEMA", "false") == "true" - wv, err := storage.NewWeaviate(getParam("WEAVIATE_HOST", ""), getParam("WEAVIATE_API_KEY", ""), openaiKey) + wvClient, err := storage.NewWeaviate(getParam("WEAVIATE_HOST", ""), getParam("WEAVIATE_API_KEY", ""), openaiKey) if err != nil { logger.Error("unable to create weaviate client", err) os.Exit(1) } if wvResetSchema { logger.Info("resetting weaviate schema") - if err := wv.ResetSchema(); err != nil { + if err := wvClient.ResetSchema(); err != nil { logger.Error("unable to reset weaviate schema", err) os.Exit(1) } } - go fetcher.NewFetch(feedRepo, videoRepo, yt, mflx, wv, fetchInterval, yt, openAIClient, logger).Run() + fetcher := fetch.NewFetch(feedRelRepo, videoRelRepo, ytClient, mflxClient, fetchInterval, ytClient, logger) + go fetcher.Run() logger.Info("fetch service started") + procs := process.NewProcessors(openAIClient) + pipeline := process.NewPipeline(fetcher.Out(), procs, videoRelRepo, wvClient, logger) + go pipeline.Run() + logger.Info("processing service started") + port, err := strconv.Atoi(getParam("API_PORT", "8080")) if err != nil { logger.Error("invalid port", err) os.Exit(1) } - go http.ListenAndServe(fmt.Sprintf(":%d", port), handler.NewServer(videoRepo, logger)) + go http.ListenAndServe(fmt.Sprintf(":%d", port), handler.NewServer(videoRelRepo, logger)) logger.Info("http server started") done := make(chan os.Signal) diff --git a/storage/migrations.go b/storage/migrations.go index 9607cd8..f4143e4 100644 --- a/storage/migrations.go +++ b/storage/migrations.go @@ -42,4 +42,14 @@ ADD COLUMN published_at VARCHAR(255)`, `ALTER TABLE video RENAME COLUMN title TO youtube_title`, `ALTER TABLE video RENAME COLUMN description TO youtube_description`, `ALTER TABLE video RENAME COLUMN youtube_published_id TO youtube_published_at`, + `UPDATE video SET status = 'new'`, + `CREATE TYPE video_status_new AS ENUM ('new', 'fetched', 'ready')`, + `BEGIN; +ALTER TABLE video ADD COLUMN status_new video_status_new; +UPDATE video SET status_new = status::text::video_status_new; +ALTER TABLE video DROP COLUMN status; +ALTER TABLE video RENAME COLUMN status_new TO status; +COMMIT;`, + `DROP TYPE video_status`, + `ALTER TYPE video_status_new RENAME TO video_status`, } diff --git a/storage/storage.go b/storage/storage.go index 9edd027..dfe2607 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1,15 +1,21 @@ package storage import ( + "context" + "ewintr.nl/yogai/model" ) -type FeedRepository interface { +type FeedRelRepository interface { Save(feed *model.Feed) error FindByStatus(statuses ...model.FeedStatus) ([]*model.Feed, error) } -type VideoRepository interface { +type VideoRelRepository interface { Save(video *model.Video) error FindByStatus(statuses ...model.VideoStatus) ([]*model.Video, error) } + +type VideoVecRepository interface { + Save(ctx context.Context, video *model.Video) error +}