more fluent sync

This commit is contained in:
Erik Winter 2022-10-25 15:59:09 +02:00
parent 3fbab7f9c7
commit 064adad7cb
8 changed files with 115 additions and 49 deletions

View File

@ -71,18 +71,18 @@ func (t *Tasks) Sync() (int, int, error) {
return 0, 0, err return 0, 0, err
} }
latestFetch, err := t.local.LatestSync() latestFetch, latestDisp, err := t.local.LatestSyncs()
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
// use unix timestamp for time comparison, because time.Before and // use unix timestamp for time comparison, because time.Before and
// time.After depend on a monotonic clock and in Android the // time.After depend on a monotonic clock and on my phone the
// monotonic clock stops ticking if the phone is in suspended sleep // monotonic clock stops ticking when it goes to suspended sleep
if latestFetch.Add(15*time.Minute).Unix() > time.Now().Unix() { if latestFetch.Add(15*time.Minute).Unix() > time.Now().Unix() || latestDisp.Add(2*time.Minute).Unix() > time.Now().Unix() {
return countDisp, 0, nil return countDisp, 0, nil
} }
res, err := process.NewFetch(t.remote, t.local).Process() res, err := process.NewFetch(t.remote, t.local, task.FOLDER_PLANNED).Process()
if err != nil { if err != nil {
return countDisp, 0, err return countDisp, 0, err
} }

View File

@ -17,6 +17,8 @@ type Sync struct {
sender *process.Send sender *process.Send
fetchInterval time.Duration fetchInterval time.Duration
fetchLatest time.Time fetchLatest time.Time
dispInterval time.Duration
dispLatest time.Time
} }
func NewSync(conf *configuration.Configuration) (*Sync, error) { func NewSync(conf *configuration.Configuration) (*Sync, error) {
@ -27,17 +29,20 @@ func NewSync(conf *configuration.Configuration) (*Sync, error) {
remote := storage.NewRemoteRepository(mstore.NewIMAP(conf.IMAP())) remote := storage.NewRemoteRepository(mstore.NewIMAP(conf.IMAP()))
disp := storage.NewDispatcher(msend.NewSSLSMTP(conf.SMTP())) disp := storage.NewDispatcher(msend.NewSSLSMTP(conf.SMTP()))
fetchLatest, err := local.LatestSync() fetchLatest, dispLatest, err := local.LatestSyncs()
if err != nil { if err != nil {
return &Sync{}, err return &Sync{}, err
} }
fetchInterval := 15 * time.Minute // not yet configurable fetchInterval := 15 * time.Minute // not yet configurable
dispInterval := 2 * time.Minute
return &Sync{ return &Sync{
fetcher: process.NewFetch(remote, local), fetcher: process.NewFetch(remote, local),
sender: process.NewSend(local, disp), sender: process.NewSend(local, disp),
fetchInterval: fetchInterval, fetchInterval: fetchInterval,
fetchLatest: fetchLatest, fetchLatest: fetchLatest,
dispInterval: dispInterval,
dispLatest: dispLatest,
}, nil }, nil
} }
@ -46,9 +51,16 @@ func (s *Sync) Do() string {
if err != nil { if err != nil {
return format.FormatError(err) return format.FormatError(err)
} }
if countSend > 0 {
return fmt.Sprintf("sent %d tasks, not fetching yet\n", countSend)
}
if time.Now().Before(s.dispLatest.Add(s.dispInterval)) {
return "sent 0 tasks, send interval has not passed yet\n"
}
if time.Now().Before(s.fetchLatest.Add(s.fetchInterval)) { if time.Now().Before(s.fetchLatest.Add(s.fetchInterval)) {
return fmt.Sprintf("sent %d tasks, not time to fetch yet\n", countSend) return "sent 0 tasks, fetch interval has not passed yet\n"
} }
fResult, err := s.fetcher.Process() fResult, err := s.fetcher.Process()
@ -56,5 +68,5 @@ func (s *Sync) Do() string {
return format.FormatError(err) return format.FormatError(err)
} }
return fmt.Sprintf("sent %d, fetched %d tasks\n", countSend, fResult.Count) return fmt.Sprintf("fetched %d tasks\n", fResult.Count)
} }

