Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pgEdge/control-plane/server/internal/migrate"
"github.com/pgEdge/control-plane/server/internal/monitor"
"github.com/pgEdge/control-plane/server/internal/orchestrator"
"github.com/pgEdge/control-plane/server/internal/orchestrator/common"
"github.com/pgEdge/control-plane/server/internal/orchestrator/swarm"
"github.com/pgEdge/control-plane/server/internal/ports"
"github.com/pgEdge/control-plane/server/internal/resource"
Expand Down Expand Up @@ -95,6 +96,7 @@ func newRootCmd(i *do.Injector) *cobra.Command {
filesystem.RegisterResourceTypes(registry)
monitor.RegisterResourceTypes(registry)
scheduler.RegisterResourceTypes(registry)
common.RegisterResourceTypes(registry)
swarm.RegisterResourceTypes(registry)

if err := orchestrator.Provide(i); err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/internal/database/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type Orchestrator interface {
GetInstanceConnectionInfo(ctx context.Context, databaseID, instanceID string) (*ConnectionInfo, error)
GetServiceInstanceStatus(ctx context.Context, serviceInstanceID string) (*ServiceInstanceStatus, error)
CreatePgBackRestBackup(ctx context.Context, w io.Writer, instanceID string, options *pgbackrest.BackupOptions) error
ExecuteInstanceCommand(ctx context.Context, w io.Writer, databaseID, instanceID string, args ...string) error
ValidateInstanceSpecs(ctx context.Context, changes []*InstanceSpecChange) ([]*ValidationResult, error)
StopInstance(ctx context.Context, instanceID string) error
StartInstance(ctx context.Context, instanceID string) error
Expand Down
222 changes: 222 additions & 0 deletions server/internal/orchestrator/common/etcd_creds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package common

import (
"context"
"errors"
"fmt"
"path/filepath"

"github.com/samber/do"
"github.com/spf13/afero"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pgEdge/control-plane/server/internal/certificates"
"github.com/pgEdge/control-plane/server/internal/etcd"
"github.com/pgEdge/control-plane/server/internal/filesystem"
"github.com/pgEdge/control-plane/server/internal/patroni"
"github.com/pgEdge/control-plane/server/internal/resource"
)

const (
etcdCaCertName = "ca.crt"
etcdClientCertName = "client.crt"
etcdClientKeyName = "client.key"
)

var _ resource.Resource = (*EtcdCreds)(nil)

const ResourceTypeEtcdCreds resource.Type = "common.etcd_creds"

func EtcdCredsIdentifier(instanceID string) resource.Identifier {
return resource.Identifier{
ID: instanceID,
Type: ResourceTypeEtcdCreds,
}
}

type EtcdCreds struct {
InstanceID string `json:"instance_id"`
DatabaseID string `json:"database_id"`
HostID string `json:"host_id"`
NodeName string `json:"node_name"`
ParentID string `json:"parent_id"`
OwnerUID int `json:"owner_uid"`
OwnerGID int `json:"owner_gid"`
Username string `json:"username"`
Password string `json:"password"`
CaCert []byte `json:"ca_cert"`
ClientCert []byte `json:"client_cert"`
ClientKey []byte `json:"client_key"`
}

func (c *EtcdCreds) ResourceVersion() string {
return "1"
}

func (c *EtcdCreds) DiffIgnore() []string {
return []string{
"/username",
"/password",
"/ca_cert",
"/client_cert",
"/client_key",
}
}

func (c *EtcdCreds) Executor() resource.Executor {
return resource.HostExecutor(c.HostID)
}

func (c *EtcdCreds) Identifier() resource.Identifier {
return EtcdCredsIdentifier(c.InstanceID)
}

func (c *EtcdCreds) Dependencies() []resource.Identifier {
return []resource.Identifier{
filesystem.DirResourceIdentifier(c.ParentID),
}
}

func (c *EtcdCreds) TypeDependencies() []resource.Type {
return nil
}

func (c *EtcdCreds) Refresh(ctx context.Context, rc *resource.Context) error {
fs, err := do.Invoke[afero.Fs](rc.Injector)
if err != nil {
return err
}

parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID)
if err != nil {
return fmt.Errorf("failed to get parent full path: %w", err)
}
certsDir := filepath.Join(parentFullPath, "etcd")

caCert, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdCaCertName))
if err != nil {
return fmt.Errorf("failed to read CA cert: %w", err)
}
clientCert, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdClientCertName))
if err != nil {
return fmt.Errorf("failed to read client cert: %w", err)
}
clientKey, err := ReadResourceFile(fs, filepath.Join(certsDir, etcdClientKeyName))
if err != nil {
return fmt.Errorf("failed to read client key: %w", err)
}

