send process

This commit is contained in:
Erik Winter 2021-09-03 09:19:36 +02:00
parent 99b8476cdf
commit a296c257ad
8 changed files with 145 additions and 3 deletions

49
internal/process/send.go Normal file
View File

@ -0,0 +1,49 @@
package process
import (
"errors"
"fmt"
"git.ewintr.nl/gte/internal/storage"
"git.ewintr.nl/gte/internal/task"
)
var (
ErrSendTasks = errors.New("could not send tasks")
)
// Send sends local tasks that need to be dispatched
type Send struct {
local storage.LocalRepository
disp *storage.Dispatcher
}
func NewSend(local storage.LocalRepository, disp *storage.Dispatcher) *Send {
return &Send{
local: local,
disp: disp,
}
}
func (s *Send) Process() error {
tasks, err := s.local.FindAll()
if err != nil {
return fmt.Errorf("%w: %v", ErrSendTasks, err)
}
for _, t := range tasks {
if t.LocalStatus != task.STATUS_UPDATED {
continue
}
t.ApplyUpdate()
if err := s.disp.Dispatch(&t.Task); err != nil {
return fmt.Errorf("%w: %v", ErrSendTasks, err)
}
if err := s.local.MarkDispatched(t.LocalId); err != nil {
return fmt.Errorf("%w: %v", ErrSendTasks, err)
}
}
return nil
}

View File

@ -0,0 +1,61 @@
package process_test
import (
"testing"
"git.ewintr.nl/go-kit/test"
"git.ewintr.nl/gte/internal/process"
"git.ewintr.nl/gte/internal/storage"
"git.ewintr.nl/gte/internal/task"
"git.ewintr.nl/gte/pkg/msend"
)
func TestSend(t *testing.T) {
task1 := &task.Task{
Id: "id-1",
Version: 2,
Project: "project1",
Action: "action1",
Due: task.NewDate(2021, 7, 29),
Folder: task.FOLDER_PLANNED,
}
task2 := &task.Task{
Id: "id-2",
Version: 2,
Project: "project1",
Action: "action2",
Folder: task.FOLDER_UNPLANNED,
}
local := storage.NewMemory()
allTasks := []*task.Task{task1, task2}
test.OK(t, local.SetTasks(allTasks))
t.Run("no updates", func(t *testing.T) {
out := msend.NewMemory()
disp := storage.NewDispatcher(out)
send := process.NewSend(local, disp)
test.OK(t, send.Process())
test.Assert(t, len(out.Messages) == 0, "amount of messages was not 0")
})
t.Run("update", func(t *testing.T) {
lu := &task.LocalUpdate{
ForVersion: task2.Version,
Fields: []string{task.FIELD_ACTION},
Action: "updated",
}
lt, err := local.FindById(task2.Id)
test.OK(t, err)
lt.AddUpdate(lu)
test.OK(t, local.SetLocalUpdate(lt))
out := msend.NewMemory()
disp := storage.NewDispatcher(out)
send := process.NewSend(local, disp)
test.OK(t, send.Process())
test.Assert(t, len(out.Messages) == 1, "amount of messages was not 1")
expSubject := "project1 - updated"
test.Equals(t, expSubject, out.Messages[0].Subject)
})
}

View File

@ -64,7 +64,7 @@ func TestUpdate(t *testing.T) {
}, },
} { } {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
local.SetTasks(allTasks) test.OK(t, local.SetTasks(allTasks))
out := msend.NewMemory() out := msend.NewMemory()
disp := storage.NewDispatcher(out) disp := storage.NewDispatcher(out)

View File

@ -19,6 +19,7 @@ type LocalRepository interface {
FindById(id string) (*task.LocalTask, error) FindById(id string) (*task.LocalTask, error)
FindByLocalId(id int) (*task.LocalTask, error) FindByLocalId(id int) (*task.LocalTask, error)
SetLocalUpdate(tsk *task.LocalTask) error SetLocalUpdate(tsk *task.LocalTask) error
MarkDispatched(id int) error
} }
// NextLocalId finds a new local id by incrememting to a variable limit. // NextLocalId finds a new local id by incrememting to a variable limit.

View File

@ -69,7 +69,15 @@ func (m *Memory) FindByLocalId(localId int) (*task.LocalTask, error) {
} }
func (m *Memory) SetLocalUpdate(tsk *task.LocalTask) error { func (m *Memory) SetLocalUpdate(tsk *task.LocalTask) error {
tsk.LocalStatus = task.STATUS_UPDATED
m.tasks[tsk.Id] = tsk m.tasks[tsk.Id] = tsk
return nil return nil
} }
func (m *Memory) MarkDispatched(localId int) error {
t, _ := m.FindByLocalId(localId)
m.tasks[t.Id].LocalStatus = task.STATUS_DISPATCHED
return nil
}

View File

@ -114,5 +114,17 @@ func TestMemory(t *testing.T) {
actTask, err := mem.FindByLocalId(2) actTask, err := mem.FindByLocalId(2)
test.OK(t, err) test.OK(t, err)
test.Equals(t, expUpdate, actTask.LocalUpdate) test.Equals(t, expUpdate, actTask.LocalUpdate)
test.Equals(t, task.STATUS_UPDATED, actTask.LocalStatus)
})
t.Run("markdispatched", func(t *testing.T) {
mem := storage.NewMemory()
test.OK(t, mem.SetTasks(tasks))
lt, err := mem.FindById(task2.Id)
test.OK(t, err)
test.OK(t, mem.MarkDispatched(lt.LocalId))
act, err := mem.FindById(task2.Id)
test.OK(t, err)
test.Equals(t, task.STATUS_DISPATCHED, act.LocalStatus)
}) })
} }

View File

@ -199,6 +199,16 @@ WHERE local_id = ?`, tsk.LocalUpdate, task.STATUS_UPDATED, tsk.LocalId); err !=
return nil return nil
} }
func (s *Sqlite) MarkDispatched(localId int) error {
if _, err := s.db.Exec(`
UPDATE task
SET local_status = ?
WHERE local_id = ?`, task.STATUS_DISPATCHED, localId); err != nil {
return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
}
return nil
}
func (s *Sqlite) migrate(wanted []sqliteMigration) error { func (s *Sqlite) migrate(wanted []sqliteMigration) error {
// admin table // admin table
if _, err := s.db.Exec(` if _, err := s.db.Exec(`

View File

@ -8,8 +8,9 @@ import (
) )
const ( const (
STATUS_FETCHED = "fetched" STATUS_FETCHED = "fetched"
STATUS_UPDATED = "updated" STATUS_UPDATED = "updated"
STATUS_DISPATCHED = "dispatched"
) )
type LocalTask struct { type LocalTask struct {