start processing pipeline

This commit is contained in:
Erik Winter 2023-05-10 16:28:45 +02:00
parent fd7e78b018
commit 4d8d8d1a3d
8 changed files with 191 additions and 134 deletions

View File

@ -1,56 +0,0 @@
package fetch
import (
"ewintr.nl/yogai/model"
"ewintr.nl/yogai/storage"
"log"
"time"
)
type FeedReader interface {
Unread() ([]*model.Video, error)
MarkRead(feedID string) error
}
type Fetch struct {
interval time.Duration
videoRepo storage.VideoRepository
feedReader FeedReader
out chan<- model.Video
}
func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration) *Fetch {
return &Fetch{
interval: interval,
videoRepo: videoRepo,
feedReader: feedReader,
}
}
func (v *Fetch) Run() {
ticker := time.NewTicker(v.interval)
for {
select {
case <-ticker.C:
newVideos, err := v.feedReader.Unread()
if err != nil {
log.Println(err)
}
for _, video := range newVideos {
if err := v.videoRepo.Save(video); err != nil {
log.Println(err)
continue
}
//v.out <- video
if err := v.feedReader.MarkRead(video.FeedID); err != nil {
log.Println(err)
}
}
}
}
}
func (v *Fetch) Out() chan<- model.Video {
return v.out
}

View File

@ -1,65 +0,0 @@
package fetch
import (
"ewintr.nl/yogai/model"
"github.com/google/uuid"
"miniflux.app/client"
"strconv"
)
type Entry struct {
MinifluxEntryID string
MinifluxFeedID string
MinifluxURL string
Title string
Description string
}
type MinifluxInfo struct {
Endpoint string
ApiKey string
}
type Miniflux struct {
client *client.Client
}
func NewMiniflux(mflInfo MinifluxInfo) *Miniflux {
return &Miniflux{
client: client.New(mflInfo.Endpoint, mflInfo.ApiKey),
}
}
func (m *Miniflux) Unread() ([]*model.Video, error) {
result, err := m.client.Entries(&client.Filter{Status: "unread"})
if err != nil {
return []*model.Video{}, err
}
videos := []*model.Video{}
for _, entry := range result.Entries {
videos = append(videos, &model.Video{
ID: uuid.New(),
Status: model.STATUS_NEW,
YoutubeURL: entry.URL,
FeedID: strconv.Itoa(int(entry.ID)),
Title: entry.Title,
Description: entry.Content,
})
}
return videos, nil
}
func (m *Miniflux) MarkRead(entryID string) error {
id, err := strconv.ParseInt(entryID, 10, 64)
if err != nil {
return err
}
if err := m.client.UpdateEntries([]int64{id}, "read"); err != nil {
return err
}
return nil
}

12
fetcher/feedreader.go Normal file
View File

@ -0,0 +1,12 @@
package fetcher
type FeedEntry struct {
EntryID int64
FeedID int64
YouTubeID string
}
type FeedReader interface {
Unread() ([]FeedEntry, error)
MarkRead(feedID int64) error
}

104
fetcher/fetcher.go Normal file
View File

@ -0,0 +1,104 @@
package fetcher
import (
"ewintr.nl/yogai/model"
"ewintr.nl/yogai/storage"
"fmt"
"github.com/google/uuid"
"log"
"time"
)
type Fetcher struct {
interval time.Duration
videoRepo storage.VideoRepository
feedReader FeedReader
pipeline chan *model.Video
needsMetadata chan *model.Video
}
func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration) *Fetcher {
return &Fetcher{
interval: interval,
videoRepo: videoRepo,
feedReader: feedReader,
pipeline: make(chan *model.Video),
needsMetadata: make(chan *model.Video),
}
}
func (f *Fetcher) Run() {
go f.ReadFeeds()
go f.MetadataFetcher()
for {
select {
case video := <-f.pipeline:
switch video.Status {
case model.STATUS_NEW:
f.needsMetadata <- video
}
}
}
}
func (f *Fetcher) ReadFeeds() {
ticker := time.NewTicker(f.interval)
for range ticker.C {
entries, err := f.feedReader.Unread()
if err != nil {
log.Println(err)
}
for _, entry := range entries {
video := &model.Video{
ID: uuid.New(),
Status: model.STATUS_NEW,
YoutubeID: entry.YouTubeID,
// feed id
}
if err := f.videoRepo.Save(video); err != nil {
log.Println(err)
continue
}
f.pipeline <- video
if err := f.feedReader.MarkRead(entry.EntryID); err != nil {
log.Println(err)
}
}
}
}
func (f *Fetcher) MetadataFetcher() {
buffer := []*model.Video{}
timeout := time.NewTimer(10 * time.Second)
fetch := make(chan []*model.Video)
go func() {
for videos := range fetch {
fmt.Println("MD Fetching metadata")
fmt.Printf("%d videos to fetch\n", len(videos))
}
}()
for {
select {
case video := <-f.needsMetadata:
timeout.Reset(10 * time.Second)
buffer = append(buffer, video)
if len(buffer) >= 10 {
batch := make([]*model.Video, len(buffer))
copy(batch, buffer)
fetch <- batch
buffer = []*model.Video{}
}
case <-timeout.C:
if len(buffer) == 0 {
continue
}
batch := make([]*model.Video, len(buffer))
copy(batch, buffer)
fetch <- batch
buffer = []*model.Video{}
}
}
}

