mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-02 20:40:52 +08:00
feat(store): add PostgreSQL-backed config store with env selection
This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
configaccess "github.com/router-for-me/CLIProxyAPI/v6/internal/access/config_access"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/cmd"
|
||||
@@ -101,6 +102,12 @@ func main() {
|
||||
var cfg *config.Config
|
||||
var isCloudDeploy bool
|
||||
var (
|
||||
usePostgresStore bool
|
||||
pgStoreDSN string
|
||||
pgStoreSchema string
|
||||
pgStoreConfigKey string
|
||||
pgStoreCacheDir string
|
||||
pgStoreInst *store.PostgresStore
|
||||
gitStoreLocalPath string
|
||||
useGitStore bool
|
||||
gitStoreRemoteURL string
|
||||
@@ -125,6 +132,22 @@ func main() {
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
if value, ok := lookupEnv("PGSTORE_DSN", "pgstore_dsn"); ok {
|
||||
usePostgresStore = true
|
||||
pgStoreDSN = value
|
||||
}
|
||||
if usePostgresStore {
|
||||
if value, ok := lookupEnv("PGSTORE_SCHEMA", "pgstore_schema"); ok {
|
||||
pgStoreSchema = value
|
||||
}
|
||||
if value, ok := lookupEnv("PGSTORE_CONFIG_KEY", "pgstore_config_key"); ok {
|
||||
pgStoreConfigKey = value
|
||||
}
|
||||
if value, ok := lookupEnv("PGSTORE_CACHE_DIR", "pgstore_cache_dir"); ok {
|
||||
pgStoreCacheDir = value
|
||||
}
|
||||
useGitStore = false
|
||||
}
|
||||
if value, ok := lookupEnv("GITSTORE_GIT_URL", "gitstore_git_url"); ok {
|
||||
useGitStore = true
|
||||
gitStoreRemoteURL = value
|
||||
@@ -147,13 +170,42 @@ func main() {
|
||||
}
|
||||
|
||||
// Determine and load the configuration file.
|
||||
// If gitstore is configured, load from the cloned repository; otherwise use the provided path or default.
|
||||
// Prefer the Postgres store when configured, otherwise fallback to git or local files.
|
||||
var configFilePath string
|
||||
if useGitStore {
|
||||
if usePostgresStore {
|
||||
if pgStoreCacheDir == "" {
|
||||
pgStoreCacheDir = wd
|
||||
}
|
||||
pgStoreCacheDir = filepath.Join(pgStoreCacheDir, "pgstore")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
pgStoreInst, err = store.NewPostgresStore(ctx, store.PostgresStoreConfig{
|
||||
DSN: pgStoreDSN,
|
||||
Schema: pgStoreSchema,
|
||||
ConfigKey: pgStoreConfigKey,
|
||||
SpoolDir: pgStoreCacheDir,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to initialize postgres token store: %v", err)
|
||||
}
|
||||
examplePath := filepath.Join(wd, "config.example.yaml")
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
|
||||
if errBootstrap := pgStoreInst.Bootstrap(ctx, examplePath); errBootstrap != nil {
|
||||
cancel()
|
||||
log.Fatalf("failed to bootstrap postgres-backed config: %v", errBootstrap)
|
||||
}
|
||||
cancel()
|
||||
configFilePath = pgStoreInst.ConfigPath()
|
||||
cfg, err = config.LoadConfigOptional(configFilePath, isCloudDeploy)
|
||||
if err == nil {
|
||||
cfg.AuthDir = pgStoreInst.AuthDir()
|
||||
log.Infof("postgres-backed token store enabled, workspace path: %s", pgStoreInst.WorkDir())
|
||||
}
|
||||
} else if useGitStore {
|
||||
if gitStoreLocalPath == "" {
|
||||
gitStoreLocalPath = wd
|
||||
}
|
||||
gitStoreRoot = filepath.Join(gitStoreLocalPath, "remote")
|
||||
gitStoreRoot = filepath.Join(gitStoreLocalPath, "gitstore")
|
||||
authDir := filepath.Join(gitStoreRoot, "auths")
|
||||
gitStoreInst = store.NewGitTokenStore(gitStoreRemoteURL, gitStoreUser, gitStorePassword)
|
||||
gitStoreInst.SetBaseDir(authDir)
|
||||
@@ -172,7 +224,7 @@ func main() {
|
||||
if errCopy := misc.CopyConfigTemplate(examplePath, configFilePath); errCopy != nil {
|
||||
log.Fatalf("failed to bootstrap git-backed config: %v", errCopy)
|
||||
}
|
||||
if errCommit := gitStoreInst.CommitConfig(context.Background()); errCommit != nil {
|
||||
if errCommit := gitStoreInst.PersistConfig(context.Background()); errCommit != nil {
|
||||
log.Fatalf("failed to commit initial git-backed config: %v", errCommit)
|
||||
}
|
||||
log.Infof("git-backed config initialized from template: %s", configFilePath)
|
||||
@@ -245,7 +297,9 @@ func main() {
|
||||
}
|
||||
|
||||
// Register the shared token store once so all components use the same persistence backend.
|
||||
if useGitStore {
|
||||
if usePostgresStore {
|
||||
sdkAuth.RegisterTokenStore(pgStoreInst)
|
||||
} else if useGitStore {
|
||||
sdkAuth.RegisterTokenStore(gitStoreInst)
|
||||
} else {
|
||||
sdkAuth.RegisterTokenStore(sdkAuth.NewFileTokenStore())
|
||||
|
||||
5
go.mod
5
go.mod
@@ -7,6 +7,7 @@ require (
|
||||
github.com/gin-gonic/gin v1.10.1
|
||||
github.com/go-git/go-git/v6 v6.0.0-20251009132922-75a182125145
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/jackc/pgx/v5 v5.7.6
|
||||
github.com/klauspost/compress v1.17.3
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
|
||||
@@ -39,6 +40,9 @@ require (
|
||||
github.com/go-playground/validator/v10 v10.20.0 // indirect
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/kevinburke/ssh_config v1.4.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||
@@ -54,6 +58,7 @@ require (
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/ugorji/go/codec v1.2.12 // indirect
|
||||
golang.org/x/arch v0.8.0 // indirect
|
||||
golang.org/x/sync v0.17.0 // indirect
|
||||
golang.org/x/sys v0.37.0 // indirect
|
||||
golang.org/x/text v0.30.0 // indirect
|
||||
google.golang.org/protobuf v1.34.1 // indirect
|
||||
|
||||
10
go.sum
10
go.sum
@@ -62,6 +62,14 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||
github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
|
||||
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/kevinburke/ssh_config v1.4.0 h1:6xxtP5bZ2E4NF5tuQulISpTO2z8XbtH8cg1PWkxoFkQ=
|
||||
@@ -137,6 +145,8 @@ golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
|
||||
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
|
||||
golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
|
||||
golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
|
||||
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
||||
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
|
||||
|
||||
@@ -359,9 +359,9 @@ func (s *GitTokenStore) Delete(_ context.Context, id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// CommitPaths commits and pushes the provided paths to the remote repository.
|
||||
// PersistAuthFiles commits and pushes the provided paths to the remote repository.
|
||||
// It no-ops when the store is not fully configured or when there are no paths.
|
||||
func (s *GitTokenStore) CommitPaths(_ context.Context, message string, paths ...string) error {
|
||||
func (s *GitTokenStore) PersistAuthFiles(_ context.Context, message string, paths ...string) error {
|
||||
if len(paths) == 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -652,8 +652,8 @@ func (s *GitTokenStore) rewriteHeadAsSingleCommit(repo *git.Repository, branch p
|
||||
return nil
|
||||
}
|
||||
|
||||
// CommitConfig commits and pushes configuration changes to git.
|
||||
func (s *GitTokenStore) CommitConfig(_ context.Context) error {
|
||||
// PersistConfig commits and pushes configuration changes to git.
|
||||
func (s *GitTokenStore) PersistConfig(_ context.Context) error {
|
||||
if err := s.EnsureRepository(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
658
internal/store/postgresstore.go
Normal file
658
internal/store/postgresstore.go
Normal file
@@ -0,0 +1,658 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "github.com/jackc/pgx/v5/stdlib"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/misc"
|
||||
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultConfigTable = "config_store"
|
||||
defaultAuthTable = "auth_store"
|
||||
defaultConfigKey = "default"
|
||||
)
|
||||
|
||||
// PostgresStoreConfig captures configuration required to initialize a Postgres-backed store.
|
||||
type PostgresStoreConfig struct {
|
||||
DSN string
|
||||
Schema string
|
||||
ConfigTable string
|
||||
AuthTable string
|
||||
ConfigKey string
|
||||
SpoolDir string
|
||||
}
|
||||
|
||||
// PostgresStore persists configuration and authentication metadata using PostgreSQL as backend
|
||||
// while mirroring data to a local workspace so existing file-based workflows continue to operate.
|
||||
type PostgresStore struct {
|
||||
db *sql.DB
|
||||
cfg PostgresStoreConfig
|
||||
spoolRoot string
|
||||
configPath string
|
||||
authDir string
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewPostgresStore establishes a connection to PostgreSQL and prepares the local workspace.
|
||||
func NewPostgresStore(ctx context.Context, cfg PostgresStoreConfig) (*PostgresStore, error) {
|
||||
trimmedDSN := strings.TrimSpace(cfg.DSN)
|
||||
if trimmedDSN == "" {
|
||||
return nil, fmt.Errorf("postgres store: DSN is required")
|
||||
}
|
||||
cfg.DSN = trimmedDSN
|
||||
if cfg.ConfigTable == "" {
|
||||
cfg.ConfigTable = defaultConfigTable
|
||||
}
|
||||
if cfg.AuthTable == "" {
|
||||
cfg.AuthTable = defaultAuthTable
|
||||
}
|
||||
if cfg.ConfigKey == "" {
|
||||
cfg.ConfigKey = defaultConfigKey
|
||||
}
|
||||
|
||||
spoolRoot := strings.TrimSpace(cfg.SpoolDir)
|
||||
if spoolRoot == "" {
|
||||
if cwd, err := os.Getwd(); err == nil {
|
||||
spoolRoot = filepath.Join(cwd, "pgstore")
|
||||
} else {
|
||||
spoolRoot = filepath.Join(os.TempDir(), "pgstore")
|
||||
}
|
||||
}
|
||||
absSpool, err := filepath.Abs(spoolRoot)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("postgres store: resolve spool directory: %w", err)
|
||||
}
|
||||
configDir := filepath.Join(absSpool, "config")
|
||||
authDir := filepath.Join(absSpool, "auths")
|
||||
if err = os.MkdirAll(configDir, 0o700); err != nil {
|
||||
return nil, fmt.Errorf("postgres store: create config directory: %w", err)
|
||||
}
|
||||
if err = os.MkdirAll(authDir, 0o700); err != nil {
|
||||
return nil, fmt.Errorf("postgres store: create auth directory: %w", err)
|
||||
}
|
||||
|
||||
db, err := sql.Open("pgx", cfg.DSN)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("postgres store: open database connection: %w", err)
|
||||
}
|
||||
if err = db.PingContext(ctx); err != nil {
|
||||
_ = db.Close()
|
||||
return nil, fmt.Errorf("postgres store: ping database: %w", err)
|
||||
}
|
||||
|
||||
store := &PostgresStore{
|
||||
db: db,
|
||||
cfg: cfg,
|
||||
spoolRoot: absSpool,
|
||||
configPath: filepath.Join(configDir, "config.yaml"),
|
||||
authDir: authDir,
|
||||
}
|
||||
return store, nil
|
||||
}
|
||||
|
||||
// Close releases the underlying database connection.
|
||||
func (s *PostgresStore) Close() error {
|
||||
if s == nil || s.db == nil {
|
||||
return nil
|
||||
}
|
||||
return s.db.Close()
|
||||
}
|
||||
|
||||
// EnsureSchema creates the required tables (and schema when provided).
|
||||
func (s *PostgresStore) EnsureSchema(ctx context.Context) error {
|
||||
if s == nil || s.db == nil {
|
||||
return fmt.Errorf("postgres store: not initialized")
|
||||
}
|
||||
if schema := strings.TrimSpace(s.cfg.Schema); schema != "" {
|
||||
query := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", quoteIdentifier(schema))
|
||||
if _, err := s.db.ExecContext(ctx, query); err != nil {
|
||||
return fmt.Errorf("postgres store: create schema: %w", err)
|
||||
}
|
||||
}
|
||||
configTable := s.fullTableName(s.cfg.ConfigTable)
|
||||
if _, err := s.db.ExecContext(ctx, fmt.Sprintf(`
|
||||
CREATE TABLE IF NOT EXISTS %s (
|
||||
id TEXT PRIMARY KEY,
|
||||
content TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
)
|
||||
`, configTable)); err != nil {
|
||||
return fmt.Errorf("postgres store: create config table: %w", err)
|
||||
}
|
||||
authTable := s.fullTableName(s.cfg.AuthTable)
|
||||
if _, err := s.db.ExecContext(ctx, fmt.Sprintf(`
|
||||
CREATE TABLE IF NOT EXISTS %s (
|
||||
id TEXT PRIMARY KEY,
|
||||
content JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
)
|
||||
`, authTable)); err != nil {
|
||||
return fmt.Errorf("postgres store: create auth table: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Bootstrap synchronizes configuration and auth records between PostgreSQL and the local workspace.
|
||||
func (s *PostgresStore) Bootstrap(ctx context.Context, exampleConfigPath string) error {
|
||||
if err := s.EnsureSchema(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.syncConfigFromDatabase(ctx, exampleConfigPath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.syncAuthFromDatabase(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConfigPath returns the managed configuration file path inside the spool directory.
|
||||
func (s *PostgresStore) ConfigPath() string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return s.configPath
|
||||
}
|
||||
|
||||
// AuthDir returns the local directory containing mirrored auth files.
|
||||
func (s *PostgresStore) AuthDir() string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return s.authDir
|
||||
}
|
||||
|
||||
// WorkDir exposes the root spool directory used for mirroring.
|
||||
func (s *PostgresStore) WorkDir() string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return s.spoolRoot
|
||||
}
|
||||
|
||||
// SetBaseDir implements the optional interface used by authenticators; it is a no-op because
|
||||
// the Postgres-backed store controls its own workspace.
|
||||
func (s *PostgresStore) SetBaseDir(string) {}
|
||||
|
||||
// Save persists authentication metadata to disk and PostgreSQL.
|
||||
func (s *PostgresStore) Save(ctx context.Context, auth *cliproxyauth.Auth) (string, error) {
|
||||
if auth == nil {
|
||||
return "", fmt.Errorf("postgres store: auth is nil")
|
||||
}
|
||||
|
||||
path, err := s.resolveAuthPath(auth)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if path == "" {
|
||||
return "", fmt.Errorf("postgres store: missing file path attribute for %s", auth.ID)
|
||||
}
|
||||
|
||||
if auth.Disabled {
|
||||
if _, statErr := os.Stat(path); errors.Is(statErr, fs.ErrNotExist) {
|
||||
return "", nil
|
||||
}
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if err = os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
|
||||
return "", fmt.Errorf("postgres store: create auth directory: %w", err)
|
||||
}
|
||||
|
||||
switch {
|
||||
case auth.Storage != nil:
|
||||
if err = auth.Storage.SaveTokenToFile(path); err != nil {
|
||||
return "", err
|
||||
}
|
||||
case auth.Metadata != nil:
|
||||
raw, errMarshal := json.Marshal(auth.Metadata)
|
||||
if errMarshal != nil {
|
||||
return "", fmt.Errorf("postgres store: marshal metadata: %w", errMarshal)
|
||||
}
|
||||
if existing, errRead := os.ReadFile(path); errRead == nil {
|
||||
if jsonEqual(existing, raw) {
|
||||
return path, nil
|
||||
}
|
||||
} else if errRead != nil && !errors.Is(errRead, fs.ErrNotExist) {
|
||||
return "", fmt.Errorf("postgres store: read existing metadata: %w", errRead)
|
||||
}
|
||||
tmp := path + ".tmp"
|
||||
if errWrite := os.WriteFile(tmp, raw, 0o600); errWrite != nil {
|
||||
return "", fmt.Errorf("postgres store: write temp auth file: %w", errWrite)
|
||||
}
|
||||
if errRename := os.Rename(tmp, path); errRename != nil {
|
||||
return "", fmt.Errorf("postgres store: rename auth file: %w", errRename)
|
||||
}
|
||||
default:
|
||||
return "", fmt.Errorf("postgres store: nothing to persist for %s", auth.ID)
|
||||
}
|
||||
|
||||
if auth.Attributes == nil {
|
||||
auth.Attributes = make(map[string]string)
|
||||
}
|
||||
auth.Attributes["path"] = path
|
||||
|
||||
if strings.TrimSpace(auth.FileName) == "" {
|
||||
auth.FileName = auth.ID
|
||||
}
|
||||
|
||||
relID, err := s.relativeAuthID(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err = s.upsertAuthRecord(ctx, relID, path); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return path, nil
|
||||
}
|
||||
|
||||
// List enumerates all auth records stored in PostgreSQL.
|
||||
func (s *PostgresStore) List(ctx context.Context) ([]*cliproxyauth.Auth, error) {
|
||||
query := fmt.Sprintf("SELECT id, content, created_at, updated_at FROM %s ORDER BY id", s.fullTableName(s.cfg.AuthTable))
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("postgres store: list auth: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
auths := make([]*cliproxyauth.Auth, 0, 32)
|
||||
for rows.Next() {
|
||||
var (
|
||||
id string
|
||||
payload string
|
||||
createdAt time.Time
|
||||
updatedAt time.Time
|
||||
)
|
||||
if err = rows.Scan(&id, &payload, &createdAt, &updatedAt); err != nil {
|
||||
return nil, fmt.Errorf("postgres store: scan auth row: %w", err)
|
||||
}
|
||||
path, errPath := s.absoluteAuthPath(id)
|
||||
if errPath != nil {
|
||||
log.WithError(errPath).Warnf("postgres store: skipping auth %s outside spool", id)
|
||||
continue
|
||||
}
|
||||
metadata := make(map[string]any)
|
||||
if err = json.Unmarshal([]byte(payload), &metadata); err != nil {
|
||||
log.WithError(err).Warnf("postgres store: skipping auth %s with invalid json", id)
|
||||
continue
|
||||
}
|
||||
provider := strings.TrimSpace(valueAsString(metadata["type"]))
|
||||
if provider == "" {
|
||||
provider = "unknown"
|
||||
}
|
||||
attr := map[string]string{"path": path}
|
||||
if email := strings.TrimSpace(valueAsString(metadata["email"])); email != "" {
|
||||
attr["email"] = email
|
||||
}
|
||||
auth := &cliproxyauth.Auth{
|
||||
ID: normalizeAuthID(id),
|
||||
Provider: provider,
|
||||
FileName: normalizeAuthID(id),
|
||||
Label: labelFor(metadata),
|
||||
Status: cliproxyauth.StatusActive,
|
||||
Attributes: attr,
|
||||
Metadata: metadata,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: updatedAt,
|
||||
LastRefreshedAt: time.Time{},
|
||||
NextRefreshAfter: time.Time{},
|
||||
}
|
||||
auths = append(auths, auth)
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("postgres store: iterate auth rows: %w", err)
|
||||
}
|
||||
return auths, nil
|
||||
}
|
||||
|
||||
// Delete removes an auth file and the corresponding database record.
|
||||
func (s *PostgresStore) Delete(ctx context.Context, id string) error {
|
||||
id = strings.TrimSpace(id)
|
||||
if id == "" {
|
||||
return fmt.Errorf("postgres store: id is empty")
|
||||
}
|
||||
path, err := s.resolveDeletePath(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if err = os.Remove(path); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return fmt.Errorf("postgres store: delete auth file: %w", err)
|
||||
}
|
||||
relID, err := s.relativeAuthID(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.deleteAuthRecord(ctx, relID)
|
||||
}
|
||||
|
||||
// PersistAuthFiles stores the provided auth file changes in PostgreSQL.
|
||||
func (s *PostgresStore) PersistAuthFiles(ctx context.Context, _ string, paths ...string) error {
|
||||
if len(paths) == 0 {
|
||||
return nil
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for _, p := range paths {
|
||||
trimmed := strings.TrimSpace(p)
|
||||
if trimmed == "" {
|
||||
continue
|
||||
}
|
||||
relID, err := s.relativeAuthID(trimmed)
|
||||
if err != nil {
|
||||
// Attempt to resolve absolute path under authDir.
|
||||
abs := trimmed
|
||||
if !filepath.IsAbs(abs) {
|
||||
abs = filepath.Join(s.authDir, trimmed)
|
||||
}
|
||||
relID, err = s.relativeAuthID(abs)
|
||||
if err != nil {
|
||||
log.WithError(err).Warnf("postgres store: ignoring auth path %s", trimmed)
|
||||
continue
|
||||
}
|
||||
trimmed = abs
|
||||
}
|
||||
if err = s.syncAuthFile(ctx, relID, trimmed); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PersistConfig mirrors the local configuration file to PostgreSQL.
|
||||
func (s *PostgresStore) PersistConfig(ctx context.Context) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
data, err := os.ReadFile(s.configPath)
|
||||
if err != nil {
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return s.deleteConfigRecord(ctx)
|
||||
}
|
||||
return fmt.Errorf("postgres store: read config file: %w", err)
|
||||
}
|
||||
return s.persistConfig(ctx, data)
|
||||
}
|
||||
|
||||
// syncConfigFromDatabase writes the database-stored config to disk or seeds the database from template.
|
||||
func (s *PostgresStore) syncConfigFromDatabase(ctx context.Context, exampleConfigPath string) error {
|
||||
query := fmt.Sprintf("SELECT content FROM %s WHERE id = $1", s.fullTableName(s.cfg.ConfigTable))
|
||||
var content string
|
||||
err := s.db.QueryRowContext(ctx, query, s.cfg.ConfigKey).Scan(&content)
|
||||
switch {
|
||||
case errors.Is(err, sql.ErrNoRows):
|
||||
if _, errStat := os.Stat(s.configPath); errors.Is(errStat, fs.ErrNotExist) {
|
||||
if exampleConfigPath != "" {
|
||||
if errCopy := misc.CopyConfigTemplate(exampleConfigPath, s.configPath); errCopy != nil {
|
||||
return fmt.Errorf("postgres store: copy example config: %w", errCopy)
|
||||
}
|
||||
} else {
|
||||
if errCreate := os.MkdirAll(filepath.Dir(s.configPath), 0o700); errCreate != nil {
|
||||
return fmt.Errorf("postgres store: prepare config directory: %w", errCreate)
|
||||
}
|
||||
if errWrite := os.WriteFile(s.configPath, []byte{}, 0o600); errWrite != nil {
|
||||
return fmt.Errorf("postgres store: create empty config: %w", errWrite)
|
||||
}
|
||||
}
|
||||
}
|
||||
data, errRead := os.ReadFile(s.configPath)
|
||||
if errRead != nil {
|
||||
return fmt.Errorf("postgres store: read local config: %w", errRead)
|
||||
}
|
||||
if errPersist := s.persistConfig(ctx, data); errPersist != nil {
|
||||
return errPersist
|
||||
}
|
||||
case err != nil:
|
||||
return fmt.Errorf("postgres store: load config from database: %w", err)
|
||||
default:
|
||||
if err = os.MkdirAll(filepath.Dir(s.configPath), 0o700); err != nil {
|
||||
return fmt.Errorf("postgres store: prepare config directory: %w", err)
|
||||
}
|
||||
if err = os.WriteFile(s.configPath, []byte(content), 0o600); err != nil {
|
||||
return fmt.Errorf("postgres store: write config to spool: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncAuthFromDatabase populates the local auth directory from PostgreSQL data.
|
||||
func (s *PostgresStore) syncAuthFromDatabase(ctx context.Context) error {
|
||||
query := fmt.Sprintf("SELECT id, content FROM %s", s.fullTableName(s.cfg.AuthTable))
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("postgres store: load auth from database: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if err = os.RemoveAll(s.authDir); err != nil {
|
||||
return fmt.Errorf("postgres store: reset auth directory: %w", err)
|
||||
}
|
||||
if err = os.MkdirAll(s.authDir, 0o700); err != nil {
|
||||
return fmt.Errorf("postgres store: recreate auth directory: %w", err)
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
id string
|
||||
payload string
|
||||
)
|
||||
if err = rows.Scan(&id, &payload); err != nil {
|
||||
return fmt.Errorf("postgres store: scan auth row: %w", err)
|
||||
}
|
||||
path, errPath := s.absoluteAuthPath(id)
|
||||
if errPath != nil {
|
||||
log.WithError(errPath).Warnf("postgres store: skipping auth %s outside spool", id)
|
||||
continue
|
||||
}
|
||||
if err = os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
|
||||
return fmt.Errorf("postgres store: create auth subdir: %w", err)
|
||||
}
|
||||
if err = os.WriteFile(path, []byte(payload), 0o600); err != nil {
|
||||
return fmt.Errorf("postgres store: write auth file: %w", err)
|
||||
}
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return fmt.Errorf("postgres store: iterate auth rows: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PostgresStore) syncAuthFile(ctx context.Context, relID, path string) error {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return s.deleteAuthRecord(ctx, relID)
|
||||
}
|
||||
return fmt.Errorf("postgres store: read auth file: %w", err)
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return s.deleteAuthRecord(ctx, relID)
|
||||
}
|
||||
return s.persistAuth(ctx, relID, data)
|
||||
}
|
||||
|
||||
func (s *PostgresStore) upsertAuthRecord(ctx context.Context, relID, path string) error {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("postgres store: read auth file: %w", err)
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return s.deleteAuthRecord(ctx, relID)
|
||||
}
|
||||
return s.persistAuth(ctx, relID, data)
|
||||
}
|
||||
|
||||
func (s *PostgresStore) persistAuth(ctx context.Context, relID string, data []byte) error {
|
||||
jsonPayload := json.RawMessage(data)
|
||||
query := fmt.Sprintf(`
|
||||
INSERT INTO %s (id, content, created_at, updated_at)
|
||||
VALUES ($1, $2, NOW(), NOW())
|
||||
ON CONFLICT (id)
|
||||
DO UPDATE SET content = EXCLUDED.content, updated_at = NOW()
|
||||
`, s.fullTableName(s.cfg.AuthTable))
|
||||
if _, err := s.db.ExecContext(ctx, query, relID, jsonPayload); err != nil {
|
||||
return fmt.Errorf("postgres store: upsert auth record: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PostgresStore) deleteAuthRecord(ctx context.Context, relID string) error {
|
||||
query := fmt.Sprintf("DELETE FROM %s WHERE id = $1", s.fullTableName(s.cfg.AuthTable))
|
||||
if _, err := s.db.ExecContext(ctx, query, relID); err != nil {
|
||||
return fmt.Errorf("postgres store: delete auth record: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PostgresStore) persistConfig(ctx context.Context, data []byte) error {
|
||||
query := fmt.Sprintf(`
|
||||
INSERT INTO %s (id, content, created_at, updated_at)
|
||||
VALUES ($1, $2, NOW(), NOW())
|
||||
ON CONFLICT (id)
|
||||
DO UPDATE SET content = EXCLUDED.content, updated_at = NOW()
|
||||
`, s.fullTableName(s.cfg.ConfigTable))
|
||||
if _, err := s.db.ExecContext(ctx, query, s.cfg.ConfigKey, string(data)); err != nil {
|
||||
return fmt.Errorf("postgres store: upsert config: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PostgresStore) deleteConfigRecord(ctx context.Context) error {
|
||||
query := fmt.Sprintf("DELETE FROM %s WHERE id = $1", s.fullTableName(s.cfg.ConfigTable))
|
||||
if _, err := s.db.ExecContext(ctx, query, s.cfg.ConfigKey); err != nil {
|
||||
return fmt.Errorf("postgres store: delete config: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PostgresStore) resolveAuthPath(auth *cliproxyauth.Auth) (string, error) {
|
||||
if auth == nil {
|
||||
return "", fmt.Errorf("postgres store: auth is nil")
|
||||
}
|
||||
if auth.Attributes != nil {
|
||||
if p := strings.TrimSpace(auth.Attributes["path"]); p != "" {
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
if fileName := strings.TrimSpace(auth.FileName); fileName != "" {
|
||||
if filepath.IsAbs(fileName) {
|
||||
return fileName, nil
|
||||
}
|
||||
return filepath.Join(s.authDir, fileName), nil
|
||||
}
|
||||
if auth.ID == "" {
|
||||
return "", fmt.Errorf("postgres store: missing id")
|
||||
}
|
||||
if filepath.IsAbs(auth.ID) {
|
||||
return auth.ID, nil
|
||||
}
|
||||
return filepath.Join(s.authDir, filepath.FromSlash(auth.ID)), nil
|
||||
}
|
||||
|
||||
func (s *PostgresStore) resolveDeletePath(id string) (string, error) {
|
||||
if strings.ContainsRune(id, os.PathSeparator) || filepath.IsAbs(id) {
|
||||
return id, nil
|
||||
}
|
||||
return filepath.Join(s.authDir, filepath.FromSlash(id)), nil
|
||||
}
|
||||
|
||||
func (s *PostgresStore) relativeAuthID(path string) (string, error) {
|
||||
if s == nil {
|
||||
return "", fmt.Errorf("postgres store: store not initialized")
|
||||
}
|
||||
if !filepath.IsAbs(path) {
|
||||
path = filepath.Join(s.authDir, path)
|
||||
}
|
||||
clean := filepath.Clean(path)
|
||||
rel, err := filepath.Rel(s.authDir, clean)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("postgres store: compute relative path: %w", err)
|
||||
}
|
||||
if strings.HasPrefix(rel, "..") {
|
||||
return "", fmt.Errorf("postgres store: path %s outside managed directory", path)
|
||||
}
|
||||
return filepath.ToSlash(rel), nil
|
||||
}
|
||||
|
||||
func (s *PostgresStore) absoluteAuthPath(id string) (string, error) {
|
||||
if s == nil {
|
||||
return "", fmt.Errorf("postgres store: store not initialized")
|
||||
}
|
||||
clean := filepath.Clean(filepath.FromSlash(id))
|
||||
if strings.HasPrefix(clean, "..") {
|
||||
return "", fmt.Errorf("postgres store: invalid auth identifier %s", id)
|
||||
}
|
||||
path := filepath.Join(s.authDir, clean)
|
||||
rel, err := filepath.Rel(s.authDir, path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if strings.HasPrefix(rel, "..") {
|
||||
return "", fmt.Errorf("postgres store: resolved auth path escapes auth directory")
|
||||
}
|
||||
return path, nil
|
||||
}
|
||||
|
||||
func (s *PostgresStore) fullTableName(name string) string {
|
||||
if strings.TrimSpace(s.cfg.Schema) == "" {
|
||||
return quoteIdentifier(name)
|
||||
}
|
||||
return quoteIdentifier(s.cfg.Schema) + "." + quoteIdentifier(name)
|
||||
}
|
||||
|
||||
func quoteIdentifier(identifier string) string {
|
||||
replaced := strings.ReplaceAll(identifier, "\"", "\"\"")
|
||||
return "\"" + replaced + "\""
|
||||
}
|
||||
|
||||
func valueAsString(v any) string {
|
||||
switch t := v.(type) {
|
||||
case string:
|
||||
return t
|
||||
case fmt.Stringer:
|
||||
return t.String()
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
func labelFor(metadata map[string]any) string {
|
||||
if metadata == nil {
|
||||
return ""
|
||||
}
|
||||
if v := strings.TrimSpace(valueAsString(metadata["label"])); v != "" {
|
||||
return v
|
||||
}
|
||||
if v := strings.TrimSpace(valueAsString(metadata["email"])); v != "" {
|
||||
return v
|
||||
}
|
||||
if v := strings.TrimSpace(valueAsString(metadata["project_id"])); v != "" {
|
||||
return v
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func normalizeAuthID(id string) string {
|
||||
return filepath.ToSlash(filepath.Clean(id))
|
||||
}
|
||||
@@ -29,10 +29,10 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// gitCommitter captures the subset of git-backed token store capabilities used by the watcher.
|
||||
type gitCommitter interface {
|
||||
CommitConfig(ctx context.Context) error
|
||||
CommitPaths(ctx context.Context, message string, paths ...string) error
|
||||
// storePersister captures persistence-capable token store methods used by the watcher.
|
||||
type storePersister interface {
|
||||
PersistConfig(ctx context.Context) error
|
||||
PersistAuthFiles(ctx context.Context, message string, paths ...string) error
|
||||
}
|
||||
|
||||
// Watcher manages file watching for configuration and authentication files
|
||||
@@ -52,7 +52,7 @@ type Watcher struct {
|
||||
pendingUpdates map[string]AuthUpdate
|
||||
pendingOrder []string
|
||||
dispatchCancel context.CancelFunc
|
||||
gitCommitter gitCommitter
|
||||
storePersister storePersister
|
||||
oldConfigYaml []byte
|
||||
}
|
||||
|
||||
@@ -126,9 +126,9 @@ func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config))
|
||||
}
|
||||
w.dispatchCond = sync.NewCond(&w.dispatchMu)
|
||||
if store := sdkAuth.GetTokenStore(); store != nil {
|
||||
if committer, ok := store.(gitCommitter); ok {
|
||||
w.gitCommitter = committer
|
||||
log.Debug("gitstore mode detected; watcher will commit changes to remote repository")
|
||||
if persister, ok := store.(storePersister); ok {
|
||||
w.storePersister = persister
|
||||
log.Debug("persistence-capable token store detected; watcher will propagate persisted changes")
|
||||
}
|
||||
}
|
||||
return w, nil
|
||||
@@ -345,21 +345,21 @@ func (w *Watcher) stopDispatch() {
|
||||
w.clientsMutex.Unlock()
|
||||
}
|
||||
|
||||
func (w *Watcher) commitConfigAsync() {
|
||||
if w == nil || w.gitCommitter == nil {
|
||||
func (w *Watcher) persistConfigAsync() {
|
||||
if w == nil || w.storePersister == nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
if err := w.gitCommitter.CommitConfig(ctx); err != nil {
|
||||
log.Errorf("failed to commit config change: %v", err)
|
||||
if err := w.storePersister.PersistConfig(ctx); err != nil {
|
||||
log.Errorf("failed to persist config change: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (w *Watcher) commitAuthAsync(message string, paths ...string) {
|
||||
if w == nil || w.gitCommitter == nil {
|
||||
func (w *Watcher) persistAuthAsync(message string, paths ...string) {
|
||||
if w == nil || w.storePersister == nil {
|
||||
return
|
||||
}
|
||||
filtered := make([]string, 0, len(paths))
|
||||
@@ -374,8 +374,8 @@ func (w *Watcher) commitAuthAsync(message string, paths ...string) {
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
if err := w.gitCommitter.CommitPaths(ctx, message, filtered...); err != nil {
|
||||
log.Errorf("failed to commit auth changes: %v", err)
|
||||
if err := w.storePersister.PersistAuthFiles(ctx, message, filtered...); err != nil {
|
||||
log.Errorf("failed to persist auth changes: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -484,7 +484,7 @@ func (w *Watcher) handleEvent(event fsnotify.Event) {
|
||||
w.clientsMutex.Lock()
|
||||
w.lastConfigHash = finalHash
|
||||
w.clientsMutex.Unlock()
|
||||
w.commitConfigAsync()
|
||||
w.persistConfigAsync()
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -683,7 +683,7 @@ func (w *Watcher) addOrUpdateClient(path string) {
|
||||
log.Debugf("triggering server update callback after add/update")
|
||||
w.reloadCallback(cfg)
|
||||
}
|
||||
w.commitAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
|
||||
w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
|
||||
}
|
||||
|
||||
// removeClient handles the removal of a single client.
|
||||
@@ -701,7 +701,7 @@ func (w *Watcher) removeClient(path string) {
|
||||
log.Debugf("triggering server update callback after removal")
|
||||
w.reloadCallback(cfg)
|
||||
}
|
||||
w.commitAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
|
||||
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
|
||||
}
|
||||
|
||||
// SnapshotCombinedClients returns a snapshot of current combined clients.
|
||||
|
||||
Reference in New Issue
Block a user