switch to postgres
This commit is contained in:
parent
671deb355a
commit
89aadbf31a
4
Makefile
4
Makefile
|
@ -1,4 +1,6 @@
|
|||
|
||||
sync-run:
|
||||
cd sync/service && PLANNER_DB_PATH=test.db PLANNER_PORT=8092 PLANNER_API_KEY=testKey go run .
|
||||
cd sync/service && PLANNER_DB_HOST=localhost PLANNER_DB_PORT=5432 PLANNER_DB_NAME=planner PLANNER_DB_USER=test PLANNER_DB_PASSWORD=test PLANNER_PORT=8092 PLANNER_API_KEY=testKey go run .
|
||||
|
||||
database:
|
||||
docker run -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test -e POSTGRES_DB=planner -p 5432:5432 postgres:16
|
||||
|
|
1
go.mod
1
go.mod
|
@ -6,6 +6,7 @@ require (
|
|||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
|
||||
github.com/lib/pq v1.10.9 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -4,6 +4,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
|||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||
|
|
|
@ -6,28 +6,23 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dbPath := os.Getenv("PLANNER_DB_PATH")
|
||||
if dbPath == "" {
|
||||
fmt.Println("PLANNER_DB_PATH is empty")
|
||||
os.Exit(1)
|
||||
}
|
||||
port, err := strconv.Atoi(os.Getenv("PLANNER_PORT"))
|
||||
if err != nil {
|
||||
fmt.Println("PLANNER_PORT env is not an integer")
|
||||
os.Exit(1)
|
||||
}
|
||||
port := os.Getenv("PLANNER_PORT")
|
||||
apiKey := os.Getenv("PLANNER_API_KEY")
|
||||
if apiKey == "" {
|
||||
fmt.Println("PLANNER_API_KEY is empty")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
repo, err := NewSqlite(dbPath)
|
||||
dbHost := os.Getenv("PLANNER_DB_HOST")
|
||||
dbPort := os.Getenv("PLANNER_DB_PORT")
|
||||
dbName := os.Getenv("PLANNER_DB_NAME")
|
||||
dbUser := os.Getenv("PLANNER_DB_USER")
|
||||
dbPassword := os.Getenv("PLANNER_DB_PASSWORD")
|
||||
repo, err := NewPostgres(dbHost, dbPort, dbName, dbUser, dbPassword)
|
||||
if err != nil {
|
||||
fmt.Printf("could not open sqlite db: %s", err.Error())
|
||||
os.Exit(1)
|
||||
|
@ -35,14 +30,15 @@ func main() {
|
|||
|
||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
||||
logger.Info("configuration", "configuration", map[string]string{
|
||||
"dbPath": dbPath,
|
||||
"port": fmt.Sprintf("%d", port),
|
||||
"apiKey": "***",
|
||||
"port": port,
|
||||
"dbHost": dbHost,
|
||||
"dbPort": dbPort,
|
||||
"dbName": dbName,
|
||||
"dbUser": dbUser,
|
||||
})
|
||||
|
||||
address := fmt.Sprintf(":%d", port)
|
||||
srv := NewServer(repo, apiKey, logger)
|
||||
go http.ListenAndServe(address, srv)
|
||||
go http.ListenAndServe(fmt.Sprintf(":%s", port), srv)
|
||||
|
||||
logger.Info("service started")
|
||||
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
"go-mod.ewintr.nl/planner/sync/item"
|
||||
)
|
||||
|
||||
const (
|
||||
timestampFormat = "2006-01-02 15:04:05"
|
||||
)
|
||||
|
||||
var migrations = []string{
|
||||
`CREATE TABLE items (id TEXT PRIMARY KEY, kind TEXT, updated TIMESTAMP, deleted BOOLEAN, body TEXT)`,
|
||||
`CREATE INDEX idx_items_updated ON items(updated)`,
|
||||
`CREATE INDEX idx_items_kind ON items(kind)`,
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidConfiguration = errors.New("invalid configuration")
|
||||
ErrIncompatibleSQLMigration = errors.New("incompatible migration")
|
||||
ErrNotEnoughSQLMigrations = errors.New("already more migrations than wanted")
|
||||
ErrPostgresFailure = errors.New("postgres returned an error")
|
||||
)
|
||||
|
||||
type Postgres struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func NewPostgres(host, port, dbname, user, password string) (*Postgres, error) {
|
||||
connStr := fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", host, port, dbname, user, password)
|
||||
|
||||
db, err := sql.Open("postgres", connStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err)
|
||||
}
|
||||
|
||||
// Test the connection
|
||||
if err := db.Ping(); err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err)
|
||||
}
|
||||
|
||||
p := &Postgres{
|
||||
db: db,
|
||||
}
|
||||
|
||||
if err := p.migrate(migrations); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *Postgres) Update(item item.Item) error {
|
||||
_, err := p.db.Exec(`
|
||||
INSERT INTO items (id, kind, updated, deleted, body)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET kind = EXCLUDED.kind,
|
||||
updated = EXCLUDED.updated,
|
||||
deleted = EXCLUDED.deleted,
|
||||
body = EXCLUDED.body`,
|
||||
item.ID, item.Kind, item.Updated, item.Deleted, item.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Postgres) Updated(ks []item.Kind, t time.Time) ([]item.Item, error) {
|
||||
query := `
|
||||
SELECT id, kind, updated, deleted, body
|
||||
FROM items
|
||||
WHERE updated > $1`
|
||||
args := []interface{}{t}
|
||||
|
||||
if len(ks) > 0 {
|
||||
placeholder := make([]string, len(ks))
|
||||
for i := range ks {
|
||||
placeholder[i] = fmt.Sprintf("$%d", i+2)
|
||||
args = append(args, string(ks[i]))
|
||||
}
|
||||
query += fmt.Sprintf(" AND kind = ANY(ARRAY[%s])", strings.Join(placeholder, ","))
|
||||
}
|
||||
|
||||
rows, err := p.db.Query(query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := make([]item.Item, 0)
|
||||
for rows.Next() {
|
||||
var item item.Item
|
||||
if err := rows.Scan(&item.ID, &item.Kind, &item.Updated, &item.Deleted, &item.Body); err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
result = append(result, item)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (p *Postgres) migrate(wanted []string) error {
|
||||
// Create migration table if not exists
|
||||
_, err := p.db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS migration
|
||||
(id SERIAL PRIMARY KEY, query TEXT)
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
|
||||
// Find existing migrations
|
||||
rows, err := p.db.Query(`SELECT query FROM migration ORDER BY id`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var existing []string
|
||||
for rows.Next() {
|
||||
var query string
|
||||
if err := rows.Scan(&query); err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
existing = append(existing, query)
|
||||
}
|
||||
|
||||
// Compare and execute missing migrations
|
||||
missing, err := compareMigrations(wanted, existing)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
|
||||
for _, query := range missing {
|
||||
if _, err := p.db.Exec(query); err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
|
||||
// Register migration
|
||||
if _, err := p.db.Exec(`
|
||||
INSERT INTO migration (query) VALUES ($1)
|
||||
`, query); err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrPostgresFailure, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func compareMigrations(wanted, existing []string) ([]string, error) {
|
||||
var needed []string
|
||||
if len(wanted) < len(existing) {
|
||||
return nil, ErrNotEnoughSQLMigrations
|
||||
}
|
||||
|
||||
for i, want := range wanted {
|
||||
switch {
|
||||
case i >= len(existing):
|
||||
needed = append(needed, want)
|
||||
case want == existing[i]:
|
||||
// do nothing
|
||||
case want != existing[i]:
|
||||
return nil, fmt.Errorf("%w: %v", ErrIncompatibleSQLMigration, want)
|
||||
}
|
||||
}
|
||||
|
||||
return needed, nil
|
||||
}
|
|
@ -1,177 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go-mod.ewintr.nl/planner/sync/item"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
const (
|
||||
timestampFormat = "2006-01-02 15:04:05"
|
||||
)
|
||||
|
||||
var migrations = []string{
|
||||
`CREATE TABLE items ("id" TEXT UNIQUE, "kind" TEXT, "updated" TIMESTAMP, "deleted" INTEGER, "body" TEXT)`,
|
||||
`PRAGMA journal_mode=WAL`,
|
||||
`PRAGMA synchronous=NORMAL`,
|
||||
`PRAGMA cache_size=2000`,
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidConfiguration = errors.New("invalid configuration")
|
||||
ErrIncompatibleSQLMigration = errors.New("incompatible migration")
|
||||
ErrNotEnoughSQLMigrations = errors.New("already more migrations than wanted")
|
||||
ErrSqliteFailure = errors.New("sqlite returned an error")
|
||||
)
|
||||
|
||||
type Sqlite struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func NewSqlite(dbPath string) (*Sqlite, error) {
|
||||
db, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
return &Sqlite{}, fmt.Errorf("%w: %v", ErrInvalidConfiguration, err)
|
||||
}
|
||||
|
||||
s := &Sqlite{
|
||||
db: db,
|
||||
}
|
||||
|
||||
if err := s.migrate(migrations); err != nil {
|
||||
return &Sqlite{}, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Sqlite) Update(item item.Item) error {
|
||||
if _, err := s.db.Exec(`
|
||||
INSERT INTO items
|
||||
(id, kind, updated, deleted, body)
|
||||
VALUES
|
||||
(?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE
|
||||
SET
|
||||
kind=excluded.kind,
|
||||
updated=excluded.updated,
|
||||
deleted=excluded.deleted,
|
||||
body=excluded.body`,
|
||||
item.ID, item.Kind, item.Updated.Format(timestampFormat), item.Deleted, item.Body); err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Sqlite) Updated(ks []item.Kind, t time.Time) ([]item.Item, error) {
|
||||
query := `
|
||||
SELECT id, kind, updated, deleted, body
|
||||
FROM items
|
||||
WHERE updated > ?`
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if len(ks) == 0 {
|
||||
rows, err = s.db.Query(query, t.Format(timestampFormat))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrSqliteFailure, err)
|
||||
}
|
||||
} else {
|
||||
args := []any{t.Format(timestampFormat)}
|
||||
ph := make([]string, 0, len(ks))
|
||||
for _, k := range ks {
|
||||
args = append(args, string(k))
|
||||
ph = append(ph, "?")
|
||||
}
|
||||
query = fmt.Sprintf("%s AND kind in (%s)", query, strings.Join(ph, ","))
|
||||
rows, err = s.db.Query(query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrSqliteFailure, err)
|
||||
}
|
||||
}
|
||||
|
||||
result := make([]item.Item, 0)
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var item item.Item
|
||||
if err := rows.Scan(&item.ID, &item.Kind, &item.Updated, &item.Deleted, &item.Body); err != nil {
|
||||
return nil, fmt.Errorf("%w: %v", ErrSqliteFailure, err)
|
||||
}
|
||||
result = append(result, item)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Sqlite) migrate(wanted []string) error {
|
||||
// admin table
|
||||
if _, err := s.db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS migration
|
||||
("id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, "query" TEXT)
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// find existing
|
||||
rows, err := s.db.Query(`SELECT query FROM migration ORDER BY id`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
|
||||
}
|
||||
|
||||
existing := []string{}
|
||||
for rows.Next() {
|
||||
var query string
|
||||
if err := rows.Scan(&query); err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
|
||||
}
|
||||
existing = append(existing, string(query))
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
// compare
|
||||
missing, err := compareMigrations(wanted, existing)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
|
||||
}
|
||||
|
||||
// execute missing
|
||||
for _, query := range missing {
|
||||
if _, err := s.db.Exec(string(query)); err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
|
||||
}
|
||||
|
||||
// register
|
||||
if _, err := s.db.Exec(`
|
||||
INSERT INTO migration
|
||||
(query) VALUES (?)
|
||||
`, query); err != nil {
|
||||
return fmt.Errorf("%w: %v", ErrSqliteFailure, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func compareMigrations(wanted, existing []string) ([]string, error) {
|
||||
needed := []string{}
|
||||
if len(wanted) < len(existing) {
|
||||
return []string{}, ErrNotEnoughSQLMigrations
|
||||
}
|
||||
|
||||
for i, want := range wanted {
|
||||
switch {
|
||||
case i >= len(existing):
|
||||
needed = append(needed, want)
|
||||
case want == existing[i]:
|
||||
// do nothing
|
||||
case want != existing[i]:
|
||||
return []string{}, fmt.Errorf("%w: %v", ErrIncompatibleSQLMigration, want)
|
||||
}
|
||||
}
|
||||
|
||||
return needed, nil
|
||||
}
|
Loading…
Reference in New Issue