From 16ca03e746675e1933dd4f7d1e0b3ec587ae05f9 Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Sat, 15 May 2021 11:46:03 +0200 Subject: [PATCH] mutex for processes --- cmd/daemon/service.go | 10 +++++++--- cmd/generate-recurring/main.go | 2 +- cmd/process-inbox/main.go | 2 +- internal/process/inbox.go | 15 +++++++++++++-- internal/process/inbox_test.go | 23 ++++++++--------------- internal/process/recur.go | 15 +++++++++++++-- internal/process/recur_test.go | 13 +++++-------- 7 files changed, 48 insertions(+), 32 deletions(-) diff --git a/cmd/daemon/service.go b/cmd/daemon/service.go index a80045d..5cd968a 100644 --- a/cmd/daemon/service.go +++ b/cmd/daemon/service.go @@ -51,7 +51,7 @@ func main() { func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger) { logger = logger.WithField("func", "run") - inboxTicker := time.NewTicker(30 * time.Second) + inboxTicker := time.NewTicker(10 * time.Second) recurTicker := time.NewTicker(time.Hour) oldToday := task.Today @@ -64,7 +64,9 @@ func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger) continue } - logger.WithField("count", result.Count).Info("finished processing inbox") + if result.Count > 0 { + logger.WithField("result", result).Info("finished processing inbox") + } case <-recurTicker.C: year, month, day := time.Now().Date() newToday := task.NewDate(year, int(month), day) @@ -80,7 +82,9 @@ func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger) continue } - logger.WithField("count", result.Count).Info("finished generating recurring tasks") + if result.Count > 0 { + logger.WithField("result", result).Info("finished generating recurring tasks") + } } } } diff --git a/cmd/generate-recurring/main.go b/cmd/generate-recurring/main.go index 58bfae9..7aaaf74 100644 --- a/cmd/generate-recurring/main.go +++ b/cmd/generate-recurring/main.go @@ -38,5 +38,5 @@ func main() { os.Exit(1) } - logger.WithField("count", result.Count).Info("finished generating recurring tasks") + logger.WithField("result", result).Info("finished generating recurring tasks") } diff --git a/cmd/process-inbox/main.go b/cmd/process-inbox/main.go index 71fa25d..7b00331 100644 --- a/cmd/process-inbox/main.go +++ b/cmd/process-inbox/main.go @@ -31,5 +31,5 @@ func main() { logger.WithErr(err).Error("unable to process inbox") os.Exit(1) } - logger.WithField("count", result.Count).Info("finished processing inbox") + logger.WithField("result", result).Info("finished processing inbox") } diff --git a/internal/process/inbox.go b/internal/process/inbox.go index 39ca635..ff3418e 100644 --- a/internal/process/inbox.go +++ b/internal/process/inbox.go @@ -3,12 +3,16 @@ package process import ( "errors" "fmt" + "sync" + "time" "git.ewintr.nl/gte/internal/task" ) var ( ErrInboxProcess = errors.New("could not process inbox") + + inboxLock sync.Mutex ) type Inbox struct { @@ -16,7 +20,8 @@ type Inbox struct { } type InboxResult struct { - Count int + Duration string `json:"duration"` + Count int `json:"count"` } func NewInbox(repo *task.TaskRepo) *Inbox { @@ -26,6 +31,11 @@ func NewInbox(repo *task.TaskRepo) *Inbox { } func (inbox *Inbox) Process() (*InboxResult, error) { + inboxLock.Lock() + defer inboxLock.Unlock() + + start := time.Now() + tasks, err := inbox.taskRepo.FindAll(task.FOLDER_INBOX) if err != nil { return &InboxResult{}, fmt.Errorf("%w: %v", ErrInboxProcess, err) @@ -47,6 +57,7 @@ func (inbox *Inbox) Process() (*InboxResult, error) { } return &InboxResult{ - Count: len(tasks), + Duration: time.Since(start).String(), + Count: len(tasks), }, nil } diff --git a/internal/process/inbox_test.go b/internal/process/inbox_test.go index 9507c55..8d85c69 100644 --- a/internal/process/inbox_test.go +++ b/internal/process/inbox_test.go @@ -11,17 +11,16 @@ import ( func TestInboxProcess(t *testing.T) { for _, tc := range []struct { - name string - messages map[string][]*mstore.Message - expResult *process.InboxResult - expMsgs map[string][]*mstore.Message + name string + messages map[string][]*mstore.Message + expCount int + expMsgs map[string][]*mstore.Message }{ { name: "empty", messages: map[string][]*mstore.Message{ task.FOLDER_INBOX: {}, }, - expResult: &process.InboxResult{}, expMsgs: map[string][]*mstore.Message{ task.FOLDER_INBOX: {}, }, @@ -47,9 +46,7 @@ func TestInboxProcess(t *testing.T) { }, }, }, - expResult: &process.InboxResult{ - Count: 4, - }, + expCount: 4, expMsgs: map[string][]*mstore.Message{ task.FOLDER_INBOX: {}, task.FOLDER_NEW: {{Subject: "to new"}}, @@ -70,9 +67,7 @@ func TestInboxProcess(t *testing.T) { Body: "id: xxx-xxx\nversion: 3", }}, }, - expResult: &process.InboxResult{ - Count: 1, - }, + expCount: 1, expMsgs: map[string][]*mstore.Message{ task.FOLDER_INBOX: {}, task.FOLDER_UNPLANNED: {{Subject: "new version"}}, @@ -90,9 +85,7 @@ func TestInboxProcess(t *testing.T) { Body: "id: xxx-xxx\nversion: 5", }}, }, - expResult: &process.InboxResult{ - Count: 1, - }, + expCount: 1, expMsgs: map[string][]*mstore.Message{ task.FOLDER_INBOX: {}, task.FOLDER_UNPLANNED: {{Subject: "not really old version"}}, @@ -118,7 +111,7 @@ func TestInboxProcess(t *testing.T) { actResult, err := inboxProc.Process() test.OK(t, err) - test.Equals(t, tc.expResult, actResult) + test.Equals(t, tc.expCount, actResult.Count) for folder, expMessages := range tc.expMsgs { actMessages, err := mstorer.Messages(folder) test.OK(t, err) diff --git a/internal/process/recur.go b/internal/process/recur.go index 7347b94..b253af5 100644 --- a/internal/process/recur.go +++ b/internal/process/recur.go @@ -3,12 +3,16 @@ package process import ( "errors" "fmt" + "sync" + "time" "git.ewintr.nl/gte/internal/task" ) var ( ErrRecurProcess = errors.New("could not generate tasks from recurrer") + + recurLock sync.Mutex ) type Recur struct { @@ -18,7 +22,8 @@ type Recur struct { } type RecurResult struct { - Count int + Duration string `json:"duration"` + Count int `json:"count"` } func NewRecur(repo *task.TaskRepo, disp *task.Dispatcher, daysAhead int) *Recur { @@ -30,6 +35,11 @@ func NewRecur(repo *task.TaskRepo, disp *task.Dispatcher, daysAhead int) *Recur } func (recur *Recur) Process() (*RecurResult, error) { + recurLock.Lock() + defer recurLock.Unlock() + + start := time.Now() + tasks, err := recur.taskRepo.FindAll(task.FOLDER_RECURRING) if err != nil { return &RecurResult{}, fmt.Errorf("%w: %v", ErrRecurProcess, err) @@ -51,6 +61,7 @@ func (recur *Recur) Process() (*RecurResult, error) { } return &RecurResult{ - Count: count, + Duration: time.Since(start).String(), + Count: count, }, nil } diff --git a/internal/process/recur_test.go b/internal/process/recur_test.go index 511a3b1..6c5d82c 100644 --- a/internal/process/recur_test.go +++ b/internal/process/recur_test.go @@ -15,13 +15,12 @@ func TestRecurProcess(t *testing.T) { for _, tc := range []struct { name string recurMsgs []*mstore.Message - expResult *process.RecurResult + expCount int expMsgs []*msend.Message }{ { - name: "empty", - expResult: &process.RecurResult{}, - expMsgs: []*msend.Message{}, + name: "empty", + expMsgs: []*msend.Message{}, }, { name: "one of two recurring", @@ -35,9 +34,7 @@ func TestRecurProcess(t *testing.T) { Body: "recur: 2021-05-10, daily\nid: xxx-xxx\nversion: 1", }, }, - expResult: &process.RecurResult{ - Count: 1, - }, + expCount: 1, expMsgs: []*msend.Message{ {Subject: "2021-05-15 (saturday) - recurring"}, }, @@ -60,7 +57,7 @@ func TestRecurProcess(t *testing.T) { recurProc := process.NewRecur(task.NewRepository(mstorer), task.NewDispatcher(msender), 1) actResult, err := recurProc.Process() test.OK(t, err) - test.Equals(t, tc.expResult, actResult) + test.Equals(t, tc.expCount, actResult.Count) for i, expMsg := range tc.expMsgs { test.Equals(t, expMsg.Subject, msender.Messages[i].Subject) }