diff --git a/server/cmd/root.go b/server/cmd/root.go index c0945fcf..3bec779d 100644 --- a/server/cmd/root.go +++ b/server/cmd/root.go @@ -22,6 +22,7 @@ import ( "github.com/pgEdge/control-plane/server/internal/migrate" "github.com/pgEdge/control-plane/server/internal/monitor" "github.com/pgEdge/control-plane/server/internal/orchestrator" + "github.com/pgEdge/control-plane/server/internal/orchestrator/common" "github.com/pgEdge/control-plane/server/internal/orchestrator/swarm" "github.com/pgEdge/control-plane/server/internal/ports" "github.com/pgEdge/control-plane/server/internal/resource" @@ -95,6 +96,7 @@ func newRootCmd(i *do.Injector) *cobra.Command { filesystem.RegisterResourceTypes(registry) monitor.RegisterResourceTypes(registry) scheduler.RegisterResourceTypes(registry) + common.RegisterResourceTypes(registry) swarm.RegisterResourceTypes(registry) if err := orchestrator.Provide(i); err != nil { diff --git a/server/internal/database/orchestrator.go b/server/internal/database/orchestrator.go index 3e4f97d2..2871a000 100644 --- a/server/internal/database/orchestrator.go +++ b/server/internal/database/orchestrator.go @@ -172,6 +172,7 @@ type Orchestrator interface { GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID string) (*ConnectionInfo, error) GetServiceInstanceStatus(ctx context.Context, serviceInstanceID string) (*ServiceInstanceStatus, error) CreatePgBackRestBackup(ctx context.Context, w io.Writer, instanceID string, options *pgbackrest.BackupOptions) error + ExecuteInstanceCommand(ctx context.Context, w io.Writer, databaseID, instanceID string, args ...string) error ValidateInstanceSpecs(ctx context.Context, changes []*InstanceSpecChange) ([]*ValidationResult, error) StopInstance(ctx context.Context, instanceID string) error StartInstance(ctx context.Context, instanceID string) error diff --git a/server/internal/orchestrator/common/etcd_creds.go b/server/internal/orchestrator/common/etcd_creds.go new file mode 100644 index 00000000..b00dd410 --- /dev/null +++ b/server/internal/orchestrator/common/etcd_creds.go @@ -0,0 +1,222 @@ +package common + +import ( + "context" + "errors" + "fmt" + "path/filepath" + + "github.com/samber/do" + "github.com/spf13/afero" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/etcd" + "github.com/pgEdge/control-plane/server/internal/filesystem" + "github.com/pgEdge/control-plane/server/internal/patroni" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +const ( + etcdCaCertName = "ca.crt" + etcdClientCertName = "client.crt" + etcdClientKeyName = "client.key" +) + +var _ resource.Resource = (*EtcdCreds)(nil) + +const ResourceTypeEtcdCreds resource.Type = "common.etcd_creds" + +func EtcdCredsIdentifier(instanceID string) resource.Identifier { + return resource.Identifier{ + ID: instanceID, + Type: ResourceTypeEtcdCreds, + } +} + +type EtcdCreds struct { + InstanceID string `json:"instance_id"` + DatabaseID string `json:"database_id"` + HostID string `json:"host_id"` + NodeName string `json:"node_name"` + ParentID string `json:"parent_id"` + OwnerUID int `json:"owner_uid"` + OwnerGID int `json:"owner_gid"` + Username string `json:"username"` + Password string `json:"password"` + CaCert []byte `json:"ca_cert"` + ClientCert []byte `json:"client_cert"` + ClientKey []byte `json:"client_key"` +} + +func (c *EtcdCreds) ResourceVersion() string { + return "1" +} + +func (c *EtcdCreds) DiffIgnore() []string { + return []string{ + "/username", + "/password", + "/ca_cert", + "/client_cert", + "/client_key", + } +} + +func (c *EtcdCreds) Executor() resource.Executor { + return resource.HostExecutor(c.HostID) +} + +func (c *EtcdCreds) Identifier() resource.Identifier { + return EtcdCredsIdentifier(c.InstanceID) +} + +func (c *EtcdCreds) Dependencies() []resource.Identifier { + return []resource.Identifier{ + filesystem.DirResourceIdentifier(c.ParentID), + } +} + +func (c *EtcdCreds) TypeDependencies() []resource.Type { + return nil +} + +func (c *EtcdCreds) Refresh(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + certsDir := filepath.Join(parentFullPath, "etcd") + + caCert, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdCaCertName)) + if err != nil { + return fmt.Errorf("failed to read CA cert: %w", err) + } + clientCert, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdClientCertName)) + if err != nil { + return fmt.Errorf("failed to read client cert: %w", err) + } + clientKey, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdClientKeyName)) + if err != nil { + return fmt.Errorf("failed to read client key: %w", err) + } + + c.CaCert = caCert + c.ClientCert = clientCert + c.ClientKey = clientKey + + return nil +} + +func (c *EtcdCreds) Create(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + certService, err := do.Invoke[*certificates.Service](rc.Injector) + if err != nil { + return err + } + etcdClient, err := do.Invoke[*clientv3.Client](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + certsDir := filepath.Join(parentFullPath, "etcd") + + etcdCreds, err := etcd.CreateInstanceEtcdUser(ctx, + etcdClient, + certService, + etcd.InstanceUserOptions{ + InstanceID: c.InstanceID, + KeyPrefix: patroni.ClusterPrefix(c.DatabaseID, c.NodeName), + Password: c.Password, + }, + ) + if err != nil { + return fmt.Errorf("failed to create etcd user: %w", err) + } + + c.Username = etcdCreds.Username + c.Password = etcdCreds.Password + c.CaCert = etcdCreds.CaCert + c.ClientCert = etcdCreds.ClientCert + c.ClientKey = etcdCreds.ClientKey + + if err := fs.MkdirAll(certsDir, 0o700); err != nil { + return fmt.Errorf("failed to create etcd certificates directory: %w", err) + } + if err := fs.Chown(certsDir, c.OwnerUID, c.OwnerGID); err != nil { + return fmt.Errorf("failed to change ownership for certificates directory: %w", err) + } + + files := map[string][]byte{ + etcdCaCertName: c.CaCert, + etcdClientCertName: c.ClientCert, + etcdClientKeyName: c.ClientKey, + } + + for name, content := range files { + if err := afero.WriteFile(fs, filepath.Join(certsDir, name), content, 0o600); err != nil { + return fmt.Errorf("failed to write %s: %w", name, err) + } + if err := fs.Chown(filepath.Join(certsDir, name), c.OwnerUID, c.OwnerGID); err != nil { + return fmt.Errorf("failed to change ownership for %s: %w", name, err) + } + } + + return nil +} + +func (c *EtcdCreds) Update(ctx context.Context, rc *resource.Context) error { + return c.Create(ctx, rc) +} + +func (c *EtcdCreds) Delete(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + certService, err := do.Invoke[*certificates.Service](rc.Injector) + if err != nil { + return err + } + etcdClient, err := do.Invoke[*clientv3.Client](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + etcdDir := filepath.Join(parentFullPath, "etcd") + + if err := fs.RemoveAll(etcdDir); err != nil { + return fmt.Errorf("failed to remove certificates directory: %w", err) + } + if err := etcd.RemoveInstanceEtcdUser(ctx, etcdClient, certService, c.InstanceID); err != nil { + return fmt.Errorf("failed to delete etcd user: %w", err) + } + + return nil +} + +func ReadResourceFile(fs afero.Fs, path string) ([]byte, error) { + contents, err := afero.ReadFile(fs, path) + if errors.Is(err, afero.ErrFileNotFound) { + return nil, resource.ErrNotFound + } else if err != nil { + return nil, err + } + return contents, nil +} diff --git a/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/enable_fast_basebackup.yaml b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/enable_fast_basebackup.yaml new file mode 100644 index 00000000..84cf7ccd --- /dev/null +++ b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/enable_fast_basebackup.yaml @@ -0,0 +1,116 @@ +name: storefront-n1-689qacsi +namespace: /patroni/ +scope: storefront:n1 +log: + type: json + level: INFO + static_fields: + database_id: storefront + instance_id: storefront-n1-689qacsi + node_name: n1 +bootstrap: + dcs: + loop_wait: 10 + ttl: 30 + retry_timeout: 10 + postgresql: + parameters: + max_connections: 901 + max_replication_slots: 16 + max_wal_senders: 16 + max_worker_processes: 12 + track_commit_timestamp: "on" + wal_level: logical + ignore_slots: + - plugin: spock_output + initdb: + - data-checksums +etcd3: + hosts: + - i-0123456789abcdef.ec2.internal:2379 + protocol: https + username: instance.storefront-n1-689qacsi + password: password + cacert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/etcd/ca.crt + cert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/etcd/client.crt + key: /var/lib/pgsql/storefront-n1-689qacsi/certificates/etcd/client.key +postgresql: + authentication: + superuser: + username: pgedge + sslmode: verify-full + sslkey: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/superuser.key + sslcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/superuser.crt + sslrootcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/ca.crt + replication: + username: patroni_replicator + sslmode: verify-full + sslkey: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/replication.key + sslcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/replication.crt + sslrootcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/ca.crt + connect_address: storefront-n1-689qacsi.storefront-database:5432 + data_dir: /var/lib/pgsql/storefront-n1-689qacsi/data/pgdata + listen: "*:5432" + parameters: + archive_command: /bin/true + archive_mode: "on" + autovacuum_max_workers: 3 + autovacuum_vacuum_cost_limit: 200 + autovacuum_work_mem: 262144 + checkpoint_completion_target: "0.9" + checkpoint_timeout: 15min + dynamic_shared_memory_type: posix + effective_cache_size: 524288 + hot_standby_feedback: "on" + lolor.node: 1 + maintenance_work_mem: 137518 + max_parallel_workers: 8 + password_encryption: md5 + shared_buffers: 262144 + shared_preload_libraries: pg_stat_statements,spock + snowflake.node: 1 + spock.allow_ddl_from_functions: "on" + spock.conflict_log_level: DEBUG + spock.conflict_resolution: last_update_wins + spock.enable_ddl_replication: "on" + spock.include_ddl_repset: "on" + spock.save_resolutions: "on" + ssl: "on" + ssl_ca_file: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/ca.crt + ssl_cert_file: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/server.crt + ssl_key_file: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/server.key + track_io_timing: "on" + wal_log_hints: "on" + wal_sender_timeout: 5s + pg_hba: + - local all all trust + - host all all 127.0.0.1/32 trust + - host all all ::1/128 trust + - local replication all trust + - host replication all 127.0.0.1/32 trust + - host replication all ::1/128 trust + - hostssl all pgedge,patroni_replicator 10.10.0.2/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.10.0.2/32 cert clientcert=verify-full + - hostssl all pgedge,patroni_replicator 10.10.0.3/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.10.0.3/32 cert clientcert=verify-full + - hostssl all pgedge,patroni_replicator 10.10.0.4/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.10.0.4/32 cert clientcert=verify-full + - host all pgedge,patroni_replicator 0.0.0.0/0 reject + - host all pgedge,patroni_replicator ::0/0 reject + - host all all 0.0.0.0/0 md5 + use_pg_rewind: true + remove_data_directory_on_rewind_failure: true + remove_data_directory_on_diverged_timelines: true + basebackup: + - checkpoint: fast +restapi: + connect_address: storefront-n1-689qacsi.storefront-database:8888 + listen: 0.0.0.0:8888 + allowlist: + - 10.10.0.2 + - 10.10.0.3 + - 10.10.0.4 + - 127.0.0.1 + - localhost +watchdog: + mode: "off" diff --git a/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/in-place_restore.yaml b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/in-place_restore.yaml new file mode 100644 index 00000000..7531d1fb --- /dev/null +++ b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/in-place_restore.yaml @@ -0,0 +1,118 @@ +name: storefront-n1-689qacsi +namespace: /patroni/ +scope: storefront:n1 +log: + type: json + level: INFO + static_fields: + database_id: storefront + instance_id: storefront-n1-689qacsi + node_name: n1 +bootstrap: + dcs: + loop_wait: 10 + ttl: 30 + retry_timeout: 10 + postgresql: + parameters: + max_connections: 901 + max_replication_slots: 16 + max_wal_senders: 16 + max_worker_processes: 12 + track_commit_timestamp: "on" + wal_level: logical + ignore_slots: + - plugin: spock_output + method: restore + initdb: + - data-checksums + restore: + command: mv /opt/pgedge/data/pgdata-restore /opt/pgedge/data/pgdata + keep_existing_recovery_conf: true + no_params: true +etcd3: + hosts: + - i-0123456789abcdef.ec2.internal:2379 + protocol: https + username: instance.storefront-n1-689qacsi + password: password + cacert: /opt/pgedge/certificates/etcd/ca.crt + cert: /opt/pgedge/certificates/etcd/client.crt + key: /opt/pgedge/certificates/etcd/client.key +postgresql: + authentication: + superuser: + username: pgedge + sslmode: verify-full + sslkey: /opt/pgedge/certificates/postgres/superuser.key + sslcert: /opt/pgedge/certificates/postgres/superuser.crt + sslrootcert: /opt/pgedge/certificates/postgres/ca.crt + replication: + username: patroni_replicator + sslmode: verify-full + sslkey: /opt/pgedge/certificates/postgres/replication.key + sslcert: /opt/pgedge/certificates/postgres/replication.crt + sslrootcert: /opt/pgedge/certificates/postgres/ca.crt + connect_address: storefront-n1-689qacsi.storefront-database:5432 + data_dir: /opt/pgedge/data/pgdata + listen: "*:5432" + parameters: + archive_command: /bin/true + archive_mode: "on" + autovacuum_max_workers: 3 + autovacuum_vacuum_cost_limit: 200 + autovacuum_work_mem: 262144 + checkpoint_completion_target: "0.9" + checkpoint_timeout: 15min + dynamic_shared_memory_type: posix + effective_cache_size: 524288 + hot_standby_feedback: "on" + lolor.node: 1 + maintenance_work_mem: 137518 + max_parallel_workers: 8 + password_encryption: md5 + shared_buffers: 262144 + shared_preload_libraries: pg_stat_statements,snowflake,spock,postgis-3 + snowflake.node: 1 + spock.allow_ddl_from_functions: "on" + spock.conflict_log_level: DEBUG + spock.conflict_resolution: last_update_wins + spock.enable_ddl_replication: "on" + spock.include_ddl_repset: "on" + spock.save_resolutions: "on" + ssl: "on" + ssl_ca_file: /opt/pgedge/certificates/postgres/ca.crt + ssl_cert_file: /opt/pgedge/certificates/postgres/server.crt + ssl_key_file: /opt/pgedge/certificates/postgres/server.key + track_io_timing: "on" + wal_log_hints: "on" + wal_sender_timeout: 5s + pg_hba: + - local all all trust + - host all all 127.0.0.1/32 trust + - host all all ::1/128 trust + - local replication all trust + - host replication all 127.0.0.1/32 trust + - host replication all ::1/128 trust + - hostssl all pgedge,patroni_replicator 172.17.0.1/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 172.17.0.1/32 cert clientcert=verify-full + - hostssl all pgedge,patroni_replicator 10.128.165.128/26 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.128.165.128/26 cert clientcert=verify-full + - host all pgedge,patroni_replicator 0.0.0.0/0 reject + - host all pgedge,patroni_replicator ::0/0 reject + - host all all 172.17.0.1/32 md5 + - host all all 172.17.0.1/16 reject + - host all all 0.0.0.0/0 md5 + use_pg_rewind: true + remove_data_directory_on_rewind_failure: true + remove_data_directory_on_diverged_timelines: true +restapi: + connect_address: storefront-n1-689qacsi.storefront-database:8888 + listen: 0.0.0.0:8888 + allowlist: + - 172.17.0.1/32 + - 10.128.165.128/26 + - 127.0.0.1 + - localhost +watchdog: + mode: "off" diff --git a/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/minimal_swarm.yaml b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/minimal_swarm.yaml new file mode 100644 index 00000000..ca6257d7 --- /dev/null +++ b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/minimal_swarm.yaml @@ -0,0 +1,113 @@ +name: storefront-n1-689qacsi +namespace: /patroni/ +scope: storefront:n1 +log: + type: json + level: INFO + static_fields: + database_id: storefront + instance_id: storefront-n1-689qacsi + node_name: n1 +bootstrap: + dcs: + loop_wait: 10 + ttl: 30 + retry_timeout: 10 + postgresql: + parameters: + max_connections: 901 + max_replication_slots: 16 + max_wal_senders: 16 + max_worker_processes: 12 + track_commit_timestamp: "on" + wal_level: logical + ignore_slots: + - plugin: spock_output + initdb: + - data-checksums +etcd3: + hosts: + - i-0123456789abcdef.ec2.internal:2379 + protocol: https + username: instance.storefront-n1-689qacsi + password: password + cacert: /opt/pgedge/certificates/etcd/ca.crt + cert: /opt/pgedge/certificates/etcd/client.crt + key: /opt/pgedge/certificates/etcd/client.key +postgresql: + authentication: + superuser: + username: pgedge + sslmode: verify-full + sslkey: /opt/pgedge/certificates/postgres/superuser.key + sslcert: /opt/pgedge/certificates/postgres/superuser.crt + sslrootcert: /opt/pgedge/certificates/postgres/ca.crt + replication: + username: patroni_replicator + sslmode: verify-full + sslkey: /opt/pgedge/certificates/postgres/replication.key + sslcert: /opt/pgedge/certificates/postgres/replication.crt + sslrootcert: /opt/pgedge/certificates/postgres/ca.crt + connect_address: storefront-n1-689qacsi.storefront-database:5432 + data_dir: /opt/pgedge/data/pgdata + listen: "*:5432" + parameters: + archive_command: /bin/true + archive_mode: "on" + autovacuum_max_workers: 3 + autovacuum_vacuum_cost_limit: 200 + autovacuum_work_mem: 262144 + checkpoint_completion_target: "0.9" + checkpoint_timeout: 15min + dynamic_shared_memory_type: posix + effective_cache_size: 524288 + hot_standby_feedback: "on" + lolor.node: 1 + maintenance_work_mem: 137518 + max_parallel_workers: 8 + password_encryption: md5 + shared_buffers: 262144 + shared_preload_libraries: pg_stat_statements,snowflake,spock,postgis-3 + snowflake.node: 1 + spock.allow_ddl_from_functions: "on" + spock.conflict_log_level: DEBUG + spock.conflict_resolution: last_update_wins + spock.enable_ddl_replication: "on" + spock.include_ddl_repset: "on" + spock.save_resolutions: "on" + ssl: "on" + ssl_ca_file: /opt/pgedge/certificates/postgres/ca.crt + ssl_cert_file: /opt/pgedge/certificates/postgres/server.crt + ssl_key_file: /opt/pgedge/certificates/postgres/server.key + track_io_timing: "on" + wal_log_hints: "on" + wal_sender_timeout: 5s + pg_hba: + - local all all trust + - host all all 127.0.0.1/32 trust + - host all all ::1/128 trust + - local replication all trust + - host replication all 127.0.0.1/32 trust + - host replication all ::1/128 trust + - hostssl all pgedge,patroni_replicator 172.17.0.1/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 172.17.0.1/32 cert clientcert=verify-full + - hostssl all pgedge,patroni_replicator 10.128.165.128/26 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.128.165.128/26 cert clientcert=verify-full + - host all pgedge,patroni_replicator 0.0.0.0/0 reject + - host all pgedge,patroni_replicator ::0/0 reject + - host all all 172.17.0.1/32 md5 + - host all all 172.17.0.1/16 reject + - host all all 0.0.0.0/0 md5 + use_pg_rewind: true + remove_data_directory_on_rewind_failure: true + remove_data_directory_on_diverged_timelines: true +restapi: + connect_address: storefront-n1-689qacsi.storefront-database:8888 + listen: 0.0.0.0:8888 + allowlist: + - 172.17.0.1/32 + - 10.128.165.128/26 + - 127.0.0.1 + - localhost +watchdog: + mode: "off" diff --git a/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/minimal_systemd.yaml b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/minimal_systemd.yaml new file mode 100644 index 00000000..74122335 --- /dev/null +++ b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/minimal_systemd.yaml @@ -0,0 +1,114 @@ +name: storefront-n1-689qacsi +namespace: /patroni/ +scope: storefront:n1 +log: + type: json + level: INFO + static_fields: + database_id: storefront + instance_id: storefront-n1-689qacsi + node_name: n1 +bootstrap: + dcs: + loop_wait: 10 + ttl: 30 + retry_timeout: 10 + postgresql: + parameters: + max_connections: 901 + max_replication_slots: 16 + max_wal_senders: 16 + max_worker_processes: 12 + track_commit_timestamp: "on" + wal_level: logical + ignore_slots: + - plugin: spock_output + initdb: + - data-checksums +etcd3: + hosts: + - i-0123456789abcdef.ec2.internal:2379 + protocol: https + username: instance.storefront-n1-689qacsi + password: password + cacert: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/etcd/ca.crt + cert: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/etcd/client.crt + key: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/etcd/client.key +postgresql: + authentication: + superuser: + username: pgedge + sslmode: verify-full + sslkey: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/postgres/superuser.key + sslcert: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/postgres/superuser.crt + sslrootcert: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/postgres/ca.crt + replication: + username: patroni_replicator + sslmode: verify-full + sslkey: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/postgres/replication.key + sslcert: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/postgres/replication.crt + sslrootcert: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/postgres/ca.crt + connect_address: storefront-n1-689qacsi.storefront-database:5432 + data_dir: /var/lib/pgsql/18/storefront-n1-689qacsi/data/pgdata + listen: "*:5432" + parameters: + archive_command: /bin/true + archive_mode: "on" + autovacuum_max_workers: 3 + autovacuum_vacuum_cost_limit: 200 + autovacuum_work_mem: 262144 + checkpoint_completion_target: "0.9" + checkpoint_timeout: 15min + dynamic_shared_memory_type: posix + effective_cache_size: 524288 + hot_standby_feedback: "on" + lolor.node: 1 + maintenance_work_mem: 137518 + max_parallel_workers: 8 + password_encryption: md5 + shared_buffers: 262144 + shared_preload_libraries: pg_stat_statements,spock + snowflake.node: 1 + spock.allow_ddl_from_functions: "on" + spock.conflict_log_level: DEBUG + spock.conflict_resolution: last_update_wins + spock.enable_ddl_replication: "on" + spock.include_ddl_repset: "on" + spock.save_resolutions: "on" + ssl: "on" + ssl_ca_file: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/postgres/ca.crt + ssl_cert_file: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/postgres/server.crt + ssl_key_file: /var/lib/pgsql/18/storefront-n1-689qacsi/certificates/postgres/server.key + track_io_timing: "on" + wal_log_hints: "on" + wal_sender_timeout: 5s + pg_hba: + - local all all trust + - host all all 127.0.0.1/32 trust + - host all all ::1/128 trust + - local replication all trust + - host replication all 127.0.0.1/32 trust + - host replication all ::1/128 trust + - hostssl all pgedge,patroni_replicator 10.10.0.2/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.10.0.2/32 cert clientcert=verify-full + - hostssl all pgedge,patroni_replicator 10.10.0.3/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.10.0.3/32 cert clientcert=verify-full + - hostssl all pgedge,patroni_replicator 10.10.0.4/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.10.0.4/32 cert clientcert=verify-full + - host all pgedge,patroni_replicator 0.0.0.0/0 reject + - host all pgedge,patroni_replicator ::0/0 reject + - host all all 0.0.0.0/0 md5 + use_pg_rewind: true + remove_data_directory_on_rewind_failure: true + remove_data_directory_on_diverged_timelines: true +restapi: + connect_address: storefront-n1-689qacsi.storefront-database:8888 + listen: 0.0.0.0:8888 + allowlist: + - 10.10.0.2 + - 10.10.0.3 + - 10.10.0.4 + - 127.0.0.1 + - localhost +watchdog: + mode: "off" diff --git a/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/with_backup_config.yaml b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/with_backup_config.yaml new file mode 100644 index 00000000..c6e75d9b --- /dev/null +++ b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/with_backup_config.yaml @@ -0,0 +1,113 @@ +name: storefront-n1-689qacsi +namespace: /patroni/ +scope: storefront:n1 +log: + type: json + level: INFO + static_fields: + database_id: storefront + instance_id: storefront-n1-689qacsi + node_name: n1 +bootstrap: + dcs: + loop_wait: 10 + ttl: 30 + retry_timeout: 10 + postgresql: + parameters: + max_connections: 901 + max_replication_slots: 16 + max_wal_senders: 16 + max_worker_processes: 12 + track_commit_timestamp: "on" + wal_level: logical + ignore_slots: + - plugin: spock_output + initdb: + - data-checksums +etcd3: + hosts: + - i-0123456789abcdef.ec2.internal:2379 + protocol: https + username: instance.storefront-n1-689qacsi + password: password + cacert: /opt/pgedge/certificates/etcd/ca.crt + cert: /opt/pgedge/certificates/etcd/client.crt + key: /opt/pgedge/certificates/etcd/client.key +postgresql: + authentication: + superuser: + username: pgedge + sslmode: verify-full + sslkey: /opt/pgedge/certificates/postgres/superuser.key + sslcert: /opt/pgedge/certificates/postgres/superuser.crt + sslrootcert: /opt/pgedge/certificates/postgres/ca.crt + replication: + username: patroni_replicator + sslmode: verify-full + sslkey: /opt/pgedge/certificates/postgres/replication.key + sslcert: /opt/pgedge/certificates/postgres/replication.crt + sslrootcert: /opt/pgedge/certificates/postgres/ca.crt + connect_address: storefront-n1-689qacsi.storefront-database:5432 + data_dir: /opt/pgedge/data/pgdata + listen: "*:5432" + parameters: + archive_command: /usr/bin/pgbackrest --config /opt/pgedge/configs/pgbackrest.backup.conf --stanza db archive-push "%p" + archive_mode: "on" + autovacuum_max_workers: 3 + autovacuum_vacuum_cost_limit: 200 + autovacuum_work_mem: 262144 + checkpoint_completion_target: "0.9" + checkpoint_timeout: 15min + dynamic_shared_memory_type: posix + effective_cache_size: 524288 + hot_standby_feedback: "on" + lolor.node: 1 + maintenance_work_mem: 137518 + max_parallel_workers: 8 + password_encryption: md5 + shared_buffers: 262144 + shared_preload_libraries: pg_stat_statements,snowflake,spock,postgis-3 + snowflake.node: 1 + spock.allow_ddl_from_functions: "on" + spock.conflict_log_level: DEBUG + spock.conflict_resolution: last_update_wins + spock.enable_ddl_replication: "on" + spock.include_ddl_repset: "on" + spock.save_resolutions: "on" + ssl: "on" + ssl_ca_file: /opt/pgedge/certificates/postgres/ca.crt + ssl_cert_file: /opt/pgedge/certificates/postgres/server.crt + ssl_key_file: /opt/pgedge/certificates/postgres/server.key + track_io_timing: "on" + wal_log_hints: "on" + wal_sender_timeout: 5s + pg_hba: + - local all all trust + - host all all 127.0.0.1/32 trust + - host all all ::1/128 trust + - local replication all trust + - host replication all 127.0.0.1/32 trust + - host replication all ::1/128 trust + - hostssl all pgedge,patroni_replicator 172.17.0.1/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 172.17.0.1/32 cert clientcert=verify-full + - hostssl all pgedge,patroni_replicator 10.128.165.128/26 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.128.165.128/26 cert clientcert=verify-full + - host all pgedge,patroni_replicator 0.0.0.0/0 reject + - host all pgedge,patroni_replicator ::0/0 reject + - host all all 172.17.0.1/32 md5 + - host all all 172.17.0.1/16 reject + - host all all 0.0.0.0/0 md5 + use_pg_rewind: true + remove_data_directory_on_rewind_failure: true + remove_data_directory_on_diverged_timelines: true +restapi: + connect_address: storefront-n1-689qacsi.storefront-database:8888 + listen: 0.0.0.0:8888 + allowlist: + - 172.17.0.1/32 + - 10.128.165.128/26 + - 127.0.0.1 + - localhost +watchdog: + mode: "off" diff --git a/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/with_restore_config.yaml b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/with_restore_config.yaml new file mode 100644 index 00000000..f1b8b283 --- /dev/null +++ b/server/internal/orchestrator/common/golden_test/TestPatroniConfigGenerator/with_restore_config.yaml @@ -0,0 +1,118 @@ +name: storefront-n1-689qacsi +namespace: /patroni/ +scope: storefront:n1 +log: + type: json + level: INFO + static_fields: + database_id: storefront + instance_id: storefront-n1-689qacsi + node_name: n1 +bootstrap: + dcs: + loop_wait: 10 + ttl: 30 + retry_timeout: 10 + postgresql: + parameters: + max_connections: 901 + max_replication_slots: 16 + max_wal_senders: 16 + max_worker_processes: 12 + track_commit_timestamp: "on" + wal_level: logical + ignore_slots: + - plugin: spock_output + method: restore + initdb: + - data-checksums + restore: + command: /usr/bin/pgbackrest --config /opt/pgedge/configs/pgbackrest.restore.conf --stanza db restore + keep_existing_recovery_conf: true + no_params: true +etcd3: + hosts: + - i-0123456789abcdef.ec2.internal:2379 + protocol: https + username: instance.storefront-n1-689qacsi + password: password + cacert: /opt/pgedge/certificates/etcd/ca.crt + cert: /opt/pgedge/certificates/etcd/client.crt + key: /opt/pgedge/certificates/etcd/client.key +postgresql: + authentication: + superuser: + username: pgedge + sslmode: verify-full + sslkey: /opt/pgedge/certificates/postgres/superuser.key + sslcert: /opt/pgedge/certificates/postgres/superuser.crt + sslrootcert: /opt/pgedge/certificates/postgres/ca.crt + replication: + username: patroni_replicator + sslmode: verify-full + sslkey: /opt/pgedge/certificates/postgres/replication.key + sslcert: /opt/pgedge/certificates/postgres/replication.crt + sslrootcert: /opt/pgedge/certificates/postgres/ca.crt + connect_address: storefront-n1-689qacsi.storefront-database:5432 + data_dir: /opt/pgedge/data/pgdata + listen: "*:5432" + parameters: + archive_command: /bin/true + archive_mode: "on" + autovacuum_max_workers: 3 + autovacuum_vacuum_cost_limit: 200 + autovacuum_work_mem: 262144 + checkpoint_completion_target: "0.9" + checkpoint_timeout: 15min + dynamic_shared_memory_type: posix + effective_cache_size: 524288 + hot_standby_feedback: "on" + lolor.node: 1 + maintenance_work_mem: 137518 + max_parallel_workers: 8 + password_encryption: md5 + shared_buffers: 262144 + shared_preload_libraries: pg_stat_statements,snowflake,spock,postgis-3 + snowflake.node: 1 + spock.allow_ddl_from_functions: "on" + spock.conflict_log_level: DEBUG + spock.conflict_resolution: last_update_wins + spock.enable_ddl_replication: "on" + spock.include_ddl_repset: "on" + spock.save_resolutions: "on" + ssl: "on" + ssl_ca_file: /opt/pgedge/certificates/postgres/ca.crt + ssl_cert_file: /opt/pgedge/certificates/postgres/server.crt + ssl_key_file: /opt/pgedge/certificates/postgres/server.key + track_io_timing: "on" + wal_log_hints: "on" + wal_sender_timeout: 5s + pg_hba: + - local all all trust + - host all all 127.0.0.1/32 trust + - host all all ::1/128 trust + - local replication all trust + - host replication all 127.0.0.1/32 trust + - host replication all ::1/128 trust + - hostssl all pgedge,patroni_replicator 172.17.0.1/32 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 172.17.0.1/32 cert clientcert=verify-full + - hostssl all pgedge,patroni_replicator 10.128.165.128/26 cert clientcert=verify-full + - hostssl replication pgedge,patroni_replicator 10.128.165.128/26 cert clientcert=verify-full + - host all pgedge,patroni_replicator 0.0.0.0/0 reject + - host all pgedge,patroni_replicator ::0/0 reject + - host all all 172.17.0.1/32 md5 + - host all all 172.17.0.1/16 reject + - host all all 0.0.0.0/0 md5 + use_pg_rewind: true + remove_data_directory_on_rewind_failure: true + remove_data_directory_on_diverged_timelines: true +restapi: + connect_address: storefront-n1-689qacsi.storefront-database:8888 + listen: 0.0.0.0:8888 + allowlist: + - 172.17.0.1/32 + - 10.128.165.128/26 + - 127.0.0.1 + - localhost +watchdog: + mode: "off" diff --git a/server/internal/orchestrator/common/main_test.go b/server/internal/orchestrator/common/main_test.go new file mode 100644 index 00000000..3ce19fb2 --- /dev/null +++ b/server/internal/orchestrator/common/main_test.go @@ -0,0 +1,16 @@ +package common_test + +import ( + "flag" + "os" + "testing" +) + +var update bool + +func TestMain(m *testing.M) { + flag.BoolVar(&update, "update", false, "update golden test outputs") + flag.Parse() + + os.Exit(m.Run()) +} diff --git a/server/internal/orchestrator/common/paths.go b/server/internal/orchestrator/common/paths.go new file mode 100644 index 00000000..73e14c1c --- /dev/null +++ b/server/internal/orchestrator/common/paths.go @@ -0,0 +1,157 @@ +package common + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/pgEdge/control-plane/server/internal/pgbackrest" +) + +type InstancePaths struct { + Instance Paths `json:"instance"` + Host Paths `json:"host"` + PgBackRestPath string `json:"pg_backrest_path"` + PatroniPath string `json:"patroni_path"` +} + +func (p *InstancePaths) HostMvDataToRestoreCmd() []string { + return []string{"mv", p.Host.PgData(), p.Host.PgDataRestore()} +} + +func (p *InstancePaths) InstanceMvRestoreToDataCmd() []string { + return []string{"mv", p.Instance.PgDataRestore(), p.Instance.PgData()} +} + +func (p *InstancePaths) PgBackRestBackupCmd(command string, args ...string) pgbackrest.Cmd { + return pgbackrest.Cmd{ + PgBackrestCmd: p.PgBackRestPath, + Config: p.Instance.PgBackRestConfig(PgBackRestConfigTypeBackup), + Stanza: "db", + Command: command, + Args: args, + } +} + +var targetActionRestoreTypes = ds.NewSet( + "immediate", + "lsn", + "name", + "time", + "xid", +) + +func (p *InstancePaths) PgBackRestRestoreCmd(command string, args ...string) pgbackrest.Cmd { + var hasTargetAction, needsTargetAction bool + for i := 0; i < len(args); i++ { + arg := args[i] + if strings.HasPrefix(arg, "--target-action") { + hasTargetAction = true + continue // skip the next arg since it's the value of --target-action no further checks needed + } + var restoreType string + if arg == "--type" && i+1 < len(args) { + restoreType = args[i+1] + i++ // skip the next arg since it's the value of --type + } else if strings.HasPrefix(arg, "--type=") { + restoreType = strings.TrimPrefix(arg, "--type=") + } else { + continue + } + if targetActionRestoreTypes.Has(restoreType) { + needsTargetAction = true + } + } + if needsTargetAction && !hasTargetAction { + args = append(args, "--target-action=promote") + } + + return pgbackrest.Cmd{ + PgBackrestCmd: p.PgBackRestPath, + Config: p.Instance.PgBackRestConfig(PgBackRestConfigTypeRestore), + Stanza: "db", + Command: command, + Args: args, + } +} + +type Paths struct { + BaseDir string `json:"base_dir"` +} + +func (p *Paths) Data() string { + return filepath.Join(p.BaseDir, "data") +} + +func (p *Paths) Configs() string { + return filepath.Join(p.BaseDir, "configs") +} + +func (p *Paths) Certificates() string { + return filepath.Join(p.BaseDir, "certificates") +} + +func (p *Paths) PgData() string { + return filepath.Join(p.Data(), "pgdata") +} + +func (p *Paths) PgDataRestore() string { + return filepath.Join(p.Data(), "pgdata-restore") +} + +func (p *Paths) PatroniConfig() string { + return filepath.Join(p.Configs(), "patroni.yaml") +} + +func (p *Paths) PgBackRestConfig(confType PgBackRestConfigType) string { + return filepath.Join(p.Configs(), fmt.Sprintf("pgbackrest.%s.conf", confType)) +} + +func (p *Paths) EtcdCertificates() string { + return filepath.Join(p.Certificates(), "etcd") +} + +func (p *Paths) EtcdCaCert() string { + return filepath.Join(p.EtcdCertificates(), etcdCaCertName) +} + +func (p *Paths) EtcdClientCert() string { + return filepath.Join(p.EtcdCertificates(), etcdClientCertName) +} + +func (p *Paths) EtcdClientKey() string { + return filepath.Join(p.EtcdCertificates(), etcdClientKeyName) +} + +func (p *Paths) PostgresCertificates() string { + return filepath.Join(p.Certificates(), "postgres") +} + +func (p *Paths) PostgresCaCert() string { + return filepath.Join(p.PostgresCertificates(), postgresCaCertName) +} + +func (p *Paths) PostgresServerCert() string { + return filepath.Join(p.PostgresCertificates(), postgresServerCertName) +} + +func (p *Paths) PostgresServerKey() string { + return filepath.Join(p.PostgresCertificates(), postgresServerKeyName) +} + +func (p *Paths) PostgresSuperuserCert() string { + return filepath.Join(p.PostgresCertificates(), postgresSuperuserCertName) +} + +func (p *Paths) PostgresSuperuserKey() string { + return filepath.Join(p.PostgresCertificates(), postgresSuperuserKeyName) +} + +func (p *Paths) PostgresReplicatorCert() string { + return filepath.Join(p.PostgresCertificates(), postgresReplicatorCertName) +} + +func (p *Paths) PostgresReplicatorKey() string { + return filepath.Join(p.PostgresCertificates(), postgresReplicatorKeyName) +} diff --git a/server/internal/orchestrator/common/paths_test.go b/server/internal/orchestrator/common/paths_test.go new file mode 100644 index 00000000..6a8925e5 --- /dev/null +++ b/server/internal/orchestrator/common/paths_test.go @@ -0,0 +1,93 @@ +package common_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/pgEdge/control-plane/server/internal/orchestrator/common" + "github.com/pgEdge/control-plane/server/internal/pgbackrest" +) + +func TestInstancePaths(t *testing.T) { + t.Run("PgBackRestRestoreCmd", func(t *testing.T) { + for _, tc := range []struct { + name string + command string + args []string + expected pgbackrest.Cmd + }{ + { + name: "default", + command: "restore", + args: nil, + expected: pgbackrest.Cmd{ + PgBackrestCmd: "/usr/bin/pgbackrest", + Config: "/opt/pgedge/configs/pgbackrest.backup.conf", + Stanza: "db", + Command: "restore", + Args: nil, + }, + }, + { + name: "needs target action", + command: "restore", + args: []string{"--type", "immediate"}, + expected: pgbackrest.Cmd{ + PgBackrestCmd: "/usr/bin/pgbackrest", + Config: "/opt/pgedge/configs/pgbackrest.backup.conf", + Stanza: "db", + Command: "restore", + Args: []string{"--type", "immediate", "--target-action=promote"}, + }, + }, + { + name: "needs target action with =", + command: "restore", + args: []string{"--type=name"}, + expected: pgbackrest.Cmd{ + PgBackrestCmd: "/usr/bin/pgbackrest", + Config: "/opt/pgedge/configs/pgbackrest.backup.conf", + Stanza: "db", + Command: "restore", + Args: []string{"--type=name", "--target-action=promote"}, + }, + }, + { + name: "already has target action", + command: "restore", + args: []string{"--type=name", "--target-action", "pause"}, + expected: pgbackrest.Cmd{ + PgBackrestCmd: "/usr/bin/pgbackrest", + Config: "/opt/pgedge/configs/pgbackrest.backup.conf", + Stanza: "db", + Command: "restore", + Args: []string{"--type=name", "--target-action", "pause"}, + }, + }, + { + name: "already has target action with =", + command: "restore", + args: []string{"--type=name", "--target-action=pause"}, + expected: pgbackrest.Cmd{ + PgBackrestCmd: "/usr/bin/pgbackrest", + Config: "/opt/pgedge/configs/pgbackrest.backup.conf", + Stanza: "db", + Command: "restore", + Args: []string{"--type=name", "--target-action=pause"}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + paths := &common.InstancePaths{ + Instance: common.Paths{BaseDir: "/opt/pgedge"}, + Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + PgBackRestPath: "/usr/bin/pgbackrest", + PatroniPath: "/usr/bin/patroni", + } + result := paths.PgBackRestRestoreCmd(tc.command, tc.args...) + assert.Equal(t, tc.expected.PgBackrestCmd, result.PgBackrestCmd) + }) + } + }) +} diff --git a/server/internal/orchestrator/common/patroni_cluster.go b/server/internal/orchestrator/common/patroni_cluster.go new file mode 100644 index 00000000..61b6651b --- /dev/null +++ b/server/internal/orchestrator/common/patroni_cluster.go @@ -0,0 +1,79 @@ +package common + +import ( + "context" + "fmt" + + "github.com/samber/do" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/resource" + "github.com/pgEdge/control-plane/server/internal/storage" +) + +var _ resource.Resource = (*PatroniCluster)(nil) + +const ResourceTypePatroniCluster resource.Type = "common.patroni_cluster" + +func PatroniClusterResourceIdentifier(nodeName string) resource.Identifier { + return resource.Identifier{ + ID: nodeName, + Type: ResourceTypePatroniCluster, + } +} + +type PatroniCluster struct { + DatabaseID string `json:"database_id"` + NodeName string `json:"node_name"` + PatroniClusterPrefix string `json:"patroni_namespace"` +} + +func (p *PatroniCluster) ResourceVersion() string { + return "1" +} + +func (p *PatroniCluster) DiffIgnore() []string { + return nil +} + +func (p *PatroniCluster) Executor() resource.Executor { + return resource.AnyExecutor() +} + +func (p *PatroniCluster) Identifier() resource.Identifier { + return PatroniClusterResourceIdentifier(p.NodeName) +} + +func (p *PatroniCluster) Dependencies() []resource.Identifier { + return nil +} + +func (p *PatroniCluster) TypeDependencies() []resource.Type { + return nil +} + +func (p *PatroniCluster) Refresh(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (p *PatroniCluster) Create(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (p *PatroniCluster) Update(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (p *PatroniCluster) Delete(ctx context.Context, rc *resource.Context) error { + client, err := do.Invoke[*clientv3.Client](rc.Injector) + if err != nil { + return err + } + + _, err = storage.NewDeletePrefixOp(client, p.PatroniClusterPrefix).Exec(ctx) + if err != nil { + return fmt.Errorf("failed to delete patroni namespace from DCS: %w", err) + } + + return nil +} diff --git a/server/internal/orchestrator/common/patroni_config.go b/server/internal/orchestrator/common/patroni_config.go new file mode 100644 index 00000000..dc267bcb --- /dev/null +++ b/server/internal/orchestrator/common/patroni_config.go @@ -0,0 +1,159 @@ +package common + +import ( + "context" + "errors" + "fmt" + "path/filepath" + + "github.com/goccy/go-yaml" + "github.com/samber/do" + "github.com/spf13/afero" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/filesystem" + "github.com/pgEdge/control-plane/server/internal/patroni" + "github.com/pgEdge/control-plane/server/internal/postgres/hba" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +type PatroniConfig struct { + InstanceID string `json:"instance_id"` + HostID string `json:"host_id"` + NodeName string `json:"node_name"` + Generator *PatroniConfigGenerator `json:"generator"` + ParentID string `json:"parent_id"` + OwnerUID int `json:"owner_uid"` + OwnerGID int `json:"owner_gid"` +} + +func (c *PatroniConfig) Dependencies() []resource.Identifier { + deps := []resource.Identifier{ + filesystem.DirResourceIdentifier(c.ParentID), + EtcdCredsIdentifier(c.InstanceID), + PatroniMemberResourceIdentifier(c.InstanceID), + PatroniClusterResourceIdentifier(c.NodeName), + } + return deps +} + +func (c *PatroniConfig) TypeDependencies() []resource.Type { + return nil +} + +func (c *PatroniConfig) Refresh(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + + contents, err := ReadResourceFile(fs, filepath.Join(parentFullPath, "patroni.yaml")) + if err != nil { + return fmt.Errorf("failed to read patroni config: %w", err) + } + + // Test that we can parse the file. We'll want to recreate the file if it's + // malformed. + var config patroni.Config + if err := yaml.Unmarshal(contents, &config); err != nil { + return fmt.Errorf("%w: failed to unmarshal patroni config", resource.ErrNotFound) + } + + return nil +} + +func (c *PatroniConfig) Create( + ctx context.Context, + rc *resource.Context, + systemAddresses []string, + extraHbaEntries []hba.Entry, +) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + etcdClient, err := do.Invoke[*clientv3.Client](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + + etcdCreds, err := resource.FromContext[*EtcdCreds](rc, EtcdCredsIdentifier(c.InstanceID)) + if err != nil { + return fmt.Errorf("failed to get etcd creds from state: %w", err) + } + + etcdHosts, err := patroni.EtcdHosts(ctx, etcdClient) + if err != nil { + return fmt.Errorf("failed to get etcd hosts: %w", err) + } + + enableFastBasebackup, err := c.isNewNode(rc) + if err != nil { + return err + } + + config := c.Generator.Generate(etcdHosts, etcdCreds, GenerateOptions{ + EnableFastBasebackup: enableFastBasebackup, + SystemAddresses: systemAddresses, + ExtraHbaEntries: extraHbaEntries, + }) + + content, err := yaml.Marshal(config) + if err != nil { + return fmt.Errorf("failed to marshal patroni config: %w", err) + } + + configPath := filepath.Join(parentFullPath, "patroni.yaml") + if err := afero.WriteFile(fs, configPath, content, 0o600); err != nil { + return fmt.Errorf("failed to write %s: %w", configPath, err) + } + if err := fs.Chown(configPath, c.OwnerUID, c.OwnerGID); err != nil { + return fmt.Errorf("failed to change ownership for %s: %w", configPath, err) + } + + return nil +} + +func (c *PatroniConfig) Delete(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + + err = fs.Remove(filepath.Join(parentFullPath, "patroni.yaml")) + if errors.Is(err, afero.ErrFileNotFound) { + return nil + } else if err != nil { + return fmt.Errorf("failed to remove patroni.yaml: %w", err) + } + + return nil +} + +func (c *PatroniConfig) isNewNode(rc *resource.Context) (bool, error) { + _, err := resource.FromContext[*database.NodeResource](rc, database.NodeResourceIdentifier(c.NodeName)) + switch { + case errors.Is(err, resource.ErrNotFound): + return true, nil + case err != nil: + return false, fmt.Errorf("failed to check if node already exists: %w", err) + default: + return false, nil + } +} diff --git a/server/internal/orchestrator/common/patroni_config_generator.go b/server/internal/orchestrator/common/patroni_config_generator.go new file mode 100644 index 00000000..09d5203b --- /dev/null +++ b/server/internal/orchestrator/common/patroni_config_generator.go @@ -0,0 +1,432 @@ +package common + +import ( + "fmt" + "maps" + "net" + "path/filepath" + "slices" + "strconv" + "strings" + + "github.com/alessio/shellescape" + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/patroni" + "github.com/pgEdge/control-plane/server/internal/postgres" + "github.com/pgEdge/control-plane/server/internal/postgres/hba" + "github.com/pgEdge/control-plane/server/internal/utils" +) + +type PatroniConfigGenerator struct { + // ArchiveCommand sets the Postgres archive command parameter. + ArchiveCommand string `json:"archive_command,omitempty"` + // ClusterSize is the number of nodes in the Spock cluster. This is used for + // the tunable Postgres parameters. + ClusterSize int `json:"cluster_size"` + // CPUs is the number of CPUs allocated for this instance. This is used for + // the tunable Postgres parameters. + CPUs float64 `json:"cpus,omitempty"` + // DatabaseID is the Database's ID. + DatabaseID string `json:"database_id"` + // DataDir is the Postgres data directory. + DataDir string `json:"data_dir"` + // EtcdCertsDir is the Etcd certificates directory. + EtcdCertsDir string `json:"etcd_certs_dir"` + // FQDN is the fully-qualified domain name for this instance. This name must + // be reachable by sibling instances within the Spock node. + FQDN string `json:"fqdn"` + // InstanceID is this instance's ID. + InstanceID string `json:"instance_id"` + // MemoryBytes is the amount of memory that is allocated for this instance. + // This is used for the tunable Postgres parameters. + MemoryBytes uint64 `json:"memory_bytes,omitempty"` + // NodeName is the Spock node name. + NodeName string `json:"node_name"` + // NodeOrdinal is the ordinal part of the Spock node name, e.g. for 'n1' + // this would be '1'. This is used to configure the Snowflake and LOLOR + // extensions. + NodeOrdinal int `json:"node_ordinal"` + // OrchestratorParameters are additional parameters to be provided by the + // orchestrator implementation. + OrchestratorParameters map[string]any `json:"orchestrator_parameters,omitempty"` + // PatroniAllowlist is a user-specified list of addresses, hostnames, or + // CIDR ranges to include in the allowlist for Patroni's REST API. + PatroniAllowlist []string `json:"patroni_allowlist"` + // PatroniPort is the port that Patroni will listen on. + PatroniPort int `json:"patroni_port"` + // PostgresCertsDir is the Postgres certificates directory. + PostgresCertsDir string `json:"postgres_certs_dir"` + // PostgresPort is the port that Postgres will listen on. + PostgresPort int `json:"postgres_port"` + // RestoreCommand is an alternate command to use to bootstrap this instance. + RestoreCommand string `json:"restore_command"` + // SpecParameters are user-specified Postgres parameters that are included + // in the database spec. + SpecParameters map[string]any `json:"spec_parameters,omitempty"` + // TenantID is an optional tenant ID that is associated with this instance. + TenantID *string `json:"tenant_id,omitempty"` +} + +type PatroniConfigGeneratorOptions struct { + // Instance is the instance spec for this instance. + Instance *database.InstanceSpec + // HostCPUs is the total number of CPUs available on the host. This is used for + // the tunable Postgres parameters. + HostCPUs float64 + // HostMemoryBytes is the total amount of memory available on the host. This + // is used for the tunable Postgres parameters. + HostMemoryBytes uint64 + // FQDN is the fully-qualified domain name for this instance. This name must + // be reachable by sibling instances within the Spock node. + FQDN string + // OrchestratorParameters are additional parameters to be provided by the + // orchestrator implementation. + OrchestratorParameters map[string]any + // PatroniPort is the port that Patroni will listen on. + PatroniPort int + // PostgresPort is the port that Postgres will listen on. + PostgresPort int + // Paths is used to compute the paths of directories and executables. + Paths InstancePaths +} + +func NewPatroniConfigGenerator(opts PatroniConfigGeneratorOptions) *PatroniConfigGenerator { + cpus := opts.Instance.CPUs + if cpus == 0 { + cpus = opts.HostCPUs + } + memoryBytes := opts.Instance.MemoryBytes + if memoryBytes == 0 { + memoryBytes = opts.HostMemoryBytes + } + var archiveCommand string + if opts.Instance.BackupConfig != nil { + archiveCommand = opts.Paths.PgBackRestBackupCmd("archive-push", `"%p"`).String() + } + var restoreCommand string + if opts.Instance.RestoreConfig != nil { + if opts.Instance.InPlaceRestore { + restoreCommand = strings.Join(opts.Paths.InstanceMvRestoreToDataCmd(), " ") + } else { + restoreOptions := utils.BuildOptionArgs(opts.Instance.RestoreConfig.RestoreOptions) + for i, o := range restoreOptions { + restoreOptions[i] = shellescape.Quote(o) + } + restoreCommand = opts.Paths.PgBackRestRestoreCmd("restore", restoreOptions...).String() + } + } + return &PatroniConfigGenerator{ + ArchiveCommand: archiveCommand, + ClusterSize: opts.Instance.ClusterSize, + CPUs: cpus, + DatabaseID: opts.Instance.DatabaseID, + DataDir: opts.Paths.Instance.PgData(), + EtcdCertsDir: opts.Paths.Instance.EtcdCertificates(), + FQDN: opts.FQDN, + InstanceID: opts.Instance.InstanceID, + MemoryBytes: memoryBytes, + NodeName: opts.Instance.NodeName, + NodeOrdinal: opts.Instance.NodeOrdinal, + OrchestratorParameters: opts.OrchestratorParameters, + PatroniPort: opts.PatroniPort, + PostgresCertsDir: opts.Paths.Instance.PostgresCertificates(), + PostgresPort: opts.PostgresPort, + RestoreCommand: restoreCommand, + SpecParameters: opts.Instance.PostgreSQLConf, + TenantID: opts.Instance.TenantID, + // TODO: Add allowlist field to instance and database types + // PatroniAllowlist: opts.Instance.PatroniAllowlist, + } +} + +type GenerateOptions struct { + // SystemAddresses are IPs, hostnames, or CIDR ranges that pgedge or + // patroni_replicator connections will originate from. + SystemAddresses []string + // ExtraHbaEntries are orchestrator-specific entries to include in the + // pg_hba.conf. + ExtraHbaEntries []hba.Entry + // EnableFastBasebackup enables basebackup's "fast" checkpoint option when + // bootstrapping this instance from another existing instance. + EnableFastBasebackup bool +} + +func (p *PatroniConfigGenerator) Generate( + etcdHosts []string, + etcdCreds *EtcdCreds, + opts GenerateOptions, +) *patroni.Config { + parameters := p.parameters() + dcsParameters := patroni.ExtractPatroniControlledGUCs(parameters) + + return &patroni.Config{ + Name: utils.PointerTo(p.InstanceID), + Namespace: utils.PointerTo(patroni.Namespace()), + Scope: utils.PointerTo(patroni.ClusterName(p.DatabaseID, p.NodeName)), + Log: p.log(), + Bootstrap: p.bootstrap(dcsParameters), + Etcd3: p.etcd(etcdHosts, etcdCreds), + RestAPI: p.restAPI(opts.SystemAddresses), + Watchdog: &patroni.Watchdog{ + Mode: utils.PointerTo(patroni.WatchdogModeOff), + }, + Postgresql: p.postgreSQL( + opts.EnableFastBasebackup, + parameters, + opts.SystemAddresses, + opts.ExtraHbaEntries, + ), + } +} + +func (p *PatroniConfigGenerator) parameters() map[string]any { + parameters := postgres.DefaultGUCs() + maps.Copy(parameters, postgres.Spock4DefaultGUCs()) + maps.Copy(parameters, postgres.DefaultTunableGUCs(p.MemoryBytes, p.CPUs, p.ClusterSize)) + maps.Copy(parameters, map[string]any{ + "ssl": "on", + "ssl_ca_file": filepath.Join(p.PostgresCertsDir, postgresCaCertName), + "ssl_cert_file": filepath.Join(p.PostgresCertsDir, postgresServerCertName), + "ssl_key_file": filepath.Join(p.PostgresCertsDir, postgresServerKeyName), + }) + maps.Copy(parameters, p.OrchestratorParameters) + if p.ArchiveCommand != "" { + maps.Copy(parameters, map[string]any{ + // It's safe to set this to "on" on every instance in the node + // because "on" (as opposed to "always") will only archive from the + // primary instance. + "archive_mode": "on", + "archive_command": p.ArchiveCommand, + }) + } + maps.Copy(parameters, postgres.SnowflakeLolorGUCs(p.NodeOrdinal)) + maps.Copy(parameters, p.SpecParameters) + + return parameters +} + +func (p *PatroniConfigGenerator) bootstrap(dcsParameters map[string]any) *patroni.Bootstrap { + bootstrap := &patroni.Bootstrap{ + DCS: &patroni.DCS{ + Postgresql: &patroni.DCSPostgreSQL{ + Parameters: &dcsParameters, + }, + IgnoreSlots: &[]patroni.IgnoreSlot{ + {Plugin: utils.PointerTo("spock_output")}, + }, + TTL: utils.PointerTo(30), + LoopWait: utils.PointerTo(10), + RetryTimeout: utils.PointerTo(10), + }, + InitDB: utils.PointerTo([]string{"data-checksums"}), + } + + if p.RestoreCommand != "" { + bootstrap.Method = utils.PointerTo(patroni.BootstrapMethodNameRestore) + bootstrap.Restore = &patroni.BootstrapMethodConf{ + Command: utils.PointerTo(p.RestoreCommand), + NoParams: utils.PointerTo(true), + KeepExistingRecoveryConf: utils.PointerTo(true), + } + } + + return bootstrap +} + +func (p *PatroniConfigGenerator) log() *patroni.Log { + fields := map[string]string{ + "database_id": p.DatabaseID, + "instance_id": p.InstanceID, + "node_name": p.NodeName, + } + if p.TenantID != nil { + fields["tenant_id"] = *p.TenantID + } + + return &patroni.Log{ + Type: utils.PointerTo(patroni.LogTypeJson), + Level: utils.PointerTo(patroni.LogLevelInfo), + StaticFields: &fields, + } +} + +func (p *PatroniConfigGenerator) etcd(hosts []string, creds *EtcdCreds) *patroni.Etcd { + return &patroni.Etcd{ + Hosts: &hosts, + CACert: utils.PointerTo(filepath.Join(p.EtcdCertsDir, etcdCaCertName)), + Cert: utils.PointerTo(filepath.Join(p.EtcdCertsDir, etcdClientCertName)), + Key: utils.PointerTo(filepath.Join(p.EtcdCertsDir, etcdClientKeyName)), + Username: &creds.Username, + Password: &creds.Password, + Protocol: utils.PointerTo("https"), + } +} + +func (p *PatroniConfigGenerator) restAPI(systemAddresses []string) *patroni.RestAPI { + combined := utils.PointerTo(slices.Concat( + p.PatroniAllowlist, + systemAddresses, + []string{ + "127.0.0.1", // Always allow local connections + "localhost", + "::1", + }, + )) + + return &patroni.RestAPI{ + ConnectAddress: utils.PointerTo(net.JoinHostPort(p.FQDN, strconv.Itoa(p.PatroniPort))), + Listen: utils.PointerTo(fmt.Sprintf("0.0.0.0:%d", p.PatroniPort)), + Allowlist: combined, + } +} + +func (p *PatroniConfigGenerator) postgreSQL( + enableFastBasebackup bool, + parameters map[string]any, + systemAddresses []string, + extraEntries []hba.Entry, +) *patroni.PostgreSQL { + var basebackup *[]any + if enableFastBasebackup { + // Causes basebackup to request an immediate checkpoint. The tradeoff + // is that the checkpoint operation can disrupt clients. We enable it + // by default for new nodes because the primary shouldn't have any + // clients outside the control plane. + basebackup = &[]any{ + map[string]string{"checkpoint": "fast"}, + } + } + + return &patroni.PostgreSQL{ + ConnectAddress: utils.PointerTo(net.JoinHostPort(p.FQDN, strconv.Itoa(p.PostgresPort))), + DataDir: &p.DataDir, + Parameters: ¶meters, + Listen: utils.PointerTo(fmt.Sprintf("*:%d", p.PostgresPort)), + BaseBackup: basebackup, + UsePgRewind: utils.PointerTo(true), + RemoveDataDirectoryOnRewindFailure: utils.PointerTo(true), + RemoveDataDirectoryOnDivergedTimelines: utils.PointerTo(true), + Authentication: p.authentication(), + PgHba: p.pgHba(systemAddresses, extraEntries), + } +} + +func (p *PatroniConfigGenerator) authentication() *patroni.Authentication { + return &patroni.Authentication{ + Superuser: &patroni.User{ + Username: utils.PointerTo("pgedge"), + SSLRootCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresCaCertName)), + SSLCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresSuperuserCertName)), + SSLKey: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresSuperuserKeyName)), + SSLMode: utils.PointerTo("verify-full"), + }, + Replication: &patroni.User{ + Username: utils.PointerTo("patroni_replicator"), + SSLRootCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresCaCertName)), + SSLCert: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresReplicatorCertName)), + SSLKey: utils.PointerTo(filepath.Join(p.PostgresCertsDir, postgresReplicatorKeyName)), + SSLMode: utils.PointerTo("verify-full"), + }, + } +} + +func (p *PatroniConfigGenerator) pgHba(systemAddresses []string, extraEntries []hba.Entry) *[]string { + entries := []string{ + // Trust local connections + hba.Entry{ + Type: hba.EntryTypeLocal, + Database: "all", + User: "all", + AuthMethod: hba.AuthMethodTrust, + }.String(), + hba.Entry{ + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "127.0.0.1/32", + AuthMethod: hba.AuthMethodTrust, + }.String(), + hba.Entry{ + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "::1/128", + AuthMethod: hba.AuthMethodTrust, + }.String(), + hba.Entry{ + Type: hba.EntryTypeLocal, + Database: "replication", + User: "all", + AuthMethod: hba.AuthMethodTrust, + }.String(), + hba.Entry{ + Type: hba.EntryTypeHost, + Database: "replication", + User: "all", + Address: "127.0.0.1/32", + AuthMethod: hba.AuthMethodTrust, + }.String(), + hba.Entry{ + Type: hba.EntryTypeHost, + Database: "replication", + User: "all", + Address: "::1/128", + AuthMethod: hba.AuthMethodTrust, + }.String(), + } + + // Reject connections for system users except for SSL connections from the + // given system addresses. + for _, address := range systemAddresses { + entries = append(entries, + hba.Entry{ + Type: hba.EntryTypeHostSSL, + Database: "all", + User: "pgedge,patroni_replicator", + Address: address, + AuthMethod: hba.AuthMethodCert, + AuthOptions: "clientcert=verify-full", + }.String(), + hba.Entry{ + Type: hba.EntryTypeHostSSL, + Database: "replication", + User: "pgedge,patroni_replicator", + Address: address, + AuthMethod: hba.AuthMethodCert, + AuthOptions: "clientcert=verify-full", + }.String(), + ) + } + entries = append(entries, + hba.Entry{ + Type: hba.EntryTypeHost, + Database: "all", + User: "pgedge,patroni_replicator", + Address: "0.0.0.0/0", + AuthMethod: hba.AuthMethodReject, + }.String(), + hba.Entry{ + Type: hba.EntryTypeHost, + Database: "all", + User: "pgedge,patroni_replicator", + Address: "::0/0", + AuthMethod: hba.AuthMethodReject, + }.String(), + ) + + for _, entry := range extraEntries { + entries = append(entries, entry.String()) + } + + // Use MD5 for non-system users from all other connections + // TODO: Can we safely upgrade this to scram-sha-256? + entries = append(entries, hba.Entry{ + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "0.0.0.0/0", + AuthMethod: hba.AuthMethodMD5, + }.String()) + + return &entries +} diff --git a/server/internal/orchestrator/common/patroni_config_generator_test.go b/server/internal/orchestrator/common/patroni_config_generator_test.go new file mode 100644 index 00000000..5500ef95 --- /dev/null +++ b/server/internal/orchestrator/common/patroni_config_generator_test.go @@ -0,0 +1,366 @@ +package common_test + +import ( + "testing" + + "github.com/goccy/go-yaml" + "github.com/stretchr/testify/require" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/host" + "github.com/pgEdge/control-plane/server/internal/orchestrator/common" + "github.com/pgEdge/control-plane/server/internal/patroni" + "github.com/pgEdge/control-plane/server/internal/postgres/hba" + "github.com/pgEdge/control-plane/server/internal/testutils" +) + +func TestPatroniConfigGenerator(t *testing.T) { + golden := &testutils.GoldenTest[*patroni.Config]{ + Compare: func(t testing.TB, expected, actual *patroni.Config) { + // The specific number types (e.g. int64) get lost in the conversion + // to and from YAML, so we marshal and unmarshal the actual value + // before comparison to normalize it. + raw, err := yaml.Marshal(actual) + require.NoError(t, err) + + var roundTrippedActual *patroni.Config + require.NoError(t, yaml.Unmarshal(raw, &roundTrippedActual)) + + require.Equal(t, expected, roundTrippedActual) + }, + FileExtension: ".yaml", + Marshal: yaml.Marshal, + Unmarshal: yaml.Unmarshal, + } + + for _, tc := range []struct { + name string + options common.PatroniConfigGeneratorOptions + etcdHosts []string + etcdCreds *common.EtcdCreds + generateOptions common.GenerateOptions + }{ + { + name: "minimal swarm", + options: common.PatroniConfigGeneratorOptions{ + Instance: &database.InstanceSpec{ + InstanceID: "storefront-n1-689qacsi", + DatabaseID: "storefront", + HostID: "host-1", + DatabaseName: "app", + NodeName: "n1", + NodeOrdinal: 1, + PgEdgeVersion: host.MustPgEdgeVersion("18.1", "5.0.4"), + ClusterSize: 3, + }, + HostCPUs: 4, + HostMemoryBytes: 1024 * 1024 * 1024 * 8, + FQDN: "storefront-n1-689qacsi.storefront-database", + OrchestratorParameters: map[string]any{ + "shared_preload_libraries": "pg_stat_statements,snowflake,spock,postgis-3", + }, + PatroniPort: 8888, + PostgresPort: 5432, + Paths: common.InstancePaths{ + Instance: common.Paths{BaseDir: "/opt/pgedge"}, + Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + PgBackRestPath: "/usr/bin/pgbackrest", + PatroniPath: "/usr/local/bin/patroni", + }, + }, + etcdHosts: []string{"i-0123456789abcdef.ec2.internal:2379"}, + etcdCreds: &common.EtcdCreds{ + Username: "instance.storefront-n1-689qacsi", + Password: "password", + }, + generateOptions: common.GenerateOptions{ + SystemAddresses: []string{ + "172.17.0.1/32", + "10.128.165.128/26", + }, + ExtraHbaEntries: []hba.Entry{ + { + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "172.17.0.1/32", + AuthMethod: hba.AuthMethodMD5, + }, + { + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "172.17.0.1/16", + AuthMethod: hba.AuthMethodReject, + }, + }, + }, + }, + { + name: "with backup config", + options: common.PatroniConfigGeneratorOptions{ + Instance: &database.InstanceSpec{ + InstanceID: "storefront-n1-689qacsi", + DatabaseID: "storefront", + HostID: "host-1", + DatabaseName: "app", + NodeName: "n1", + NodeOrdinal: 1, + PgEdgeVersion: host.MustPgEdgeVersion("18.1", "5.0.4"), + ClusterSize: 3, + BackupConfig: &database.BackupConfig{}, + }, + HostCPUs: 4, + HostMemoryBytes: 1024 * 1024 * 1024 * 8, + FQDN: "storefront-n1-689qacsi.storefront-database", + OrchestratorParameters: map[string]any{ + "shared_preload_libraries": "pg_stat_statements,snowflake,spock,postgis-3", + }, + PatroniPort: 8888, + PostgresPort: 5432, + Paths: common.InstancePaths{ + Instance: common.Paths{BaseDir: "/opt/pgedge"}, + Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + PgBackRestPath: "/usr/bin/pgbackrest", + PatroniPath: "/usr/local/bin/patroni", + }, + }, + etcdHosts: []string{"i-0123456789abcdef.ec2.internal:2379"}, + etcdCreds: &common.EtcdCreds{ + Username: "instance.storefront-n1-689qacsi", + Password: "password", + }, + generateOptions: common.GenerateOptions{ + SystemAddresses: []string{ + "172.17.0.1/32", + "10.128.165.128/26", + }, + ExtraHbaEntries: []hba.Entry{ + { + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "172.17.0.1/32", + AuthMethod: hba.AuthMethodMD5, + }, + { + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "172.17.0.1/16", + AuthMethod: hba.AuthMethodReject, + }, + }, + }, + }, + { + name: "with restore config", + options: common.PatroniConfigGeneratorOptions{ + Instance: &database.InstanceSpec{ + InstanceID: "storefront-n1-689qacsi", + DatabaseID: "storefront", + HostID: "host-1", + DatabaseName: "app", + NodeName: "n1", + NodeOrdinal: 1, + PgEdgeVersion: host.MustPgEdgeVersion("18.1", "5.0.4"), + ClusterSize: 3, + RestoreConfig: &database.RestoreConfig{}, + }, + HostCPUs: 4, + HostMemoryBytes: 1024 * 1024 * 1024 * 8, + FQDN: "storefront-n1-689qacsi.storefront-database", + OrchestratorParameters: map[string]any{ + "shared_preload_libraries": "pg_stat_statements,snowflake,spock,postgis-3", + }, + PatroniPort: 8888, + PostgresPort: 5432, + Paths: common.InstancePaths{ + Instance: common.Paths{BaseDir: "/opt/pgedge"}, + Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + PgBackRestPath: "/usr/bin/pgbackrest", + PatroniPath: "/usr/local/bin/patroni", + }, + }, + etcdHosts: []string{"i-0123456789abcdef.ec2.internal:2379"}, + etcdCreds: &common.EtcdCreds{ + Username: "instance.storefront-n1-689qacsi", + Password: "password", + }, + generateOptions: common.GenerateOptions{ + SystemAddresses: []string{ + "172.17.0.1/32", + "10.128.165.128/26", + }, + ExtraHbaEntries: []hba.Entry{ + { + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "172.17.0.1/32", + AuthMethod: hba.AuthMethodMD5, + }, + { + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "172.17.0.1/16", + AuthMethod: hba.AuthMethodReject, + }, + }, + }, + }, + { + name: "in-place restore", + options: common.PatroniConfigGeneratorOptions{ + Instance: &database.InstanceSpec{ + InstanceID: "storefront-n1-689qacsi", + DatabaseID: "storefront", + HostID: "host-1", + DatabaseName: "app", + NodeName: "n1", + NodeOrdinal: 1, + PgEdgeVersion: host.MustPgEdgeVersion("18.1", "5.0.4"), + ClusterSize: 3, + RestoreConfig: &database.RestoreConfig{}, + InPlaceRestore: true, + }, + HostCPUs: 4, + HostMemoryBytes: 1024 * 1024 * 1024 * 8, + FQDN: "storefront-n1-689qacsi.storefront-database", + OrchestratorParameters: map[string]any{ + "shared_preload_libraries": "pg_stat_statements,snowflake,spock,postgis-3", + }, + PatroniPort: 8888, + PostgresPort: 5432, + Paths: common.InstancePaths{ + Instance: common.Paths{BaseDir: "/opt/pgedge"}, + Host: common.Paths{BaseDir: "/data/control-plane/instances/storefront-n1-689qacsi"}, + PgBackRestPath: "/usr/bin/pgbackrest", + PatroniPath: "/usr/local/bin/patroni", + }, + }, + etcdHosts: []string{"i-0123456789abcdef.ec2.internal:2379"}, + etcdCreds: &common.EtcdCreds{ + Username: "instance.storefront-n1-689qacsi", + Password: "password", + }, + generateOptions: common.GenerateOptions{ + SystemAddresses: []string{ + "172.17.0.1/32", + "10.128.165.128/26", + }, + ExtraHbaEntries: []hba.Entry{ + { + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "172.17.0.1/32", + AuthMethod: hba.AuthMethodMD5, + }, + { + Type: hba.EntryTypeHost, + Database: "all", + User: "all", + Address: "172.17.0.1/16", + AuthMethod: hba.AuthMethodReject, + }, + }, + }, + }, + { + name: "minimal systemd", + options: common.PatroniConfigGeneratorOptions{ + Instance: &database.InstanceSpec{ + InstanceID: "storefront-n1-689qacsi", + DatabaseID: "storefront", + HostID: "host-1", + DatabaseName: "app", + NodeName: "n1", + NodeOrdinal: 1, + PgEdgeVersion: host.MustPgEdgeVersion("18.1", "5.0.4"), + ClusterSize: 3, + }, + HostCPUs: 4, + HostMemoryBytes: 1024 * 1024 * 1024 * 8, + FQDN: "storefront-n1-689qacsi.storefront-database", + OrchestratorParameters: map[string]any{ + "shared_preload_libraries": "pg_stat_statements,spock", + }, + PatroniPort: 8888, + PostgresPort: 5432, + Paths: common.InstancePaths{ + Instance: common.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, + Host: common.Paths{BaseDir: "/var/lib/pgsql/18/storefront-n1-689qacsi"}, + PgBackRestPath: "/usr/bin/pgbackrest", + PatroniPath: "/usr/local/bin/patroni", + }, + }, + etcdHosts: []string{"i-0123456789abcdef.ec2.internal:2379"}, + etcdCreds: &common.EtcdCreds{ + Username: "instance.storefront-n1-689qacsi", + Password: "password", + }, + generateOptions: common.GenerateOptions{ + SystemAddresses: []string{ + "10.10.0.2", + "10.10.0.3", + "10.10.0.4", + }, + }, + }, + { + name: "enable fast basebackup", + options: common.PatroniConfigGeneratorOptions{ + Instance: &database.InstanceSpec{ + InstanceID: "storefront-n1-689qacsi", + DatabaseID: "storefront", + HostID: "host-1", + DatabaseName: "app", + NodeName: "n1", + NodeOrdinal: 1, + PgEdgeVersion: host.MustPgEdgeVersion("18.1", "5.0.4"), + ClusterSize: 3, + }, + HostCPUs: 4, + HostMemoryBytes: 1024 * 1024 * 1024 * 8, + FQDN: "storefront-n1-689qacsi.storefront-database", + OrchestratorParameters: map[string]any{ + "shared_preload_libraries": "pg_stat_statements,spock", + }, + PatroniPort: 8888, + PostgresPort: 5432, + Paths: common.InstancePaths{ + Instance: common.Paths{BaseDir: "/var/lib/pgsql/storefront-n1-689qacsi"}, + Host: common.Paths{BaseDir: "/var/lib/pgsql/storefront-n1-689qacsi"}, + PgBackRestPath: "/usr/bin/pgbackrest", + PatroniPath: "/usr/local/bin/patroni", + }, + }, + etcdHosts: []string{ + "i-0123456789abcdef.ec2.internal:2379", + }, + etcdCreds: &common.EtcdCreds{ + Username: "instance.storefront-n1-689qacsi", + Password: "password", + }, + generateOptions: common.GenerateOptions{ + // This is a newly-created node, so there are no clients to + // disrupt by enabling this option. + EnableFastBasebackup: true, + SystemAddresses: []string{ + "10.10.0.2", + "10.10.0.3", + "10.10.0.4", + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + gen := common.NewPatroniConfigGenerator(tc.options) + out := gen.Generate(tc.etcdHosts, tc.etcdCreds, tc.generateOptions) + + golden.Run(t, out, update) + }) + } +} diff --git a/server/internal/orchestrator/common/patroni_member.go b/server/internal/orchestrator/common/patroni_member.go new file mode 100644 index 00000000..7c3988e4 --- /dev/null +++ b/server/internal/orchestrator/common/patroni_member.go @@ -0,0 +1,83 @@ +package common + +import ( + "context" + "fmt" + + "github.com/samber/do" + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/patroni" + "github.com/pgEdge/control-plane/server/internal/resource" + "github.com/pgEdge/control-plane/server/internal/storage" +) + +var _ resource.Resource = (*PatroniMember)(nil) + +const ResourceTypePatroniMember resource.Type = "common.patroni_member" + +func PatroniMemberResourceIdentifier(instanceID string) resource.Identifier { + return resource.Identifier{ + ID: instanceID, + Type: ResourceTypePatroniMember, + } +} + +type PatroniMember struct { + DatabaseID string `json:"database_id"` + NodeName string `json:"node_name"` + InstanceID string `json:"instance_id"` +} + +func (p *PatroniMember) ResourceVersion() string { + return "1" +} + +func (p *PatroniMember) DiffIgnore() []string { + return nil +} + +func (p *PatroniMember) Executor() resource.Executor { + return resource.AnyExecutor() +} + +func (p *PatroniMember) Identifier() resource.Identifier { + return PatroniMemberResourceIdentifier(p.InstanceID) +} + +func (p *PatroniMember) Dependencies() []resource.Identifier { + return []resource.Identifier{ + PatroniClusterResourceIdentifier(p.NodeName), + } +} + +func (p *PatroniMember) TypeDependencies() []resource.Type { + return nil +} + +func (p *PatroniMember) Refresh(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (p *PatroniMember) Create(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (p *PatroniMember) Update(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (p *PatroniMember) Delete(ctx context.Context, rc *resource.Context) error { + client, err := do.Invoke[*clientv3.Client](rc.Injector) + if err != nil { + return err + } + + key := patroni.MemberKey(p.DatabaseID, p.NodeName, p.InstanceID) + _, err = storage.NewDeleteKeyOp(client, key).Exec(ctx) + if err != nil { + return fmt.Errorf("failed to delete patroni cluster member from DCS: %w", err) + } + + return nil +} diff --git a/server/internal/orchestrator/common/pg_service_conf.go b/server/internal/orchestrator/common/pg_service_conf.go new file mode 100644 index 00000000..45ec4901 --- /dev/null +++ b/server/internal/orchestrator/common/pg_service_conf.go @@ -0,0 +1,151 @@ +package common + +import ( + "context" + "errors" + "fmt" + "path/filepath" + + "github.com/samber/do" + "github.com/spf13/afero" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/filesystem" + "github.com/pgEdge/control-plane/server/internal/postgres" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +var _ resource.Resource = (*PgServiceConf)(nil) + +const ResourceTypePgServiceConf resource.Type = "common.pg_service_conf" + +func PgServiceConfResourceIdentifier(nodeName string) resource.Identifier { + return resource.Identifier{ + ID: nodeName, + Type: ResourceTypePgServiceConf, + } +} + +type PgServiceConf struct { + ParentID string `json:"parent_id"` + HostID string `json:"host_id"` + InstanceID string `json:"instance_id"` + NodeNames []string `json:"node_names"` + OwnerUID int `json:"owner_uid"` + OwnerGID int `json:"owner_gid"` +} + +func (p *PgServiceConf) ResourceVersion() string { + return "1" +} + +func (p *PgServiceConf) DiffIgnore() []string { + return nil +} + +func (p *PgServiceConf) Executor() resource.Executor { + return resource.HostExecutor(p.HostID) +} + +func (p *PgServiceConf) Identifier() resource.Identifier { + return PgServiceConfResourceIdentifier(p.InstanceID) +} + +func (p *PgServiceConf) Dependencies() []resource.Identifier { + deps := []resource.Identifier{ + filesystem.DirResourceIdentifier(p.ParentID), + database.InstanceResourceIdentifier(p.InstanceID), + } + return deps +} + +func (p *PgServiceConf) TypeDependencies() []resource.Type { + return []resource.Type{database.ResourceTypeNode} +} + +func (p *PgServiceConf) Refresh(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, p.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + + _, err = ReadResourceFile(fs, filepath.Join(parentFullPath, "pg_service.conf")) + if err != nil { + return fmt.Errorf("failed to read pg_service.conf: %w", err) + } + + return nil +} + +func (p *PgServiceConf) Create(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + instance, err := resource.FromContext[*database.InstanceResource](rc, database.InstanceResourceIdentifier(p.InstanceID)) + if err != nil { + return fmt.Errorf("failed to get instance %q: %w", p.InstanceID, err) + } + nodes, err := resource.AllFromContext[*database.NodeResource](rc, database.ResourceTypeNode) + if err != nil { + return fmt.Errorf("failed to get all nodes from state: %w", err) + } + + conf := postgres.NewPgServiceConf() + for _, node := range nodes { + // We set an empty dbname here because service conf users will set the + // database in their connection string, e.g. 'service=n1 dbname=my_app'. + dsn, err := node.DSN(ctx, rc, instance, "") + if err != nil { + return fmt.Errorf("failed to get dsn for node %q: %w", node.Name, err) + } + conf.Services[node.Name] = dsn + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, p.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + + path := filepath.Join(parentFullPath, "pg_service.conf") + err = afero.WriteFile(fs, path, []byte(conf.String()), 0o600) + if err != nil { + return fmt.Errorf("failed to write pg_service.conf file '%s': %w", path, err) + } + if err := fs.Chown(path, p.OwnerUID, p.OwnerGID); err != nil { + return fmt.Errorf("failed to change ownership for pg_service.conf file '%s': %w", path, err) + } + + return nil +} + +func (p *PgServiceConf) Update(ctx context.Context, rc *resource.Context) error { + return p.Create(ctx, rc) +} + +func (p *PgServiceConf) Delete(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, p.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + + err = fs.Remove(filepath.Join(parentFullPath, "pg_service.conf")) + if errors.Is(err, afero.ErrFileNotFound) { + return nil + } else if err != nil { + return fmt.Errorf("failed to remove patroni.yaml: %w", err) + } + + return nil +} diff --git a/server/internal/orchestrator/common/pgbackrest_config.go b/server/internal/orchestrator/common/pgbackrest_config.go new file mode 100644 index 00000000..243f755f --- /dev/null +++ b/server/internal/orchestrator/common/pgbackrest_config.go @@ -0,0 +1,169 @@ +package common + +import ( + "bytes" + "context" + "errors" + "fmt" + "path/filepath" + + "github.com/samber/do" + "github.com/spf13/afero" + + "github.com/pgEdge/control-plane/server/internal/filesystem" + "github.com/pgEdge/control-plane/server/internal/pgbackrest" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +type PgBackRestConfigType string + +func (t PgBackRestConfigType) String() string { + return string(t) +} + +const ( + PgBackRestConfigTypeBackup PgBackRestConfigType = "backup" + PgBackRestConfigTypeRestore PgBackRestConfigType = "restore" +) + +var _ resource.Resource = (*PgBackRestConfig)(nil) + +const ResourceTypePgBackRestConfig resource.Type = "common.pgbackrest_config" + +func PgBackRestConfigIdentifier(instanceID string, configType PgBackRestConfigType) resource.Identifier { + return resource.Identifier{ + ID: instanceID + "-" + configType.String(), + Type: ResourceTypePgBackRestConfig, + } +} + +type PgBackRestConfig struct { + InstanceID string `json:"instance_id"` + HostID string `json:"host_id"` + DatabaseID string `json:"database_id"` + NodeName string `json:"node_name"` + Repositories []*pgbackrest.Repository `json:"repositories"` + ParentID string `json:"parent_id"` + Type PgBackRestConfigType `json:"type"` + OwnerUID int `json:"owner_uid"` + OwnerGID int `json:"owner_gid"` + Paths InstancePaths `json:"paths"` + Port int `json:"port"` +} + +func (c *PgBackRestConfig) ResourceVersion() string { + return "1" +} + +func (c *PgBackRestConfig) DiffIgnore() []string { + return nil +} + +func (c *PgBackRestConfig) Executor() resource.Executor { + return resource.HostExecutor(c.HostID) +} + +func (c *PgBackRestConfig) Identifier() resource.Identifier { + return PgBackRestConfigIdentifier(c.InstanceID, c.Type) +} + +func (c *PgBackRestConfig) Dependencies() []resource.Identifier { + return []resource.Identifier{ + filesystem.DirResourceIdentifier(c.ParentID), + } +} + +func (c *PgBackRestConfig) TypeDependencies() []resource.Type { + return nil +} + +func (c *PgBackRestConfig) Refresh(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + hostPath, err := c.HostPath(rc) + if err != nil { + return err + } + + _, err = ReadResourceFile(fs, hostPath) + if err != nil { + return fmt.Errorf("failed to read pgbackrest config: %w", err) + } + + return nil +} + +func (c *PgBackRestConfig) Create(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + var b bytes.Buffer + if err := pgbackrest.WriteConfig(&b, pgbackrest.ConfigOptions{ + Repositories: c.Repositories, + DatabaseID: c.DatabaseID, + NodeName: c.NodeName, + PgDataPath: c.Paths.Instance.PgData(), + HostUser: "pgedge", + User: "pgedge", + Port: c.Port, + }); err != nil { + return fmt.Errorf("failed to generate pgBackRest configuration: %w", err) + } + + hostPath, err := c.HostPath(rc) + if err != nil { + return err + } + + if err := afero.WriteFile(fs, hostPath, b.Bytes(), 0o600); err != nil { + return fmt.Errorf("failed to write %s: %w", hostPath, err) + } + if err := fs.Chown(hostPath, c.OwnerUID, c.OwnerGID); err != nil { + return fmt.Errorf("failed to change ownership for %s: %w", hostPath, err) + } + + return nil +} + +func (c *PgBackRestConfig) Update(ctx context.Context, rc *resource.Context) error { + return c.Create(ctx, rc) +} + +func (c *PgBackRestConfig) Delete(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + hostPath, err := c.HostPath(rc) + if err != nil { + return err + } + + err = fs.Remove(hostPath) + if errors.Is(err, afero.ErrFileNotFound) { + return nil + } else if err != nil { + return fmt.Errorf("failed to remove pgbackrest config: %w", err) + } + + return nil +} + +func (c *PgBackRestConfig) BaseName() string { + return fmt.Sprintf("pgbackrest.%s.conf", c.Type) +} + +func (c *PgBackRestConfig) HostPath(rc *resource.Context) (string, error) { + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return "", fmt.Errorf("failed to get parent full path: %w", err) + } + + return filepath.Join(parentFullPath, c.BaseName()), nil +} diff --git a/server/internal/orchestrator/common/pgbackrest_stanza.go b/server/internal/orchestrator/common/pgbackrest_stanza.go new file mode 100644 index 00000000..386334ca --- /dev/null +++ b/server/internal/orchestrator/common/pgbackrest_stanza.go @@ -0,0 +1,129 @@ +package common + +import ( + "bytes" + "context" + "fmt" + + "github.com/samber/do" + + "github.com/pgEdge/control-plane/server/internal/database" + "github.com/pgEdge/control-plane/server/internal/pgbackrest" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +var _ resource.Resource = (*PgBackRestStanza)(nil) + +const ResourceTypePgBackRestStanza resource.Type = "common.pgbackrest_stanza" + +func PgBackRestStanzaIdentifier(nodeName string) resource.Identifier { + return resource.Identifier{ + ID: nodeName, + Type: ResourceTypePgBackRestStanza, + } +} + +type PgBackRestStanza struct { + DatabaseID string `json:"database_id"` + NodeName string `json:"node_name"` + Paths InstancePaths `json:"paths"` +} + +func (p *PgBackRestStanza) ResourceVersion() string { + return "1" +} + +func (p *PgBackRestStanza) DiffIgnore() []string { + return nil +} + +func (p *PgBackRestStanza) Executor() resource.Executor { + return resource.PrimaryExecutor(p.NodeName) +} + +func (p *PgBackRestStanza) Identifier() resource.Identifier { + return PgBackRestStanzaIdentifier(p.NodeName) +} + +func (p *PgBackRestStanza) Dependencies() []resource.Identifier { + return []resource.Identifier{ + database.NodeResourceIdentifier(p.NodeName), + } +} + +func (p *PgBackRestStanza) TypeDependencies() []resource.Type { + return nil +} + +func (p *PgBackRestStanza) Refresh(ctx context.Context, rc *resource.Context) error { + orchestrator, err := do.Invoke[database.Orchestrator](rc.Injector) + if err != nil { + return err + } + node, err := resource.FromContext[*database.NodeResource](rc, database.NodeResourceIdentifier(p.NodeName)) + if err != nil { + return fmt.Errorf("failed to get node %q: %w", p.NodeName, err) + } + + var output bytes.Buffer + infoCmd := p.Paths.PgBackRestBackupCmd("info", "--output=json").StringSlice() + err = orchestrator.ExecuteInstanceCommand(ctx, &output, p.DatabaseID, node.PrimaryInstanceID, infoCmd...) + if err != nil { + // pgbackrest info returns a 0 exit code even if the stanza doesn't + // exist, so an error here means something else went wrong. + return fmt.Errorf("failed to exec pgbackrest info: %w, output: %s", err, output.String()) + } + info, err := pgbackrest.ParseInfoOutput(output.Bytes()) + if err != nil { + return fmt.Errorf("failed to parse pgbackrest info output: %w, output: %s", err, output.String()) + } + stanza := info.Stanza("db") + if stanza == nil { + // the stanza will be in the output even if it doesn't exist. + return fmt.Errorf("stanza %q not found in pgbackrest info output", "db") + } + // This status code will be non-zero if the repository is empty, even if + // it's otherwise configured correctly. + if stanza.Status.Code != 0 && stanza.Status.Message != "no valid backups" { + return resource.ErrNotFound + } + + return nil +} + +func (p *PgBackRestStanza) Create(ctx context.Context, rc *resource.Context) error { + orchestrator, err := do.Invoke[database.Orchestrator](rc.Injector) + if err != nil { + return err + } + node, err := resource.FromContext[*database.NodeResource](rc, database.NodeResourceIdentifier(p.NodeName)) + if err != nil { + return fmt.Errorf("failed to get node %q: %w", p.NodeName, err) + } + + var stanzaCreateOut bytes.Buffer + createCmd := p.Paths.PgBackRestBackupCmd("stanza-create", "--io-timeout=10s").StringSlice() + err = orchestrator.ExecuteInstanceCommand(ctx, &stanzaCreateOut, p.DatabaseID, node.PrimaryInstanceID, createCmd...) + if err != nil { + return fmt.Errorf("failed to exec pgbackrest stanza-create: %w, output: %s", err, stanzaCreateOut.String()) + } + var checkOut bytes.Buffer + checkCmd := p.Paths.PgBackRestBackupCmd("check").StringSlice() + err = orchestrator.ExecuteInstanceCommand(ctx, &checkOut, p.DatabaseID, node.PrimaryInstanceID, checkCmd...) + if err != nil { + return fmt.Errorf("failed to exec pgbackrest check: %w, output: %s", err, checkOut.String()) + } + + return nil +} + +func (p *PgBackRestStanza) Update(ctx context.Context, rc *resource.Context) error { + return p.Create(ctx, rc) +} + +func (p *PgBackRestStanza) Delete(ctx context.Context, rc *resource.Context) error { + // Removing the stanza will delete all backups, so we don't do this + // automatically. Users can delete the stanza manually once the database is + // deleted. + return nil +} diff --git a/server/internal/orchestrator/common/postgres_certs.go b/server/internal/orchestrator/common/postgres_certs.go new file mode 100644 index 00000000..05aecd97 --- /dev/null +++ b/server/internal/orchestrator/common/postgres_certs.go @@ -0,0 +1,251 @@ +package common + +import ( + "context" + "fmt" + "path/filepath" + "strings" + + "github.com/samber/do" + "github.com/spf13/afero" + + "github.com/pgEdge/control-plane/server/internal/certificates" + "github.com/pgEdge/control-plane/server/internal/ds" + "github.com/pgEdge/control-plane/server/internal/filesystem" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +const ( + postgresCaCertName = "ca.crt" + postgresServerCertName = "server.crt" + postgresServerKeyName = "server.key" + postgresSuperuserCertName = "superuser.crt" + postgresSuperuserKeyName = "superuser.key" + postgresReplicatorCertName = "replication.crt" + postgresReplicatorKeyName = "replication.key" +) + +var _ resource.Resource = (*PostgresCerts)(nil) + +const ResourceTypePostgresCerts resource.Type = "common.postgres_certs" + +func PostgresCertsIdentifier(instanceID string) resource.Identifier { + return resource.Identifier{ + ID: instanceID, + Type: ResourceTypePostgresCerts, + } +} + +type PostgresCerts struct { + InstanceID string `json:"instance_id"` + HostID string `json:"host_id"` + InstanceAddresses []string `json:"instance_addresses"` + ParentID string `json:"parent_id"` + OwnerUID int `json:"owner_uid"` + OwnerGID int `json:"owner_gid"` + CaCert []byte `json:"ca_cert"` + ServerCert []byte `json:"server_cert"` + ServerKey []byte `json:"server_key"` + SuperuserCert []byte `json:"superuser_cert"` + SuperuserKey []byte `json:"superuser_key"` + ReplicationCert []byte `json:"replication_cert"` + ReplicationKey []byte `json:"replication_key"` +} + +func (c *PostgresCerts) ResourceVersion() string { + return "1" +} + +func (c *PostgresCerts) DiffIgnore() []string { + return []string{ + "/ca_cert", + "/server_cert", + "/server_key", + "/superuser_cert", + "/superuser_key", + "/replication_cert", + "/replication_key", + } +} + +func (c *PostgresCerts) Executor() resource.Executor { + return resource.HostExecutor(c.HostID) +} + +func (c *PostgresCerts) Identifier() resource.Identifier { + return PostgresCertsIdentifier(c.InstanceID) +} + +func (c *PostgresCerts) Dependencies() []resource.Identifier { + return []resource.Identifier{ + filesystem.DirResourceIdentifier(c.ParentID), + } +} + +func (c *PostgresCerts) TypeDependencies() []resource.Type { + return nil +} + +func (c *PostgresCerts) Refresh(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + certsDir := filepath.Join(parentFullPath, "postgres") + + caCert, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresCaCertName)) + if err != nil { + return fmt.Errorf("failed to read CA cert: %w", err) + } + serverCert, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresServerCertName)) + if err != nil { + return fmt.Errorf("failed to read server cert: %w", err) + } + serverKey, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresServerKeyName)) + if err != nil { + return fmt.Errorf("failed to read server key: %w", err) + } + superuserCert, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresSuperuserCertName)) + if err != nil { + return fmt.Errorf("failed to read superuser cert: %w", err) + } + superuserKey, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresSuperuserKeyName)) + if err != nil { + return fmt.Errorf("failed to read superuser key: %w", err) + } + replicationCert, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresReplicatorCertName)) + if err != nil { + return fmt.Errorf("failed to read replication cert: %w", err) + } + replicationKey, err := ReadResourceFile(fs, filepath.Join(certsDir, postgresReplicatorKeyName)) + if err != nil { + return fmt.Errorf("failed to read replication key: %w", err) + } + + c.CaCert = caCert + c.ServerCert = serverCert + c.ServerKey = serverKey + c.SuperuserCert = superuserCert + c.SuperuserKey = superuserKey + c.ReplicationCert = replicationCert + c.ReplicationKey = replicationKey + + return nil +} + +func (c *PostgresCerts) Create(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + certService, err := do.Invoke[*certificates.Service](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + certsDir := filepath.Join(parentFullPath, "postgres") + + // Ensure that localhost is included in the addresses + combined := ds.NewSet(c.InstanceAddresses...) + combined.Add("127.0.0.1", "localhost", "::1") + + pgServerPrincipal, err := certService.PostgresServer(ctx, + c.InstanceID, + combined.ToSortedSlice(strings.Compare), + ) + if err != nil { + return fmt.Errorf("failed to create postgres server principal: %w", err) + } + pgSuperuserPrincipal, err := certService.PostgresUser(ctx, c.InstanceID, "pgedge") + if err != nil { + return fmt.Errorf("failed to create pgedge postgres user principal: %w", err) + } + pgReplicatorPrincipal, err := certService.PostgresUser(ctx, c.InstanceID, "patroni_replicator") + if err != nil { + return fmt.Errorf("failed to create patroni_replicator postgres user principal: %w", err) + } + + c.CaCert = certService.CACert() + c.ServerCert = pgServerPrincipal.CertPEM + c.ServerKey = pgServerPrincipal.KeyPEM + c.SuperuserCert = pgSuperuserPrincipal.CertPEM + c.SuperuserKey = pgSuperuserPrincipal.KeyPEM + c.ReplicationCert = pgReplicatorPrincipal.CertPEM + c.ReplicationKey = pgReplicatorPrincipal.KeyPEM + + if err := fs.MkdirAll(certsDir, 0o700); err != nil { + return fmt.Errorf("failed to create postgres certificates directory: %w", err) + } + if err := fs.Chown(certsDir, c.OwnerUID, c.OwnerGID); err != nil { + return fmt.Errorf("failed to change ownership for certificates directory: %w", err) + } + + files := map[string][]byte{ + postgresCaCertName: c.CaCert, + postgresServerCertName: c.ServerCert, + postgresServerKeyName: c.ServerKey, + postgresSuperuserCertName: c.SuperuserCert, + postgresSuperuserKeyName: c.SuperuserKey, + postgresReplicatorCertName: c.ReplicationCert, + postgresReplicatorKeyName: c.ReplicationKey, + } + + for name, content := range files { + path := filepath.Join(certsDir, name) + + if err := afero.WriteFile(fs, path, content, 0o600); err != nil { + return fmt.Errorf("failed to write %s: %w", path, err) + } + if err := fs.Chown(path, c.OwnerUID, c.OwnerGID); err != nil { + return fmt.Errorf("failed to change ownership for %s: %w", path, err) + } + } + + return nil +} + +func (c *PostgresCerts) Update(ctx context.Context, rc *resource.Context) error { + return c.Create(ctx, rc) +} + +func (c *PostgresCerts) Delete(ctx context.Context, rc *resource.Context) error { + fs, err := do.Invoke[afero.Fs](rc.Injector) + if err != nil { + return err + } + certService, err := do.Invoke[*certificates.Service](rc.Injector) + if err != nil { + return err + } + + parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID) + if err != nil { + return fmt.Errorf("failed to get parent full path: %w", err) + } + postgresDir := filepath.Join(parentFullPath, "postgres") + + if err := fs.RemoveAll(postgresDir); err != nil { + return fmt.Errorf("failed to remove certificates directory: %w", err) + } + + if err := certService.RemovePostgresUser(ctx, c.InstanceID, "pgedge"); err != nil { + return fmt.Errorf("failed to remove pgedge postgres user principal: %w", err) + } + if err := certService.RemovePostgresUser(ctx, c.InstanceID, "patroni_replicator"); err != nil { + return fmt.Errorf("failed to remove patroni_replicator postgres user principal: %w", err) + } + if err := certService.RemovePostgresServer(ctx, c.InstanceID); err != nil { + return fmt.Errorf("failed to remove postgres server principal: %w", err) + } + + return nil +} diff --git a/server/internal/orchestrator/common/resources.go b/server/internal/orchestrator/common/resources.go new file mode 100644 index 00000000..d0da3cf7 --- /dev/null +++ b/server/internal/orchestrator/common/resources.go @@ -0,0 +1,13 @@ +package common + +import "github.com/pgEdge/control-plane/server/internal/resource" + +func RegisterResourceTypes(registry *resource.Registry) { + resource.RegisterResourceType[*EtcdCreds](registry, ResourceTypeEtcdCreds) + resource.RegisterResourceType[*PostgresCerts](registry, ResourceTypePostgresCerts) + resource.RegisterResourceType[*PgBackRestConfig](registry, ResourceTypePgBackRestConfig) + resource.RegisterResourceType[*PgBackRestStanza](registry, ResourceTypePgBackRestStanza) + resource.RegisterResourceType[*PatroniCluster](registry, ResourceTypePatroniCluster) + resource.RegisterResourceType[*PatroniMember](registry, ResourceTypePatroniMember) + resource.RegisterResourceType[*PgServiceConf](registry, ResourceTypePgServiceConf) +} diff --git a/server/internal/orchestrator/swarm/orchestrator.go b/server/internal/orchestrator/swarm/orchestrator.go index afc4cc86..7e73f209 100644 --- a/server/internal/orchestrator/swarm/orchestrator.go +++ b/server/internal/orchestrator/swarm/orchestrator.go @@ -669,6 +669,10 @@ func (o *Orchestrator) CreatePgBackRestBackup(ctx context.Context, w io.Writer, return nil } +func (o *Orchestrator) ExecuteInstanceCommand(ctx context.Context, w io.Writer, databaseID, instanceID string, args ...string) error { + return PostgresContainerExec(ctx, w, o.docker, instanceID, args) +} + func (o *Orchestrator) ValidateInstanceSpecs(ctx context.Context, changes []*database.InstanceSpecChange) ([]*database.ValidationResult, error) { results := make([]*database.ValidationResult, 0, len(changes)*3) diff --git a/server/internal/orchestrator/swarm/patroni_config.go b/server/internal/orchestrator/swarm/patroni_config.go index d1f5bb02..1ca7b073 100644 --- a/server/internal/orchestrator/swarm/patroni_config.go +++ b/server/internal/orchestrator/swarm/patroni_config.go @@ -253,11 +253,7 @@ func generatePatroniConfig( "archive_command": PgBackRestBackupCmd("archive-push", `"%p"`).String(), }) } - snowflakeLolorGUCs, err := postgres.SnowflakeLolorGUCs(spec.NodeOrdinal) - if err != nil { - return nil, fmt.Errorf("failed to generate snowflake/lolor GUCs: %w", err) - } - maps.Copy(parameters, snowflakeLolorGUCs) + maps.Copy(parameters, postgres.SnowflakeLolorGUCs(spec.NodeOrdinal)) maps.Copy(parameters, spec.PostgreSQLConf) dcsParameters := patroni.ExtractPatroniControlledGUCs(parameters) diff --git a/server/internal/patroni/utils.go b/server/internal/patroni/utils.go new file mode 100644 index 00000000..bd2e7959 --- /dev/null +++ b/server/internal/patroni/utils.go @@ -0,0 +1,28 @@ +package patroni + +import ( + "context" + "fmt" + "net/url" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +func EtcdHosts(ctx context.Context, client *clientv3.Client) ([]string, error) { + members, err := client.MemberList(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list etcd cluster members: %w", err) + } + var hosts []string + for _, member := range members.Members { + for _, endpoint := range member.GetClientURLs() { + u, err := url.Parse(endpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse etcd client url '%s': %w", endpoint, err) + } + hosts = append(hosts, u.Host) + } + } + + return hosts, nil +} diff --git a/server/internal/pgbackrest/config.go b/server/internal/pgbackrest/config.go index cff7d8f2..f7e3c040 100644 --- a/server/internal/pgbackrest/config.go +++ b/server/internal/pgbackrest/config.go @@ -186,6 +186,7 @@ type ConfigOptions struct { HostUser string User string SocketPath string + Port int Repositories []*Repository } @@ -234,6 +235,9 @@ func WriteConfig(w io.Writer, opts ConfigOptions) error { if opts.SocketPath != "" { db["pg1-socket-path"] = opts.SocketPath } + if opts.Port > 0 { + db["pg1-port"] = fmt.Sprintf("%d", opts.Port) + } file := ini.Empty() diff --git a/server/internal/postgres/gucs.go b/server/internal/postgres/gucs.go index 3382ba4a..d1191431 100644 --- a/server/internal/postgres/gucs.go +++ b/server/internal/postgres/gucs.go @@ -33,11 +33,11 @@ func Spock4DefaultGUCs() map[string]any { } } -func SnowflakeLolorGUCs(nodeOrdinal int) (map[string]any, error) { +func SnowflakeLolorGUCs(nodeOrdinal int) map[string]any { return map[string]any{ "snowflake.node": nodeOrdinal, "lolor.node": nodeOrdinal, - }, nil + } } func DefaultTunableGUCs(memBytes uint64, cpus float64, clusterSize int) map[string]any { diff --git a/server/internal/postgres/pg_service_conf.go b/server/internal/postgres/pg_service_conf.go new file mode 100644 index 00000000..e3930e30 --- /dev/null +++ b/server/internal/postgres/pg_service_conf.go @@ -0,0 +1,42 @@ +package postgres + +import ( + "maps" + "slices" + "strings" +) + +type PgServiceConf struct { + Services map[string]*DSN +} + +func NewPgServiceConf() *PgServiceConf { + return &PgServiceConf{ + Services: map[string]*DSN{}, + } +} + +func (c *PgServiceConf) String() string { + var buf strings.Builder + + // Always write in a consistent order + keys := slices.Sorted(maps.Keys(c.Services)) + + // Not using gopkg.in/ini here because Postgres does not like pretty spaces + // and the ini library uses a global variable to configure pretty spaces, so + // it would interfere with the pgBackRest conf. + for _, key := range keys { + buf.WriteString("[") + buf.WriteString(key) + buf.WriteString("]\n") + + for _, field := range c.Services[key].Fields() { + buf.WriteString(field) + buf.WriteString("\n") + } + + buf.WriteString("\n") + } + + return buf.String() +}