From d1f89c24039fca08c228fcbb0f4bc212e2170e50 Mon Sep 17 00:00:00 2001 From: Philipp Kolberg Date: Fri, 22 May 2026 02:13:25 +0200 Subject: [PATCH] fix: scope servicesync service watches --- charts/dynamic-prefix-operator/README.md | 14 +++ .../templates/deployment.yaml | 3 + charts/dynamic-prefix-operator/values.yaml | 7 ++ cmd/main.go | 23 ++++ internal/controller/servicesync_controller.go | 25 ++++- .../controller/servicesync_controller_test.go | 106 ++++++++++++++++++ 6 files changed, 176 insertions(+), 2 deletions(-) diff --git a/charts/dynamic-prefix-operator/README.md b/charts/dynamic-prefix-operator/README.md index 859f314..c3d1bfe 100644 --- a/charts/dynamic-prefix-operator/README.md +++ b/charts/dynamic-prefix-operator/README.md @@ -84,6 +84,19 @@ helm install dynamic-prefix-operator ./charts/dynamic-prefix-operator \ --set serviceMonitor.enabled=true ``` +#### Limit the Service informer cache + +For large clusters, label HA-managed Services and restrict the Service informer +cache to those opt-in Services: + +```bash +helm install dynamic-prefix-operator ./charts/dynamic-prefix-operator \ + --set 'config.serviceSync.cacheLabelSelector=dynamic-prefix.io/name' +``` + +Services still use the `dynamic-prefix.io/name` annotation for configuration; +the matching label is only used to narrow the informer cache. + #### High availability setup ```bash @@ -123,6 +136,7 @@ helm install dynamic-prefix-operator ./charts/dynamic-prefix-operator \ | `config.logLevel` | Log level | `info` | | `config.leaderElection.enabled` | Enable leader election | `true` | | `config.metrics.enabled` | Enable metrics endpoint | `true` | +| `config.serviceSync.cacheLabelSelector` | Optional label selector for the Service informer cache | `""` | ### Monitoring diff --git a/charts/dynamic-prefix-operator/templates/deployment.yaml b/charts/dynamic-prefix-operator/templates/deployment.yaml index 6c708c3..e15dca9 100644 --- a/charts/dynamic-prefix-operator/templates/deployment.yaml +++ b/charts/dynamic-prefix-operator/templates/deployment.yaml @@ -56,6 +56,9 @@ spec: {{- end }} - --health-probe-bind-address={{ .Values.config.health.bindAddress }} - --zap-log-level={{ .Values.config.logLevel }} + {{- with .Values.config.serviceSync.cacheLabelSelector }} + - --service-cache-label-selector={{ . | quote }} + {{- end }} {{- with .Values.securityContext }} securityContext: {{- toYaml . | nindent 12 }} diff --git a/charts/dynamic-prefix-operator/values.yaml b/charts/dynamic-prefix-operator/values.yaml index 58d871f..942b42b 100644 --- a/charts/dynamic-prefix-operator/values.yaml +++ b/charts/dynamic-prefix-operator/values.yaml @@ -100,6 +100,13 @@ config: # -- Health probe bind address bindAddress: ":8081" + # -- ServiceSync controller configuration + serviceSync: + # -- Optional label selector limiting the Service informer cache. + # Leave empty to preserve backwards-compatible cluster-wide Service cache behavior. + # Example: dynamic-prefix.io/name caches only Services labeled with that key. + cacheLabelSelector: "" + # ============================================================================== # Network Configuration # ============================================================================== diff --git a/cmd/main.go b/cmd/main.go index 1259898..4415913 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -21,6 +21,9 @@ import ( "flag" "os" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -29,6 +32,8 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" @@ -62,6 +67,7 @@ func main() { var probeAddr string var secureMetrics bool var enableHTTP2 bool + var serviceCacheLabelSelector string var tlsOpts []func(*tls.Config) flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") @@ -80,6 +86,8 @@ func main() { flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.") flag.BoolVar(&enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers") + flag.StringVar(&serviceCacheLabelSelector, "service-cache-label-selector", "", + "Optional Kubernetes label selector limiting the Service informer cache, for example dynamic-prefix.io/name") opts := zap.Options{ Development: true, } @@ -155,8 +163,23 @@ func main() { metricsServerOptions.KeyName = metricsCertKey } + cacheOptions := cache.Options{} + if serviceCacheLabelSelector != "" { + selector, err := labels.Parse(serviceCacheLabelSelector) + if err != nil { + setupLog.Error(err, "invalid service cache label selector", "selector", serviceCacheLabelSelector) + os.Exit(1) + } + + cacheOptions.ByObject = map[client.Object]cache.ByObject{ + &corev1.Service{}: {Label: selector}, + } + setupLog.Info("restricting Service informer cache", "selector", serviceCacheLabelSelector) + } + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, + Cache: cacheOptions, Metrics: metricsServerOptions, WebhookServer: webhookServer, HealthProbeBindAddress: probeAddr, diff --git a/internal/controller/servicesync_controller.go b/internal/controller/servicesync_controller.go index 538bd48..41ef198 100644 --- a/internal/controller/servicesync_controller.go +++ b/internal/controller/servicesync_controller.go @@ -59,6 +59,9 @@ const ( // (and historical) prefix with this suffix, instead of inferring it from the // Service's currently assigned IP. AnnotationSuffix = "dynamic-prefix.io/suffix" + + // serviceDynamicPrefixIndex indexes Services by the DynamicPrefix they reference. + serviceDynamicPrefixIndex = "metadata.annotations.dynamic-prefix.io/name" ) // ServiceSyncReconciler reconciles LoadBalancer Services for HA mode prefix transitions. @@ -557,6 +560,10 @@ func (r *ServiceSyncReconciler) applyIPOffset(base netip.Addr, offset [16]byte) // SetupWithManager sets up the controller with the Manager. func (r *ServiceSyncReconciler) SetupWithManager(mgr ctrl.Manager) error { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Service{}, serviceDynamicPrefixIndex, indexServiceByDynamicPrefix); err != nil { + return fmt.Errorf("failed to index Services by DynamicPrefix annotation: %w", err) + } + // Create predicate for LoadBalancer Services with dynamic-prefix.io/name annotation hasAnnotation := predicate.NewPredicateFuncs(func(obj client.Object) bool { svc, ok := obj.(*corev1.Service) @@ -581,6 +588,20 @@ func (r *ServiceSyncReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +func indexServiceByDynamicPrefix(obj client.Object) []string { + svc, ok := obj.(*corev1.Service) + if !ok || svc.Spec.Type != corev1.ServiceTypeLoadBalancer { + return nil + } + + annotations := svc.GetAnnotations() + if annotations == nil || annotations[AnnotationName] == "" { + return nil + } + + return []string{annotations[AnnotationName]} +} + // findReferencingServices finds all Services that reference the given DynamicPrefix. func (r *ServiceSyncReconciler) findReferencingServices(ctx context.Context, obj client.Object) []reconcile.Request { dp, ok := obj.(*dynamicprefixiov1alpha1.DynamicPrefix) @@ -596,9 +617,9 @@ func (r *ServiceSyncReconciler) findReferencingServices(ctx context.Context, obj log := logf.FromContext(ctx) var requests []reconcile.Request - // List all Services + // List only Services that the field index says reference this DynamicPrefix. var serviceList corev1.ServiceList - if err := r.List(ctx, &serviceList); err != nil { + if err := r.List(ctx, &serviceList, client.MatchingFields{serviceDynamicPrefixIndex: dp.Name}); err != nil { log.V(1).Info("Failed to list Services", "error", err) return nil } diff --git a/internal/controller/servicesync_controller_test.go b/internal/controller/servicesync_controller_test.go index 83ef5c8..2ef5e07 100644 --- a/internal/controller/servicesync_controller_test.go +++ b/internal/controller/servicesync_controller_test.go @@ -26,7 +26,10 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" dynamicprefixiov1alpha1 "github.com/jr42/dynamic-prefix-operator/api/v1alpha1" @@ -822,3 +825,106 @@ func TestServiceSyncAnnotationConstants(t *testing.T) { }) } } + +func TestIndexServiceByDynamicPrefix(t *testing.T) { + tests := []struct { + name string + svc *corev1.Service + expected []string + }{ + { + name: "loadbalancer with DynamicPrefix annotation", + svc: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{AnnotationName: "home-ipv6"}}, + Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer}, + }, + expected: []string{"home-ipv6"}, + }, + { + name: "clusterip ignored", + svc: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{AnnotationName: "home-ipv6"}}, + Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeClusterIP}, + }, + }, + { + name: "missing annotation ignored", + svc: &corev1.Service{Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer}}, + }, + { + name: "empty annotation ignored", + svc: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{AnnotationName: ""}}, + Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := indexServiceByDynamicPrefix(tt.svc) + if len(result) != len(tt.expected) { + t.Fatalf("indexServiceByDynamicPrefix() returned %v, want %v", result, tt.expected) + } + for i := range result { + if result[i] != tt.expected[i] { + t.Errorf("indexServiceByDynamicPrefix()[%d] = %q, want %q", i, result[i], tt.expected[i]) + } + } + }) + } +} + +func TestServiceSyncReconciler_findReferencingServicesUsesIndex(t *testing.T) { + ctx := context.Background() + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + _ = dynamicprefixiov1alpha1.AddToScheme(scheme) + + dp := &dynamicprefixiov1alpha1.DynamicPrefix{ + ObjectMeta: metav1.ObjectMeta{Name: "home-ipv6"}, + Spec: dynamicprefixiov1alpha1.DynamicPrefixSpec{ + Transition: &dynamicprefixiov1alpha1.TransitionSpec{Mode: dynamicprefixiov1alpha1.TransitionModeHA}, + }, + } + managed := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managed", + Namespace: "default", + Annotations: map[string]string{AnnotationName: "home-ipv6"}, + }, + Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer}, + } + otherDP := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "other-dp", + Namespace: "default", + Annotations: map[string]string{AnnotationName: "other-ipv6"}, + }, + Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeLoadBalancer}, + } + clusterIP := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clusterip", + Namespace: "default", + Annotations: map[string]string{AnnotationName: "home-ipv6"}, + }, + Spec: corev1.ServiceSpec{Type: corev1.ServiceTypeClusterIP}, + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(dp, managed, otherDP, clusterIP). + WithIndex(&corev1.Service{}, serviceDynamicPrefixIndex, indexServiceByDynamicPrefix). + Build() + + reconciler := &ServiceSyncReconciler{Client: fakeClient, Scheme: scheme} + requests := reconciler.findReferencingServices(ctx, dp) + + if len(requests) != 1 { + t.Fatalf("findReferencingServices() returned %d requests, want 1: %v", len(requests), requests) + } + if requests[0].NamespacedName != (types.NamespacedName{Name: "managed", Namespace: "default"}) { + t.Errorf("findReferencingServices()[0] = %v, want default/managed", requests[0].NamespacedName) + } +}