From 064adad7cb756dfe449f7f734a9ee9bd2f222b9e Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Tue, 25 Oct 2022 15:59:09 +0200 Subject: [PATCH] more fluent sync --- cmd/android-app/component/tasks.go | 10 ++--- cmd/cli/command/sync.go | 18 +++++++-- internal/process/fetch.go | 18 ++++++--- internal/process/fetch_test.go | 62 ++++++++++++++++++++++-------- internal/storage/local.go | 2 +- internal/storage/memory.go | 15 +++++--- internal/storage/memory_test.go | 13 ++++--- internal/storage/sqlite.go | 26 +++++++++---- 8 files changed, 115 insertions(+), 49 deletions(-) diff --git a/cmd/android-app/component/tasks.go b/cmd/android-app/component/tasks.go index c8e8ae8..f5cb7ad 100644 --- a/cmd/android-app/component/tasks.go +++ b/cmd/android-app/component/tasks.go @@ -71,18 +71,18 @@ func (t *Tasks) Sync() (int, int, error) { return 0, 0, err } - latestFetch, err := t.local.LatestSync() + latestFetch, latestDisp, err := t.local.LatestSyncs() if err != nil { return 0, 0, err } // use unix timestamp for time comparison, because time.Before and - // time.After depend on a monotonic clock and in Android the - // monotonic clock stops ticking if the phone is in suspended sleep - if latestFetch.Add(15*time.Minute).Unix() > time.Now().Unix() { + // time.After depend on a monotonic clock and on my phone the + // monotonic clock stops ticking when it goes to suspended sleep + if latestFetch.Add(15*time.Minute).Unix() > time.Now().Unix() || latestDisp.Add(2*time.Minute).Unix() > time.Now().Unix() { return countDisp, 0, nil } - res, err := process.NewFetch(t.remote, t.local).Process() + res, err := process.NewFetch(t.remote, t.local, task.FOLDER_PLANNED).Process() if err != nil { return countDisp, 0, err } diff --git a/cmd/cli/command/sync.go b/cmd/cli/command/sync.go index b0281a7..6f5f6c0 100644 --- a/cmd/cli/command/sync.go +++ b/cmd/cli/command/sync.go @@ -17,6 +17,8 @@ type Sync struct { sender *process.Send fetchInterval time.Duration fetchLatest time.Time + dispInterval time.Duration + dispLatest time.Time } func NewSync(conf *configuration.Configuration) (*Sync, error) { @@ -27,17 +29,20 @@ func NewSync(conf *configuration.Configuration) (*Sync, error) { remote := storage.NewRemoteRepository(mstore.NewIMAP(conf.IMAP())) disp := storage.NewDispatcher(msend.NewSSLSMTP(conf.SMTP())) - fetchLatest, err := local.LatestSync() + fetchLatest, dispLatest, err := local.LatestSyncs() if err != nil { return &Sync{}, err } fetchInterval := 15 * time.Minute // not yet configurable + dispInterval := 2 * time.Minute return &Sync{ fetcher: process.NewFetch(remote, local), sender: process.NewSend(local, disp), fetchInterval: fetchInterval, fetchLatest: fetchLatest, + dispInterval: dispInterval, + dispLatest: dispLatest, }, nil } @@ -46,9 +51,16 @@ func (s *Sync) Do() string { if err != nil { return format.FormatError(err) } + if countSend > 0 { + return fmt.Sprintf("sent %d tasks, not fetching yet\n", countSend) + } + + if time.Now().Before(s.dispLatest.Add(s.dispInterval)) { + return "sent 0 tasks, send interval has not passed yet\n" + } if time.Now().Before(s.fetchLatest.Add(s.fetchInterval)) { - return fmt.Sprintf("sent %d tasks, not time to fetch yet\n", countSend) + return "sent 0 tasks, fetch interval has not passed yet\n" } fResult, err := s.fetcher.Process() @@ -56,5 +68,5 @@ func (s *Sync) Do() string { return format.FormatError(err) } - return fmt.Sprintf("sent %d, fetched %d tasks\n", countSend, fResult.Count) + return fmt.Sprintf("fetched %d tasks\n", fResult.Count) } diff --git a/internal/process/fetch.go b/internal/process/fetch.go index 00b92db..7cb7492 100644 --- a/internal/process/fetch.go +++ b/internal/process/fetch.go @@ -15,8 +15,9 @@ var ( // Fetch fetches all tasks in regular folders from the remote repository and overwrites what is stored locally type Fetch struct { - remote *storage.RemoteRepository - local storage.LocalRepository + remote *storage.RemoteRepository + local storage.LocalRepository + folders []string } type FetchResult struct { @@ -24,17 +25,22 @@ type FetchResult struct { Count int `json:"count"` } -func NewFetch(remote *storage.RemoteRepository, local storage.LocalRepository) *Fetch { +func NewFetch(remote *storage.RemoteRepository, local storage.LocalRepository, folders ...string) *Fetch { + if len(folders) == 0 { + folders = task.KnownFolders + } + return &Fetch{ - remote: remote, - local: local, + remote: remote, + local: local, + folders: folders, } } func (s *Fetch) Process() (*FetchResult, error) { start := time.Now() tasks := []*task.Task{} - for _, folder := range task.KnownFolders { + for _, folder := range s.folders { if folder == task.FOLDER_INBOX { continue } diff --git a/internal/process/fetch_test.go b/internal/process/fetch_test.go index cde0f3e..7fb10c1 100644 --- a/internal/process/fetch_test.go +++ b/internal/process/fetch_test.go @@ -24,31 +24,61 @@ func TestFetchProcess(t *testing.T) { Action: "action2", Folder: task.FOLDER_UNPLANNED, } + task3 := &task.Task{ + Id: "id3", + Version: 1, + Action: "action3", + Folder: task.FOLDER_PLANNED, + } localTask1 := &task.LocalTask{Task: *task1, LocalUpdate: &task.LocalUpdate{}, LocalStatus: task.STATUS_FETCHED} localTask2 := &task.LocalTask{Task: *task2, LocalUpdate: &task.LocalUpdate{}, LocalStatus: task.STATUS_FETCHED} + localTask3 := &task.LocalTask{Task: *task3, LocalUpdate: &task.LocalUpdate{}, LocalStatus: task.STATUS_FETCHED} mstorer, err := mstore.NewMemory(task.KnownFolders) test.OK(t, err) test.OK(t, mstorer.Add(task1.Folder, task1.FormatSubject(), task1.FormatBody())) test.OK(t, mstorer.Add(task2.Folder, task2.FormatSubject(), task2.FormatBody())) + test.OK(t, mstorer.Add(task3.Folder, task3.FormatSubject(), task3.FormatBody())) remote := storage.NewRemoteRepository(mstorer) local := storage.NewMemory() - syncer := process.NewFetch(remote, local) - actResult, err := syncer.Process() - test.OK(t, err) - test.Equals(t, 2, actResult.Count) - actTasks, err := local.FindAll() - test.OK(t, err) - for _, a := range actTasks { - a.LocalId = 0 - a.Message = nil - } - exp := task.ById([]*task.LocalTask{localTask1, localTask2}) - sExp := task.ById(exp) - sAct := task.ById(actTasks) - sort.Sort(sAct) - sort.Sort(sExp) - test.Equals(t, sExp, sAct) + t.Run("all", func(t *testing.T) { + syncer := process.NewFetch(remote, local) + actResult, err := syncer.Process() + test.OK(t, err) + test.Equals(t, 3, actResult.Count) + actTasks, err := local.FindAll() + test.OK(t, err) + for _, a := range actTasks { + a.LocalId = 0 + a.Message = nil + } + exp := task.ById([]*task.LocalTask{localTask1, localTask2, localTask3}) + sExp := task.ById(exp) + sAct := task.ById(actTasks) + sort.Sort(sAct) + sort.Sort(sExp) + test.Equals(t, sExp, sAct) + }) + + t.Run("planned", func(t *testing.T) { + syncer := process.NewFetch(remote, local, task.FOLDER_PLANNED) + actResult, err := syncer.Process() + test.OK(t, err) + test.Equals(t, 1, actResult.Count) + actTasks, err := local.FindAll() + test.OK(t, err) + for _, a := range actTasks { + a.LocalId = 0 + a.Message = nil + } + exp := task.ById([]*task.LocalTask{localTask3}) + sExp := task.ById(exp) + sAct := task.ById(actTasks) + sort.Sort(sAct) + sort.Sort(sExp) + test.Equals(t, sExp, sAct) + }) + } diff --git a/internal/storage/local.go b/internal/storage/local.go index 2068bde..b0a62f6 100644 --- a/internal/storage/local.go +++ b/internal/storage/local.go @@ -13,7 +13,7 @@ var ( ) type LocalRepository interface { - LatestSync() (time.Time, error) + LatestSyncs() (time.Time, time.Time, error) // last fetch, last dispatch, err SetTasks(tasks []*task.Task) error FindAll() ([]*task.LocalTask, error) FindById(id string) (*task.LocalTask, error) diff --git a/internal/storage/memory.go b/internal/storage/memory.go index 32df2fa..ac18e7c 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -9,17 +9,21 @@ import ( // Memory is an in memory implementation of LocalRepository type Memory struct { - tasks map[string]*task.LocalTask - latestSync time.Time + tasks map[string]*task.LocalTask + latestFetch time.Time + latestDispatch time.Time } func NewMemory(initTasks ...*task.Task) *Memory { tasks := map[string]*task.LocalTask{} + id := 1 for _, t := range initTasks { tasks[t.Id] = &task.LocalTask{ Task: *t, LocalUpdate: &task.LocalUpdate{}, + LocalId: id, } + id++ } return &Memory{ @@ -27,8 +31,8 @@ func NewMemory(initTasks ...*task.Task) *Memory { } } -func (m *Memory) LatestSync() (time.Time, error) { - return m.latestSync, nil +func (m *Memory) LatestSyncs() (time.Time, time.Time, error) { + return m.latestFetch, m.latestDispatch, nil } func (m *Memory) SetTasks(tasks []*task.Task) error { @@ -43,7 +47,7 @@ func (m *Memory) SetTasks(tasks []*task.Task) error { for _, nt := range newTasks { m.tasks[nt.Id] = nt } - m.latestSync = time.Now() + m.latestFetch = time.Now() return nil } @@ -87,6 +91,7 @@ func (m *Memory) SetLocalUpdate(id string, update *task.LocalUpdate) error { func (m *Memory) MarkDispatched(localId int) error { t, _ := m.FindByLocalId(localId) m.tasks[t.Id].LocalStatus = task.STATUS_DISPATCHED + m.latestDispatch = time.Now() return nil } diff --git a/internal/storage/memory_test.go b/internal/storage/memory_test.go index 54c5e6c..4420f42 100644 --- a/internal/storage/memory_test.go +++ b/internal/storage/memory_test.go @@ -48,16 +48,19 @@ func TestMemory(t *testing.T) { localTask3 := &task.LocalTask{Task: *task3, LocalUpdate: emptyUpdate, LocalStatus: task.STATUS_FETCHED} t.Run("sync", func(t *testing.T) { - mem := storage.NewMemory() - latest, err := mem.LatestSync() + mem := storage.NewMemory(task1) + latestFetch, latestDisp, err := mem.LatestSyncs() test.OK(t, err) - test.Assert(t, latest.IsZero(), "lastest was not zero") + test.Assert(t, latestFetch.IsZero(), "latestfetch was not zero") + test.Assert(t, latestDisp.IsZero(), "latestdisp was not zero") start := time.Now() test.OK(t, mem.SetTasks(tasks)) - latest, err = mem.LatestSync() + test.OK(t, mem.MarkDispatched(1)) + latestFetch, latestDisp, err = mem.LatestSyncs() test.OK(t, err) - test.Assert(t, latest.After(start), "latest was not after start") + test.Assert(t, latestFetch.After(start), "latestfetch was not after start") + test.Assert(t, latestDisp.After(start), "latestdisp was not after start") }) t.Run("findallin", func(t *testing.T) { diff --git a/internal/storage/sqlite.go b/internal/storage/sqlite.go index 7589808..2428047 100644 --- a/internal/storage/sqlite.go +++ b/internal/storage/sqlite.go @@ -27,6 +27,9 @@ var sqliteMigrations = []sqliteMigration{ `DROP TABLE local_task`, `ALTER TABLE task ADD COLUMN local_status TEXT`, `UPDATE task SET local_status = "fetched"`, + `DROP TABLE system`, + `CREATE TABLE system ("latest_fetch" INTEGER, "latest_dispatch" INTEGER)`, + `INSERT INTO system (latest_fetch, latest_dispatch) VALUES (0, 0)`, } var ( @@ -62,20 +65,20 @@ func NewSqlite(conf *SqliteConfig) (*Sqlite, error) { return s, nil } -func (s *Sqlite) LatestSync() (time.Time, error) { - rows, err := s.db.Query(`SELECT strftime('%s', latest_sync) FROM system`) +func (s *Sqlite) LatestSyncs() (time.Time, time.Time, error) { + rows, err := s.db.Query(`SELECT strftime('%s', latest_fetch), strftime('%s', latest_dispatch) FROM system`) if err != nil { - return time.Time{}, fmt.Errorf("%w: %v", ErrSqliteFailure, err) + return time.Time{}, time.Time{}, fmt.Errorf("%w: %v", ErrSqliteFailure, err) } defer rows.Close() rows.Next() - var latest int64 - if err := rows.Scan(&latest); err != nil { - return time.Time{}, fmt.Errorf("%w: %v", ErrSqliteFailure, err) + var latest_fetch, latest_dispatch int64 + if err := rows.Scan(&latest_fetch, &latest_dispatch); err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("%w: %v", ErrSqliteFailure, err) } - return time.Unix(latest, 0), nil + return time.Unix(latest_fetch, 0), time.Unix(latest_dispatch, 0), nil } func (s *Sqlite) SetTasks(tasks []*task.Task) error { @@ -112,7 +115,7 @@ VALUES } } - if _, err := s.db.Exec(`UPDATE system SET latest_sync=DATETIME('now')`); err != nil { + if _, err := s.db.Exec(`UPDATE system SET latest_fetch=DATETIME('now')`); err != nil { return fmt.Errorf("%w: %v", ErrSqliteFailure, err) } @@ -221,6 +224,13 @@ SET local_status = ? WHERE local_id = ?`, task.STATUS_DISPATCHED, localId); err != nil { return fmt.Errorf("%w: %v", ErrSqliteFailure, err) } + + if _, err := s.db.Exec(` +UPDATE system +SET latest_dispatch=DATETIME('now')`); err != nil { + return fmt.Errorf("%w: %v", ErrSqliteFailure, err) + } + return nil }