From b892066abf3c6212830663d193634cbd1627c3a3 Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Sat, 27 May 2023 14:36:22 +0200 Subject: [PATCH] fetch historical videos from channel --- fetcher/feedreader.go | 13 ++++-- fetcher/fetcher.go | 97 ++++++++++++++++++++++++++++++++++++------- fetcher/metadata.go | 4 +- fetcher/miniflux.go | 22 ++++------ fetcher/youtube.go | 47 +++++++++++++++++---- handler/server.go | 7 +++- handler/video.go | 71 +++++++++++++++++++++++++++++++ model/feed.go | 17 ++++++++ model/video.go | 28 +++++++------ service.go | 19 +++++---- storage/postgres.go | 69 ++++++++++++++++++++++++++---- storage/storage.go | 7 +++- 12 files changed, 329 insertions(+), 72 deletions(-) create mode 100644 handler/video.go create mode 100644 model/feed.go diff --git a/fetcher/feedreader.go b/fetcher/feedreader.go index 3a383f2..e587278 100644 --- a/fetcher/feedreader.go +++ b/fetcher/feedreader.go @@ -1,9 +1,16 @@ package fetcher +import "ewintr.nl/yogai/model" + type FeedEntry struct { - EntryID int64 - FeedID int64 - YouTubeID string + EntryID int64 + FeedID int64 + YoutubeChannelID string + YoutubeID string +} + +type ChannelReader interface { + Search(channelID model.YoutubeChannelID, pageToken string) ([]model.YoutubeVideoID, string, error) } type FeedReader interface { diff --git a/fetcher/fetcher.go b/fetcher/fetcher.go index 9460ba7..e9cf6de 100644 --- a/fetcher/fetcher.go +++ b/fetcher/fetcher.go @@ -1,33 +1,40 @@ package fetcher import ( + "time" + "ewintr.nl/yogai/model" "ewintr.nl/yogai/storage" "github.com/google/uuid" "golang.org/x/exp/slog" - "time" ) type Fetcher struct { interval time.Duration + feedRepo storage.FeedRepository videoRepo storage.VideoRepository feedReader FeedReader + channelReader ChannelReader metadataFetcher MetadataFetcher summaryFetcher SummaryFetcher - pipeline chan *model.Video + feedPipeline chan *model.Feed + videoPipeline chan *model.Video needsMetadata chan *model.Video needsSummary chan *model.Video logger *slog.Logger } -func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration, metadataFetcher MetadataFetcher, summaryFetcher SummaryFetcher, logger *slog.Logger) *Fetcher { +func NewFetch(feedRepo storage.FeedRepository, videoRepo storage.VideoRepository, channelReader ChannelReader, feedReader FeedReader, interval time.Duration, metadataFetcher MetadataFetcher, summaryFetcher SummaryFetcher, logger *slog.Logger) *Fetcher { return &Fetcher{ interval: interval, + feedRepo: feedRepo, videoRepo: videoRepo, + channelReader: channelReader, feedReader: feedReader, metadataFetcher: metadataFetcher, summaryFetcher: summaryFetcher, - pipeline: make(chan *model.Video, 10), + feedPipeline: make(chan *model.Feed, 10), + videoPipeline: make(chan *model.Video, 10), needsMetadata: make(chan *model.Video, 10), needsSummary: make(chan *model.Video, 10), logger: logger, @@ -35,15 +42,18 @@ func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval } func (f *Fetcher) Run() { + go f.FetchHistoricalVideos() + go f.FindNewFeeds() + go f.ReadFeeds() go f.MetadataFetcher() go f.SummaryFetcher() go f.FindUnprocessed() - f.logger.Info("started pipeline") + f.logger.Info("started videoPipeline") for { select { - case video := <-f.pipeline: + case video := <-f.videoPipeline: switch video.Status { case model.StatusNew: f.needsMetadata <- video @@ -63,6 +73,63 @@ func (f *Fetcher) Run() { } } +func (f *Fetcher) FindNewFeeds() { + f.logger.Info("looking for new feeds") + feeds, err := f.feedRepo.FindByStatus(model.FeedStatusNew) + if err != nil { + f.logger.Error("failed to fetch feeds", err) + return + } + for _, feed := range feeds { + f.feedPipeline <- feed + } +} + +func (f *Fetcher) FetchHistoricalVideos() { + f.logger.Info("started historical video fetcher") + + for feed := range f.feedPipeline { + f.logger.Info("fetching historical videos", slog.String("channelid", string(feed.YoutubeChannelID))) + token := "" + for { + token = f.FetchHistoricalVideoPage(feed.YoutubeChannelID, token) + if token == "" { + break + } + } + feed.Status = model.FeedStatusReady + if err := f.feedRepo.Save(feed); err != nil { + f.logger.Error("failed to save feed", err) + continue + } + } +} + +func (f *Fetcher) FetchHistoricalVideoPage(channelID model.YoutubeChannelID, pageToken string) string { + f.logger.Info("fetching historical video page", slog.String("channelid", string(channelID)), slog.String("pagetoken", pageToken)) + ytIDs, pageToken, err := f.channelReader.Search(channelID, pageToken) + if err != nil { + f.logger.Error("failed to fetch channel", err) + return "" + } + for _, ytID := range ytIDs { + video := &model.Video{ + ID: uuid.New(), + Status: model.StatusNew, + YoutubeID: ytID, + YoutubeChannelID: channelID, + } + if err := f.videoRepo.Save(video); err != nil { + f.logger.Error("failed to save video", err) + continue + } + f.videoPipeline <- video + } + + f.logger.Info("fetched historical video page", slog.String("channelid", string(channelID)), slog.String("pagetoken", pageToken), slog.Int("count", len(ytIDs))) + return pageToken +} + func (f *Fetcher) FindUnprocessed() { f.logger.Info("looking for unprocessed videos") videos, err := f.videoRepo.FindByStatus(model.StatusNew, model.StatusHasMetadata) @@ -72,7 +139,7 @@ func (f *Fetcher) FindUnprocessed() { } f.logger.Info("found unprocessed videos", slog.Int("count", len(videos))) for _, video := range videos { - f.pipeline <- video + f.videoPipeline <- video } } @@ -92,16 +159,16 @@ func (f *Fetcher) ReadFeeds() { for _, entry := range entries { video := &model.Video{ - ID: uuid.New(), - Status: model.StatusNew, - YoutubeID: entry.YouTubeID, - // feed id + ID: uuid.New(), + Status: model.StatusNew, + YoutubeID: model.YoutubeVideoID(entry.YoutubeID), + YoutubeChannelID: model.YoutubeChannelID(entry.YoutubeChannelID), } if err := f.videoRepo.Save(video); err != nil { f.logger.Error("failed to save video", err) continue } - f.pipeline <- video + f.videoPipeline <- video if err := f.feedReader.MarkRead(entry.EntryID); err != nil { f.logger.Error("failed to mark entry as read", err) continue @@ -120,7 +187,7 @@ func (f *Fetcher) MetadataFetcher() { go func() { for videos := range fetch { f.logger.Info("fetching metadata", slog.Int("count", len(videos))) - ids := make([]string, 0, len(videos)) + ids := make([]model.YoutubeVideoID, 0, len(videos)) for _, video := range videos { ids = append(ids, video.YoutubeID) } @@ -148,7 +215,7 @@ func (f *Fetcher) MetadataFetcher() { case video := <-f.needsMetadata: timeout.Reset(10 * time.Second) buffer = append(buffer, video) - if len(buffer) >= 10 { + if len(buffer) >= 50 { batch := make([]*model.Video, len(buffer)) copy(batch, buffer) fetch <- batch @@ -177,7 +244,7 @@ func (f *Fetcher) SummaryFetcher() { } video.Status = model.StatusHasSummary f.logger.Info("fetched summary", slog.String("id", video.ID.String())) - f.pipeline <- video + f.videoPipeline <- video } } } diff --git a/fetcher/metadata.go b/fetcher/metadata.go index 9f21862..182ea9f 100644 --- a/fetcher/metadata.go +++ b/fetcher/metadata.go @@ -1,10 +1,12 @@ package fetcher +import "ewintr.nl/yogai/model" + type Metadata struct { Title string Description string } type MetadataFetcher interface { - FetchMetadata([]string) (map[string]Metadata, error) + FetchMetadata([]model.YoutubeVideoID) (map[model.YoutubeVideoID]Metadata, error) } diff --git a/fetcher/miniflux.go b/fetcher/miniflux.go index afadf0a..91c714d 100644 --- a/fetcher/miniflux.go +++ b/fetcher/miniflux.go @@ -1,8 +1,9 @@ package fetcher import ( - "miniflux.app/client" "strings" + + "miniflux.app/client" ) type Entry struct { @@ -31,24 +32,17 @@ func NewMiniflux(mflInfo MinifluxInfo) *Miniflux { func (m *Miniflux) Unread() ([]FeedEntry, error) { result, err := m.client.Entries(&client.Filter{Status: "unread"}) if err != nil { - return []FeedEntry{}, err + return nil, err } - entries := []FeedEntry{} + entries := make([]FeedEntry, 0, len(result.Entries)) for _, entry := range result.Entries { entries = append(entries, FeedEntry{ - EntryID: entry.ID, - FeedID: entry.FeedID, - YouTubeID: strings.TrimPrefix(entry.URL, "https://www.youtube.com/watch?v="), + EntryID: entry.ID, + FeedID: entry.FeedID, + YoutubeChannelID: strings.TrimPrefix(entry.Feed.FeedURL, "https://www.youtube.com/feeds/videos.xml?channel_id="), + YoutubeID: strings.TrimPrefix(entry.URL, "https://www.youtube.com/watch?v="), }) - - // ID: uuid.New(), - // Status: model.STATUS_NEW, - // YoutubeURL: entry.URL, - // FeedID: strconv.Itoa(int(entry.ID)), - // Title: entry.Title, - // Description: entry.Content, - //}) } return entries, nil diff --git a/fetcher/youtube.go b/fetcher/youtube.go index de2c0e7..c4883d4 100644 --- a/fetcher/youtube.go +++ b/fetcher/youtube.go @@ -1,8 +1,10 @@ package fetcher import ( - "google.golang.org/api/youtube/v3" "strings" + + "ewintr.nl/yogai/model" + "google.golang.org/api/youtube/v3" ) type Youtube struct { @@ -13,19 +15,48 @@ func NewYoutube(client *youtube.Service) *Youtube { return &Youtube{Client: client} } -func (y *Youtube) FetchMetadata(ytIDs []string) (map[string]Metadata, error) { - call := y.Client.Videos. - List([]string{"snippet"}). - Id(strings.Join(ytIDs, ",")) +func (y *Youtube) Search(channelID model.YoutubeChannelID, pageToken string) ([]model.YoutubeVideoID, string, error) { + call := y.Client.Search. + List([]string{"id"}). + MaxResults(50). + Type("video"). + Order("date"). + ChannelId(string(channelID)) + + if pageToken != "" { + call.PageToken(pageToken) + } response, err := call.Do() if err != nil { - return map[string]Metadata{}, err + return []model.YoutubeVideoID{}, "", err } - mds := make(map[string]Metadata, len(response.Items)) + ids := make([]model.YoutubeVideoID, len(response.Items)) + for i, item := range response.Items { + ids[i] = model.YoutubeVideoID(item.Id.VideoId) + } + + return ids, response.NextPageToken, nil +} + +func (y *Youtube) FetchMetadata(ytIDs []model.YoutubeVideoID) (map[model.YoutubeVideoID]Metadata, error) { + strIDs := make([]string, len(ytIDs)) + for i, id := range ytIDs { + strIDs[i] = string(id) + } + call := y.Client.Videos. + List([]string{"snippet"}). + Id(strings.Join(strIDs, ",")) + + response, err := call.Do() + if err != nil { + return map[model.YoutubeVideoID]Metadata{}, err + } + + mds := make(map[model.YoutubeVideoID]Metadata, len(response.Items)) for _, item := range response.Items { - mds[item.Id] = Metadata{ + mds[model.YoutubeVideoID(item.Id)] = Metadata{ Title: item.Snippet.Title, Description: item.Snippet.Description, } diff --git a/handler/server.go b/handler/server.go index b0e583f..1f8caa9 100644 --- a/handler/server.go +++ b/handler/server.go @@ -1,6 +1,7 @@ package handler import ( + "ewintr.nl/yogai/storage" "fmt" "golang.org/x/exp/slog" "miniflux.app/logger" @@ -15,9 +16,11 @@ type Server struct { logger *slog.Logger } -func NewServer(logger *slog.Logger) *Server { +func NewServer(videoRepo storage.VideoRepository, logger *slog.Logger) *Server { return &Server{ - apis: map[string]http.Handler{}, + apis: map[string]http.Handler{ + "video": NewVideoAPI(videoRepo, logger), + }, logger: logger, } } diff --git a/handler/video.go b/handler/video.go new file mode 100644 index 0000000..363ff5c --- /dev/null +++ b/handler/video.go @@ -0,0 +1,71 @@ +package handler + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "ewintr.nl/yogai/model" + "ewintr.nl/yogai/storage" + "golang.org/x/exp/slog" +) + +type VideoAPI struct { + videoRepo storage.VideoRepository + logger *slog.Logger +} + +func NewVideoAPI(videoRepo storage.VideoRepository, logger *slog.Logger) *VideoAPI { + return &VideoAPI{ + videoRepo: videoRepo, + logger: logger, + } +} + +func (v *VideoAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { + videoID, _ := ShiftPath(r.URL.Path) + + switch { + case r.Method == http.MethodGet && videoID == "": + v.List(w, r) + default: + Error(w, http.StatusNotFound, "not found", fmt.Errorf("method %s with subpath %q was not registered in the repository api", r.Method, videoID)) + } +} + +func (v *VideoAPI) List(w http.ResponseWriter, r *http.Request) { + video, err := v.videoRepo.FindByStatus(model.StatusReady) + if err != nil { + v.returnErr(r.Context(), w, http.StatusInternalServerError, "could not list repositories", err) + return + } + + type respVideo struct { + YoutubeID string `json:"youtube_url"` + Title string `json:"title"` + Summary string `json:"summary"` + } + var resp []respVideo + for _, v := range video { + resp = append(resp, respVideo{ + YoutubeID: string(v.YoutubeID), + Title: v.Title, + Summary: v.Summary, + }) + } + + jsonBody, err := json.Marshal(resp) + if err != nil { + v.returnErr(r.Context(), w, http.StatusInternalServerError, "could not marshal response", err) + return + } + + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, string(jsonBody)) +} + +func (v *VideoAPI) returnErr(_ context.Context, w http.ResponseWriter, status int, message string, err error, details ...any) { + v.logger.Error(message, slog.String("err", err.Error()), slog.String("details", fmt.Sprintf("%+v", details))) + Error(w, status, message, err, details...) +} diff --git a/model/feed.go b/model/feed.go new file mode 100644 index 0000000..af01f5e --- /dev/null +++ b/model/feed.go @@ -0,0 +1,17 @@ +package model + +import "github.com/google/uuid" + +type FeedStatus string + +const ( + FeedStatusNew FeedStatus = "new" + FeedStatusReady FeedStatus = "ready" +) + +type Feed struct { + ID uuid.UUID + Status FeedStatus + Title string + YoutubeChannelID YoutubeChannelID +} diff --git a/model/video.go b/model/video.go index 737272d..c10fd61 100644 --- a/model/video.go +++ b/model/video.go @@ -2,21 +2,25 @@ package model import "github.com/google/uuid" -type Status string +type VideoStatus string const ( - StatusNew Status = "new" - StatusHasMetadata Status = "has_metadata" - StatusHasSummary Status = "has_summary" - StatusReady Status = "ready" + StatusNew VideoStatus = "new" + StatusHasMetadata VideoStatus = "has_metadata" + StatusHasSummary VideoStatus = "has_summary" + StatusReady VideoStatus = "ready" ) +type YoutubeVideoID string + +type YoutubeChannelID string + type Video struct { - ID uuid.UUID - Status Status - YoutubeID string - FeedID uuid.UUID - Title string - Description string - Summary string + ID uuid.UUID + Status VideoStatus + YoutubeID YoutubeVideoID + YoutubeChannelID YoutubeChannelID + Title string + Description string + Summary string } diff --git a/service.go b/service.go index 1e428c4..a70eb13 100644 --- a/service.go +++ b/service.go @@ -2,18 +2,19 @@ package main import ( "context" - "ewintr.nl/yogai/fetcher" - "ewintr.nl/yogai/handler" - "ewintr.nl/yogai/storage" "fmt" - "golang.org/x/exp/slog" - "google.golang.org/api/option" - "google.golang.org/api/youtube/v3" "net/http" "os" "os/signal" "strconv" "time" + + "ewintr.nl/yogai/fetcher" + "ewintr.nl/yogai/handler" + "ewintr.nl/yogai/storage" + "golang.org/x/exp/slog" + "google.golang.org/api/option" + "google.golang.org/api/youtube/v3" ) func main() { @@ -33,6 +34,7 @@ func main() { os.Exit(1) } videoRepo := storage.NewPostgresVideoRepository(postgres) + feedRepo := storage.NewPostgresFeedRepository(postgres) mflx := fetcher.NewMiniflux(fetcher.MinifluxInfo{ Endpoint: getParam("MINIFLUX_ENDPOINT", "http://localhost/v1"), @@ -54,8 +56,7 @@ func main() { openAIClient := fetcher.NewOpenAI(getParam("OPENAI_API_KEY", "")) - fetcher := fetcher.NewFetch(videoRepo, mflx, fetchInterval, yt, openAIClient, logger) - go fetcher.Run() + go fetcher.NewFetch(feedRepo, videoRepo, yt, mflx, fetchInterval, yt, openAIClient, logger).Run() logger.Info("fetch service started") port, err := strconv.Atoi(getParam("API_PORT", "8080")) @@ -63,7 +64,7 @@ func main() { logger.Error("invalid port", err) os.Exit(1) } - go http.ListenAndServe(fmt.Sprintf(":%d", port), handler.NewServer(logger)) + go http.ListenAndServe(fmt.Sprintf(":%d", port), handler.NewServer(videoRepo, logger)) logger.Info("http server started") done := make(chan os.Signal) diff --git a/storage/postgres.go b/storage/postgres.go index 31ae6fa..39a0f23 100644 --- a/storage/postgres.go +++ b/storage/postgres.go @@ -2,8 +2,9 @@ package storage import ( "database/sql" - "ewintr.nl/yogai/model" "fmt" + + "ewintr.nl/yogai/model" "github.com/lib/pq" _ "github.com/lib/pq" ) @@ -43,24 +44,24 @@ func NewPostgresVideoRepository(postgres *Postgres) *PostgresVideoRepository { } func (p *PostgresVideoRepository) Save(v *model.Video) error { - query := `INSERT INTO video (id, status, youtube_id, feed_id, title, description, summary) + query := `INSERT INTO video (id, status, youtube_id, youtube_channel_id, title, description, summary) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id, status = EXCLUDED.status, youtube_id = EXCLUDED.youtube_id, - feed_id = EXCLUDED.feed_id, + youtube_channel_id = EXCLUDED.youtube_channel_id, title = EXCLUDED.title, description = EXCLUDED.description, summary = EXCLUDED.summary;` - _, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeID, v.FeedID, v.Title, v.Description, v.Summary) + _, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeID, v.YoutubeChannelID, v.Title, v.Description, v.Summary) return err } -func (p *PostgresVideoRepository) FindByStatus(statuses ...model.Status) ([]*model.Video, error) { - query := `SELECT id, status, youtube_id, feed_id, title, description, summary +func (p *PostgresVideoRepository) FindByStatus(statuses ...model.VideoStatus) ([]*model.Video, error) { + query := `SELECT id, status, youtube_channel_id, youtube_id, title, description, summary FROM video WHERE status = ANY($1)` rows, err := p.db.Query(query, pq.Array(statuses)) @@ -71,7 +72,7 @@ WHERE status = ANY($1)` videos := []*model.Video{} for rows.Next() { v := &model.Video{} - if err := rows.Scan(&v.ID, &v.Status, &v.YoutubeID, &v.FeedID, &v.Title, &v.Description, &v.Summary); err != nil { + if err := rows.Scan(&v.ID, &v.Status, &v.YoutubeChannelID, &v.YoutubeID, &v.Title, &v.Description, &v.Summary); err != nil { return nil, err } videos = append(videos, v) @@ -81,6 +82,50 @@ WHERE status = ANY($1)` return videos, nil } +type PostgresFeedRepository struct { + *Postgres +} + +func NewPostgresFeedRepository(postgres *Postgres) *PostgresFeedRepository { + return &PostgresFeedRepository{postgres} +} + +func (p *PostgresFeedRepository) Save(f *model.Feed) error { + query := `INSERT INTO feed (id, status, youtube_channel_id, title) +VALUES ($1, $2, $3, $4) +ON CONFLICT (id) +DO UPDATE SET + id = EXCLUDED.id, + status = EXCLUDED.status, + youtube_channel_id = EXCLUDED.youtube_channel_id, + title = EXCLUDED.title;` + _, err := p.db.Exec(query, f.ID, f.Status, f.YoutubeChannelID, f.Title) + + return err +} + +func (p *PostgresFeedRepository) FindByStatus(statuses ...model.FeedStatus) ([]*model.Feed, error) { + query := `SELECT id, status, youtube_channel_id, title +FROM feed +WHERE status = ANY($1)` + rows, err := p.db.Query(query, pq.Array(statuses)) + if err != nil { + return nil, err + } + + feeds := []*model.Feed{} + for rows.Next() { + f := &model.Feed{} + if err := rows.Scan(&f.ID, &f.Status, &f.YoutubeChannelID, &f.Title); err != nil { + return nil, err + } + feeds = append(feeds, f) + } + rows.Close() + + return feeds, nil +} + var pgMigration = []string{ `CREATE TYPE video_status AS ENUM ('new', 'ready')`, `CREATE TABLE video ( @@ -105,6 +150,16 @@ ALTER COLUMN summary SET DEFAULT '', ALTER COLUMN summary SET NOT NULL, ALTER COLUMN description SET DEFAULT '', ALTER COLUMN description SET NOT NULL`, + `CREATE TYPE feed_status AS ENUM ('new', 'ready')`, + `CREATE TABLE feed ( +id uuid PRIMARY KEY, +status feed_status NOT NULL, +youtube_channel_id VARCHAR(255) NOT NULL UNIQUE, +title VARCHAR(255) NOT NULL +)`, + `ALTER TABLE video +DROP COLUMN feed_id, +ADD COLUMN youtube_channel_id VARCHAR(255) NOT NULL REFERENCES feed(youtube_channel_id)`, } func (p *Postgres) migrate(wanted []string) error { diff --git a/storage/storage.go b/storage/storage.go index 0687788..9edd027 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -4,7 +4,12 @@ import ( "ewintr.nl/yogai/model" ) +type FeedRepository interface { + Save(feed *model.Feed) error + FindByStatus(statuses ...model.FeedStatus) ([]*model.Feed, error) +} + type VideoRepository interface { Save(video *model.Video) error - FindByStatus(statuses ...model.Status) ([]*model.Video, error) + FindByStatus(statuses ...model.VideoStatus) ([]*model.Video, error) }