yogai/service.go

107 lines
2.9 KiB
Go
Raw Normal View History

2023-05-06 12:08:34 +02:00
package main
import (
2023-05-10 20:08:45 +02:00
"context"
2023-05-13 13:03:48 +02:00
"fmt"
"net/http"
2023-05-06 12:08:34 +02:00
"os"
2023-05-08 15:53:06 +02:00
"os/signal"
2023-05-13 13:03:48 +02:00
"strconv"
2023-05-08 15:53:06 +02:00
"time"
2023-05-27 14:36:22 +02:00
2023-07-06 13:25:51 +02:00
"ewintr.nl/yogai/fetch"
2023-05-27 14:36:22 +02:00
"ewintr.nl/yogai/handler"
2023-07-06 13:25:51 +02:00
"ewintr.nl/yogai/process"
2023-05-27 14:36:22 +02:00
"ewintr.nl/yogai/storage"
2023-07-06 13:25:51 +02:00
"github.com/sashabaranov/go-openai"
2023-05-27 14:36:22 +02:00
"golang.org/x/exp/slog"
"google.golang.org/api/option"
"google.golang.org/api/youtube/v3"
2023-05-06 12:08:34 +02:00
)
func main() {
2023-05-25 12:27:47 +02:00
2023-05-10 20:08:45 +02:00
ctx := context.Background()
2023-07-06 13:58:12 +02:00
logger := slog.New(slog.NewTextHandler(os.Stdout))
2023-05-13 13:03:48 +02:00
2023-05-08 15:53:06 +02:00
postgres, err := storage.NewPostgres(storage.PostgresInfo{
2023-05-06 12:08:34 +02:00
Host: getParam("POSTGRES_HOST", "localhost"),
Port: getParam("POSTGRES_PORT", "5432"),
User: getParam("POSTGRES_USER", "yogai"),
Password: getParam("POSTGRES_PASSWORD", "yogai"),
Database: getParam("POSTGRES_DB", "yogai"),
2023-05-08 15:53:06 +02:00
})
2023-05-06 12:08:34 +02:00
if err != nil {
2023-05-10 19:27:31 +02:00
logger.Error("unable to connect to postgres", err)
2023-05-06 12:08:34 +02:00
os.Exit(1)
}
2023-07-06 13:25:51 +02:00
videoRelRepo := storage.NewPostgresVideoRepository(postgres)
feedRelRepo := storage.NewPostgresFeedRepository(postgres)
2023-05-08 15:53:06 +02:00
2023-07-06 13:25:51 +02:00
mflxClient := fetch.NewMiniflux(fetch.MinifluxInfo{
2023-05-08 15:53:06 +02:00
Endpoint: getParam("MINIFLUX_ENDPOINT", "http://localhost/v1"),
ApiKey: getParam("MINIFLUX_APIKEY", ""),
})
fetchInterval, err := time.ParseDuration(getParam("FETCH_INTERVAL", "1m"))
2023-05-06 12:08:34 +02:00
if err != nil {
2023-05-10 19:27:31 +02:00
logger.Error("unable to parse fetch interval", err)
2023-05-06 12:08:34 +02:00
os.Exit(1)
}
2023-05-06 14:03:14 +02:00
2023-07-06 13:25:51 +02:00
yt, err := youtube.NewService(ctx, option.WithAPIKey(getParam("YOUTUBE_API_KEY", "")))
2023-05-10 20:08:45 +02:00
if err != nil {
logger.Error("unable to create youtube service", err)
os.Exit(1)
}
2023-07-06 13:25:51 +02:00
ytClient := fetch.NewYoutube(yt)
2023-05-10 20:08:45 +02:00
2023-07-04 19:56:44 +02:00
openaiKey := getParam("OPENAI_API_KEY", "")
2023-07-06 13:25:51 +02:00
openAIClient := openai.NewClient(openaiKey)
2023-05-13 12:53:37 +02:00
2023-07-04 19:56:44 +02:00
wvResetSchema := getParam("WEAVIATE_RESET_SCHEMA", "false") == "true"
2023-07-06 13:25:51 +02:00
wvClient, err := storage.NewWeaviate(getParam("WEAVIATE_HOST", ""), getParam("WEAVIATE_API_KEY", ""), openaiKey)
2023-07-04 19:56:44 +02:00
if err != nil {
logger.Error("unable to create weaviate client", err)
os.Exit(1)
}
if wvResetSchema {
logger.Info("resetting weaviate schema")
2023-07-06 13:25:51 +02:00
if err := wvClient.ResetSchema(); err != nil {
2023-07-04 19:56:44 +02:00
logger.Error("unable to reset weaviate schema", err)
os.Exit(1)
}
}
2023-07-06 13:25:51 +02:00
fetcher := fetch.NewFetch(feedRelRepo, videoRelRepo, ytClient, mflxClient, fetchInterval, ytClient, logger)
go fetcher.Run()
2023-05-13 13:03:48 +02:00
logger.Info("fetch service started")
2023-07-06 13:25:51 +02:00
procs := process.NewProcessors(openAIClient)
2023-07-06 13:37:55 +02:00
for i := 0; i < 4; i++ {
go process.NewPipeline(fetcher.Out(), procs, videoRelRepo, wvClient, logger.With(slog.Int("pipeline", i))).Run()
}
2023-07-06 13:25:51 +02:00
logger.Info("processing service started")
2023-05-13 13:03:48 +02:00
port, err := strconv.Atoi(getParam("API_PORT", "8080"))
if err != nil {
logger.Error("invalid port", err)
os.Exit(1)
}
2023-07-06 13:25:51 +02:00
go http.ListenAndServe(fmt.Sprintf(":%d", port), handler.NewServer(videoRelRepo, logger))
2023-05-13 13:03:48 +02:00
logger.Info("http server started")
2023-05-08 15:53:06 +02:00
done := make(chan os.Signal)
signal.Notify(done, os.Interrupt)
<-done
2023-05-10 19:27:31 +02:00
logger.Info("service stopped")
2023-05-06 12:08:34 +02:00
}
func getParam(param, def string) string {
if val, ok := os.LookupEnv(param); ok {
return val
}
return def
}