c.CaCert = caCert
c.ClientCert = clientCert
c.ClientKey = clientKey

return nil
}

func (c *EtcdCreds) Create(ctx context.Context, rc *resource.Context) error {
fs, err := do.Invoke[afero.Fs](rc.Injector)
if err != nil {
return err
}
certService, err := do.Invoke[*certificates.Service](rc.Injector)
if err != nil {
return err
}
etcdClient, err := do.Invoke[*clientv3.Client](rc.Injector)
if err != nil {
return err
}

parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID)
if err != nil {
return fmt.Errorf("failed to get parent full path: %w", err)
}
certsDir := filepath.Join(parentFullPath, "etcd")

etcdCreds, err := etcd.CreateInstanceEtcdUser(ctx,
etcdClient,
certService,
etcd.InstanceUserOptions{
InstanceID: c.InstanceID,
KeyPrefix: patroni.ClusterPrefix(c.DatabaseID, c.NodeName),
Password: c.Password,
},
)
if err != nil {
return fmt.Errorf("failed to create etcd user: %w", err)
}

c.Username = etcdCreds.Username
c.Password = etcdCreds.Password
c.CaCert = etcdCreds.CaCert
c.ClientCert = etcdCreds.ClientCert
c.ClientKey = etcdCreds.ClientKey

if err := fs.MkdirAll(certsDir, 0o700); err != nil {
return fmt.Errorf("failed to create etcd certificates directory: %w", err)
}
if err := fs.Chown(certsDir, c.OwnerUID, c.OwnerGID); err != nil {
return fmt.Errorf("failed to change ownership for certificates directory: %w", err)
}

files := map[string][]byte{
etcdCaCertName: c.CaCert,
etcdClientCertName: c.ClientCert,
etcdClientKeyName: c.ClientKey,
}

for name, content := range files {
if err := afero.WriteFile(fs, filepath.Join(certsDir, name), content, 0o600); err != nil {
return fmt.Errorf("failed to write %s: %w", name, err)
}
if err := fs.Chown(filepath.Join(certsDir, name), c.OwnerUID, c.OwnerGID); err != nil {
return fmt.Errorf("failed to change ownership for %s: %w", name, err)
}
}

return nil
}

func (c *EtcdCreds) Update(ctx context.Context, rc *resource.Context) error {
return c.Create(ctx, rc)
}

func (c *EtcdCreds) Delete(ctx context.Context, rc *resource.Context) error {
fs, err := do.Invoke[afero.Fs](rc.Injector)
if err != nil {
return err
}
certService, err := do.Invoke[*certificates.Service](rc.Injector)
if err != nil {
return err
}
etcdClient, err := do.Invoke[*clientv3.Client](rc.Injector)
if err != nil {
return err
}

parentFullPath, err := filesystem.DirResourceFullPath(rc, c.ParentID)
if err != nil {
return fmt.Errorf("failed to get parent full path: %w", err)
}
etcdDir := filepath.Join(parentFullPath, "etcd")

if err := fs.RemoveAll(etcdDir); err != nil {
return fmt.Errorf("failed to remove certificates directory: %w", err)
}
if err := etcd.RemoveInstanceEtcdUser(ctx, etcdClient, certService, c.InstanceID); err != nil {
return fmt.Errorf("failed to delete etcd user: %w", err)
}

return nil
}

