-
Notifications
You must be signed in to change notification settings - Fork 2
feat: stable random ports #290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| package database | ||
|
|
||
| import ( | ||
| clientv3 "go.etcd.io/etcd/client/v3" | ||
|
|
||
| "github.com/pgEdge/control-plane/server/internal/storage" | ||
| ) | ||
|
|
||
| type StoredInstanceSpec struct { | ||
| storage.StoredValue | ||
| Spec *InstanceSpec `json:"spec"` | ||
| } | ||
|
|
||
| type InstanceSpecStore struct { | ||
| client *clientv3.Client | ||
| root string | ||
| } | ||
|
|
||
| func NewInstanceSpecStore(client *clientv3.Client, root string) *InstanceSpecStore { | ||
| return &InstanceSpecStore{ | ||
| client: client, | ||
| root: root, | ||
| } | ||
| } | ||
|
|
||
| func (s *InstanceSpecStore) Prefix() string { | ||
| return storage.Prefix("/", s.root, "instance_specs") | ||
| } | ||
|
|
||
| func (s *InstanceSpecStore) DatabasePrefix(databaseID string) string { | ||
| return storage.Prefix(s.Prefix(), databaseID) | ||
| } | ||
|
|
||
| func (s *InstanceSpecStore) Key(databaseID, instanceID string) string { | ||
| return storage.Key(s.DatabasePrefix(databaseID), instanceID) | ||
| } | ||
|
|
||
| func (s *InstanceSpecStore) GetByKey(databaseID, instanceID string) storage.GetOp[*StoredInstanceSpec] { | ||
| key := s.Key(databaseID, instanceID) | ||
| return storage.NewGetOp[*StoredInstanceSpec](s.client, key) | ||
| } | ||
|
|
||
| func (s *InstanceSpecStore) GetByDatabaseID(databaseID string) storage.GetMultipleOp[*StoredInstanceSpec] { | ||
| prefix := s.DatabasePrefix(databaseID) | ||
| return storage.NewGetPrefixOp[*StoredInstanceSpec](s.client, prefix) | ||
| } | ||
|
|
||
| func (s *InstanceSpecStore) GetAll() storage.GetMultipleOp[*StoredInstanceSpec] { | ||
| prefix := s.Prefix() | ||
| return storage.NewGetPrefixOp[*StoredInstanceSpec](s.client, prefix) | ||
| } | ||
|
|
||
| func (s *InstanceSpecStore) Update(item *StoredInstanceSpec) storage.PutOp[*StoredInstanceSpec] { | ||
| key := s.Key(item.Spec.DatabaseID, item.Spec.InstanceID) | ||
| return storage.NewUpdateOp(s.client, key, item) | ||
| } | ||
|
|
||
| func (s *InstanceSpecStore) DeleteByKey(databaseID, instanceID string) storage.DeleteOp { | ||
| key := s.Key(databaseID, instanceID) | ||
| return storage.NewDeleteKeyOp(s.client, key) | ||
| } | ||
|
|
||
| func (s *InstanceSpecStore) DeleteByDatabaseID(databaseID string) storage.DeleteOp { | ||
| prefix := s.DatabasePrefix(databaseID) | ||
| return storage.NewDeletePrefixOp(s.client, prefix) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,8 +8,11 @@ import ( | |
|
|
||
| "github.com/google/uuid" | ||
|
|
||
| "github.com/pgEdge/control-plane/server/internal/config" | ||
| "github.com/pgEdge/control-plane/server/internal/host" | ||
| "github.com/pgEdge/control-plane/server/internal/ports" | ||
| "github.com/pgEdge/control-plane/server/internal/storage" | ||
| "github.com/pgEdge/control-plane/server/internal/utils" | ||
| ) | ||
|
|
||
| var ( | ||
|
|
@@ -24,16 +27,26 @@ var ( | |
| ) | ||
|
|
||
| type Service struct { | ||
| cfg config.Config | ||
| orchestrator Orchestrator | ||
| store *Store | ||
| hostSvc *host.Service | ||
| portsSvc *ports.Service | ||
| } | ||
|
|
||
| func NewService(orchestrator Orchestrator, store *Store, hostSvc *host.Service) *Service { | ||
| func NewService( | ||
| cfg config.Config, | ||
| orchestrator Orchestrator, | ||
| store *Store, | ||
| hostSvc *host.Service, | ||
| portsSvc *ports.Service, | ||
| ) *Service { | ||
| return &Service{ | ||
| cfg: cfg, | ||
| orchestrator: orchestrator, | ||
| store: store, | ||
| hostSvc: hostSvc, | ||
| portsSvc: portsSvc, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -119,6 +132,18 @@ func (s *Service) UpdateDatabase(ctx context.Context, state DatabaseState, spec | |
| } | ||
|
|
||
| func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) error { | ||
| specs, err := s.store.InstanceSpec. | ||
| GetByDatabaseID(databaseID). | ||
| Exec(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get instance specs: %w", err) | ||
| } | ||
| for _, spec := range specs { | ||
| if err := s.releaseInstancePorts(ctx, spec.Spec); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
Comment on lines
+141
to
+145
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should also be moved after completing the transaction and before returning? what do you think?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it makes more sense at the beginning because of the retry case. We're using the simplest form of deletion (just by key with no constraints) for all database entities, and I would only expect them to fail if Etcd were unhealthy. The release will also fail if Etcd is unhealthy, but it has an additional failure mode when many instances are being created and deleted simultaneously. So, as a whole, this method is more likely to fail during the release than during database entity deletes. With the release at the top here, the spec still exists in Etcd even if the release fails. So, on retry, we're still able to read the spec from storage and retry the release operation. If we did this the other way around and the entity deletes happened first, then on retry, the spec would no longer exist, so we wouldn't be able to retry the port release. That would have to happen thousands of times before you would run out of random ports in the default range, but still, since we can prevent it, I think it makes more sense to try the more error-prone operation at the beginning. |
||
|
|
||
| var ops []storage.TxnOperation | ||
|
|
||
| spec, err := s.store.Spec.GetByKey(databaseID).Exec(ctx) | ||
|
|
@@ -141,6 +166,7 @@ func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) error { | |
|
|
||
| ops = append(ops, | ||
| s.store.Instance.DeleteByDatabaseID(databaseID), | ||
| s.store.InstanceSpec.DeleteByDatabaseID(databaseID), | ||
| s.store.InstanceStatus.DeleteByDatabaseID(databaseID), | ||
| ) | ||
|
|
||
|
|
@@ -279,7 +305,7 @@ func (s *Service) DeleteInstance(ctx context.Context, databaseID, instanceID str | |
| return fmt.Errorf("failed to delete stored instance status: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| return s.DeleteInstanceSpec(ctx, databaseID, instanceID) | ||
| } | ||
|
|
||
| func (s *Service) UpdateInstanceStatus( | ||
|
|
@@ -490,6 +516,134 @@ func (s *Service) PopulateSpecDefaults(ctx context.Context, spec *Spec) error { | |
| return nil | ||
| } | ||
|
|
||
| func (s *Service) ReconcileInstanceSpec(ctx context.Context, spec *InstanceSpec) (*InstanceSpec, error) { | ||
| if s.cfg.HostID != spec.HostID { | ||
| return nil, fmt.Errorf("this instance belongs to another host - this host='%s', instance host='%s'", s.cfg.HostID, spec.HostID) | ||
| } | ||
|
|
||
| var previous *InstanceSpec | ||
| stored, err := s.store.InstanceSpec. | ||
| GetByKey(spec.DatabaseID, spec.InstanceID). | ||
| Exec(ctx) | ||
| switch { | ||
| case err == nil: | ||
| previous = stored.Spec | ||
| spec.CopySettingsFrom(previous) | ||
| case errors.Is(err, storage.ErrNotFound): | ||
| stored = &StoredInstanceSpec{} | ||
| default: | ||
| return nil, fmt.Errorf("failed to get current spec for instance '%s': %w", spec.InstanceID, err) | ||
| } | ||
|
|
||
| var allocated []int | ||
| rollback := func(cause error) error { | ||
| rollbackCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) | ||
| defer cancel() | ||
|
|
||
| return errors.Join(cause, s.portsSvc.ReleasePort(rollbackCtx, spec.HostID, allocated...)) | ||
| } | ||
|
|
||
| if spec.Port != nil && *spec.Port == 0 { | ||
| port, err := s.portsSvc.AllocatePort(ctx, spec.HostID) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to allocate port: %w", err) | ||
| } | ||
| allocated = append(allocated, port) | ||
| spec.Port = utils.PointerTo(port) | ||
| } | ||
|
|
||
| if spec.PatroniPort != nil && *spec.PatroniPort == 0 { | ||
| port, err := s.portsSvc.AllocatePort(ctx, spec.HostID) | ||
| if err != nil { | ||
| return nil, rollback(fmt.Errorf("failed to allocate patroni port: %w", err)) | ||
| } | ||
| allocated = append(allocated, port) | ||
| spec.PatroniPort = utils.PointerTo(port) | ||
| } | ||
|
|
||
| stored.Spec = spec | ||
| err = s.store.InstanceSpec. | ||
| Update(stored). | ||
| Exec(ctx) | ||
| if err != nil { | ||
| return nil, rollback(fmt.Errorf("failed to persist updated instance spec: %w", err)) | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if err := s.releasePreviousSpecPorts(ctx, previous, spec); err != nil { | ||
| return nil, err | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return spec, nil | ||
| } | ||
|
|
||
| func (s *Service) DeleteInstanceSpec(ctx context.Context, databaseID, instanceID string) error { | ||
| spec, err := s.store.InstanceSpec. | ||
| GetByKey(databaseID, instanceID). | ||
| Exec(ctx) | ||
| if errors.Is(err, storage.ErrNotFound) { | ||
| // Spec has already been deleted | ||
| return nil | ||
| } else if err != nil { | ||
| return fmt.Errorf("failed to check if instance spec exists: %w", err) | ||
| } | ||
|
|
||
| if err := s.releaseInstancePorts(ctx, spec.Spec); err != nil { | ||
| return err | ||
| } | ||
|
Comment on lines
+590
to
+592
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move this logic after DeleteByKey?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same logic that I stated above applies here as well: if the release happens after the delete, it cannot be retried. This operation is more likely to fail during the release, so I think it makes more sense to attempt it first. |
||
|
|
||
| _, err = s.store.InstanceSpec. | ||
| DeleteByKey(databaseID, instanceID). | ||
| Exec(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to delete instance spec: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (s *Service) releaseInstancePorts(ctx context.Context, spec *InstanceSpec) error { | ||
| err := s.portsSvc.ReleasePortIfDefined(ctx, spec.HostID, spec.Port, spec.PatroniPort) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to release ports for instance '%s': %w", spec.InstanceID, err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (s *Service) releasePreviousSpecPorts(ctx context.Context, previous, new *InstanceSpec) error { | ||
| if previous == nil { | ||
| return nil | ||
| } | ||
| if portShouldBeReleased(previous.Port, new.Port) { | ||
| err := s.portsSvc.ReleasePortIfDefined(ctx, previous.HostID, previous.Port) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to release previous port: %w", err) | ||
| } | ||
| } | ||
| if portShouldBeReleased(previous.PatroniPort, new.PatroniPort) { | ||
| err := s.portsSvc.ReleasePortIfDefined(ctx, previous.HostID, previous.PatroniPort) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to release previous patroni port: %w", err) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func portShouldBeReleased(current *int, new *int) bool { | ||
| if current == nil || *current == 0 { | ||
| // we didn't previously have an assigned port | ||
| return false | ||
| } | ||
| if new == nil || *current != *new { | ||
| // we had a previously assigned port and now the port is either nil or | ||
| // different | ||
| return true | ||
| } | ||
|
|
||
| // the current and new ports are equal, so it should not be released. | ||
| return false | ||
| } | ||
|
|
||
| func ValidateChangedSpec(current, updated *Spec) error { | ||
| var errs []error | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.