63
fetcher/miniflux.go Normal file
View File

@ -0,0 +1,63 @@
package fetcher
import (
"miniflux.app/client"
"strings"
)
type Entry struct {
MinifluxEntryID string
MinifluxFeedID string
MinifluxURL string
Title string
Description string
}
type MinifluxInfo struct {
Endpoint string
ApiKey string
}
type Miniflux struct {
client *client.Client
}
func NewMiniflux(mflInfo MinifluxInfo) *Miniflux {
return &Miniflux{
client: client.New(mflInfo.Endpoint, mflInfo.ApiKey),
}
}
func (m *Miniflux) Unread() ([]FeedEntry, error) {
result, err := m.client.Entries(&client.Filter{Status: "unread"})
if err != nil {
return []FeedEntry{}, err
}
entries := []FeedEntry{}
for _, entry := range result.Entries {
entries = append(entries, FeedEntry{
EntryID: entry.ID,
FeedID: entry.FeedID,
YouTubeID: strings.TrimPrefix(entry.URL, "https://www.youtube.com/watch?v="),
})
// ID: uuid.New(),
// Status: model.STATUS_NEW,
// YoutubeURL: entry.URL,
// FeedID: strconv.Itoa(int(entry.ID)),
// Title: entry.Title,
// Description: entry.Content,
//})
}
return entries, nil
}
func (m *Miniflux) MarkRead(entryID int64) error {
if err := m.client.UpdateEntries([]int64{entryID}, "read"); err != nil {
return err
}
return nil
}

View File

@ -6,15 +6,14 @@ type Status string
const (
STATUS_NEW Status = "new"
STATUS_NEEDS_SUMMARY Status = "needs_summary"
STATUS_READY Status = "ready"
)
type Video struct {
ID uuid.UUID
Status Status
YoutubeURL string
FeedID string
YoutubeID string
FeedID uuid.UUID
Title string
Description string
Summary string

View File

@ -1,7 +1,7 @@
package main
import (
"ewintr.nl/yogai/fetch"
"ewintr.nl/yogai/fetcher"
"ewintr.nl/yogai/storage"
"fmt"
"os"
@ -23,7 +23,7 @@ func main() {
}
videoRepo := storage.NewPostgresVideoRepository(postgres)
mflx := fetch.NewMiniflux(fetch.MinifluxInfo{
mflx := fetcher.NewMiniflux(fetcher.MinifluxInfo{
Endpoint: getParam("MINIFLUX_ENDPOINT", "http://localhost/v1"),
ApiKey: getParam("MINIFLUX_APIKEY", ""),
})
@ -34,7 +34,7 @@ func main() {
os.Exit(1)
}
fetcher := fetch.NewFetch(videoRepo, mflx, fetchInterval)
fetcher := fetcher.NewFetch(videoRepo, mflx, fetchInterval)
go fetcher.Run()
done := make(chan os.Signal)

View File

@ -42,27 +42,27 @@ func NewPostgresVideoRepository(postgres *Postgres) *PostgresVideoRepository {
}
func (p *PostgresVideoRepository) Save(v *model.Video) error {
query := `INSERT INTO video (id, status, youtube_url, feed_id, title, description)
query := `INSERT INTO video (id, status, youtube_id, feed_id, title, description)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (id)
DO UPDATE SET
id = EXCLUDED.id,
status = EXCLUDED.status,
youtube_url = EXCLUDED.youtube_url,
youtube_id = EXCLUDED.youtube_id,
feed_id = EXCLUDED.feed_id,
title = EXCLUDED.title,
description = EXCLUDED.description;`
_, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeURL, v.FeedID, v.Title, v.Description)
_, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeID, v.FeedID, v.Title, v.Description)
return err
}
var pgMigration = []string{
`CREATE TYPE video_status AS ENUM ('new', 'needs_summary', 'ready')`,
`CREATE TYPE video_status AS ENUM ('new', 'ready')`,
`CREATE TABLE video (
id uuid PRIMARY KEY,
status video_status NOT NULL,
youtube_url VARCHAR(255) NOT NULL,
youtube_id VARCHAR(255) NOT NULL,
title VARCHAR(255) NOT NULL,
feed_id VARCHAR(255) NOT NULL,
description TEXT,