fetch historical videos from channel

This commit is contained in:
Erik Winter 2023-05-27 14:36:22 +02:00
parent f58afaf70b
commit b892066abf
12 changed files with 329 additions and 72 deletions

View File

@ -1,9 +1,16 @@
package fetcher
import "ewintr.nl/yogai/model"
type FeedEntry struct {
EntryID int64
FeedID int64
YouTubeID string
EntryID int64
FeedID int64
YoutubeChannelID string
YoutubeID string
}
type ChannelReader interface {
Search(channelID model.YoutubeChannelID, pageToken string) ([]model.YoutubeVideoID, string, error)
}
type FeedReader interface {

View File

@ -1,33 +1,40 @@
package fetcher
import (
"time"
"ewintr.nl/yogai/model"
"ewintr.nl/yogai/storage"
"github.com/google/uuid"
"golang.org/x/exp/slog"
"time"
)
type Fetcher struct {
interval time.Duration
feedRepo storage.FeedRepository
videoRepo storage.VideoRepository
feedReader FeedReader
channelReader ChannelReader
metadataFetcher MetadataFetcher
summaryFetcher SummaryFetcher
pipeline chan *model.Video
feedPipeline chan *model.Feed
videoPipeline chan *model.Video
needsMetadata chan *model.Video
needsSummary chan *model.Video
logger *slog.Logger
}
func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration, metadataFetcher MetadataFetcher, summaryFetcher SummaryFetcher, logger *slog.Logger) *Fetcher {
func NewFetch(feedRepo storage.FeedRepository, videoRepo storage.VideoRepository, channelReader ChannelReader, feedReader FeedReader, interval time.Duration, metadataFetcher MetadataFetcher, summaryFetcher SummaryFetcher, logger *slog.Logger) *Fetcher {
return &Fetcher{
interval: interval,
feedRepo: feedRepo,
videoRepo: videoRepo,
channelReader: channelReader,
feedReader: feedReader,
metadataFetcher: metadataFetcher,
summaryFetcher: summaryFetcher,
pipeline: make(chan *model.Video, 10),
feedPipeline: make(chan *model.Feed, 10),
videoPipeline: make(chan *model.Video, 10),
needsMetadata: make(chan *model.Video, 10),
needsSummary: make(chan *model.Video, 10),
logger: logger,
@ -35,15 +42,18 @@ func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval
}
func (f *Fetcher) Run() {
go f.FetchHistoricalVideos()
go f.FindNewFeeds()
go f.ReadFeeds()
go f.MetadataFetcher()
go f.SummaryFetcher()
go f.FindUnprocessed()
f.logger.Info("started pipeline")
f.logger.Info("started videoPipeline")
for {
select {
case video := <-f.pipeline:
case video := <-f.videoPipeline:
switch video.Status {
case model.StatusNew:
f.needsMetadata <- video
@ -63,6 +73,63 @@ func (f *Fetcher) Run() {
}
}
func (f *Fetcher) FindNewFeeds() {
f.logger.Info("looking for new feeds")
feeds, err := f.feedRepo.FindByStatus(model.FeedStatusNew)
if err != nil {
f.logger.Error("failed to fetch feeds", err)
return
}
for _, feed := range feeds {
f.feedPipeline <- feed
}
}
func (f *Fetcher) FetchHistoricalVideos() {
f.logger.Info("started historical video fetcher")
for feed := range f.feedPipeline {
f.logger.Info("fetching historical videos", slog.String("channelid", string(feed.YoutubeChannelID)))
token := ""
for {
token = f.FetchHistoricalVideoPage(feed.YoutubeChannelID, token)
if token == "" {
break
}
}
feed.Status = model.FeedStatusReady
if err := f.feedRepo.Save(feed); err != nil {
f.logger.Error("failed to save feed", err)
continue
}
}
}
func (f *Fetcher) FetchHistoricalVideoPage(channelID model.YoutubeChannelID, pageToken string) string {
f.logger.Info("fetching historical video page", slog.String("channelid", string(channelID)), slog.String("pagetoken", pageToken))
ytIDs, pageToken, err := f.channelReader.Search(channelID, pageToken)
if err != nil {
f.logger.Error("failed to fetch channel", err)
return ""
}
for _, ytID := range ytIDs {
video := &model.Video{
ID: uuid.New(),
Status: model.StatusNew,
YoutubeID: ytID,
YoutubeChannelID: channelID,
}
if err := f.videoRepo.Save(video); err != nil {
f.logger.Error("failed to save video", err)
continue
}
f.videoPipeline <- video
}
f.logger.Info("fetched historical video page", slog.String("channelid", string(channelID)), slog.String("pagetoken", pageToken), slog.Int("count", len(ytIDs)))
return pageToken
}
func (f *Fetcher) FindUnprocessed() {
f.logger.Info("looking for unprocessed videos")
videos, err := f.videoRepo.FindByStatus(model.StatusNew, model.StatusHasMetadata)
@ -72,7 +139,7 @@ func (f *Fetcher) FindUnprocessed() {
}
f.logger.Info("found unprocessed videos", slog.Int("count", len(videos)))
for _, video := range videos {
f.pipeline <- video
f.videoPipeline <- video
}
}
@ -92,16 +159,16 @@ func (f *Fetcher) ReadFeeds() {
for _, entry := range entries {
video := &model.Video{
ID: uuid.New(),
Status: model.StatusNew,
YoutubeID: entry.YouTubeID,
// feed id
ID: uuid.New(),
Status: model.StatusNew,
YoutubeID: model.YoutubeVideoID(entry.YoutubeID),
YoutubeChannelID: model.YoutubeChannelID(entry.YoutubeChannelID),
}
if err := f.videoRepo.Save(video); err != nil {
f.logger.Error("failed to save video", err)
continue
}
f.pipeline <- video
f.videoPipeline <- video
if err := f.feedReader.MarkRead(entry.EntryID); err != nil {
f.logger.Error("failed to mark entry as read", err)
continue
@ -120,7 +187,7 @@ func (f *Fetcher) MetadataFetcher() {
go func() {
for videos := range fetch {
f.logger.Info("fetching metadata", slog.Int("count", len(videos)))
ids := make([]string, 0, len(videos))
ids := make([]model.YoutubeVideoID, 0, len(videos))
for _, video := range videos {
ids = append(ids, video.YoutubeID)
}
@ -148,7 +215,7 @@ func (f *Fetcher) MetadataFetcher() {
case video := <-f.needsMetadata:
timeout.Reset(10 * time.Second)
buffer = append(buffer, video)
if len(buffer) >= 10 {
if len(buffer) >= 50 {
batch := make([]*model.Video, len(buffer))
copy(batch, buffer)
fetch <- batch
@ -177,7 +244,7 @@ func (f *Fetcher) SummaryFetcher() {
}
video.Status = model.StatusHasSummary
f.logger.Info("fetched summary", slog.String("id", video.ID.String()))
f.pipeline <- video
f.videoPipeline <- video
}
}
}

View File

@ -1,10 +1,12 @@
package fetcher
import "ewintr.nl/yogai/model"
type Metadata struct {
Title string
Description string
}
type MetadataFetcher interface {
FetchMetadata([]string) (map[string]Metadata, error)
FetchMetadata([]model.YoutubeVideoID) (map[model.YoutubeVideoID]Metadata, error)
}

View File

@ -1,8 +1,9 @@
package fetcher
import (
"miniflux.app/client"
"strings"
"miniflux.app/client"
)
type Entry struct {
@ -31,24 +32,17 @@ func NewMiniflux(mflInfo MinifluxInfo) *Miniflux {
func (m *Miniflux) Unread() ([]FeedEntry, error) {
result, err := m.client.Entries(&client.Filter{Status: "unread"})
if err != nil {
return []FeedEntry{}, err
return nil, err
}
entries := []FeedEntry{}
entries := make([]FeedEntry, 0, len(result.Entries))
for _, entry := range result.Entries {
entries = append(entries, FeedEntry{
EntryID: entry.ID,
FeedID: entry.FeedID,
YouTubeID: strings.TrimPrefix(entry.URL, "https://www.youtube.com/watch?v="),
EntryID: entry.ID,
FeedID: entry.FeedID,
YoutubeChannelID: strings.TrimPrefix(entry.Feed.FeedURL, "https://www.youtube.com/feeds/videos.xml?channel_id="),
YoutubeID: strings.TrimPrefix(entry.URL, "https://www.youtube.com/watch?v="),
})
// ID: uuid.New(),
// Status: model.STATUS_NEW,
// YoutubeURL: entry.URL,
// FeedID: strconv.Itoa(int(entry.ID)),
// Title: entry.Title,
// Description: entry.Content,
//})
}
return entries, nil

View File

@ -1,8 +1,10 @@
package fetcher
import (
"google.golang.org/api/youtube/v3"
"strings"
"ewintr.nl/yogai/model"
"google.golang.org/api/youtube/v3"
)
type Youtube struct {
@ -13,19 +15,48 @@ func NewYoutube(client *youtube.Service) *Youtube {
return &Youtube{Client: client}
}
func (y *Youtube) FetchMetadata(ytIDs []string) (map[string]Metadata, error) {
call := y.Client.Videos.
List([]string{"snippet"}).
Id(strings.Join(ytIDs, ","))
func (y *Youtube) Search(channelID model.YoutubeChannelID, pageToken string) ([]model.YoutubeVideoID, string, error) {
call := y.Client.Search.
List([]string{"id"}).
MaxResults(50).
Type("video").
Order("date").
ChannelId(string(channelID))
if pageToken != "" {
call.PageToken(pageToken)
}
response, err := call.Do()
if err != nil {
return map[string]Metadata{}, err
return []model.YoutubeVideoID{}, "", err
}
mds := make(map[string]Metadata, len(response.Items))
ids := make([]model.YoutubeVideoID, len(response.Items))
for i, item := range response.Items {
ids[i] = model.YoutubeVideoID(item.Id.VideoId)
}
return ids, response.NextPageToken, nil
}
func (y *Youtube) FetchMetadata(ytIDs []model.YoutubeVideoID) (map[model.YoutubeVideoID]Metadata, error) {
strIDs := make([]string, len(ytIDs))
for i, id := range ytIDs {
strIDs[i] = string(id)
}
call := y.Client.Videos.
List([]string{"snippet"}).
Id(strings.Join(strIDs, ","))
response, err := call.Do()
if err != nil {
return map[model.YoutubeVideoID]Metadata{}, err
}
mds := make(map[model.YoutubeVideoID]Metadata, len(response.Items))
for _, item := range response.Items {
mds[item.Id] = Metadata{
mds[model.YoutubeVideoID(item.Id)] = Metadata{
Title: item.Snippet.Title,
Description: item.Snippet.Description,
}

View File

@ -1,6 +1,7 @@
package handler
import (
"ewintr.nl/yogai/storage"
"fmt"
"golang.org/x/exp/slog"
"miniflux.app/logger"
@ -15,9 +16,11 @@ type Server struct {
logger *slog.Logger
}
func NewServer(logger *slog.Logger) *Server {
func NewServer(videoRepo storage.VideoRepository, logger *slog.Logger) *Server {
return &Server{
apis: map[string]http.Handler{},
apis: map[string]http.Handler{
"video": NewVideoAPI(videoRepo, logger),
},
logger: logger,
}
}

71
handler/video.go Normal file
View File

@ -0,0 +1,71 @@
package handler
import (
"context"
"encoding/json"
"fmt"
"net/http"
"ewintr.nl/yogai/model"
"ewintr.nl/yogai/storage"
"golang.org/x/exp/slog"
)
type VideoAPI struct {
videoRepo storage.VideoRepository
logger *slog.Logger
}
func NewVideoAPI(videoRepo storage.VideoRepository, logger *slog.Logger) *VideoAPI {
return &VideoAPI{
videoRepo: videoRepo,
logger: logger,
}
}
func (v *VideoAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
videoID, _ := ShiftPath(r.URL.Path)
switch {
case r.Method == http.MethodGet && videoID == "":
v.List(w, r)
default:
Error(w, http.StatusNotFound, "not found", fmt.Errorf("method %s with subpath %q was not registered in the repository api", r.Method, videoID))
}
}
func (v *VideoAPI) List(w http.ResponseWriter, r *http.Request) {
video, err := v.videoRepo.FindByStatus(model.StatusReady)
if err != nil {
v.returnErr(r.Context(), w, http.StatusInternalServerError, "could not list repositories", err)
return
}
type respVideo struct {
YoutubeID string `json:"youtube_url"`
Title string `json:"title"`
Summary string `json:"summary"`
}
var resp []respVideo
for _, v := range video {
resp = append(resp, respVideo{
YoutubeID: string(v.YoutubeID),
Title: v.Title,
Summary: v.Summary,
})
}
jsonBody, err := json.Marshal(resp)
if err != nil {
v.returnErr(r.Context(), w, http.StatusInternalServerError, "could not marshal response", err)
return
}
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, string(jsonBody))
}
func (v *VideoAPI) returnErr(_ context.Context, w http.ResponseWriter, status int, message string, err error, details ...any) {
v.logger.Error(message, slog.String("err", err.Error()), slog.String("details", fmt.Sprintf("%+v", details)))
Error(w, status, message, err, details...)
}

17
model/feed.go Normal file
View File

@ -0,0 +1,17 @@
package model
import "github.com/google/uuid"
type FeedStatus string
const (
FeedStatusNew FeedStatus = "new"
FeedStatusReady FeedStatus = "ready"
)
type Feed struct {
ID uuid.UUID
Status FeedStatus
Title string
YoutubeChannelID YoutubeChannelID
}

View File

@ -2,21 +2,25 @@ package model
import "github.com/google/uuid"
type Status string
type VideoStatus string
const (
StatusNew Status = "new"
StatusHasMetadata Status = "has_metadata"
StatusHasSummary Status = "has_summary"
StatusReady Status = "ready"
StatusNew VideoStatus = "new"
StatusHasMetadata VideoStatus = "has_metadata"
StatusHasSummary VideoStatus = "has_summary"
StatusReady VideoStatus = "ready"
)
type YoutubeVideoID string
type YoutubeChannelID string
type Video struct {
ID uuid.UUID
Status Status
YoutubeID string
FeedID uuid.UUID
Title string
Description string
Summary string
ID uuid.UUID
Status VideoStatus
YoutubeID YoutubeVideoID
YoutubeChannelID YoutubeChannelID
Title string
Description string
Summary string
}

View File

@ -2,18 +2,19 @@ package main
import (
"context"
"ewintr.nl/yogai/fetcher"
"ewintr.nl/yogai/handler"
"ewintr.nl/yogai/storage"
"fmt"
"golang.org/x/exp/slog"
"google.golang.org/api/option"
"google.golang.org/api/youtube/v3"
"net/http"
"os"
"os/signal"
"strconv"
"time"
"ewintr.nl/yogai/fetcher"
"ewintr.nl/yogai/handler"
"ewintr.nl/yogai/storage"
"golang.org/x/exp/slog"
"google.golang.org/api/option"
"google.golang.org/api/youtube/v3"
)
func main() {
@ -33,6 +34,7 @@ func main() {
os.Exit(1)
}
videoRepo := storage.NewPostgresVideoRepository(postgres)
feedRepo := storage.NewPostgresFeedRepository(postgres)
mflx := fetcher.NewMiniflux(fetcher.MinifluxInfo{
Endpoint: getParam("MINIFLUX_ENDPOINT", "http://localhost/v1"),
@ -54,8 +56,7 @@ func main() {
openAIClient := fetcher.NewOpenAI(getParam("OPENAI_API_KEY", ""))
fetcher := fetcher.NewFetch(videoRepo, mflx, fetchInterval, yt, openAIClient, logger)
go fetcher.Run()
go fetcher.NewFetch(feedRepo, videoRepo, yt, mflx, fetchInterval, yt, openAIClient, logger).Run()
logger.Info("fetch service started")
port, err := strconv.Atoi(getParam("API_PORT", "8080"))
@ -63,7 +64,7 @@ func main() {
logger.Error("invalid port", err)
os.Exit(1)
}
go http.ListenAndServe(fmt.Sprintf(":%d", port), handler.NewServer(logger))
go http.ListenAndServe(fmt.Sprintf(":%d", port), handler.NewServer(videoRepo, logger))
logger.Info("http server started")
done := make(chan os.Signal)

View File

@ -2,8 +2,9 @@ package storage
import (
"database/sql"
"ewintr.nl/yogai/model"
"fmt"
"ewintr.nl/yogai/model"
"github.com/lib/pq"
_ "github.com/lib/pq"
)
@ -43,24 +44,24 @@ func NewPostgresVideoRepository(postgres *Postgres) *PostgresVideoRepository {
}
func (p *PostgresVideoRepository) Save(v *model.Video) error {
query := `INSERT INTO video (id, status, youtube_id, feed_id, title, description, summary)
query := `INSERT INTO video (id, status, youtube_id, youtube_channel_id, title, description, summary)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id)
DO UPDATE SET
id = EXCLUDED.id,
status = EXCLUDED.status,
youtube_id = EXCLUDED.youtube_id,
feed_id = EXCLUDED.feed_id,
youtube_channel_id = EXCLUDED.youtube_channel_id,
title = EXCLUDED.title,
description = EXCLUDED.description,
summary = EXCLUDED.summary;`
_, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeID, v.FeedID, v.Title, v.Description, v.Summary)
_, err := p.db.Exec(query, v.ID, v.Status, v.YoutubeID, v.YoutubeChannelID, v.Title, v.Description, v.Summary)
return err
}
func (p *PostgresVideoRepository) FindByStatus(statuses ...model.Status) ([]*model.Video, error) {
query := `SELECT id, status, youtube_id, feed_id, title, description, summary
func (p *PostgresVideoRepository) FindByStatus(statuses ...model.VideoStatus) ([]*model.Video, error) {
query := `SELECT id, status, youtube_channel_id, youtube_id, title, description, summary
FROM video
WHERE status = ANY($1)`
rows, err := p.db.Query(query, pq.Array(statuses))
@ -71,7 +72,7 @@ WHERE status = ANY($1)`
videos := []*model.Video{}
for rows.Next() {
v := &model.Video{}
if err := rows.Scan(&v.ID, &v.Status, &v.YoutubeID, &v.FeedID, &v.Title, &v.Description, &v.Summary); err != nil {
if err := rows.Scan(&v.ID, &v.Status, &v.YoutubeChannelID, &v.YoutubeID, &v.Title, &v.Description, &v.Summary); err != nil {
return nil, err
}
videos = append(videos, v)
@ -81,6 +82,50 @@ WHERE status = ANY($1)`
return videos, nil
}
type PostgresFeedRepository struct {
*Postgres
}
func NewPostgresFeedRepository(postgres *Postgres) *PostgresFeedRepository {
return &PostgresFeedRepository{postgres}
}
func (p *PostgresFeedRepository) Save(f *model.Feed) error {
query := `INSERT INTO feed (id, status, youtube_channel_id, title)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id)
DO UPDATE SET
id = EXCLUDED.id,
status = EXCLUDED.status,
youtube_channel_id = EXCLUDED.youtube_channel_id,
title = EXCLUDED.title;`
_, err := p.db.Exec(query, f.ID, f.Status, f.YoutubeChannelID, f.Title)
return err
}
func (p *PostgresFeedRepository) FindByStatus(statuses ...model.FeedStatus) ([]*model.Feed, error) {
query := `SELECT id, status, youtube_channel_id, title
FROM feed
WHERE status = ANY($1)`
rows, err := p.db.Query(query, pq.Array(statuses))
if err != nil {
return nil, err
}
feeds := []*model.Feed{}
for rows.Next() {
f := &model.Feed{}
if err := rows.Scan(&f.ID, &f.Status, &f.YoutubeChannelID, &f.Title); err != nil {
return nil, err
}
feeds = append(feeds, f)
}
rows.Close()
return feeds, nil
}
var pgMigration = []string{
`CREATE TYPE video_status AS ENUM ('new', 'ready')`,
`CREATE TABLE video (
@ -105,6 +150,16 @@ ALTER COLUMN summary SET DEFAULT '',
ALTER COLUMN summary SET NOT NULL,
ALTER COLUMN description SET DEFAULT '',
ALTER COLUMN description SET NOT NULL`,
`CREATE TYPE feed_status AS ENUM ('new', 'ready')`,
`CREATE TABLE feed (
id uuid PRIMARY KEY,
status feed_status NOT NULL,
youtube_channel_id VARCHAR(255) NOT NULL UNIQUE,
title VARCHAR(255) NOT NULL
)`,
`ALTER TABLE video
DROP COLUMN feed_id,
ADD COLUMN youtube_channel_id VARCHAR(255) NOT NULL REFERENCES feed(youtube_channel_id)`,
}
func (p *Postgres) migrate(wanted []string) error {

View File

@ -4,7 +4,12 @@ import (
"ewintr.nl/yogai/model"
)
type FeedRepository interface {
Save(feed *model.Feed) error
FindByStatus(statuses ...model.FeedStatus) ([]*model.Feed, error)
}
type VideoRepository interface {
Save(video *model.Video) error
FindByStatus(statuses ...model.Status) ([]*model.Video, error)
FindByStatus(statuses ...model.VideoStatus) ([]*model.Video, error)
}