gte/internal/process/inbox.go

97 lines
1.9 KiB
Go
Raw Normal View History

2021-05-13 08:15:14 +02:00
package process
import (
"errors"
"fmt"
2021-05-15 11:46:03 +02:00
"sync"
"time"
2021-05-13 08:15:14 +02:00
2021-06-25 09:14:27 +02:00
"git.ewintr.nl/gte/internal/storage"
2021-05-13 08:15:14 +02:00
"git.ewintr.nl/gte/internal/task"
)
var (
ErrInboxProcess = errors.New("could not process inbox")
2021-05-15 11:46:03 +02:00
inboxLock sync.Mutex
2021-05-13 08:15:14 +02:00
)
2021-06-25 09:14:27 +02:00
// Inbox processes all messages in INBOX in a remote repository
2021-05-13 08:15:14 +02:00
type Inbox struct {
2021-06-25 09:14:27 +02:00
taskRepo *storage.RemoteRepository
2021-05-13 08:15:14 +02:00
}
type InboxResult struct {
2021-05-15 11:46:03 +02:00
Duration string `json:"duration"`
Count int `json:"count"`
2021-05-13 08:15:14 +02:00
}
2021-06-25 09:14:27 +02:00
func NewInbox(repo *storage.RemoteRepository) *Inbox {
2021-05-13 08:15:14 +02:00
return &Inbox{
taskRepo: repo,
}
}
func (inbox *Inbox) Process() (*InboxResult, error) {
2021-05-15 11:46:03 +02:00
inboxLock.Lock()
defer inboxLock.Unlock()
start := time.Now()
2021-07-10 11:29:48 +02:00
// find tasks to be processed
2021-05-13 08:15:14 +02:00
tasks, err := inbox.taskRepo.FindAll(task.FOLDER_INBOX)
if err != nil {
return &InboxResult{}, fmt.Errorf("%w: %v", ErrInboxProcess, err)
}
2021-08-20 07:38:57 +02:00
// deduplicate
taskKeys := map[string]*task.Task{}
for _, newT := range tasks {
existingT, ok := taskKeys[newT.Id]
switch {
case !ok:
taskKeys[newT.Id] = newT
case newT.Version >= existingT.Version:
taskKeys[newT.Id] = newT
}
}
tasks = []*task.Task{}
for _, t := range taskKeys {
tasks = append(tasks, t)
}
2021-07-10 11:29:48 +02:00
// split them
doneTasks, updateTasks := []*task.Task{}, []*task.Task{}
2021-05-13 08:15:14 +02:00
for _, t := range tasks {
2021-07-10 11:29:48 +02:00
if t.Done {
doneTasks = append(doneTasks, t)
continue
}
updateTasks = append(updateTasks, t)
}
// remove
if err := inbox.taskRepo.Remove(doneTasks); err != nil {
return &InboxResult{}, fmt.Errorf("%w: %v", ErrInboxProcess, err)
}
// update
var cleanupNeeded bool
for _, t := range updateTasks {
2021-06-04 10:45:56 +02:00
if err := inbox.taskRepo.Update(t); err != nil {
return &InboxResult{}, fmt.Errorf("%w: %v", ErrInboxProcess, err)
2021-05-13 08:15:14 +02:00
}
2021-06-04 10:45:56 +02:00
cleanupNeeded = true
2021-05-13 08:15:14 +02:00
}
if cleanupNeeded {
if err := inbox.taskRepo.CleanUp(); err != nil {
return &InboxResult{}, fmt.Errorf("%w: %v", ErrInboxProcess, err)
}
}
return &InboxResult{
2021-05-15 11:46:03 +02:00
Duration: time.Since(start).String(),
Count: len(tasks),
2021-05-13 08:15:14 +02:00
}, nil
}