From 89aadbf31a9e251379389874f1753b3efacd0bdf Mon Sep 17 00:00:00 2001 From: Erik Winter Date: Wed, 18 Sep 2024 18:04:27 +0200 Subject: [PATCH] switch to postgres --- Makefile | 4 +- go.mod | 1 + go.sum | 2 + sync/service/main.go | 30 +++---- sync/service/postgres.go | 175 ++++++++++++++++++++++++++++++++++++++ sync/service/sqlite.go | 177 --------------------------------------- 6 files changed, 194 insertions(+), 195 deletions(-) create mode 100644 sync/service/postgres.go delete mode 100644 sync/service/sqlite.go diff --git a/Makefile b/Makefile index 5eefc71..630adca 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,6 @@ sync-run: - cd sync/service && PLANNER_DB_PATH=test.db PLANNER_PORT=8092 PLANNER_API_KEY=testKey go run . + cd sync/service && PLANNER_DB_HOST=localhost PLANNER_DB_PORT=5432 PLANNER_DB_NAME=planner PLANNER_DB_USER=test PLANNER_DB_PASSWORD=test PLANNER_PORT=8092 PLANNER_API_KEY=testKey go run . +database: + docker run -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test -e POSTGRES_DB=planner -p 5432:5432 postgres:16 diff --git a/go.mod b/go.mod index a5d95fa..59e2029 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect diff --git a/go.sum b/go.sum index db23520..b99f05c 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= diff --git a/sync/service/main.go b/sync/service/main.go index 963cf5e..fbd5f08 100644 --- a/sync/service/main.go +++ b/sync/service/main.go @@ -6,28 +6,23 @@ import ( "net/http" "os" "os/signal" - "strconv" "syscall" ) func main() { - dbPath := os.Getenv("PLANNER_DB_PATH") - if dbPath == "" { - fmt.Println("PLANNER_DB_PATH is empty") - os.Exit(1) - } - port, err := strconv.Atoi(os.Getenv("PLANNER_PORT")) - if err != nil { - fmt.Println("PLANNER_PORT env is not an integer") - os.Exit(1) - } + port := os.Getenv("PLANNER_PORT") apiKey := os.Getenv("PLANNER_API_KEY") if apiKey == "" { fmt.Println("PLANNER_API_KEY is empty") os.Exit(1) } - repo, err := NewSqlite(dbPath) + dbHost := os.Getenv("PLANNER_DB_HOST") + dbPort := os.Getenv("PLANNER_DB_PORT") + dbName := os.Getenv("PLANNER_DB_NAME") + dbUser := os.Getenv("PLANNER_DB_USER") + dbPassword := os.Getenv("PLANNER_DB_PASSWORD") + repo, err := NewPostgres(dbHost, dbPort, dbName, dbUser, dbPassword) if err != nil { fmt.Printf("could not open sqlite db: %s", err.Error()) os.Exit(1) @@ -35,14 +30,15 @@ func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) logger.Info("configuration", "configuration", map[string]string{ - "dbPath": dbPath, - "port": fmt.Sprintf("%d", port), - "apiKey": "***", + "port": port, + "dbHost": dbHost, + "dbPort": dbPort, + "dbName": dbName, + "dbUser": dbUser, }) - address := fmt.Sprintf(":%d", port) srv := NewServer(repo, apiKey, logger) - go http.ListenAndServe(address, srv) + go http.ListenAndServe(fmt.Sprintf(":%s", port), srv) logger.Info("service started") diff --git a/sync/service/postgres.go b/sync/service/postgres.go new file mode 100644 index 0000000..6c27df7 --- /dev/null +++ b/sync/service/postgres.go @@ -0,0 +1,175 @@ +package main + +import ( + "database/sql" + "errors" + "fmt" + "strings" + "time" + + _ "github.com/lib/pq" + "go-mod.ewintr.nl/planner/sync/item" +) + +const ( + timestampFormat = "2006-01-02 15:04:05" +) + +var migrations = []string{ + `CREATE TABLE items (id TEXT PRIMARY KEY, kind TEXT, updated TIMESTAMP, deleted BOOLEAN, body TEXT)`, + `CREATE INDEX idx_items_updated ON items(updated)`, + `CREATE INDEX idx_items_kind ON items(kind)`, +} + +var ( + ErrInvalidConfiguration = errors.New("invalid configuration") + ErrIncompatibleSQLMigration = errors.New("incompatible migration") + ErrNotEnoughSQLMigrations = errors.New("already more migrations than wanted") + ErrPostgresFailure = errors.New("postgres returned an error") +) + +type Postgres struct { + db *sql.DB +} + +func NewPostgres(host, port, dbname, user, password string) (*Postgres, error) { + connStr := fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", host, port, dbname, user, password) + + db, err := sql.Open("postgres", connStr) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err) + } + + // Test the connection + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err) + } + + p := &Postgres{ + db: db, + } + + if err := p.migrate(migrations); err != nil { + return nil, err + } + + return p, nil +} + +func (p *Postgres) Update(item item.Item) error { + _, err := p.db.Exec(` + INSERT INTO items (id, kind, updated, deleted, body) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (id) DO UPDATE + SET kind = EXCLUDED.kind, + updated = EXCLUDED.updated, + deleted = EXCLUDED.deleted, + body = EXCLUDED.body`, + item.ID, item.Kind, item.Updated, item.Deleted, item.Body) + if err != nil { + return fmt.Errorf("%w: %v", ErrPostgresFailure, err) + } + return nil +} + +func (p *Postgres) Updated(ks []item.Kind, t time.Time) ([]item.Item, error) { + query := ` + SELECT id, kind, updated, deleted, body + FROM items + WHERE updated > $1` + args := []interface{}{t} + + if len(ks) > 0 { + placeholder := make([]string, len(ks)) + for i := range ks { + placeholder[i] = fmt.Sprintf("$%d", i+2) + args = append(args, string(ks[i])) + } + query += fmt.Sprintf(" AND kind = ANY(ARRAY[%s])", strings.Join(placeholder, ",")) + } + + rows, err := p.db.Query(query, args...) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err) + } + defer rows.Close() + + result := make([]item.Item, 0) + for rows.Next() { + var item item.Item + if err := rows.Scan(&item.ID, &item.Kind, &item.Updated, &item.Deleted, &item.Body); err != nil { + return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err) + } + result = append(result, item) + } + + return result, nil +} + +func (p *Postgres) migrate(wanted []string) error { + // Create migration table if not exists + _, err := p.db.Exec(` + CREATE TABLE IF NOT EXISTS migration + (id SERIAL PRIMARY KEY, query TEXT) + `) + if err != nil { + return fmt.Errorf("%w: %v", ErrPostgresFailure, err) + } + + // Find existing migrations + rows, err := p.db.Query(`SELECT query FROM migration ORDER BY id`) + if err != nil { + return fmt.Errorf("%w: %v", ErrPostgresFailure, err) + } + defer rows.Close() + + var existing []string + for rows.Next() { + var query string + if err := rows.Scan(&query); err != nil { + return fmt.Errorf("%w: %v", ErrPostgresFailure, err) + } + existing = append(existing, query) + } + + // Compare and execute missing migrations + missing, err := compareMigrations(wanted, existing) + if err != nil { + return fmt.Errorf("%w: %v", ErrPostgresFailure, err) + } + + for _, query := range missing { + if _, err := p.db.Exec(query); err != nil { + return fmt.Errorf("%w: %v", ErrPostgresFailure, err) + } + + // Register migration + if _, err := p.db.Exec(` + INSERT INTO migration (query) VALUES ($1) + `, query); err != nil { + return fmt.Errorf("%w: %v", ErrPostgresFailure, err) + } + } + + return nil +} + +func compareMigrations(wanted, existing []string) ([]string, error) { + var needed []string + if len(wanted) < len(existing) { + return nil, ErrNotEnoughSQLMigrations + } + + for i, want := range wanted { + switch { + case i >= len(existing): + needed = append(needed, want) + case want == existing[i]: + // do nothing + case want != existing[i]: + return nil, fmt.Errorf("%w: %v", ErrIncompatibleSQLMigration, want) + } + } + + return needed, nil +} diff --git a/sync/service/sqlite.go b/sync/service/sqlite.go deleted file mode 100644 index 3359748..0000000 --- a/sync/service/sqlite.go +++ /dev/null @@ -1,177 +0,0 @@ -package main - -import ( - "database/sql" - "errors" - "fmt" - "strings" - "time" - - "go-mod.ewintr.nl/planner/sync/item" - _ "modernc.org/sqlite" -) - -const ( - timestampFormat = "2006-01-02 15:04:05" -) - -var migrations = []string{ - `CREATE TABLE items ("id" TEXT UNIQUE, "kind" TEXT, "updated" TIMESTAMP, "deleted" INTEGER, "body" TEXT)`, - `PRAGMA journal_mode=WAL`, - `PRAGMA synchronous=NORMAL`, - `PRAGMA cache_size=2000`, -} - -var ( - ErrInvalidConfiguration = errors.New("invalid configuration") - ErrIncompatibleSQLMigration = errors.New("incompatible migration") - ErrNotEnoughSQLMigrations = errors.New("already more migrations than wanted") - ErrSqliteFailure = errors.New("sqlite returned an error") -) - -type Sqlite struct { - db *sql.DB -} - -func NewSqlite(dbPath string) (*Sqlite, error) { - db, err := sql.Open("sqlite", dbPath) - if err != nil { - return &Sqlite{}, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err) - } - - s := &Sqlite{ - db: db, - } - - if err := s.migrate(migrations); err != nil { - return &Sqlite{}, err - } - - return s, nil -} - -func (s *Sqlite) Update(item item.Item) error { - if _, err := s.db.Exec(` -INSERT INTO items -(id, kind, updated, deleted, body) -VALUES -(?, ?, ?, ?, ?) -ON CONFLICT(id) DO UPDATE -SET -kind=excluded.kind, -updated=excluded.updated, -deleted=excluded.deleted, -body=excluded.body`, - item.ID, item.Kind, item.Updated.Format(timestampFormat), item.Deleted, item.Body); err != nil { - return fmt.Errorf("%w: %v", ErrSqliteFailure, err) - } - return nil -} - -func (s *Sqlite) Updated(ks []item.Kind, t time.Time) ([]item.Item, error) { - query := ` -SELECT id, kind, updated, deleted, body -FROM items -WHERE updated > ?` - var rows *sql.Rows - var err error - if len(ks) == 0 { - rows, err = s.db.Query(query, t.Format(timestampFormat)) - if err != nil { - return nil, fmt.Errorf("%w: %v", ErrSqliteFailure, err) - } - } else { - args := []any{t.Format(timestampFormat)} - ph := make([]string, 0, len(ks)) - for _, k := range ks { - args = append(args, string(k)) - ph = append(ph, "?") - } - query = fmt.Sprintf("%s AND kind in (%s)", query, strings.Join(ph, ",")) - rows, err = s.db.Query(query, args...) - if err != nil { - return nil, fmt.Errorf("%w: %v", ErrSqliteFailure, err) - } - } - - result := make([]item.Item, 0) - defer rows.Close() - for rows.Next() { - var item item.Item - if err := rows.Scan(&item.ID, &item.Kind, &item.Updated, &item.Deleted, &item.Body); err != nil { - return nil, fmt.Errorf("%w: %v", ErrSqliteFailure, err) - } - result = append(result, item) - } - - return result, nil -} - -func (s *Sqlite) migrate(wanted []string) error { - // admin table - if _, err := s.db.Exec(` -CREATE TABLE IF NOT EXISTS migration -("id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, "query" TEXT) -`); err != nil { - return err - } - - // find existing - rows, err := s.db.Query(`SELECT query FROM migration ORDER BY id`) - if err != nil { - return fmt.Errorf("%w: %v", ErrSqliteFailure, err) - } - - existing := []string{} - for rows.Next() { - var query string - if err := rows.Scan(&query); err != nil { - return fmt.Errorf("%w: %v", ErrSqliteFailure, err) - } - existing = append(existing, string(query)) - } - rows.Close() - - // compare - missing, err := compareMigrations(wanted, existing) - if err != nil { - return fmt.Errorf("%w: %v", ErrSqliteFailure, err) - } - - // execute missing - for _, query := range missing { - if _, err := s.db.Exec(string(query)); err != nil { - return fmt.Errorf("%w: %v", ErrSqliteFailure, err) - } - - // register - if _, err := s.db.Exec(` -INSERT INTO migration -(query) VALUES (?) -`, query); err != nil { - return fmt.Errorf("%w: %v", ErrSqliteFailure, err) - } - } - - return nil -} - -func compareMigrations(wanted, existing []string) ([]string, error) { - needed := []string{} - if len(wanted) < len(existing) { - return []string{}, ErrNotEnoughSQLMigrations - } - - for i, want := range wanted { - switch { - case i >= len(existing): - needed = append(needed, want) - case want == existing[i]: - // do nothing - case want != existing[i]: - return []string{}, fmt.Errorf("%w: %v", ErrIncompatibleSQLMigration, want) - } - } - - return needed, nil -}