Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 50 additions & 16 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"crypto/tls"
"flag"
"fmt"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand All @@ -27,6 +28,7 @@ import (

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -182,6 +184,19 @@ func main() {
// Create the receiver factory for prefix acquisition
receiverFactory := prefix.NewReceiverFactory()

// Discover available Cilium API versions
restConfig := ctrl.GetConfigOrDie()
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
setupLog.Error(err, "unable to create kubernetes clientset")
os.Exit(1)
}

ciliumVersions, err := controller.DiscoverCiliumVersions(clientset.Discovery())
if err != nil {
setupLog.Info("Cilium API not available at startup, will poll in background", "reason", err.Error())
}

// Set up DynamicPrefix controller with receiver factory
dynamicPrefixReconciler := controller.NewDynamicPrefixReconciler(
mgr.GetClient(),
Expand All @@ -193,15 +208,6 @@ func main() {
os.Exit(1)
}

// Set up PoolSync controller for Cilium resource synchronization
if err := (&controller.PoolSyncReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PoolSync")
os.Exit(1)
}

// Set up ServiceSync controller for HA mode Service management
if err := (&controller.ServiceSyncReconciler{
Client: mgr.GetClient(),
Expand All @@ -211,13 +217,41 @@ func main() {
os.Exit(1)
}

// Set up BGPSync controller for BGP advertisement management
if err := (&controller.BGPSyncReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "BGPSync")
os.Exit(1)
// setupCiliumControllers registers PoolSync and BGPSync with the manager.
setupCiliumControllers := func(versions *controller.CiliumVersions) error {
if err := (&controller.PoolSyncReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
CiliumVersions: versions,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to create PoolSync controller: %w", err)
}

if err := (&controller.BGPSyncReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
CiliumVersions: versions,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to create BGPSync controller: %w", err)
}

return nil
}

// Set up Cilium-dependent controllers immediately or defer to background poller.
if ciliumVersions != nil {
if err := setupCiliumControllers(ciliumVersions); err != nil {
setupLog.Error(err, "unable to set up Cilium controllers")
os.Exit(1)
}
} else {
if err := mgr.Add(&controller.CiliumControllerStarter{
Discovery: clientset.Discovery(),
SetupControllers: setupCiliumControllers,
}); err != nil {
setupLog.Error(err, "unable to add Cilium controller starter")
os.Exit(1)
}
}
// +kubebuilder:scaffold:builder

Expand Down
92 changes: 45 additions & 47 deletions internal/controller/bgpsync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ import (
dynamicprefixiov1alpha1 "github.com/jr42/dynamic-prefix-operator/api/v1alpha1"
)

var (
// CiliumBGPAdvertisementGVK is the GroupVersionKind for CiliumBGPAdvertisement.
CiliumBGPAdvertisementGVK = schema.GroupVersionKind{
Group: "cilium.io",
Version: "v2alpha1",
Kind: "CiliumBGPAdvertisement",
}
)
// DefaultCiliumBGPAdvertisementGVK is the default GVK used when CiliumVersions is not injected.
var DefaultCiliumBGPAdvertisementGVK = schema.GroupVersionKind{
Group: "cilium.io",
Version: "v2",
Kind: "CiliumBGPAdvertisement",
}

const (
// LabelManagedBy identifies resources managed by this operator.
Expand All @@ -63,7 +61,22 @@ const (
// resources for subnets with BGP advertisement enabled.
type BGPSyncReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
CiliumVersions *CiliumVersions
}

func (r *BGPSyncReconciler) bgpAdvGVK() schema.GroupVersionKind {
if r.CiliumVersions != nil {
return r.CiliumVersions.BGPAdvertisement
}
return DefaultCiliumBGPAdvertisementGVK
}

func (r *BGPSyncReconciler) lbIPPoolGVK() schema.GroupVersionKind {
if r.CiliumVersions != nil {
return r.CiliumVersions.LoadBalancerIPPool
}
return DefaultCiliumLBIPPoolGVK
}

// +kubebuilder:rbac:groups=cilium.io,resources=ciliumbgpadvertisements,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -149,7 +162,7 @@ func (r *BGPSyncReconciler) reconcileAdvertisement(

// Create or update the advertisement
adv := &unstructured.Unstructured{}
adv.SetGroupVersionKind(CiliumBGPAdvertisementGVK)
adv.SetGroupVersionKind(r.bgpAdvGVK())
adv.SetName(advName)

// Check if it exists
Expand All @@ -162,7 +175,7 @@ func (r *BGPSyncReconciler) reconcileAdvertisement(
// Create new advertisement
adv = &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "cilium.io/v2alpha1",
"apiVersion": APIVersion(r.bgpAdvGVK()),
"kind": "CiliumBGPAdvertisement",
"metadata": map[string]interface{}{
"name": advName,
Expand Down Expand Up @@ -224,11 +237,7 @@ func (r *BGPSyncReconciler) getPoolServiceSelector(
) (map[string]interface{}, error) {
// List all CiliumLoadBalancerIPPools with matching annotations
poolList := &unstructured.UnstructuredList{}
poolList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "cilium.io",
Version: "v2alpha1",
Kind: "CiliumLoadBalancerIPPoolList",
})
poolList.SetGroupVersionKind(ListGVK(r.lbIPPoolGVK()))

if err := r.List(ctx, poolList); err != nil {
return nil, fmt.Errorf("failed to list CiliumLoadBalancerIPPools: %w", err)
Expand Down Expand Up @@ -299,11 +308,7 @@ func (r *BGPSyncReconciler) deleteOrphanedAdvertisements(

// List all advertisements managed by this operator for this DynamicPrefix
advList := &unstructured.UnstructuredList{}
advList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "cilium.io",
Version: "v2alpha1",
Kind: "CiliumBGPAdvertisementList",
})
advList.SetGroupVersionKind(ListGVK(r.bgpAdvGVK()))

if err := r.List(ctx, advList, client.MatchingLabels{
LabelManagedBy: LabelManagedByValue,
Expand Down Expand Up @@ -379,10 +384,11 @@ func (r *BGPSyncReconciler) buildBGPCondition(
) metav1.Condition {
if len(subnetsWithBGP) == 0 {
return metav1.Condition{
Type: dynamicprefixiov1alpha1.ConditionTypeBGPAdvertisementReady,
Status: metav1.ConditionFalse,
Reason: "NoBGPSubnets",
Message: "No subnets have BGP advertisement enabled",
Type: dynamicprefixiov1alpha1.ConditionTypeBGPAdvertisementReady,
Status: metav1.ConditionFalse,
Reason: "NoBGPSubnets",
Message: "No subnets have BGP advertisement enabled",
LastTransitionTime: metav1.Now(),
}
}

Expand All @@ -391,7 +397,7 @@ func (r *BGPSyncReconciler) buildBGPCondition(
for _, subnet := range subnetsWithBGP {
advName := r.advertisementName(dp.Name, subnet.Name)
adv := &unstructured.Unstructured{}
adv.SetGroupVersionKind(CiliumBGPAdvertisementGVK)
adv.SetGroupVersionKind(r.bgpAdvGVK())
if err := r.Get(ctx, types.NamespacedName{Name: advName}, adv); err != nil {
allReady = false
break
Expand All @@ -400,18 +406,20 @@ func (r *BGPSyncReconciler) buildBGPCondition(

if allReady {
return metav1.Condition{
Type: dynamicprefixiov1alpha1.ConditionTypeBGPAdvertisementReady,
Status: metav1.ConditionTrue,
Reason: "AdvertisementsReady",
Message: fmt.Sprintf("%d BGP advertisement(s) configured", len(subnetsWithBGP)),
Type: dynamicprefixiov1alpha1.ConditionTypeBGPAdvertisementReady,
Status: metav1.ConditionTrue,
Reason: "AdvertisementsReady",
Message: fmt.Sprintf("%d BGP advertisement(s) configured", len(subnetsWithBGP)),
LastTransitionTime: metav1.Now(),
}
}

return metav1.Condition{
Type: dynamicprefixiov1alpha1.ConditionTypeBGPAdvertisementReady,
Status: metav1.ConditionFalse,
Reason: "AdvertisementsPending",
Message: "Some BGP advertisements are not yet ready",
Type: dynamicprefixiov1alpha1.ConditionTypeBGPAdvertisementReady,
Status: metav1.ConditionFalse,
Reason: "AdvertisementsPending",
Message: "Some BGP advertisements are not yet ready",
LastTransitionTime: metav1.Now(),
}
}

Expand All @@ -425,40 +433,30 @@ func (r *BGPSyncReconciler) findCondition(conditions []metav1.Condition, conditi
return nil
}

// setCondition updates or adds a condition, preserving LastTransitionTime
// when the status has not changed (per Kubernetes convention).
// setCondition updates or adds a condition.
func (r *BGPSyncReconciler) setCondition(conditions *[]metav1.Condition, condition metav1.Condition) {
now := metav1.Now()
for i := range *conditions {
if (*conditions)[i].Type == condition.Type {
if (*conditions)[i].Status == condition.Status {
// Status unchanged — preserve the existing transition time
condition.LastTransitionTime = (*conditions)[i].LastTransitionTime
} else {
condition.LastTransitionTime = now
}
(*conditions)[i] = condition
return
}
}
// New condition
condition.LastTransitionTime = now
*conditions = append(*conditions, condition)
}

// SetupWithManager sets up the controller with the Manager.
func (r *BGPSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Watch CiliumBGPAdvertisement for owned resources
bgpAdv := &unstructured.Unstructured{}
bgpAdv.SetGroupVersionKind(CiliumBGPAdvertisementGVK)
bgpAdv.SetGroupVersionKind(r.bgpAdvGVK())

return ctrl.NewControllerManagedBy(mgr).
Named("bgpsync").
For(&dynamicprefixiov1alpha1.DynamicPrefix{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(bgpAdv).
Watches(&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "cilium.io/v2alpha1",
"apiVersion": APIVersion(r.lbIPPoolGVK()),
"kind": "CiliumLoadBalancerIPPool",
},
}, handler.EnqueueRequestsFromMapFunc(r.findDynamicPrefixForPool),
Expand Down
Loading
Loading