From f4c856df68094cdaa99a20ffb1ccc2533db3685b Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Thu, 6 Jul 2023 13:37:55 +0200 Subject: [PATCH] multiple pipelines --- process/processor.go | 4 ++++ service.go | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/process/processor.go b/process/processor.go index d2db65a..dd44447 100644 --- a/process/processor.go +++ b/process/processor.go @@ -65,6 +65,10 @@ func (p *Pipeline) Process(ctx context.Context, video *model.Video) { next := p.procs.Next(video) if next == nil { p.logger.Info("no more processors for video", slog.String("video", string(video.YoutubeID))) + video.Status = model.StatusReady + if err := p.relStorage.Save(video); err != nil { + p.logger.Error("failed to save video in rel db", slog.String("video", string(video.YoutubeID)), slog.String("error", err.Error())) + } return } diff --git a/service.go b/service.go index 838c61d..63be285 100644 --- a/service.go +++ b/service.go @@ -78,8 +78,9 @@ func main() { logger.Info("fetch service started") procs := process.NewProcessors(openAIClient) - pipeline := process.NewPipeline(fetcher.Out(), procs, videoRelRepo, wvClient, logger) - go pipeline.Run() + for i := 0; i < 4; i++ { + go process.NewPipeline(fetcher.Out(), procs, videoRelRepo, wvClient, logger.With(slog.Int("pipeline", i))).Run() + } logger.Info("processing service started") port, err := strconv.Atoi(getParam("API_PORT", "8080"))