mutex for processes

This commit is contained in:
Erik Winter 2021-05-15 11:46:03 +02:00
parent d8054059f7
commit 16ca03e746
7 changed files with 48 additions and 32 deletions

View File

@ -51,7 +51,7 @@ func main() {
func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger) { func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger) {
logger = logger.WithField("func", "run") logger = logger.WithField("func", "run")
inboxTicker := time.NewTicker(30 * time.Second) inboxTicker := time.NewTicker(10 * time.Second)
recurTicker := time.NewTicker(time.Hour) recurTicker := time.NewTicker(time.Hour)
oldToday := task.Today oldToday := task.Today
@ -64,7 +64,9 @@ func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger)
continue continue
} }
logger.WithField("count", result.Count).Info("finished processing inbox") if result.Count > 0 {
logger.WithField("result", result).Info("finished processing inbox")
}
case <-recurTicker.C: case <-recurTicker.C:
year, month, day := time.Now().Date() year, month, day := time.Now().Date()
newToday := task.NewDate(year, int(month), day) newToday := task.NewDate(year, int(month), day)
@ -80,7 +82,9 @@ func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger)
continue continue
} }
logger.WithField("count", result.Count).Info("finished generating recurring tasks") if result.Count > 0 {
logger.WithField("result", result).Info("finished generating recurring tasks")
}
} }
} }
} }

View File

@ -38,5 +38,5 @@ func main() {
os.Exit(1) os.Exit(1)
} }
logger.WithField("count", result.Count).Info("finished generating recurring tasks") logger.WithField("result", result).Info("finished generating recurring tasks")
} }

View File

@ -31,5 +31,5 @@ func main() {
logger.WithErr(err).Error("unable to process inbox") logger.WithErr(err).Error("unable to process inbox")
os.Exit(1) os.Exit(1)
} }
logger.WithField("count", result.Count).Info("finished processing inbox") logger.WithField("result", result).Info("finished processing inbox")
} }

View File

