Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions charts/dynamic-prefix-operator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ helm install dynamic-prefix-operator ./charts/dynamic-prefix-operator \
--set serviceMonitor.enabled=true
```

#### Limit the Service informer cache

For large clusters, label HA-managed Services and restrict the Service informer
cache to those opt-in Services:

```bash
helm install dynamic-prefix-operator ./charts/dynamic-prefix-operator \
--set 'config.serviceSync.cacheLabelSelector=dynamic-prefix.io/name'
```

Services still use the `dynamic-prefix.io/name` annotation for configuration;
the matching label is only used to narrow the informer cache.

#### High availability setup

```bash
Expand Down Expand Up @@ -123,6 +136,7 @@ helm install dynamic-prefix-operator ./charts/dynamic-prefix-operator \
| `config.logLevel` | Log level | `info` |
| `config.leaderElection.enabled` | Enable leader election | `true` |
| `config.metrics.enabled` | Enable metrics endpoint | `true` |
| `config.serviceSync.cacheLabelSelector` | Optional label selector for the Service informer cache | `""` |

### Monitoring

Expand Down
3 changes: 3 additions & 0 deletions charts/dynamic-prefix-operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ spec:
{{- end }}
- --health-probe-bind-address={{ .Values.config.health.bindAddress }}
- --zap-log-level={{ .Values.config.logLevel }}
{{- with .Values.config.serviceSync.cacheLabelSelector }}
- --service-cache-label-selector={{ . | quote }}
{{- end }}
{{- with .Values.securityContext }}
securityContext:
{{- toYaml . | nindent 12 }}
Expand Down
7 changes: 7 additions & 0 deletions charts/dynamic-prefix-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ config:
# -- Health probe bind address
bindAddress: ":8081"

# -- ServiceSync controller configuration
serviceSync:
# -- Optional label selector limiting the Service informer cache.
# Leave empty to preserve backwards-compatible cluster-wide Service cache behavior.
# Example: dynamic-prefix.io/name caches only Services labeled with that key.
cacheLabelSelector: ""