View File

@ -17,6 +17,7 @@ var (
type Fetch struct { type Fetch struct {
remote *storage.RemoteRepository remote *storage.RemoteRepository
local storage.LocalRepository local storage.LocalRepository
folders []string
} }
type FetchResult struct { type FetchResult struct {
@ -24,17 +25,22 @@ type FetchResult struct {
Count int `json:"count"` Count int `json:"count"`
} }
func NewFetch(remote *storage.RemoteRepository, local storage.LocalRepository) *Fetch { func NewFetch(remote *storage.RemoteRepository, local storage.LocalRepository, folders ...string) *Fetch {
if len(folders) == 0 {
folders = task.KnownFolders
}
return &Fetch{ return &Fetch{
remote: remote, remote: remote,
local: local, local: local,
folders: folders,
} }
} }
func (s *Fetch) Process() (*FetchResult, error) { func (s *Fetch) Process() (*FetchResult, error) {
start := time.Now() start := time.Now()
tasks := []*task.Task{} tasks := []*task.Task{}
for _, folder := range task.KnownFolders { for _, folder := range s.folders {
if folder == task.FOLDER_INBOX { if folder == task.FOLDER_INBOX {
continue continue
} }

View File

@ -24,31 +24,61 @@ func TestFetchProcess(t *testing.T) {
Action: "action2", Action: "action2",
Folder: task.FOLDER_UNPLANNED, Folder: task.FOLDER_UNPLANNED,
} }
task3 := &task.Task{
Id: "id3",
Version: 1,
Action: "action3",
Folder: task.FOLDER_PLANNED,
}
localTask1 := &task.LocalTask{Task: *task1, LocalUpdate: &task.LocalUpdate{}, LocalStatus: task.STATUS_FETCHED} localTask1 := &task.LocalTask{Task: *task1, LocalUpdate: &task.LocalUpdate{}, LocalStatus: task.STATUS_FETCHED}
localTask2 := &task.LocalTask{Task: *task2, LocalUpdate: &task.LocalUpdate{}, LocalStatus: task.STATUS_FETCHED} localTask2 := &task.LocalTask{Task: *task2, LocalUpdate: &task.LocalUpdate{}, LocalStatus: task.STATUS_FETCHED}
localTask3 := &task.LocalTask{Task: *task3, LocalUpdate: &task.LocalUpdate{}, LocalStatus: task.STATUS_FETCHED}
mstorer, err := mstore.NewMemory(task.KnownFolders) mstorer, err := mstore.NewMemory(task.KnownFolders)
test.OK(t, err) test.OK(t, err)
test.OK(t, mstorer.Add(task1.Folder, task1.FormatSubject(), task1.FormatBody())) test.OK(t, mstorer.Add(task1.Folder, task1.FormatSubject(), task1.FormatBody()))
test.OK(t, mstorer.Add(task2.Folder, task2.FormatSubject(), task2.FormatBody())) test.OK(t, mstorer.Add(task2.Folder, task2.FormatSubject(), task2.FormatBody()))
test.OK(t, mstorer.Add(task3.Folder, task3.FormatSubject(), task3.FormatBody()))
remote := storage.NewRemoteRepository(mstorer) remote := storage.NewRemoteRepository(mstorer)
local := storage.NewMemory() local := storage.NewMemory()
t.Run("all", func(t *testing.T) {
syncer := process.NewFetch(remote, local) syncer := process.NewFetch(remote, local)
actResult, err := syncer.Process() actResult, err := syncer.Process()
test.OK(t, err) test.OK(t, err)
test.Equals(t, 2, actResult.Count) test.Equals(t, 3, actResult.Count)
actTasks, err := local.FindAll() actTasks, err := local.FindAll()
test.OK(t, err) test.OK(t, err)
for _, a := range actTasks { for _, a := range actTasks {
a.LocalId = 0 a.LocalId = 0
a.Message = nil a.Message = nil
} }
exp := task.ById([]*task.LocalTask{localTask1, localTask2}) exp := task.ById([]*task.LocalTask{localTask1, localTask2, localTask3})
sExp := task.ById(exp) sExp := task.ById(exp)
sAct := task.ById(actTasks) sAct := task.ById(actTasks)
sort.Sort(sAct) sort.Sort(sAct)
sort.Sort(sExp) sort.Sort(sExp)
test.Equals(t, sExp, sAct) test.Equals(t, sExp, sAct)
})
t.Run("planned", func(t *testing.T) {
syncer := process.NewFetch(remote, local, task.FOLDER_PLANNED)
actResult, err := syncer.Process()
test.OK(t, err)
test.Equals(t, 1, actResult.Count)
actTasks, err := local.FindAll()
test.OK(t, err)
for _, a := range actTasks {
a.LocalId = 0
a.Message = nil
}
exp := task.ById([]*task.LocalTask{localTask3})
sExp := task.ById(exp)
sAct := task.ById(actTasks)
sort.Sort(sAct)
sort.Sort(sExp)
test.Equals(t, sExp, sAct)
})
} }

View File

@ -13,7 +13,7 @@ var (
) )
type LocalRepository interface { type LocalRepository interface {
LatestSync() (time.Time, error) LatestSyncs() (time.Time, time.Time, error) // last fetch, last dispatch, err
SetTasks(tasks []*task.Task) error SetTasks(tasks []*task.Task) error
FindAll() ([]*task.LocalTask, error) FindAll() ([]*task.LocalTask, error)
FindById(id string) (*task.LocalTask, error) FindById(id string) (*task.LocalTask, error)

View File

@ -10,16 +10,20 @@ import (
// Memory is an in memory implementation of LocalRepository // Memory is an in memory implementation of LocalRepository
type Memory struct { type Memory struct {
tasks map[string]*task.LocalTask tasks map[string]*task.LocalTask
latestSync time.Time latestFetch time.Time
latestDispatch time.Time
} }
func NewMemory(initTasks ...*task.Task) *Memory { func NewMemory(initTasks ...*task.Task) *Memory {
tasks := map[string]*task.LocalTask{} tasks := map[string]*task.LocalTask{}
id := 1
for _, t := range initTasks { for _, t := range initTasks {
tasks[t.Id] = &task.LocalTask{ tasks[t.Id] = &task.LocalTask{
Task: *t, Task: *t,
LocalUpdate: &task.LocalUpdate{}, LocalUpdate: &task.LocalUpdate{},
LocalId: id,
} }
id++
} }
return &Memory{ return &Memory{
@ -27,8 +31,8 @@ func NewMemory(initTasks ...*task.Task) *Memory {
} }
} }
func (m *Memory) LatestSync() (time.Time, error) { func (m *Memory) LatestSyncs() (time.Time, time.Time, error) {
return m.latestSync, nil return m.latestFetch, m.latestDispatch, nil
} }
func (m *Memory) SetTasks(tasks []*task.Task) error { func (m *Memory) SetTasks(tasks []*task.Task) error {
@ -43,7 +47,7 @@ func (m *Memory) SetTasks(tasks []*task.Task) error {
for _, nt := range newTasks { for _, nt := range newTasks {
m.tasks[nt.Id] = nt m.tasks[nt.Id] = nt
} }
m.latestSync = time.Now() m.latestFetch = time.Now()
return nil return nil
} }
@ -87,6 +91,7 @@ func (m *Memory) SetLocalUpdate(id string, update *task.LocalUpdate) error {
func (m *Memory) MarkDispatched(localId int) error { func (m *Memory) MarkDispatched(localId int) error {
t, _ := m.FindByLocalId(localId) t, _ := m.FindByLocalId(localId)
m.tasks[t.Id].LocalStatus = task.STATUS_DISPATCHED m.tasks[t.Id].LocalStatus = task.STATUS_DISPATCHED
m.latestDispatch = time.Now()
return nil return nil
} }

View File

@ -48,16 +48,19 @@ func TestMemory(t *testing.T) {
localTask3 := &task.LocalTask{Task: *task3, LocalUpdate: emptyUpdate, LocalStatus: task.STATUS_FETCHED} localTask3 := &task.LocalTask{Task: *task3, LocalUpdate: emptyUpdate, LocalStatus: task.STATUS_FETCHED}
t.Run("sync", func(t *testing.T) { t.Run("sync", func(t *testing.T) {
mem := storage.NewMemory() mem := storage.NewMemory(task1)
latest, err := mem.LatestSync() latestFetch, latestDisp, err := mem.LatestSyncs()
test.OK(t, err) test.OK(t, err)
test.Assert(t, latest.IsZero(), "lastest was not zero") test.Assert(t, latestFetch.IsZero(), "latestfetch was not zero")
test.Assert(t, latestDisp.IsZero(), "latestdisp was not zero")
start := time.Now() start := time.Now()
test.OK(t, mem.SetTasks(tasks)) test.OK(t, mem.SetTasks(tasks))
latest, err = mem.LatestSync() test.OK(t, mem.MarkDispatched(1))
latestFetch, latestDisp, err = mem.LatestSyncs()
test.OK(t, err) test.OK(t, err)
test.Assert(t, latest.After(start), "latest was not after start") test.Assert(t, latestFetch.After(start), "latestfetch was not after start")
test.Assert(t, latestDisp.After(start), "latestdisp was not after start")
}) })
t.Run("findallin", func(t *testing.T) { t.Run("findallin", func(t *testing.T) {

View File

@ -27,6 +27,9 @@ var sqliteMigrations = []sqliteMigration{
`DROP TABLE local_task`, `DROP TABLE local_task`,
`ALTER TABLE task ADD COLUMN local_status TEXT`, `ALTER TABLE task ADD COLUMN local_status TEXT`,
`UPDATE task SET local_status = "fetched"`, `UPDATE task SET local_status = "fetched"`,
`DROP TABLE system`,
`CREATE TABLE system ("latest_fetch" INTEGER, "latest_dispatch" INTEGER)`,
`INSERT INTO system (latest_fetch, latest_dispatch) VALUES (0, 0)`,
} }
var ( var (
@ -62,20 +65,20 @@ func NewSqlite(conf *SqliteConfig) (*Sqlite, error) {
return s, nil return s, nil
} }
func (s *Sqlite) LatestSync() (time.Time, error) { func (s *Sqlite) LatestSyncs() (time.Time, time.Time, error) {
rows, err := s.db.Query(`SELECT strftime('%s', latest_sync) FROM system`) rows, err := s.db.Query(`SELECT strftime('%s', latest_fetch), strftime('%s', latest_dispatch) FROM system`)
if err != nil { if err != nil {
return time.Time{}, fmt.Errorf("%w: %v", ErrSqliteFailure, err) return time.Time{}, time.Time{}, fmt.Errorf("%w: %v", ErrSqliteFailure, err)
} }
defer rows.Close() defer rows.Close()
rows.Next() rows.Next()
var latest int64 var latest_fetch, latest_dispatch int64
if err := rows.Scan(&latest); err != nil { if err := rows.Scan(&latest_fetch, &latest_dispatch); err != nil {
return time.Time{}, fmt.Errorf("%w: %v", ErrSqliteFailure, err) return time.Time{}, time.Time{}, fmt.Errorf("%w: %v", ErrSqliteFailure, err)
} }
return time.Unix(latest, 0), nil return time.Unix(latest_fetch, 0), time.Unix(latest_dispatch, 0), nil
} }
func (s *Sqlite) SetTasks(tasks []*task.Task) error { func (s *Sqlite) SetTasks(tasks []*task.Task) error {
@ -112,7 +115,7 @@ VALUES
} }
} }
if _, err := s.db.Exec(`UPDATE system SET latest_sync=DATETIME('now')`); err != nil { if _, err := s.db.Exec(`UPDATE system SET latest_fetch=DATETIME('now')`); err != nil {
return fmt.Errorf("%w: %v", ErrSqliteFailure, err) return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
} }
@ -221,6 +224,13 @@ SET local_status = ?
WHERE local_id = ?`, task.STATUS_DISPATCHED, localId); err != nil { WHERE local_id = ?`, task.STATUS_DISPATCHED, localId); err != nil {
return fmt.Errorf("%w: %v", ErrSqliteFailure, err) return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
} }
if _, err := s.db.Exec(`
UPDATE system
SET latest_dispatch=DATETIME('now')`); err != nil {
return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
}
return nil return nil
} }