added logger
This commit is contained in:
parent
4d8d8d1a3d
commit
c0b84948ba
|
@ -3,9 +3,8 @@ package fetcher
|
||||||
import (
|
import (
|
||||||
"ewintr.nl/yogai/model"
|
"ewintr.nl/yogai/model"
|
||||||
"ewintr.nl/yogai/storage"
|
"ewintr.nl/yogai/storage"
|
||||||
"fmt"
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"log"
|
"golang.org/x/exp/slog"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,15 +14,17 @@ type Fetcher struct {
|
||||||
feedReader FeedReader
|
feedReader FeedReader
|
||||||
pipeline chan *model.Video
|
pipeline chan *model.Video
|
||||||
needsMetadata chan *model.Video
|
needsMetadata chan *model.Video
|
||||||
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration) *Fetcher {
|
func NewFetch(videoRepo storage.VideoRepository, feedReader FeedReader, interval time.Duration, logger *slog.Logger) *Fetcher {
|
||||||
return &Fetcher{
|
return &Fetcher{
|
||||||
interval: interval,
|
interval: interval,
|
||||||
videoRepo: videoRepo,
|
videoRepo: videoRepo,
|
||||||
feedReader: feedReader,
|
feedReader: feedReader,
|
||||||
pipeline: make(chan *model.Video),
|
pipeline: make(chan *model.Video),
|
||||||
needsMetadata: make(chan *model.Video),
|
needsMetadata: make(chan *model.Video),
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,6 +32,7 @@ func (f *Fetcher) Run() {
|
||||||
go f.ReadFeeds()
|
go f.ReadFeeds()
|
||||||
go f.MetadataFetcher()
|
go f.MetadataFetcher()
|
||||||
|
|
||||||
|
f.logger.Info("started pipeline")
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case video := <-f.pipeline:
|
case video := <-f.pipeline:
|
||||||
|
@ -43,12 +45,19 @@ func (f *Fetcher) Run() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Fetcher) ReadFeeds() {
|
func (f *Fetcher) ReadFeeds() {
|
||||||
|
f.logger.Info("started feed reader")
|
||||||
ticker := time.NewTicker(f.interval)
|
ticker := time.NewTicker(f.interval)
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
entries, err := f.feedReader.Unread()
|
entries, err := f.feedReader.Unread()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
f.logger.Error("failed to fetch unread entries", err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
f.logger.Info("fetched unread entries", slog.Int("count", len(entries)))
|
||||||
|
if len(entries) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
video := &model.Video{
|
video := &model.Video{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
|
@ -57,26 +66,29 @@ func (f *Fetcher) ReadFeeds() {
|
||||||
// feed id
|
// feed id
|
||||||
}
|
}
|
||||||
if err := f.videoRepo.Save(video); err != nil {
|
if err := f.videoRepo.Save(video); err != nil {
|
||||||
log.Println(err)
|
f.logger.Error("failed to save video", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
f.pipeline <- video
|
f.pipeline <- video
|
||||||
if err := f.feedReader.MarkRead(entry.EntryID); err != nil {
|
if err := f.feedReader.MarkRead(entry.EntryID); err != nil {
|
||||||
log.Println(err)
|
f.logger.Error("failed to mark entry as read", err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Fetcher) MetadataFetcher() {
|
func (f *Fetcher) MetadataFetcher() {
|
||||||
|
f.logger.Info("started metadata fetcher")
|
||||||
|
|
||||||
buffer := []*model.Video{}
|
buffer := []*model.Video{}
|
||||||
timeout := time.NewTimer(10 * time.Second)
|
timeout := time.NewTimer(10 * time.Second)
|
||||||
fetch := make(chan []*model.Video)
|
fetch := make(chan []*model.Video)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for videos := range fetch {
|
for videos := range fetch {
|
||||||
fmt.Println("MD Fetching metadata")
|
f.logger.Info("fetching metadata", slog.Int("count", len(videos)))
|
||||||
fmt.Printf("%d videos to fetch\n", len(videos))
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -5,5 +5,6 @@ go 1.20
|
||||||
require (
|
require (
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/lib/pq v1.10.9
|
github.com/lib/pq v1.10.9
|
||||||
|
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
|
||||||
miniflux.app v0.0.0-20230505000442-88062ab9f959
|
miniflux.app v0.0.0-20230505000442-88062ab9f959
|
||||||
)
|
)
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -2,5 +2,7 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
|
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o=
|
||||||
|
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
|
||||||
miniflux.app v0.0.0-20230505000442-88062ab9f959 h1:YzOQqdFtI6HYRmz8+xVZbqcrVoaC3X4x/pB2DDID//c=
|
miniflux.app v0.0.0-20230505000442-88062ab9f959 h1:YzOQqdFtI6HYRmz8+xVZbqcrVoaC3X4x/pB2DDID//c=
|
||||||
miniflux.app v0.0.0-20230505000442-88062ab9f959/go.mod h1:VWCACdWTG2GzFFifM2i9hVK91mZxmPvZCdHzYGheNOE=
|
miniflux.app v0.0.0-20230505000442-88062ab9f959/go.mod h1:VWCACdWTG2GzFFifM2i9hVK91mZxmPvZCdHzYGheNOE=
|
||||||
|
|
12
service.go
12
service.go
|
@ -3,13 +3,14 @@ package main
|
||||||
import (
|
import (
|
||||||
"ewintr.nl/yogai/fetcher"
|
"ewintr.nl/yogai/fetcher"
|
||||||
"ewintr.nl/yogai/storage"
|
"ewintr.nl/yogai/storage"
|
||||||
"fmt"
|
"golang.org/x/exp/slog"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
logger := slog.New(slog.NewTextHandler(os.Stderr))
|
||||||
postgres, err := storage.NewPostgres(storage.PostgresInfo{
|
postgres, err := storage.NewPostgres(storage.PostgresInfo{
|
||||||
Host: getParam("POSTGRES_HOST", "localhost"),
|
Host: getParam("POSTGRES_HOST", "localhost"),
|
||||||
Port: getParam("POSTGRES_PORT", "5432"),
|
Port: getParam("POSTGRES_PORT", "5432"),
|
||||||
|
@ -18,7 +19,7 @@ func main() {
|
||||||
Database: getParam("POSTGRES_DB", "yogai"),
|
Database: getParam("POSTGRES_DB", "yogai"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
logger.Error("unable to connect to postgres", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
videoRepo := storage.NewPostgresVideoRepository(postgres)
|
videoRepo := storage.NewPostgresVideoRepository(postgres)
|
||||||
|
@ -30,18 +31,19 @@ func main() {
|
||||||
|
|
||||||
fetchInterval, err := time.ParseDuration(getParam("FETCH_INTERVAL", "1m"))
|
fetchInterval, err := time.ParseDuration(getParam("FETCH_INTERVAL", "1m"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
logger.Error("unable to parse fetch interval", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
fetcher := fetcher.NewFetch(videoRepo, mflx, fetchInterval)
|
fetcher := fetcher.NewFetch(videoRepo, mflx, fetchInterval, logger)
|
||||||
go fetcher.Run()
|
go fetcher.Run()
|
||||||
|
logger.Info("service started")
|
||||||
|
|
||||||
done := make(chan os.Signal)
|
done := make(chan os.Signal)
|
||||||
signal.Notify(done, os.Interrupt)
|
signal.Notify(done, os.Interrupt)
|
||||||
<-done
|
<-done
|
||||||
|
|
||||||
//fmt.Println(err)
|
logger.Info("service stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
func getParam(param, def string) string {
|
func getParam(param, def string) string {
|
||||||
|
|
Loading…
Reference in New Issue