@ -3,12 +3,16 @@ package process
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
"time"
"git.ewintr.nl/gte/internal/task" "git.ewintr.nl/gte/internal/task"
) )
var ( var (
ErrInboxProcess = errors.New("could not process inbox") ErrInboxProcess = errors.New("could not process inbox")
inboxLock sync.Mutex
) )
type Inbox struct { type Inbox struct {
@ -16,7 +20,8 @@ type Inbox struct {
} }
type InboxResult struct { type InboxResult struct {
Count int Duration string `json:"duration"`
Count int `json:"count"`
} }
func NewInbox(repo *task.TaskRepo) *Inbox { func NewInbox(repo *task.TaskRepo) *Inbox {
@ -26,6 +31,11 @@ func NewInbox(repo *task.TaskRepo) *Inbox {
} }
func (inbox *Inbox) Process() (*InboxResult, error) { func (inbox *Inbox) Process() (*InboxResult, error) {
inboxLock.Lock()
defer inboxLock.Unlock()
start := time.Now()
tasks, err := inbox.taskRepo.FindAll(task.FOLDER_INBOX) tasks, err := inbox.taskRepo.FindAll(task.FOLDER_INBOX)
if err != nil { if err != nil {
return &InboxResult{}, fmt.Errorf("%w: %v", ErrInboxProcess, err) return &InboxResult{}, fmt.Errorf("%w: %v", ErrInboxProcess, err)
@ -47,6 +57,7 @@ func (inbox *Inbox) Process() (*InboxResult, error) {
} }
return &InboxResult{ return &InboxResult{
Count: len(tasks), Duration: time.Since(start).String(),
Count: len(tasks),
}, nil }, nil
} }

View File

@ -11,17 +11,16 @@ import (
func TestInboxProcess(t *testing.T) { func TestInboxProcess(t *testing.T) {
for _, tc := range []struct { for _, tc := range []struct {
name string name string
messages map[string][]*mstore.Message messages map[string][]*mstore.Message
expResult *process.InboxResult expCount int
expMsgs map[string][]*mstore.Message expMsgs map[string][]*mstore.Message
}{ }{
{ {
name: "empty", name: "empty",
messages: map[string][]*mstore.Message{ messages: map[string][]*mstore.Message{
task.FOLDER_INBOX: {}, task.FOLDER_INBOX: {},
}, },
expResult: &process.InboxResult{},
expMsgs: map[string][]*mstore.Message{ expMsgs: map[string][]*mstore.Message{
task.FOLDER_INBOX: {}, task.FOLDER_INBOX: {},
}, },
@ -47,9 +46,7 @@ func TestInboxProcess(t *testing.T) {
}, },
}, },
}, },
expResult: &process.InboxResult{ expCount: 4,
Count: 4,
},
expMsgs: map[string][]*mstore.Message{ expMsgs: map[string][]*mstore.Message{
task.FOLDER_INBOX: {}, task.FOLDER_INBOX: {},
task.FOLDER_NEW: {{Subject: "to new"}}, task.FOLDER_NEW: {{Subject: "to new"}},
@ -70,9 +67,7 @@ func TestInboxProcess(t *testing.T) {
Body: "id: xxx-xxx\nversion: 3", Body: "id: xxx-xxx\nversion: 3",
}}, }},
}, },
expResult: &process.InboxResult{ expCount: 1,
Count: 1,
},
expMsgs: map[string][]*mstore.Message{ expMsgs: map[string][]*mstore.Message{
task.FOLDER_INBOX: {}, task.FOLDER_INBOX: {},
task.FOLDER_UNPLANNED: {{Subject: "new version"}}, task.FOLDER_UNPLANNED: {{Subject: "new version"}},
@ -90,9 +85,7 @@ func TestInboxProcess(t *testing.T) {
Body: "id: xxx-xxx\nversion: 5", Body: "id: xxx-xxx\nversion: 5",
}}, }},
}, },
expResult: &process.InboxResult{ expCount: 1,
Count: 1,
},
expMsgs: map[string][]*mstore.Message{ expMsgs: map[string][]*mstore.Message{
task.FOLDER_INBOX: {}, task.FOLDER_INBOX: {},
task.FOLDER_UNPLANNED: {{Subject: "not really old version"}}, task.FOLDER_UNPLANNED: {{Subject: "not really old version"}},
@ -118,7 +111,7 @@ func TestInboxProcess(t *testing.T) {
actResult, err := inboxProc.Process() actResult, err := inboxProc.Process()
test.OK(t, err) test.OK(t, err)
test.Equals(t, tc.expResult, actResult) test.Equals(t, tc.expCount, actResult.Count)
for folder, expMessages := range tc.expMsgs { for folder, expMessages := range tc.expMsgs {
actMessages, err := mstorer.Messages(folder) actMessages, err := mstorer.Messages(folder)
test.OK(t, err) test.OK(t, err)

View File

@ -3,12 +3,16 @@ package process
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
"time"
"git.ewintr.nl/gte/internal/task" "git.ewintr.nl/gte/internal/task"
) )
var ( var (
ErrRecurProcess = errors.New("could not generate tasks from recurrer") ErrRecurProcess = errors.New("could not generate tasks from recurrer")
recurLock sync.Mutex
) )
type Recur struct { type Recur struct {
@ -18,7 +22,8 @@ type Recur struct {
} }
type RecurResult struct { type RecurResult struct {
Count int Duration string `json:"duration"`
Count int `json:"count"`
} }
func NewRecur(repo *task.TaskRepo, disp *task.Dispatcher, daysAhead int) *Recur { func NewRecur(repo *task.TaskRepo, disp *task.Dispatcher, daysAhead int) *Recur {
@ -30,6 +35,11 @@ func NewRecur(repo *task.TaskRepo, disp *task.Dispatcher, daysAhead int) *Recur
} }
func (recur *Recur) Process() (*RecurResult, error) { func (recur *Recur) Process() (*RecurResult, error) {
recurLock.Lock()
defer recurLock.Unlock()
start := time.Now()
tasks, err := recur.taskRepo.FindAll(task.FOLDER_RECURRING) tasks, err := recur.taskRepo.FindAll(task.FOLDER_RECURRING)
if err != nil { if err != nil {
return &RecurResult{}, fmt.Errorf("%w: %v", ErrRecurProcess, err) return &RecurResult{}, fmt.Errorf("%w: %v", ErrRecurProcess, err)
@ -51,6 +61,7 @@ func (recur *Recur) Process() (*RecurResult, error) {
} }
return &RecurResult{ return &RecurResult{
Count: count, Duration: time.Since(start).String(),
Count: count,
}, nil }, nil
} }

View File

@ -15,13 +15,12 @@ func TestRecurProcess(t *testing.T) {
for _, tc := range []struct { for _, tc := range []struct {
name string name string
recurMsgs []*mstore.Message recurMsgs []*mstore.Message
expResult *process.RecurResult expCount int
expMsgs []*msend.Message expMsgs []*msend.Message
}{ }{
{ {
name: "empty", name: "empty",
expResult: &process.RecurResult{}, expMsgs: []*msend.Message{},
expMsgs: []*msend.Message{},
}, },
{ {
name: "one of two recurring", name: "one of two recurring",
@ -35,9 +34,7 @@ func TestRecurProcess(t *testing.T) {
Body: "recur: 2021-05-10, daily\nid: xxx-xxx\nversion: 1", Body: "recur: 2021-05-10, daily\nid: xxx-xxx\nversion: 1",
}, },
}, },
expResult: &process.RecurResult{ expCount: 1,
Count: 1,
},
expMsgs: []*msend.Message{ expMsgs: []*msend.Message{
{Subject: "2021-05-15 (saturday) - recurring"}, {Subject: "2021-05-15 (saturday) - recurring"},
}, },
@ -60,7 +57,7 @@ func TestRecurProcess(t *testing.T) {
recurProc := process.NewRecur(task.NewRepository(mstorer), task.NewDispatcher(msender), 1) recurProc := process.NewRecur(task.NewRepository(mstorer), task.NewDispatcher(msender), 1)
actResult, err := recurProc.Process() actResult, err := recurProc.Process()
test.OK(t, err) test.OK(t, err)
test.Equals(t, tc.expResult, actResult) test.Equals(t, tc.expCount, actResult.Count)
for i, expMsg := range tc.expMsgs { for i, expMsg := range tc.expMsgs {
test.Equals(t, expMsg.Subject, msender.Messages[i].Subject) test.Equals(t, expMsg.Subject, msender.Messages[i].Subject)
} }