From 4d8d8d1a3df518d26314578e601416585e084131 Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Wed, 10 May 2023 16:28:45 +0200 Subject: [PATCH] start processing pipeline --- fetch/fetch.go | 56 ----------------------- fetch/miniflux.go | 65 -------------------------- fetcher/feedreader.go | 12 +++++ fetcher/fetcher.go | 104 ++++++++++++++++++++++++++++++++++++++++++ fetcher/miniflux.go | 63 +++++++++++++++++++++++++ model/video.go | 9 ++-- service.go | 6 +-- storage/postgres.go | 10 ++-- 8 files changed, 191 insertions(+), 134 deletions(-) delete mode 100644 fetch/fetch.go delete mode 100644 fetch/miniflux.go create mode 100644 fetcher/feedreader.go create mode 100644 fetcher/fetcher.go create mode 100644 fetcher/miniflux.go diff --git a/fetch/fetch.go b/fetch/fetch.go deleted file mode 100644 index 2f413d1..0000000 --- a/fetch/fetch.go +++ /dev/null @@ -1,56 +0,0 @@ -package fetch - -import ( - "ewintr.nl/yogai/model" - "ewintr.nl/yogai/storage" - "log" - "time" -) - -type FeedReader interface { - Unread() ([]*model.Video, error) - MarkRead(feedID string) error -} - -type Fetch struct { - interval time.Duration - videoRepo storage.VideoRepository - feedReader FeedReader - out chan<- model.Video -} - -func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration) *Fetch { - return &Fetch{ - interval: interval, - videoRepo: videoRepo, - feedReader: feedReader, - } -} - -func (v *Fetch) Run() { - ticker := time.NewTicker(v.interval) - for { - select { - case <-ticker.C: - newVideos, err := v.feedReader.Unread() - if err != nil { - log.Println(err) - } - for _, video := range newVideos { - if err := v.videoRepo.Save(video); err != nil { - log.Println(err) - continue - } - //v.out <- video - if err := v.feedReader.MarkRead(video.FeedID); err != nil { - log.Println(err) - } - } - - } - } -} - -func (v *Fetch) Out() chan<- model.Video { - return v.out -} diff --git a/fetch/miniflux.go b/fetch/miniflux.go deleted file mode 100644 index add2c2a..0000000 --- a/fetch/miniflux.go +++ /dev/null @@ -1,65 +0,0 @@ -package fetch - -import ( - "ewintr.nl/yogai/model" - "github.com/google/uuid" - "miniflux.app/client" - "strconv" -) - -type Entry struct { - MinifluxEntryID string - MinifluxFeedID string - MinifluxURL string - Title string - Description string -} - -type MinifluxInfo struct { - Endpoint string - ApiKey string -} - -type Miniflux struct { - client *client.Client -} - -func NewMiniflux(mflInfo MinifluxInfo) *Miniflux { - return &Miniflux{ - client: client.New(mflInfo.Endpoint, mflInfo.ApiKey), - } -} - -func (m *Miniflux) Unread() ([]*model.Video, error) { - result, err := m.client.Entries(&client.Filter{Status: "unread"}) - if err != nil { - return []*model.Video{}, err - } - - videos := []*model.Video{} - for _, entry := range result.Entries { - videos = append(videos, &model.Video{ - ID: uuid.New(), - Status: model.STATUS_NEW, - YoutubeURL: entry.URL, - FeedID: strconv.Itoa(int(entry.ID)), - Title: entry.Title, - Description: entry.Content, - }) - } - - return videos, nil -} - -func (m *Miniflux) MarkRead(entryID string) error { - id, err := strconv.ParseInt(entryID, 10, 64) - if err != nil { - return err - } - - if err := m.client.UpdateEntries([]int64{id}, "read"); err != nil { - return err - } - - return nil -} diff --git a/fetcher/feedreader.go b/fetcher/feedreader.go new file mode 100644 index 0000000..3a383f2 --- /dev/null +++ b/fetcher/feedreader.go @@ -0,0 +1,12 @@ +package fetcher + +type FeedEntry struct { + EntryID int64 + FeedID int64 + YouTubeID string +} + +type FeedReader interface { + Unread() ([]FeedEntry, error) + MarkRead(feedID int64) error +} diff --git a/fetcher/fetcher.go b/fetcher/fetcher.go new file mode 100644 index 0000000..d08093d --- /dev/null +++ b/fetcher/fetcher.go @@ -0,0 +1,104 @@ +package fetcher + +import ( + "ewintr.nl/yogai/model" + "ewintr.nl/yogai/storage" + "fmt" + "github.com/google/uuid" + "log" + "time" +) + +type Fetcher struct { + interval time.Duration + videoRepo storage.VideoRepository + feedReader FeedReader + pipeline chan *model.Video + needsMetadata chan *model.Video +} + +func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration) *Fetcher { + return &Fetcher{ + interval: interval, + videoRepo: videoRepo, + feedReader: feedReader, + pipeline: make(chan *model.Video), + needsMetadata: make(chan *model.Video), + } +} + +func (f *Fetcher) Run() { + go f.ReadFeeds() + go f.MetadataFetcher() + + for { + select { + case video := <-f.pipeline: + switch video.Status { + case model.STATUS_NEW: + f.needsMetadata <- video + } + } + } +} + +func (f *Fetcher) ReadFeeds() { + ticker := time.NewTicker(f.interval) + for range ticker.C { + entries, err := f.feedReader.Unread() + if err != nil { + log.Println(err) + } + for _, entry := range entries { + video := &model.Video{ + ID: uuid.New(), + Status: model.STATUS_NEW, + YoutubeID: entry.YouTubeID, + // feed id + } + if err := f.videoRepo.Save(video); err != nil { + log.Println(err) + continue + } + f.pipeline <- video + if err := f.feedReader.MarkRead(entry.EntryID); err != nil { + log.Println(err) + } + } + } +} + +func (f *Fetcher) MetadataFetcher() { + buffer := []*model.Video{} + timeout := time.NewTimer(10 * time.Second) + fetch := make(chan []*model.Video) + + go func() { + for videos := range fetch { + fmt.Println("MD Fetching metadata") + fmt.Printf("%d videos to fetch\n", len(videos)) + } + }() + + for { + select { + case video := <-f.needsMetadata: + timeout.Reset(10 * time.Second) + buffer = append(buffer, video) + if len(buffer) >= 10 { + batch := make([]*model.Video, len(buffer)) + copy(batch, buffer) + fetch <- batch + buffer = []*model.Video{} + } + case <-timeout.C: + if len(buffer) == 0 { + continue + } + batch := make([]*model.Video, len(buffer)) + copy(batch, buffer) + fetch <- batch + buffer = []*model.Video{} + } + } +} diff --git a/fetcher/miniflux.go b/fetcher/miniflux.go new file mode 100644 index 0000000..afadf0a --- /dev/null +++ b/fetcher/miniflux.go @@ -0,0 +1,63 @@ +package fetcher + +import ( + "miniflux.app/client" + "strings" +) + +type Entry struct { + MinifluxEntryID string + MinifluxFeedID string + MinifluxURL string + Title string + Description string +} + +type MinifluxInfo struct { + Endpoint string + ApiKey string +} + +type Miniflux struct { + client *client.Client +} + +func NewMiniflux(mflInfo MinifluxInfo) *Miniflux { + return &Miniflux{ + client: client.New(mflInfo.Endpoint, mflInfo.ApiKey), + } +} + +func (m *Miniflux) Unread() ([]FeedEntry, error) { + result, err := m.client.Entries(&client.Filter{Status: "unread"}) + if err != nil { + return []FeedEntry{}, err + } + + entries := []FeedEntry{} + for _, entry := range result.Entries { + entries = append(entries, FeedEntry{ + EntryID: entry.ID, + FeedID: entry.FeedID, + YouTubeID: strings.TrimPrefix(entry.URL, "https://www.youtube.com/watch?v="), + }) + + // ID: uuid.New(), + // Status: model.STATUS_NEW, + // YoutubeURL: entry.URL, + // FeedID: strconv.Itoa(int(entry.ID)), + // Title: entry.Title, + // Description: entry.Content, + //}) + } + + return entries, nil +} + +func (m *Miniflux) MarkRead(entryID int64) error { + if err := m.client.UpdateEntries([]int64{entryID}, "read"); err != nil { + return err + } + + return nil +} diff --git a/model/video.go b/model/video.go index 5fd9284..227a7e5 100644 --- a/model/video.go +++ b/model/video.go @@ -5,16 +5,15 @@ import "github.com/google/uuid" type Status string const ( - STATUS_NEW Status = "new" - STATUS_NEEDS_SUMMARY Status = "needs_summary" - STATUS_READY Status = "ready" + STATUS_NEW Status = "new" + STATUS_READY Status = "ready" ) type Video struct { ID uuid.UUID Status Status - YoutubeURL string - FeedID string + YoutubeID string + FeedID uuid.UUID Title string Description string Summary string diff --git a/service.go b/service.go index 352957e..1d6f96a 100644 --- a/service.go +++ b/service.go @@ -1,7 +1,7 @@ package main import ( - "ewintr.nl/yogai/fetch" + "ewintr.nl/yogai/fetcher" "ewintr.nl/yogai/storage" "fmt" "os" @@ -23,7 +23,7 @@ func main() { } videoRepo := storage.NewPostgresVideoRepository(postgres) - mflx := fetch.NewMiniflux(fetch.MinifluxInfo{ + mflx := fetcher.NewMiniflux(fetcher.MinifluxInfo{ Endpoint: getParam("MINIFLUX_ENDPOINT", "http://localhost/v1"), ApiKey: getParam("MINIFLUX_APIKEY", ""), }) @@ -34,7 +34,7 @@ func main() { os.Exit(1) } - fetcher := fetch.NewFetch(videoRepo, mflx, fetchInterval) + fetcher := fetcher.NewFetch(videoRepo, mflx, fetchInterval) go fetcher.Run() done := make(chan os.Signal) diff --git a/storage/postgres.go b/storage/postgres.go index a5a5a88..4bb1c9b 100644 --- a/storage/postgres.go +++ b/storage/postgres.go @@ -42,27 +42,27 @@ func NewPostgresVideoRepository(postgres *Postgres) *PostgresVideoRepository { } func (p *PostgresVideoRepository) Save(v *model.Video) error { - query := `INSERT INTO video (id, status, youtube_url, feed_id, title, description) + query := `INSERT INTO video (id, status, youtube_id, feed_id, title, description) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id, status = EXCLUDED.status, - youtube_url = EXCLUDED.youtube_url, + youtube_id = EXCLUDED.youtube_id, feed_id = EXCLUDED.feed_id, title = EXCLUDED.title, description = EXCLUDED.description;` - _, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeURL, v.FeedID, v.Title, v.Description) + _, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeID, v.FeedID, v.Title, v.Description) return err } var pgMigration = []string{ - `CREATE TYPE video_status AS ENUM ('new', 'needs_summary', 'ready')`, + `CREATE TYPE video_status AS ENUM ('new', 'ready')`, `CREATE TABLE video ( id uuid PRIMARY KEY, status video_status NOT NULL, - youtube_url VARCHAR(255) NOT NULL, + youtube_id VARCHAR(255) NOT NULL, title VARCHAR(255) NOT NULL, feed_id VARCHAR(255) NOT NULL, description TEXT,