From 42087d53871c53f771cbe7fb1385d27907e06f8f Mon Sep 17 00:00:00 2001 From: hkfires <10558748+hkfires@users.noreply.github.com> Date: Mon, 13 Oct 2025 21:05:43 +0800 Subject: [PATCH] feat(store): add PostgreSQL-backed config store with env selection --- cmd/server/main.go | 64 +++- go.mod | 5 + go.sum | 10 + internal/store/gitstore.go | 8 +- internal/store/postgresstore.go | 658 ++++++++++++++++++++++++++++++++ internal/watcher/watcher.go | 38 +- 6 files changed, 755 insertions(+), 28 deletions(-) create mode 100644 internal/store/postgresstore.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 7ec1f6ae..6690ac64 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "strings" + "time" configaccess "github.com/router-for-me/CLIProxyAPI/v6/internal/access/config_access" "github.com/router-for-me/CLIProxyAPI/v6/internal/cmd" @@ -101,6 +102,12 @@ func main() { var cfg *config.Config var isCloudDeploy bool var ( + usePostgresStore bool + pgStoreDSN string + pgStoreSchema string + pgStoreConfigKey string + pgStoreCacheDir string + pgStoreInst *store.PostgresStore gitStoreLocalPath string useGitStore bool gitStoreRemoteURL string @@ -125,6 +132,22 @@ func main() { } return "", false } + if value, ok := lookupEnv("PGSTORE_DSN", "pgstore_dsn"); ok { + usePostgresStore = true + pgStoreDSN = value + } + if usePostgresStore { + if value, ok := lookupEnv("PGSTORE_SCHEMA", "pgstore_schema"); ok { + pgStoreSchema = value + } + if value, ok := lookupEnv("PGSTORE_CONFIG_KEY", "pgstore_config_key"); ok { + pgStoreConfigKey = value + } + if value, ok := lookupEnv("PGSTORE_CACHE_DIR", "pgstore_cache_dir"); ok { + pgStoreCacheDir = value + } + useGitStore = false + } if value, ok := lookupEnv("GITSTORE_GIT_URL", "gitstore_git_url"); ok { useGitStore = true gitStoreRemoteURL = value @@ -147,13 +170,42 @@ func main() { } // Determine and load the configuration file. - // If gitstore is configured, load from the cloned repository; otherwise use the provided path or default. + // Prefer the Postgres store when configured, otherwise fallback to git or local files. var configFilePath string - if useGitStore { + if usePostgresStore { + if pgStoreCacheDir == "" { + pgStoreCacheDir = wd + } + pgStoreCacheDir = filepath.Join(pgStoreCacheDir, "pgstore") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + pgStoreInst, err = store.NewPostgresStore(ctx, store.PostgresStoreConfig{ + DSN: pgStoreDSN, + Schema: pgStoreSchema, + ConfigKey: pgStoreConfigKey, + SpoolDir: pgStoreCacheDir, + }) + cancel() + if err != nil { + log.Fatalf("failed to initialize postgres token store: %v", err) + } + examplePath := filepath.Join(wd, "config.example.yaml") + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + if errBootstrap := pgStoreInst.Bootstrap(ctx, examplePath); errBootstrap != nil { + cancel() + log.Fatalf("failed to bootstrap postgres-backed config: %v", errBootstrap) + } + cancel() + configFilePath = pgStoreInst.ConfigPath() + cfg, err = config.LoadConfigOptional(configFilePath, isCloudDeploy) + if err == nil { + cfg.AuthDir = pgStoreInst.AuthDir() + log.Infof("postgres-backed token store enabled, workspace path: %s", pgStoreInst.WorkDir()) + } + } else if useGitStore { if gitStoreLocalPath == "" { gitStoreLocalPath = wd } - gitStoreRoot = filepath.Join(gitStoreLocalPath, "remote") + gitStoreRoot = filepath.Join(gitStoreLocalPath, "gitstore") authDir := filepath.Join(gitStoreRoot, "auths") gitStoreInst = store.NewGitTokenStore(gitStoreRemoteURL, gitStoreUser, gitStorePassword) gitStoreInst.SetBaseDir(authDir) @@ -172,7 +224,7 @@ func main() { if errCopy := misc.CopyConfigTemplate(examplePath, configFilePath); errCopy != nil { log.Fatalf("failed to bootstrap git-backed config: %v", errCopy) } - if errCommit := gitStoreInst.CommitConfig(context.Background()); errCommit != nil { + if errCommit := gitStoreInst.PersistConfig(context.Background()); errCommit != nil { log.Fatalf("failed to commit initial git-backed config: %v", errCommit) } log.Infof("git-backed config initialized from template: %s", configFilePath) @@ -245,7 +297,9 @@ func main() { } // Register the shared token store once so all components use the same persistence backend. - if useGitStore { + if usePostgresStore { + sdkAuth.RegisterTokenStore(pgStoreInst) + } else if useGitStore { sdkAuth.RegisterTokenStore(gitStoreInst) } else { sdkAuth.RegisterTokenStore(sdkAuth.NewFileTokenStore()) diff --git a/go.mod b/go.mod index f8970674..a4b9c5af 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/gin-gonic/gin v1.10.1 github.com/go-git/go-git/v6 v6.0.0-20251009132922-75a182125145 github.com/google/uuid v1.6.0 + github.com/jackc/pgx/v5 v5.7.6 github.com/klauspost/compress v1.17.3 github.com/sirupsen/logrus v1.9.3 github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 @@ -39,6 +40,9 @@ require ( github.com/go-playground/validator/v10 v10.20.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kevinburke/ssh_config v1.4.0 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect @@ -54,6 +58,7 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect golang.org/x/arch v0.8.0 // indirect + golang.org/x/sync v0.17.0 // indirect golang.org/x/sys v0.37.0 // indirect golang.org/x/text v0.30.0 // indirect google.golang.org/protobuf v1.34.1 // indirect diff --git a/go.sum b/go.sum index d4718d1c..7ed1f83f 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,14 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= +github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kevinburke/ssh_config v1.4.0 h1:6xxtP5bZ2E4NF5tuQulISpTO2z8XbtH8cg1PWkxoFkQ= @@ -137,6 +145,8 @@ golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= diff --git a/internal/store/gitstore.go b/internal/store/gitstore.go index 95d2c1f8..3b68e4b0 100644 --- a/internal/store/gitstore.go +++ b/internal/store/gitstore.go @@ -359,9 +359,9 @@ func (s *GitTokenStore) Delete(_ context.Context, id string) error { return nil } -// CommitPaths commits and pushes the provided paths to the remote repository. +// PersistAuthFiles commits and pushes the provided paths to the remote repository. // It no-ops when the store is not fully configured or when there are no paths. -func (s *GitTokenStore) CommitPaths(_ context.Context, message string, paths ...string) error { +func (s *GitTokenStore) PersistAuthFiles(_ context.Context, message string, paths ...string) error { if len(paths) == 0 { return nil } @@ -652,8 +652,8 @@ func (s *GitTokenStore) rewriteHeadAsSingleCommit(repo *git.Repository, branch p return nil } -// CommitConfig commits and pushes configuration changes to git. -func (s *GitTokenStore) CommitConfig(_ context.Context) error { +// PersistConfig commits and pushes configuration changes to git. +func (s *GitTokenStore) PersistConfig(_ context.Context) error { if err := s.EnsureRepository(); err != nil { return err } diff --git a/internal/store/postgresstore.go b/internal/store/postgresstore.go new file mode 100644 index 00000000..eb42e743 --- /dev/null +++ b/internal/store/postgresstore.go @@ -0,0 +1,658 @@ +package store + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "strings" + "sync" + "time" + + _ "github.com/jackc/pgx/v5/stdlib" + "github.com/router-for-me/CLIProxyAPI/v6/internal/misc" + cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + log "github.com/sirupsen/logrus" +) + +const ( + defaultConfigTable = "config_store" + defaultAuthTable = "auth_store" + defaultConfigKey = "default" +) + +// PostgresStoreConfig captures configuration required to initialize a Postgres-backed store. +type PostgresStoreConfig struct { + DSN string + Schema string + ConfigTable string + AuthTable string + ConfigKey string + SpoolDir string +} + +// PostgresStore persists configuration and authentication metadata using PostgreSQL as backend +// while mirroring data to a local workspace so existing file-based workflows continue to operate. +type PostgresStore struct { + db *sql.DB + cfg PostgresStoreConfig + spoolRoot string + configPath string + authDir string + mu sync.Mutex +} + +// NewPostgresStore establishes a connection to PostgreSQL and prepares the local workspace. +func NewPostgresStore(ctx context.Context, cfg PostgresStoreConfig) (*PostgresStore, error) { + trimmedDSN := strings.TrimSpace(cfg.DSN) + if trimmedDSN == "" { + return nil, fmt.Errorf("postgres store: DSN is required") + } + cfg.DSN = trimmedDSN + if cfg.ConfigTable == "" { + cfg.ConfigTable = defaultConfigTable + } + if cfg.AuthTable == "" { + cfg.AuthTable = defaultAuthTable + } + if cfg.ConfigKey == "" { + cfg.ConfigKey = defaultConfigKey + } + + spoolRoot := strings.TrimSpace(cfg.SpoolDir) + if spoolRoot == "" { + if cwd, err := os.Getwd(); err == nil { + spoolRoot = filepath.Join(cwd, "pgstore") + } else { + spoolRoot = filepath.Join(os.TempDir(), "pgstore") + } + } + absSpool, err := filepath.Abs(spoolRoot) + if err != nil { + return nil, fmt.Errorf("postgres store: resolve spool directory: %w", err) + } + configDir := filepath.Join(absSpool, "config") + authDir := filepath.Join(absSpool, "auths") + if err = os.MkdirAll(configDir, 0o700); err != nil { + return nil, fmt.Errorf("postgres store: create config directory: %w", err) + } + if err = os.MkdirAll(authDir, 0o700); err != nil { + return nil, fmt.Errorf("postgres store: create auth directory: %w", err) + } + + db, err := sql.Open("pgx", cfg.DSN) + if err != nil { + return nil, fmt.Errorf("postgres store: open database connection: %w", err) + } + if err = db.PingContext(ctx); err != nil { + _ = db.Close() + return nil, fmt.Errorf("postgres store: ping database: %w", err) + } + + store := &PostgresStore{ + db: db, + cfg: cfg, + spoolRoot: absSpool, + configPath: filepath.Join(configDir, "config.yaml"), + authDir: authDir, + } + return store, nil +} + +// Close releases the underlying database connection. +func (s *PostgresStore) Close() error { + if s == nil || s.db == nil { + return nil + } + return s.db.Close() +} + +// EnsureSchema creates the required tables (and schema when provided). +func (s *PostgresStore) EnsureSchema(ctx context.Context) error { + if s == nil || s.db == nil { + return fmt.Errorf("postgres store: not initialized") + } + if schema := strings.TrimSpace(s.cfg.Schema); schema != "" { + query := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", quoteIdentifier(schema)) + if _, err := s.db.ExecContext(ctx, query); err != nil { + return fmt.Errorf("postgres store: create schema: %w", err) + } + } + configTable := s.fullTableName(s.cfg.ConfigTable) + if _, err := s.db.ExecContext(ctx, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + `, configTable)); err != nil { + return fmt.Errorf("postgres store: create config table: %w", err) + } + authTable := s.fullTableName(s.cfg.AuthTable) + if _, err := s.db.ExecContext(ctx, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id TEXT PRIMARY KEY, + content JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + `, authTable)); err != nil { + return fmt.Errorf("postgres store: create auth table: %w", err) + } + return nil +} + +// Bootstrap synchronizes configuration and auth records between PostgreSQL and the local workspace. +func (s *PostgresStore) Bootstrap(ctx context.Context, exampleConfigPath string) error { + if err := s.EnsureSchema(ctx); err != nil { + return err + } + if err := s.syncConfigFromDatabase(ctx, exampleConfigPath); err != nil { + return err + } + if err := s.syncAuthFromDatabase(ctx); err != nil { + return err + } + return nil +} + +// ConfigPath returns the managed configuration file path inside the spool directory. +func (s *PostgresStore) ConfigPath() string { + if s == nil { + return "" + } + return s.configPath +} + +// AuthDir returns the local directory containing mirrored auth files. +func (s *PostgresStore) AuthDir() string { + if s == nil { + return "" + } + return s.authDir +} + +// WorkDir exposes the root spool directory used for mirroring. +func (s *PostgresStore) WorkDir() string { + if s == nil { + return "" + } + return s.spoolRoot +} + +// SetBaseDir implements the optional interface used by authenticators; it is a no-op because +// the Postgres-backed store controls its own workspace. +func (s *PostgresStore) SetBaseDir(string) {} + +// Save persists authentication metadata to disk and PostgreSQL. +func (s *PostgresStore) Save(ctx context.Context, auth *cliproxyauth.Auth) (string, error) { + if auth == nil { + return "", fmt.Errorf("postgres store: auth is nil") + } + + path, err := s.resolveAuthPath(auth) + if err != nil { + return "", err + } + if path == "" { + return "", fmt.Errorf("postgres store: missing file path attribute for %s", auth.ID) + } + + if auth.Disabled { + if _, statErr := os.Stat(path); errors.Is(statErr, fs.ErrNotExist) { + return "", nil + } + } + + s.mu.Lock() + defer s.mu.Unlock() + + if err = os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return "", fmt.Errorf("postgres store: create auth directory: %w", err) + } + + switch { + case auth.Storage != nil: + if err = auth.Storage.SaveTokenToFile(path); err != nil { + return "", err + } + case auth.Metadata != nil: + raw, errMarshal := json.Marshal(auth.Metadata) + if errMarshal != nil { + return "", fmt.Errorf("postgres store: marshal metadata: %w", errMarshal) + } + if existing, errRead := os.ReadFile(path); errRead == nil { + if jsonEqual(existing, raw) { + return path, nil + } + } else if errRead != nil && !errors.Is(errRead, fs.ErrNotExist) { + return "", fmt.Errorf("postgres store: read existing metadata: %w", errRead) + } + tmp := path + ".tmp" + if errWrite := os.WriteFile(tmp, raw, 0o600); errWrite != nil { + return "", fmt.Errorf("postgres store: write temp auth file: %w", errWrite) + } + if errRename := os.Rename(tmp, path); errRename != nil { + return "", fmt.Errorf("postgres store: rename auth file: %w", errRename) + } + default: + return "", fmt.Errorf("postgres store: nothing to persist for %s", auth.ID) + } + + if auth.Attributes == nil { + auth.Attributes = make(map[string]string) + } + auth.Attributes["path"] = path + + if strings.TrimSpace(auth.FileName) == "" { + auth.FileName = auth.ID + } + + relID, err := s.relativeAuthID(path) + if err != nil { + return "", err + } + if err = s.upsertAuthRecord(ctx, relID, path); err != nil { + return "", err + } + return path, nil +} + +// List enumerates all auth records stored in PostgreSQL. +func (s *PostgresStore) List(ctx context.Context) ([]*cliproxyauth.Auth, error) { + query := fmt.Sprintf("SELECT id, content, created_at, updated_at FROM %s ORDER BY id", s.fullTableName(s.cfg.AuthTable)) + rows, err := s.db.QueryContext(ctx, query) + if err != nil { + return nil, fmt.Errorf("postgres store: list auth: %w", err) + } + defer rows.Close() + + auths := make([]*cliproxyauth.Auth, 0, 32) + for rows.Next() { + var ( + id string + payload string + createdAt time.Time + updatedAt time.Time + ) + if err = rows.Scan(&id, &payload, &createdAt, &updatedAt); err != nil { + return nil, fmt.Errorf("postgres store: scan auth row: %w", err) + } + path, errPath := s.absoluteAuthPath(id) + if errPath != nil { + log.WithError(errPath).Warnf("postgres store: skipping auth %s outside spool", id) + continue + } + metadata := make(map[string]any) + if err = json.Unmarshal([]byte(payload), &metadata); err != nil { + log.WithError(err).Warnf("postgres store: skipping auth %s with invalid json", id) + continue + } + provider := strings.TrimSpace(valueAsString(metadata["type"])) + if provider == "" { + provider = "unknown" + } + attr := map[string]string{"path": path} + if email := strings.TrimSpace(valueAsString(metadata["email"])); email != "" { + attr["email"] = email + } + auth := &cliproxyauth.Auth{ + ID: normalizeAuthID(id), + Provider: provider, + FileName: normalizeAuthID(id), + Label: labelFor(metadata), + Status: cliproxyauth.StatusActive, + Attributes: attr, + Metadata: metadata, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + LastRefreshedAt: time.Time{}, + NextRefreshAfter: time.Time{}, + } + auths = append(auths, auth) + } + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("postgres store: iterate auth rows: %w", err) + } + return auths, nil +} + +// Delete removes an auth file and the corresponding database record. +func (s *PostgresStore) Delete(ctx context.Context, id string) error { + id = strings.TrimSpace(id) + if id == "" { + return fmt.Errorf("postgres store: id is empty") + } + path, err := s.resolveDeletePath(id) + if err != nil { + return err + } + + s.mu.Lock() + defer s.mu.Unlock() + + if err = os.Remove(path); err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("postgres store: delete auth file: %w", err) + } + relID, err := s.relativeAuthID(path) + if err != nil { + return err + } + return s.deleteAuthRecord(ctx, relID) +} + +// PersistAuthFiles stores the provided auth file changes in PostgreSQL. +func (s *PostgresStore) PersistAuthFiles(ctx context.Context, _ string, paths ...string) error { + if len(paths) == 0 { + return nil + } + s.mu.Lock() + defer s.mu.Unlock() + + for _, p := range paths { + trimmed := strings.TrimSpace(p) + if trimmed == "" { + continue + } + relID, err := s.relativeAuthID(trimmed) + if err != nil { + // Attempt to resolve absolute path under authDir. + abs := trimmed + if !filepath.IsAbs(abs) { + abs = filepath.Join(s.authDir, trimmed) + } + relID, err = s.relativeAuthID(abs) + if err != nil { + log.WithError(err).Warnf("postgres store: ignoring auth path %s", trimmed) + continue + } + trimmed = abs + } + if err = s.syncAuthFile(ctx, relID, trimmed); err != nil { + return err + } + } + return nil +} + +// PersistConfig mirrors the local configuration file to PostgreSQL. +func (s *PostgresStore) PersistConfig(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + data, err := os.ReadFile(s.configPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return s.deleteConfigRecord(ctx) + } + return fmt.Errorf("postgres store: read config file: %w", err) + } + return s.persistConfig(ctx, data) +} + +// syncConfigFromDatabase writes the database-stored config to disk or seeds the database from template. +func (s *PostgresStore) syncConfigFromDatabase(ctx context.Context, exampleConfigPath string) error { + query := fmt.Sprintf("SELECT content FROM %s WHERE id = $1", s.fullTableName(s.cfg.ConfigTable)) + var content string + err := s.db.QueryRowContext(ctx, query, s.cfg.ConfigKey).Scan(&content) + switch { + case errors.Is(err, sql.ErrNoRows): + if _, errStat := os.Stat(s.configPath); errors.Is(errStat, fs.ErrNotExist) { + if exampleConfigPath != "" { + if errCopy := misc.CopyConfigTemplate(exampleConfigPath, s.configPath); errCopy != nil { + return fmt.Errorf("postgres store: copy example config: %w", errCopy) + } + } else { + if errCreate := os.MkdirAll(filepath.Dir(s.configPath), 0o700); errCreate != nil { + return fmt.Errorf("postgres store: prepare config directory: %w", errCreate) + } + if errWrite := os.WriteFile(s.configPath, []byte{}, 0o600); errWrite != nil { + return fmt.Errorf("postgres store: create empty config: %w", errWrite) + } + } + } + data, errRead := os.ReadFile(s.configPath) + if errRead != nil { + return fmt.Errorf("postgres store: read local config: %w", errRead) + } + if errPersist := s.persistConfig(ctx, data); errPersist != nil { + return errPersist + } + case err != nil: + return fmt.Errorf("postgres store: load config from database: %w", err) + default: + if err = os.MkdirAll(filepath.Dir(s.configPath), 0o700); err != nil { + return fmt.Errorf("postgres store: prepare config directory: %w", err) + } + if err = os.WriteFile(s.configPath, []byte(content), 0o600); err != nil { + return fmt.Errorf("postgres store: write config to spool: %w", err) + } + } + return nil +} + +// syncAuthFromDatabase populates the local auth directory from PostgreSQL data. +func (s *PostgresStore) syncAuthFromDatabase(ctx context.Context) error { + query := fmt.Sprintf("SELECT id, content FROM %s", s.fullTableName(s.cfg.AuthTable)) + rows, err := s.db.QueryContext(ctx, query) + if err != nil { + return fmt.Errorf("postgres store: load auth from database: %w", err) + } + defer rows.Close() + + if err = os.RemoveAll(s.authDir); err != nil { + return fmt.Errorf("postgres store: reset auth directory: %w", err) + } + if err = os.MkdirAll(s.authDir, 0o700); err != nil { + return fmt.Errorf("postgres store: recreate auth directory: %w", err) + } + + for rows.Next() { + var ( + id string + payload string + ) + if err = rows.Scan(&id, &payload); err != nil { + return fmt.Errorf("postgres store: scan auth row: %w", err) + } + path, errPath := s.absoluteAuthPath(id) + if errPath != nil { + log.WithError(errPath).Warnf("postgres store: skipping auth %s outside spool", id) + continue + } + if err = os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return fmt.Errorf("postgres store: create auth subdir: %w", err) + } + if err = os.WriteFile(path, []byte(payload), 0o600); err != nil { + return fmt.Errorf("postgres store: write auth file: %w", err) + } + } + if err = rows.Err(); err != nil { + return fmt.Errorf("postgres store: iterate auth rows: %w", err) + } + return nil +} + +func (s *PostgresStore) syncAuthFile(ctx context.Context, relID, path string) error { + data, err := os.ReadFile(path) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return s.deleteAuthRecord(ctx, relID) + } + return fmt.Errorf("postgres store: read auth file: %w", err) + } + if len(data) == 0 { + return s.deleteAuthRecord(ctx, relID) + } + return s.persistAuth(ctx, relID, data) +} + +func (s *PostgresStore) upsertAuthRecord(ctx context.Context, relID, path string) error { + data, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("postgres store: read auth file: %w", err) + } + if len(data) == 0 { + return s.deleteAuthRecord(ctx, relID) + } + return s.persistAuth(ctx, relID, data) +} + +func (s *PostgresStore) persistAuth(ctx context.Context, relID string, data []byte) error { + jsonPayload := json.RawMessage(data) + query := fmt.Sprintf(` + INSERT INTO %s (id, content, created_at, updated_at) + VALUES ($1, $2, NOW(), NOW()) + ON CONFLICT (id) + DO UPDATE SET content = EXCLUDED.content, updated_at = NOW() + `, s.fullTableName(s.cfg.AuthTable)) + if _, err := s.db.ExecContext(ctx, query, relID, jsonPayload); err != nil { + return fmt.Errorf("postgres store: upsert auth record: %w", err) + } + return nil +} + +func (s *PostgresStore) deleteAuthRecord(ctx context.Context, relID string) error { + query := fmt.Sprintf("DELETE FROM %s WHERE id = $1", s.fullTableName(s.cfg.AuthTable)) + if _, err := s.db.ExecContext(ctx, query, relID); err != nil { + return fmt.Errorf("postgres store: delete auth record: %w", err) + } + return nil +} + +func (s *PostgresStore) persistConfig(ctx context.Context, data []byte) error { + query := fmt.Sprintf(` + INSERT INTO %s (id, content, created_at, updated_at) + VALUES ($1, $2, NOW(), NOW()) + ON CONFLICT (id) + DO UPDATE SET content = EXCLUDED.content, updated_at = NOW() + `, s.fullTableName(s.cfg.ConfigTable)) + if _, err := s.db.ExecContext(ctx, query, s.cfg.ConfigKey, string(data)); err != nil { + return fmt.Errorf("postgres store: upsert config: %w", err) + } + return nil +} + +func (s *PostgresStore) deleteConfigRecord(ctx context.Context) error { + query := fmt.Sprintf("DELETE FROM %s WHERE id = $1", s.fullTableName(s.cfg.ConfigTable)) + if _, err := s.db.ExecContext(ctx, query, s.cfg.ConfigKey); err != nil { + return fmt.Errorf("postgres store: delete config: %w", err) + } + return nil +} + +func (s *PostgresStore) resolveAuthPath(auth *cliproxyauth.Auth) (string, error) { + if auth == nil { + return "", fmt.Errorf("postgres store: auth is nil") + } + if auth.Attributes != nil { + if p := strings.TrimSpace(auth.Attributes["path"]); p != "" { + return p, nil + } + } + if fileName := strings.TrimSpace(auth.FileName); fileName != "" { + if filepath.IsAbs(fileName) { + return fileName, nil + } + return filepath.Join(s.authDir, fileName), nil + } + if auth.ID == "" { + return "", fmt.Errorf("postgres store: missing id") + } + if filepath.IsAbs(auth.ID) { + return auth.ID, nil + } + return filepath.Join(s.authDir, filepath.FromSlash(auth.ID)), nil +} + +func (s *PostgresStore) resolveDeletePath(id string) (string, error) { + if strings.ContainsRune(id, os.PathSeparator) || filepath.IsAbs(id) { + return id, nil + } + return filepath.Join(s.authDir, filepath.FromSlash(id)), nil +} + +func (s *PostgresStore) relativeAuthID(path string) (string, error) { + if s == nil { + return "", fmt.Errorf("postgres store: store not initialized") + } + if !filepath.IsAbs(path) { + path = filepath.Join(s.authDir, path) + } + clean := filepath.Clean(path) + rel, err := filepath.Rel(s.authDir, clean) + if err != nil { + return "", fmt.Errorf("postgres store: compute relative path: %w", err) + } + if strings.HasPrefix(rel, "..") { + return "", fmt.Errorf("postgres store: path %s outside managed directory", path) + } + return filepath.ToSlash(rel), nil +} + +func (s *PostgresStore) absoluteAuthPath(id string) (string, error) { + if s == nil { + return "", fmt.Errorf("postgres store: store not initialized") + } + clean := filepath.Clean(filepath.FromSlash(id)) + if strings.HasPrefix(clean, "..") { + return "", fmt.Errorf("postgres store: invalid auth identifier %s", id) + } + path := filepath.Join(s.authDir, clean) + rel, err := filepath.Rel(s.authDir, path) + if err != nil { + return "", err + } + if strings.HasPrefix(rel, "..") { + return "", fmt.Errorf("postgres store: resolved auth path escapes auth directory") + } + return path, nil +} + +func (s *PostgresStore) fullTableName(name string) string { + if strings.TrimSpace(s.cfg.Schema) == "" { + return quoteIdentifier(name) + } + return quoteIdentifier(s.cfg.Schema) + "." + quoteIdentifier(name) +} + +func quoteIdentifier(identifier string) string { + replaced := strings.ReplaceAll(identifier, "\"", "\"\"") + return "\"" + replaced + "\"" +} + +func valueAsString(v any) string { + switch t := v.(type) { + case string: + return t + case fmt.Stringer: + return t.String() + default: + return "" + } +} + +func labelFor(metadata map[string]any) string { + if metadata == nil { + return "" + } + if v := strings.TrimSpace(valueAsString(metadata["label"])); v != "" { + return v + } + if v := strings.TrimSpace(valueAsString(metadata["email"])); v != "" { + return v + } + if v := strings.TrimSpace(valueAsString(metadata["project_id"])); v != "" { + return v + } + return "" +} + +func normalizeAuthID(id string) string { + return filepath.ToSlash(filepath.Clean(id)) +} diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 5bc03b17..7f440624 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -29,10 +29,10 @@ import ( log "github.com/sirupsen/logrus" ) -// gitCommitter captures the subset of git-backed token store capabilities used by the watcher. -type gitCommitter interface { - CommitConfig(ctx context.Context) error - CommitPaths(ctx context.Context, message string, paths ...string) error +// storePersister captures persistence-capable token store methods used by the watcher. +type storePersister interface { + PersistConfig(ctx context.Context) error + PersistAuthFiles(ctx context.Context, message string, paths ...string) error } // Watcher manages file watching for configuration and authentication files @@ -52,7 +52,7 @@ type Watcher struct { pendingUpdates map[string]AuthUpdate pendingOrder []string dispatchCancel context.CancelFunc - gitCommitter gitCommitter + storePersister storePersister oldConfigYaml []byte } @@ -126,9 +126,9 @@ func NewWatcher(configPath, authDir string, reloadCallback func(*config.Config)) } w.dispatchCond = sync.NewCond(&w.dispatchMu) if store := sdkAuth.GetTokenStore(); store != nil { - if committer, ok := store.(gitCommitter); ok { - w.gitCommitter = committer - log.Debug("gitstore mode detected; watcher will commit changes to remote repository") + if persister, ok := store.(storePersister); ok { + w.storePersister = persister + log.Debug("persistence-capable token store detected; watcher will propagate persisted changes") } } return w, nil @@ -345,21 +345,21 @@ func (w *Watcher) stopDispatch() { w.clientsMutex.Unlock() } -func (w *Watcher) commitConfigAsync() { - if w == nil || w.gitCommitter == nil { +func (w *Watcher) persistConfigAsync() { + if w == nil || w.storePersister == nil { return } go func() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if err := w.gitCommitter.CommitConfig(ctx); err != nil { - log.Errorf("failed to commit config change: %v", err) + if err := w.storePersister.PersistConfig(ctx); err != nil { + log.Errorf("failed to persist config change: %v", err) } }() } -func (w *Watcher) commitAuthAsync(message string, paths ...string) { - if w == nil || w.gitCommitter == nil { +func (w *Watcher) persistAuthAsync(message string, paths ...string) { + if w == nil || w.storePersister == nil { return } filtered := make([]string, 0, len(paths)) @@ -374,8 +374,8 @@ func (w *Watcher) commitAuthAsync(message string, paths ...string) { go func() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if err := w.gitCommitter.CommitPaths(ctx, message, filtered...); err != nil { - log.Errorf("failed to commit auth changes: %v", err) + if err := w.storePersister.PersistAuthFiles(ctx, message, filtered...); err != nil { + log.Errorf("failed to persist auth changes: %v", err) } }() } @@ -484,7 +484,7 @@ func (w *Watcher) handleEvent(event fsnotify.Event) { w.clientsMutex.Lock() w.lastConfigHash = finalHash w.clientsMutex.Unlock() - w.commitConfigAsync() + w.persistConfigAsync() } return } @@ -683,7 +683,7 @@ func (w *Watcher) addOrUpdateClient(path string) { log.Debugf("triggering server update callback after add/update") w.reloadCallback(cfg) } - w.commitAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path) + w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path) } // removeClient handles the removal of a single client. @@ -701,7 +701,7 @@ func (w *Watcher) removeClient(path string) { log.Debugf("triggering server update callback after removal") w.reloadCallback(cfg) } - w.commitAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path) + w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path) } // SnapshotCombinedClients returns a snapshot of current combined clients.