yogai/fetcher/fetcher.go

117 lines
2.5 KiB
Go
Raw Normal View History

2023-05-10 16:28:45 +02:00
package fetcher
import (
"ewintr.nl/yogai/model"
"ewintr.nl/yogai/storage"
"github.com/google/uuid"
2023-05-10 19:27:31 +02:00
"golang.org/x/exp/slog"
2023-05-10 16:28:45 +02:00
"time"
)
type Fetcher struct {
interval time.Duration
videoRepo storage.VideoRepository
feedReader FeedReader
pipeline chan *model.Video
needsMetadata chan *model.Video
2023-05-10 19:27:31 +02:00
logger *slog.Logger
2023-05-10 16:28:45 +02:00
}
2023-05-10 19:27:31 +02:00
func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration, logger *slog.Logger) *Fetcher {
2023-05-10 16:28:45 +02:00
return &Fetcher{
interval: interval,
videoRepo: videoRepo,
feedReader: feedReader,
pipeline: make(chan *model.Video),
needsMetadata: make(chan *model.Video),
2023-05-10 19:27:31 +02:00
logger: logger,
2023-05-10 16:28:45 +02:00
}
}
func (f *Fetcher) Run() {
go f.ReadFeeds()
go f.MetadataFetcher()
2023-05-10 19:27:31 +02:00
f.logger.Info("started pipeline")
2023-05-10 16:28:45 +02:00
for {
select {
case video := <-f.pipeline:
switch video.Status {
case model.STATUS_NEW:
f.needsMetadata <- video
}
}
}
}
func (f *Fetcher) ReadFeeds() {
2023-05-10 19:27:31 +02:00
f.logger.Info("started feed reader")
2023-05-10 16:28:45 +02:00
ticker := time.NewTicker(f.interval)
for range ticker.C {
entries, err := f.feedReader.Unread()
if err != nil {
2023-05-10 19:27:31 +02:00
f.logger.Error("failed to fetch unread entries", err)
continue
2023-05-10 16:28:45 +02:00
}
2023-05-10 19:27:31 +02:00
f.logger.Info("fetched unread entries", slog.Int("count", len(entries)))
if len(entries) == 0 {
continue
}
2023-05-10 16:28:45 +02:00
for _, entry := range entries {
video := &model.Video{
ID: uuid.New(),
Status: model.STATUS_NEW,
YoutubeID: entry.YouTubeID,
// feed id
}
if err := f.videoRepo.Save(video); err != nil {
2023-05-10 19:27:31 +02:00
f.logger.Error("failed to save video", err)
2023-05-10 16:28:45 +02:00
continue
}
f.pipeline <- video
if err := f.feedReader.MarkRead(entry.EntryID); err != nil {
2023-05-10 19:27:31 +02:00
f.logger.Error("failed to mark entry as read", err)
continue
2023-05-10 16:28:45 +02:00
}
}
}
}
func (f *Fetcher) MetadataFetcher() {
2023-05-10 19:27:31 +02:00
f.logger.Info("started metadata fetcher")
2023-05-10 16:28:45 +02:00
buffer := []*model.Video{}
timeout := time.NewTimer(10 * time.Second)
fetch := make(chan []*model.Video)
go func() {
for videos := range fetch {
2023-05-10 19:27:31 +02:00
f.logger.Info("fetching metadata", slog.Int("count", len(videos)))
2023-05-10 16:28:45 +02:00
}
}()
for {
select {
case video := <-f.needsMetadata:
timeout.Reset(10 * time.Second)
buffer = append(buffer, video)
if len(buffer) >= 10 {
batch := make([]*model.Video, len(buffer))
copy(batch, buffer)
fetch <- batch
buffer = []*model.Video{}
}
case <-timeout.C:
if len(buffer) == 0 {
continue
}
batch := make([]*model.Video, len(buffer))
copy(batch, buffer)
fetch <- batch
buffer = []*model.Video{}
}
}
}