diff --git a/item/item.go b/item/item.go index 3c62061..fba8090 100644 --- a/item/item.go +++ b/item/item.go @@ -22,6 +22,7 @@ type Item struct { Kind Kind `json:"kind"` Updated time.Time `json:"updated"` Deleted bool `json:"deleted"` + Date Date `json:"date"` Recurrer Recurrer `json:"recurrer"` RecurNext Date `json:"recurNext"` Body string `json:"body"` diff --git a/item/recur.go b/item/recur.go index dea7aa7..a8e4735 100644 --- a/item/recur.go +++ b/item/recur.go @@ -4,11 +4,11 @@ import ( "fmt" "strconv" "strings" - "time" ) type Recurrer interface { RecursOn(date Date) bool + First() Date String() string } @@ -40,6 +40,16 @@ func NewRecurrer(recurStr string) Recurrer { return nil } +func FirstRecurAfter(r Recurrer, d Date) Date { + lim := NewDate(2050, 1, 1) + for { + if r.RecursOn(d) || d.Equal(lim) { + return d + } + d = d.Add(1) + } +} + type Daily struct { Start Date } @@ -63,6 +73,8 @@ func (d Daily) RecursOn(date Date) bool { return date.Equal(d.Start) || date.After(d.Start) } +func (d Daily) First() Date { return FirstRecurAfter(d, d.Start) } + func (d Daily) String() string { return fmt.Sprintf("%s, daily", d.Start.String()) } @@ -112,6 +124,8 @@ func (nd EveryNDays) RecursOn(date Date) bool { } } +func (nd EveryNDays) First() Date { return FirstRecurAfter(nd, nd.Start) } + func (nd EveryNDays) String() string { return fmt.Sprintf("%s, every %d days", nd.Start.String(), nd.N) } @@ -163,6 +177,8 @@ func (w Weekly) RecursOn(date Date) bool { return false } +func (w Weekly) First() Date { return FirstRecurAfter(w, w.Start) } + func (w Weekly) String() string { weekdayStrs := []string{} for _, wd := range w.Weekdays { @@ -173,11 +189,6 @@ func (w Weekly) String() string { return fmt.Sprintf("%s, weekly, %s", w.Start.String(), strings.ToLower(weekdayStr)) } -type Biweekly struct { - Start Date - Weekday time.Weekday -} - type EveryNWeeks struct { Start Date N int @@ -216,6 +227,8 @@ func (enw EveryNWeeks) RecursOn(date Date) bool { return enw.Start.DaysBetween(date)%intervalDays == 0 } +func (enw EveryNWeeks) First() Date { return FirstRecurAfter(enw, enw.Start) } + func (enw EveryNWeeks) String() string { return fmt.Sprintf("%s, every %d weeks", enw.Start.String(), enw.N) } @@ -264,6 +277,8 @@ func (enm EveryNMonths) RecursOn(date Date) bool { } +func (enm EveryNMonths) First() Date { return FirstRecurAfter(enm, enm.Start) } + func (enm EveryNMonths) String() string { return fmt.Sprintf("%s, every %d months", enm.Start.String(), enm.N) } diff --git a/sync/service/postgres.go b/sync/service/postgres.go index 0a3660b..1b51759 100644 --- a/sync/service/postgres.go +++ b/sync/service/postgres.go @@ -2,7 +2,6 @@ package main import ( "database/sql" - "encoding/json" "errors" "fmt" "strings" @@ -21,6 +20,13 @@ var migrations = []string{ `CREATE INDEX idx_items_updated ON items(updated)`, `CREATE INDEX idx_items_kind ON items(kind)`, `ALTER TABLE items ADD COLUMN recurrer JSONB, ADD COLUMN recur_next TIMESTAMP`, + `ALTER TABLE items ALTER COLUMN recurrer TYPE TEXT USING recurrer::TEXT, + ALTER COLUMN recurrer SET NOT NULL, + ALTER COLUMN recurrer SET DEFAULT ''`, + `ALTER TABLE items ALTER COLUMN recur_next TYPE TEXT USING TO_CHAR(recur_next, 'YYYY-MM-DD'), + ALTER COLUMN recur_next SET NOT NULL, + ALTER COLUMN recur_next SET DEFAULT ''`, + `ALTER TABLE items ADD COLUMN date TEXT NOT NULL DEFAULT ''`, } var ( @@ -36,13 +42,10 @@ type Postgres struct { func NewPostgres(host, port, dbname, user, password string) (*Postgres, error) { connStr := fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", host, port, dbname, user, password) - db, err := sql.Open("postgres", connStr) if err != nil { return nil, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err) } - - // Test the connection if err := db.Ping(); err != nil { return nil, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err) } @@ -59,30 +62,21 @@ func NewPostgres(host, port, dbname, user, password string) (*Postgres, error) { } func (p *Postgres) Update(i item.Item, ts time.Time) error { - var recurrerJSON any - if i.Recurrer != nil { - var err error - recurrerJSON, err = json.Marshal(i.Recurrer) - if err != nil { - return fmt.Errorf("%w: %v", ErrPostgresFailure, err) - } - i.RecurNext = i.Recurrer.Start - } else { - recurrerJSON = nil + if i.Recurrer != nil && i.RecurNext.IsZero() { + i.RecurNext = i.Recurrer.First() } - - _, err := p.db.Exec(` - INSERT INTO items (id, kind, updated, deleted, body, recurrer, recur_next) + if _, err := p.db.Exec(` + INSERT INTO items (id, kind, updated, deleted, date, recurrer, recur_next, body) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO UPDATE SET kind = EXCLUDED.kind, updated = EXCLUDED.updated, deleted = EXCLUDED.deleted, - body = EXCLUDED.body, + date = EXCLUDED.date recurrer = EXCLUDED.recurrer, - recur_next = EXCLUDED.recur_next`, - i.ID, i.Kind, ts, i.Deleted, i.Body, recurrerJSON, i.RecurNext) - if err != nil { + recur_next = EXCLUDED.recur_next, + body = EXCLUDED.bodyi`, + i.ID, i.Kind, ts, i.Deleted, i.Date.String(), i.Recurrer.String(), i.RecurNext.String(), i.Body); err != nil { return fmt.Errorf("%w: %v", ErrPostgresFailure, err) } return nil @@ -90,11 +84,10 @@ func (p *Postgres) Update(i item.Item, ts time.Time) error { func (p *Postgres) Updated(ks []item.Kind, t time.Time) ([]item.Item, error) { query := ` - SELECT id, kind, updated, deleted, body, recurrer, recur_next + SELECT id, kind, updated, deleted, date, recurrer, recur_next, body FROM items WHERE updated > $1` args := []interface{}{t} - if len(ks) > 0 { placeholder := make([]string, len(ks)) for i := range ks { @@ -113,34 +106,25 @@ func (p *Postgres) Updated(ks []item.Kind, t time.Time) ([]item.Item, error) { result := make([]item.Item, 0) for rows.Next() { var i item.Item - var recurNext sql.NullTime - var recurrerJSON sql.NullString - if err := rows.Scan(&i.ID, &i.Kind, &i.Updated, &i.Deleted, &i.Body, &recurrerJSON, &recurNext); err != nil { + var date, recurrer, recurNext string + if err := rows.Scan(&i.ID, &i.Kind, &i.Updated, &i.Deleted, &date, &recurrer, &recurNext, &i.Body); err != nil { return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err) } - if recurrerJSON.Valid && recurrerJSON.String != "" { - var recurrer item.Recur - if err := json.Unmarshal([]byte(recurrerJSON.String), &recurrer); err != nil { - return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err) - } - i.Recurrer = &recurrer - } - if recurNext.Valid { - i.RecurNext = recurNext.Time - } + i.Date = item.NewDateFromString(date) + i.Recurrer = item.NewRecurrer(recurrer) + i.RecurNext = item.NewDateFromString(recurNext) result = append(result, i) } return result, nil } -func (p *Postgres) RecursBefore(date time.Time) ([]item.Item, error) { +func (p *Postgres) ShouldRecur(date item.Date) ([]item.Item, error) { query := ` - SELECT id, kind, updated, deleted, body, recurrer, recur_next + SELECT id, kind, updated, deleted, date, recurrer, recur_next, body FROM items - WHERE recur_next <= $1 AND recurrer IS NOT NULL` - - rows, err := p.db.Query(query, date) + WHERE recurrer <> '' AND recur_next <= $1 ` + rows, err := p.db.Query(query, date.String()) if err != nil { return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err) } @@ -149,61 +133,19 @@ func (p *Postgres) RecursBefore(date time.Time) ([]item.Item, error) { result := make([]item.Item, 0) for rows.Next() { var i item.Item - var recurNext sql.NullTime - var recurrerJSON sql.NullString - if err := rows.Scan(&i.ID, &i.Kind, &i.Updated, &i.Deleted, &i.Body, &recurrerJSON, &recurNext); err != nil { + var date, recurrer, recurNext string + if err := rows.Scan(&i.ID, &i.Kind, &i.Updated, &i.Deleted, &i.Body, &recurrer, &recurNext); err != nil { return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err) } - if recurrerJSON.Valid && recurrerJSON.String != "" { - var recurrer item.Recur - if err := json.Unmarshal([]byte(recurrerJSON.String), &recurrer); err != nil { - return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err) - } - i.Recurrer = &recurrer - } - if recurNext.Valid { - i.RecurNext = recurNext.Time - } + i.Date = item.NewDateFromString(date) + i.Recurrer = item.NewRecurrer(recurrer) + i.RecurNext = item.NewDateFromString(recurNext) result = append(result, i) } return result, nil } -func (p *Postgres) RecursNext(id string, date time.Time, ts time.Time) error { - var recurrer *item.Recur - err := p.db.QueryRow(` - SELECT recurrer - FROM items - WHERE id = $1`, id).Scan(&recurrer) - if err != nil { - if err == sql.ErrNoRows { - return ErrNotFound - } - return fmt.Errorf("%w: %v", ErrPostgresFailure, err) - } - - if recurrer == nil { - return ErrNotARecurrer - } - - // Verify that the new date is actually a valid recurrence - if !recurrer.On(date) { - return fmt.Errorf("%w: date %v is not a valid recurrence", ErrPostgresFailure, date) - } - - _, err = p.db.Exec(` - UPDATE items - SET recur_next = $1, - updated = $2 - WHERE id = $3`, date, ts, id) - if err != nil { - return fmt.Errorf("%w: %v", ErrPostgresFailure, err) - } - - return nil -} - func (p *Postgres) migrate(wanted []string) error { // Create migration table if not exists _, err := p.db.Exec(` diff --git a/sync/service/recur.go b/sync/service/recur.go index c0e3ee3..c48d9d1 100644 --- a/sync/service/recur.go +++ b/sync/service/recur.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "log/slog" "time" @@ -37,41 +36,30 @@ func (r *Recur) Run(interval time.Duration) { func (r *Recur) Recur() error { r.logger.Info("start looking for recurring items") - items, err := r.repoRecur.RecursBefore(time.Now()) + today := item.NewDateFromString(time.Now().Format(item.DateFormat)) + items, err := r.repoRecur.ShouldRecur(today) if err != nil { return err } r.logger.Info("found recurring items", "count", len(items)) for _, i := range items { - r.logger.Info("processing recurring item", "item", fmt.Sprintf("%+v", i)) + r.logger.Info("processing recurring item", "id", i.ID) // spawn instance - ne, err := item.NewEvent(i) - if err != nil { + newItem := i + newItem.ID = uuid.New().String() + newItem.Date = i.RecurNext + newItem.Recurrer = nil + newItem.RecurNext = item.Date{} + if err := r.repoSync.Update(newItem, time.Now()); err != nil { return err } - r.logger.Info("processing recurring event", "event", fmt.Sprintf("%+v", ne)) - y, m, d := i.RecurNext.Date() - ne.ID = uuid.New().String() - ne.Recurrer = nil - ne.RecurNext = time.Time{} - ne.Start = time.Date(y, m, d, ne.Start.Hour(), ne.Start.Minute(), 0, 0, time.UTC) - r.logger.Info("created instance of recurring event", "event", fmt.Sprintf("%+v", ne)) - ni, err := ne.Item() - if err != nil { + // update recurrer + i.RecurNext = item.FirstRecurAfter(i.Recurrer, i.RecurNext) + if err := r.repoSync.Update(i, time.Now()); err != nil { return err } - if err := r.repoSync.Update(ni, time.Now()); err != nil { - return err - } - r.logger.Info("storen instance of recurring event", "recEventID", ne.ID, "instanceID", ni.ID) - - // set next - next := i.Recurrer.NextAfter(i.RecurNext) - if err := r.repoRecur.RecursNext(i.ID, next, time.Now()); err != nil { - return err - } - r.logger.Info("updated recur date", "recEventID", ne.ID, "next", next) + r.logger.Info("recurring item processed", "id", i.ID, "recurNext", i.RecurNext.String()) } r.logger.Info("processed recurring items", "count", len(items)) diff --git a/sync/service/storage.go b/sync/service/storage.go index 7240bbb..d05dc74 100644 --- a/sync/service/storage.go +++ b/sync/service/storage.go @@ -18,6 +18,5 @@ type Syncer interface { } type Recurrer interface { - RecursBefore(date time.Time) ([]item.Item, error) - RecursNext(id string, date time.Time, t time.Time) error + ShouldRecur(date item.Date) ([]item.Item, error) }