# ==============================================================================
# Network Configuration
# ==============================================================================
Expand Down
23 changes: 23 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"flag"
"os"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand All @@ -29,6 +32,8 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
Expand Down Expand Up @@ -62,6 +67,7 @@ func main() {
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var serviceCacheLabelSelector string
var tlsOpts []func(*tls.Config)
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
Expand All @@ -80,6 +86,8 @@ func main() {
flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.StringVar(&serviceCacheLabelSelector, "service-cache-label-selector", "",
"Optional Kubernetes label selector limiting the Service informer cache, for example dynamic-prefix.io/name")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -155,8 +163,23 @@ func main() {
metricsServerOptions.KeyName = metricsCertKey
}

cacheOptions := cache.Options{}
if serviceCacheLabelSelector != "" {
selector, err := labels.Parse(serviceCacheLabelSelector)
if err != nil {
setupLog.Error(err, "invalid service cache label selector", "selector", serviceCacheLabelSelector)
os.Exit(1)
}

cacheOptions.ByObject = map[client.Object]cache.ByObject{
&corev1.Service{}: {Label: selector},
}
setupLog.Info("restricting Service informer cache", "selector", serviceCacheLabelSelector)
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Cache: cacheOptions,
Metrics: metricsServerOptions,
WebhookServer: webhookServer,
HealthProbeBindAddress: probeAddr,
Expand Down
25 changes: 23 additions & 2 deletions internal/controller/servicesync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const (
// (and historical) prefix with this suffix, instead of inferring it from the
// Service's currently assigned IP.
AnnotationSuffix = "dynamic-prefix.io/suffix"

// serviceDynamicPrefixIndex indexes Services by the DynamicPrefix they reference.
serviceDynamicPrefixIndex = "metadata.annotations.dynamic-prefix.io/name"
)

// ServiceSyncReconciler reconciles LoadBalancer Services for HA mode prefix transitions.
Expand Down Expand Up @@ -557,6 +560,10 @@ func (r *ServiceSyncReconciler) applyIPOffset(base netip.Addr, offset [16]byte)

// SetupWithManager sets up the controller with the Manager.
func (r *ServiceSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Service{}, serviceDynamicPrefixIndex, indexServiceByDynamicPrefix); err != nil {
return fmt.Errorf("failed to index Services by DynamicPrefix annotation: %w", err)
}

// Create predicate for LoadBalancer Services with dynamic-prefix.io/name annotation
hasAnnotation := predicate.NewPredicateFuncs(func(obj client.Object) bool {
svc, ok := obj.(*corev1.Service)
Expand All @@ -581,6 +588,20 @@ func (r *ServiceSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func indexServiceByDynamicPrefix(obj client.Object) []string {
svc, ok := obj.(*corev1.Service)
if !ok || svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return nil
}

annotations := svc.GetAnnotations()
if annotations == nil || annotations[AnnotationName] == "" {
return nil
}

return []string{annotations[AnnotationName]}
}

// findReferencingServices finds all Services that reference the given DynamicPrefix.
func (r *ServiceSyncReconciler) findReferencingServices(ctx context.Context, obj client.Object) []reconcile.Request {
dp, ok := obj.(*dynamicprefixiov1alpha1.DynamicPrefix)
Expand All @@ -596,9 +617,9 @@ func (r *ServiceSyncReconciler) findReferencingServices(ctx context.Context, obj
log := logf.FromContext(ctx)
var requests []reconcile.Request

// List all Services
// List only Services that the field index says reference this DynamicPrefix.
var serviceList corev1.ServiceList
if err := r.List(ctx, &serviceList); err != nil {
if err := r.List(ctx, &serviceList, client.MatchingFields{serviceDynamicPrefixIndex: dp.Name}); err != nil {
log.V(1).Info("Failed to list Services", "error", err)
return nil
}
Expand Down
106 changes: 106 additions & 0 deletions internal/controller/servicesync_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

dynamicprefixiov1alpha1 "github.com/jr42/dynamic-prefix-operator/api/v1alpha1"
Expand Down Expand Up @@ -822,3 +825,106 @@ func TestServiceSyncAnnotationConstants(t *testing.T) {
})
}
}

func TestIndexServiceByDynamicPrefix(t *testing.T) {
tests := []struct {
name string
svc *corev1.Service
expected []string
}{
{
name: "loadbalancer with DynamicPrefix annotation",
svc: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{AnnotationName: "home-ipv6"}},
Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer},
},
expected: []string{"home-ipv6"},
},
{
name: "clusterip ignored",
svc: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{AnnotationName: "home-ipv6"}},
Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeClusterIP},
},
},
{
name: "missing annotation ignored",
svc: &corev1.Service{Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer}},
},
{
name: "empty annotation ignored",
svc: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{AnnotationName: ""}},
Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := indexServiceByDynamicPrefix(tt.svc)
if len(result) != len(tt.expected) {
t.Fatalf("indexServiceByDynamicPrefix() returned %v, want %v", result, tt.expected)
}
for i := range result {
if result[i] != tt.expected[i] {
t.Errorf("indexServiceByDynamicPrefix()[%d] = %q, want %q", i, result[i], tt.expected[i])
}
}
})
}
}

func TestServiceSyncReconciler_findReferencingServicesUsesIndex(t *testing.T) {
ctx := context.Background()
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
_ = dynamicprefixiov1alpha1.AddToScheme(scheme)

dp := &dynamicprefixiov1alpha1.DynamicPrefix{
ObjectMeta: metav1.ObjectMeta{Name: "home-ipv6"},
Spec: dynamicprefixiov1alpha1.DynamicPrefixSpec{
Transition: &dynamicprefixiov1alpha1.TransitionSpec{Mode: dynamicprefixiov1alpha1.TransitionModeHA},
},
}
managed := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "managed",
Namespace: "default",
Annotations: map[string]string{AnnotationName: "home-ipv6"},
},
Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer},
}
otherDP := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "other-dp",
Namespace: "default",
Annotations: map[string]string{AnnotationName: "other-ipv6"},
},
Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer},
}
clusterIP := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "clusterip",
Namespace: "default",
Annotations: map[string]string{AnnotationName: "home-ipv6"},
},
Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeClusterIP},
}

fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(dp, managed, otherDP, clusterIP).
WithIndex(&corev1.Service{}, serviceDynamicPrefixIndex, indexServiceByDynamicPrefix).
Build()

reconciler := &ServiceSyncReconciler{Client: fakeClient, Scheme: scheme}
requests := reconciler.findReferencingServices(ctx, dp)

if len(requests) != 1 {
t.Fatalf("findReferencingServices() returned %d requests, want 1: %v", len(requests), requests)
}
if requests[0].NamespacedName != (types.NamespacedName{Name: "managed", Namespace: "default"}) {
t.Errorf("findReferencingServices()[0] = %v, want default/managed", requests[0].NamespacedName)
}
}
Loading