func ReadResourceFile(fs afero.Fs, path string) ([]byte, error) {
contents, err := afero.ReadFile(fs, path)
if errors.Is(err, afero.ErrFileNotFound) {
return nil, resource.ErrNotFound
} else if err != nil {
return nil, err
}
return contents, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
name: storefront-n1-689qacsi
namespace: /patroni/
scope: storefront:n1
log:
type: json
level: INFO
static_fields:
database_id: storefront
instance_id: storefront-n1-689qacsi
node_name: n1
bootstrap:
dcs:
loop_wait: 10
ttl: 30
retry_timeout: 10
postgresql:
parameters:
max_connections: 901
max_replication_slots: 16
max_wal_senders: 16
max_worker_processes: 12
track_commit_timestamp: "on"
wal_level: logical
ignore_slots:
- plugin: spock_output
initdb:
- data-checksums
etcd3:
hosts:
- i-0123456789abcdef.ec2.internal:2379
protocol: https
username: instance.storefront-n1-689qacsi
password: password
cacert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/etcd/ca.crt
cert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/etcd/client.crt
key: /var/lib/pgsql/storefront-n1-689qacsi/certificates/etcd/client.key
postgresql:
authentication:
superuser:
username: pgedge
sslmode: verify-full
sslkey: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/superuser.key
sslcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/superuser.crt
sslrootcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/ca.crt
replication:
username: patroni_replicator
sslmode: verify-full
sslkey: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/replication.key
sslcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/replication.crt
sslrootcert: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/ca.crt
connect_address: storefront-n1-689qacsi.storefront-database:5432
data_dir: /var/lib/pgsql/storefront-n1-689qacsi/data/pgdata
listen: "*:5432"
parameters:
archive_command: /bin/true
archive_mode: "on"
autovacuum_max_workers: 3
autovacuum_vacuum_cost_limit: 200
autovacuum_work_mem: 262144
checkpoint_completion_target: "0.9"
checkpoint_timeout: 15min
dynamic_shared_memory_type: posix
effective_cache_size: 524288
hot_standby_feedback: "on"
lolor.node: 1
maintenance_work_mem: 137518
max_parallel_workers: 8
password_encryption: md5
shared_buffers: 262144
shared_preload_libraries: pg_stat_statements,spock
snowflake.node: 1
spock.allow_ddl_from_functions: "on"
spock.conflict_log_level: DEBUG
spock.conflict_resolution: last_update_wins
spock.enable_ddl_replication: "on"
spock.include_ddl_repset: "on"
spock.save_resolutions: "on"
ssl: "on"
ssl_ca_file: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/ca.crt
ssl_cert_file: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/server.crt
ssl_key_file: /var/lib/pgsql/storefront-n1-689qacsi/certificates/postgres/server.key
track_io_timing: "on"
wal_log_hints: "on"
wal_sender_timeout: 5s
pg_hba:
- local all all trust
- host all all 127.0.0.1/32 trust
- host all all ::1/128 trust
- local replication all trust
- host replication all 127.0.0.1/32 trust
- host replication all ::1/128 trust
- hostssl all pgedge,patroni_replicator 10.10.0.2/32 cert clientcert=verify-full
- hostssl replication pgedge,patroni_replicator 10.10.0.2/32 cert clientcert=verify-full
- hostssl all pgedge,patroni_replicator 10.10.0.3/32 cert clientcert=verify-full
- hostssl replication pgedge,patroni_replicator 10.10.0.3/32 cert clientcert=verify-full
- hostssl all pgedge,patroni_replicator 10.10.0.4/32 cert clientcert=verify-full
- hostssl replication pgedge,patroni_replicator 10.10.0.4/32 cert clientcert=verify-full
- host all pgedge,patroni_replicator 0.0.0.0/0 reject
- host all pgedge,patroni_replicator ::0/0 reject
- host all all 0.0.0.0/0 md5
use_pg_rewind: true
remove_data_directory_on_rewind_failure: true
remove_data_directory_on_diverged_timelines: true
basebackup:
- checkpoint: fast
restapi:
connect_address: storefront-n1-689qacsi.storefront-database:8888
listen: 0.0.0.0:8888
allowlist:
- 10.10.0.2
- 10.10.0.3
- 10.10.0.4
- 127.0.0.1
- localhost
watchdog:
mode: "off"
Loading