recur in sync
This commit is contained in:
parent
f6f4946c91
commit
3c210cb172
|
@ -22,6 +22,7 @@ type Item struct {
|
|||
Kind Kind `json:"kind"`
|
||||
Updated time.Time `json:"updated"`
|
||||
Deleted bool `json:"deleted"`
|
||||
Date Date `json:"date"`
|
||||
Recurrer Recurrer `json:"recurrer"`
|
||||
RecurNext Date `json:"recurNext"`
|
||||
Body string `json:"body"`
|
||||
|
|
|
@ -4,11 +4,11 @@ import (
|
|||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Recurrer interface {
|
||||
RecursOn(date Date) bool
|
||||
First() Date
|
||||
String() string
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,16 @@ func NewRecurrer(recurStr string) Recurrer {
|
|||
return nil
|
||||
}
|
||||
|
||||
func FirstRecurAfter(r Recurrer, d Date) Date {
|
||||
lim := NewDate(2050, 1, 1)
|
||||
for {
|
||||
if r.RecursOn(d) || d.Equal(lim) {
|
||||
return d
|
||||
}
|
||||
d = d.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
type Daily struct {
|
||||
Start Date
|
||||
}
|
||||
|
@ -63,6 +73,8 @@ func (d Daily) RecursOn(date Date) bool {
|
|||
return date.Equal(d.Start) || date.After(d.Start)
|
||||
}
|
||||
|
||||
func (d Daily) First() Date { return FirstRecurAfter(d, d.Start) }
|
||||
|
||||
func (d Daily) String() string {
|
||||
return fmt.Sprintf("%s, daily", d.Start.String())
|
||||
}
|
||||
|
@ -112,6 +124,8 @@ func (nd EveryNDays) RecursOn(date Date) bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (nd EveryNDays) First() Date { return FirstRecurAfter(nd, nd.Start) }
|
||||
|
||||
func (nd EveryNDays) String() string {
|
||||
return fmt.Sprintf("%s, every %d days", nd.Start.String(), nd.N)
|
||||
}
|
||||
|
@ -163,6 +177,8 @@ func (w Weekly) RecursOn(date Date) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (w Weekly) First() Date { return FirstRecurAfter(w, w.Start) }
|
||||
|
||||
func (w Weekly) String() string {
|
||||
weekdayStrs := []string{}
|
||||
for _, wd := range w.Weekdays {
|
||||
|
@ -173,11 +189,6 @@ func (w Weekly) String() string {
|
|||
return fmt.Sprintf("%s, weekly, %s", w.Start.String(), strings.ToLower(weekdayStr))
|
||||
}
|
||||
|
||||
type Biweekly struct {
|
||||
Start Date
|
||||
Weekday time.Weekday
|
||||
}
|
||||
|
||||
type EveryNWeeks struct {
|
||||
Start Date
|
||||
N int
|
||||
|
@ -216,6 +227,8 @@ func (enw EveryNWeeks) RecursOn(date Date) bool {
|
|||
return enw.Start.DaysBetween(date)%intervalDays == 0
|
||||
}
|
||||
|
||||
func (enw EveryNWeeks) First() Date { return FirstRecurAfter(enw, enw.Start) }
|
||||
|
||||
func (enw EveryNWeeks) String() string {
|
||||
return fmt.Sprintf("%s, every %d weeks", enw.Start.String(), enw.N)
|
||||
}
|
||||
|
@ -264,6 +277,8 @@ func (enm EveryNMonths) RecursOn(date Date) bool {
|
|||
|
||||
}
|
||||
|
||||
func (enm EveryNMonths) First() Date { return FirstRecurAfter(enm, enm.Start) }
|
||||
|
||||
func (enm EveryNMonths) String() string {
|
||||
return fmt.Sprintf("%s, every %d months", enm.Start.String(), enm.N)
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package main
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
@ -21,6 +20,13 @@ var migrations = []string{
|
|||
`CREATE INDEX idx_items_updated ON items(updated)`,
|
||||
`CREATE INDEX idx_items_kind ON items(kind)`,
|
||||
`ALTER TABLE items ADD COLUMN recurrer JSONB, ADD COLUMN recur_next TIMESTAMP`,
|
||||
`ALTER TABLE items ALTER COLUMN recurrer TYPE TEXT USING recurrer::TEXT,
|
||||
ALTER COLUMN recurrer SET NOT NULL,
|
||||
ALTER COLUMN recurrer SET DEFAULT ''`,
|
||||
`ALTER TABLE items ALTER COLUMN recur_next TYPE TEXT USING TO_CHAR(recur_next, 'YYYY-MM-DD'),
|
||||
ALTER COLUMN recur_next SET NOT NULL,
|
||||
ALTER COLUMN recur_next SET DEFAULT ''`,
|
||||
`ALTER TABLE items ADD COLUMN date TEXT NOT NULL DEFAULT ''`,
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -36,13 +42,10 @@ type Postgres struct {
|
|||
|
||||
func NewPostgres(host, port, dbname, user, password string) (*Postgres, error) {
|
||||
connStr := fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", host, port, dbname, user, password)
|
||||
|
||||
db, err := sql.Open("postgres", connStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err)
|
||||
}
|
||||
|
||||
// Test the connection
|
||||
if err := db.Ping(); err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err)
|
||||
}
|
||||
|
@ -59,30 +62,21 @@ func NewPostgres(host, port, dbname, user, password string) (*Postgres, error) {
|
|||
}
|
||||
|
||||
func (p *Postgres) Update(i item.Item, ts time.Time) error {
|
||||
var recurrerJSON any
|
||||
if i.Recurrer != nil {
|
||||
var err error
|
||||
recurrerJSON, err = json.Marshal(i.Recurrer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
i.RecurNext = i.Recurrer.Start
|
||||
} else {
|
||||
recurrerJSON = nil
|
||||
if i.Recurrer != nil && i.RecurNext.IsZero() {
|
||||
i.RecurNext = i.Recurrer.First()
|
||||
}
|
||||
|
||||
_, err := p.db.Exec(`
|
||||
INSERT INTO items (id, kind, updated, deleted, body, recurrer, recur_next)
|
||||
if _, err := p.db.Exec(`
|
||||
INSERT INTO items (id, kind, updated, deleted, date, recurrer, recur_next, body)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET kind = EXCLUDED.kind,
|
||||
updated = EXCLUDED.updated,
|
||||
deleted = EXCLUDED.deleted,
|
||||
body = EXCLUDED.body,
|
||||
date = EXCLUDED.date
|
||||
recurrer = EXCLUDED.recurrer,
|
||||
recur_next = EXCLUDED.recur_next`,
|
||||
i.ID, i.Kind, ts, i.Deleted, i.Body, recurrerJSON, i.RecurNext)
|
||||
if err != nil {
|
||||
recur_next = EXCLUDED.recur_next,
|
||||
body = EXCLUDED.bodyi`,
|
||||
i.ID, i.Kind, ts, i.Deleted, i.Date.String(), i.Recurrer.String(), i.RecurNext.String(), i.Body); err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
return nil
|
||||
|
@ -90,11 +84,10 @@ func (p *Postgres) Update(i item.Item, ts time.Time) error {
|
|||
|
||||
func (p *Postgres) Updated(ks []item.Kind, t time.Time) ([]item.Item, error) {
|
||||
query := `
|
||||
SELECT id, kind, updated, deleted, body, recurrer, recur_next
|
||||
SELECT id, kind, updated, deleted, date, recurrer, recur_next, body
|
||||
FROM items
|
||||
WHERE updated > $1`
|
||||
args := []interface{}{t}
|
||||
|
||||
if len(ks) > 0 {
|
||||
placeholder := make([]string, len(ks))
|
||||
for i := range ks {
|
||||
|
@ -113,34 +106,25 @@ func (p *Postgres) Updated(ks []item.Kind, t time.Time) ([]item.Item, error) {
|
|||
result := make([]item.Item, 0)
|
||||
for rows.Next() {
|
||||
var i item.Item
|
||||
var recurNext sql.NullTime
|
||||
var recurrerJSON sql.NullString
|
||||
if err := rows.Scan(&i.ID, &i.Kind, &i.Updated, &i.Deleted, &i.Body, &recurrerJSON, &recurNext); err != nil {
|
||||
var date, recurrer, recurNext string
|
||||
if err := rows.Scan(&i.ID, &i.Kind, &i.Updated, &i.Deleted, &date, &recurrer, &recurNext, &i.Body); err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
if recurrerJSON.Valid && recurrerJSON.String != "" {
|
||||
var recurrer item.Recur
|
||||
if err := json.Unmarshal([]byte(recurrerJSON.String), &recurrer); err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
i.Recurrer = &recurrer
|
||||
}
|
||||
if recurNext.Valid {
|
||||
i.RecurNext = recurNext.Time
|
||||
}
|
||||
i.Date = item.NewDateFromString(date)
|
||||
i.Recurrer = item.NewRecurrer(recurrer)
|
||||
i.RecurNext = item.NewDateFromString(recurNext)
|
||||
result = append(result, i)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (p *Postgres) RecursBefore(date time.Time) ([]item.Item, error) {
|
||||
func (p *Postgres) ShouldRecur(date item.Date) ([]item.Item, error) {
|
||||
query := `
|
||||
SELECT id, kind, updated, deleted, body, recurrer, recur_next
|
||||
SELECT id, kind, updated, deleted, date, recurrer, recur_next, body
|
||||
FROM items
|
||||
WHERE recur_next <= $1 AND recurrer IS NOT NULL`
|
||||
|
||||
rows, err := p.db.Query(query, date)
|
||||
WHERE recurrer <> '' AND recur_next <= $1 `
|
||||
rows, err := p.db.Query(query, date.String())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
|
@ -149,61 +133,19 @@ func (p *Postgres) RecursBefore(date time.Time) ([]item.Item, error) {
|
|||
result := make([]item.Item, 0)
|
||||
for rows.Next() {
|
||||
var i item.Item
|
||||
var recurNext sql.NullTime
|
||||
var recurrerJSON sql.NullString
|
||||
if err := rows.Scan(&i.ID, &i.Kind, &i.Updated, &i.Deleted, &i.Body, &recurrerJSON, &recurNext); err != nil {
|
||||
var date, recurrer, recurNext string
|
||||
if err := rows.Scan(&i.ID, &i.Kind, &i.Updated, &i.Deleted, &i.Body, &recurrer, &recurNext); err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
if recurrerJSON.Valid && recurrerJSON.String != "" {
|
||||
var recurrer item.Recur
|
||||
if err := json.Unmarshal([]byte(recurrerJSON.String), &recurrer); err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
i.Recurrer = &recurrer
|
||||
}
|
||||
if recurNext.Valid {
|
||||
i.RecurNext = recurNext.Time
|
||||
}
|
||||
i.Date = item.NewDateFromString(date)
|
||||
i.Recurrer = item.NewRecurrer(recurrer)
|
||||
i.RecurNext = item.NewDateFromString(recurNext)
|
||||
result = append(result, i)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (p *Postgres) RecursNext(id string, date time.Time, ts time.Time) error {
|
||||
var recurrer *item.Recur
|
||||
err := p.db.QueryRow(`
|
||||
SELECT recurrer
|
||||
FROM items
|
||||
WHERE id = $1`, id).Scan(&recurrer)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return ErrNotFound
|
||||
}
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
|
||||
if recurrer == nil {
|
||||
return ErrNotARecurrer
|
||||
}
|
||||
|
||||
// Verify that the new date is actually a valid recurrence
|
||||
if !recurrer.On(date) {
|
||||
return fmt.Errorf("%w: date %v is not a valid recurrence", ErrPostgresFailure, date)
|
||||
}
|
||||
|
||||
_, err = p.db.Exec(`
|
||||
UPDATE items
|
||||
SET recur_next = $1,
|
||||
updated = $2
|
||||
WHERE id = $3`, date, ts, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Postgres) migrate(wanted []string) error {
|
||||
// Create migration table if not exists
|
||||
_, err := p.db.Exec(`
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
|
@ -37,41 +36,30 @@ func (r *Recur) Run(interval time.Duration) {
|
|||
|
||||
func (r *Recur) Recur() error {
|
||||
r.logger.Info("start looking for recurring items")
|
||||
items, err := r.repoRecur.RecursBefore(time.Now())
|
||||
today := item.NewDateFromString(time.Now().Format(item.DateFormat))
|
||||
items, err := r.repoRecur.ShouldRecur(today)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.logger.Info("found recurring items", "count", len(items))
|
||||
for _, i := range items {
|
||||
r.logger.Info("processing recurring item", "item", fmt.Sprintf("%+v", i))
|
||||
r.logger.Info("processing recurring item", "id", i.ID)
|
||||
// spawn instance
|
||||
ne, err := item.NewEvent(i)
|
||||
if err != nil {
|
||||
newItem := i
|
||||
newItem.ID = uuid.New().String()
|
||||
newItem.Date = i.RecurNext
|
||||
newItem.Recurrer = nil
|
||||
newItem.RecurNext = item.Date{}
|
||||
if err := r.repoSync.Update(newItem, time.Now()); err != nil {
|
||||
return err
|
||||
}
|
||||
r.logger.Info("processing recurring event", "event", fmt.Sprintf("%+v", ne))
|
||||
y, m, d := i.RecurNext.Date()
|
||||
ne.ID = uuid.New().String()
|
||||
ne.Recurrer = nil
|
||||
ne.RecurNext = time.Time{}
|
||||
ne.Start = time.Date(y, m, d, ne.Start.Hour(), ne.Start.Minute(), 0, 0, time.UTC)
|
||||
r.logger.Info("created instance of recurring event", "event", fmt.Sprintf("%+v", ne))
|
||||
|
||||
ni, err := ne.Item()
|
||||
if err != nil {
|
||||
// update recurrer
|
||||
i.RecurNext = item.FirstRecurAfter(i.Recurrer, i.RecurNext)
|
||||
if err := r.repoSync.Update(i, time.Now()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.repoSync.Update(ni, time.Now()); err != nil {
|
||||
return err
|
||||
}
|
||||
r.logger.Info("storen instance of recurring event", "recEventID", ne.ID, "instanceID", ni.ID)
|
||||
|
||||
// set next
|
||||
next := i.Recurrer.NextAfter(i.RecurNext)
|
||||
if err := r.repoRecur.RecursNext(i.ID, next, time.Now()); err != nil {
|
||||
return err
|
||||
}
|
||||
r.logger.Info("updated recur date", "recEventID", ne.ID, "next", next)
|
||||
r.logger.Info("recurring item processed", "id", i.ID, "recurNext", i.RecurNext.String())
|
||||
}
|
||||
r.logger.Info("processed recurring items", "count", len(items))
|
||||
|
||||
|
|
|
@ -18,6 +18,5 @@ type Syncer interface {
|
|||
}
|
||||
|
||||
type Recurrer interface {
|
||||
RecursBefore(date time.Time) ([]item.Item, error)
|
||||
RecursNext(id string, date time.Time, t time.Time) error
|
||||
ShouldRecur(date item.Date) ([]item.Item, error)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue