From 15908f8e98b63c4d98aaadb13ebd861266813363 Mon Sep 17 00:00:00 2001 From: Aritra Basu Date: Wed, 20 May 2026 03:50:34 -0400 Subject: [PATCH] Remove bgp_conf_watcher dependency on pubsub Signed-off-by: Aritra Basu --- calico-vpp-agent/cmd/calico_vpp_dataplane.go | 2 +- calico-vpp-agent/common/pubsub.go | 1 - calico-vpp-agent/felix/felix_server.go | 11 +- .../watchers/bgp_configuration_watcher.go | 145 ++++++++++++------ 4 files changed, 103 insertions(+), 56 deletions(-) diff --git a/calico-vpp-agent/cmd/calico_vpp_dataplane.go b/calico-vpp-agent/cmd/calico_vpp_dataplane.go index 47d39f5b2..836bf6c67 100644 --- a/calico-vpp-agent/cmd/calico_vpp_dataplane.go +++ b/calico-vpp-agent/cmd/calico_vpp_dataplane.go @@ -140,7 +140,6 @@ func main() { */ routeWatcher := watchers.NewRouteWatcher(log.WithFields(logrus.Fields{"subcomponent": "host-route-watcher"})) linkWatcher := watchers.NewLinkWatcher(common.VppManagerInfo.UplinkStatuses, log.WithFields(logrus.Fields{"subcomponent": "host-link-watcher"})) - bgpConfigurationWatcher := watchers.NewBGPConfigurationWatcher(clientv3, log.WithFields(logrus.Fields{"subcomponent": "bgp-conf-watch"})) prefixWatcher := watchers.NewPrefixWatcher(clientv3, log.WithFields(logrus.Fields{"subcomponent": "prefix-watcher"})) peerWatcher := watchers.NewPeerWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "peer-watcher"})) bgpFilterWatcher := watchers.NewBGPFilterWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "BGPFilter-watcher"})) @@ -148,6 +147,7 @@ func main() { routingServer := routing.NewRoutingServer(vpp, bgpServer, log.WithFields(logrus.Fields{"component": "routing"})) localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"})) felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"})) + bgpConfigurationWatcher := watchers.NewBGPConfigurationWatcher(clientv3, log.WithFields(logrus.Fields{"subcomponent": "bgp-conf-watch"}), felixServer.HandleBGPConfigurationChange) felixWatcher := watchers.NewFelixWatcher(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "felix watcher"})) cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"})) serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"})) diff --git a/calico-vpp-agent/common/pubsub.go b/calico-vpp-agent/common/pubsub.go index f1af491ba..6d03dc199 100644 --- a/calico-vpp-agent/common/pubsub.go +++ b/calico-vpp-agent/common/pubsub.go @@ -28,7 +28,6 @@ const ( PeerNodeStateChanged CalicoVppEventType = "PeerNodeStateChanged" IpamConfChanged CalicoVppEventType = "IpamConfChanged" - BGPConfChanged CalicoVppEventType = "BGPConfChanged" ConnectivityAdded CalicoVppEventType = "ConnectivityAdded" ConnectivityDeleted CalicoVppEventType = "ConnectivityDeleted" diff --git a/calico-vpp-agent/felix/felix_server.go b/calico-vpp-agent/felix/felix_server.go index 7d3fcc18d..2881b0926 100644 --- a/calico-vpp-agent/felix/felix_server.go +++ b/calico-vpp-agent/felix/felix_server.go @@ -109,6 +109,14 @@ func (s *Server) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) { s.cache.BGPConf = bgpConf } +// HandleBGPConfigurationChange is called when the BGPConfiguration changes. +// Handling of BGPConfiguration updates is not yet implemented, instead, +// we log and trigger a restart to ensure the system reloads configuration. +func (s *Server) HandleBGPConfigurationChange() error { + s.log.Error("BGPConf updated") + return errors.Errorf("BGPConf updated, restarting") +} + func (s *Server) getMainInterface() *config.UplinkStatus { for _, i := range common.VppManagerInfo.UplinkStatuses { if i.IsMain { @@ -296,9 +304,6 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) { s.log.Debugf("Ignoring NamespaceRemove") case *proto.GlobalBGPConfigUpdate: s.log.Infof("Got GlobalBGPConfigUpdate") - common.SendEvent(common.CalicoVppEvent{ - Type: common.BGPConfChanged, - }) case *proto.WireguardEndpointUpdate: err = s.connectivityHandler.OnWireguardEndpointUpdate(evt) case *proto.WireguardEndpointRemove: diff --git a/calico-vpp-agent/watchers/bgp_configuration_watcher.go b/calico-vpp-agent/watchers/bgp_configuration_watcher.go index 3aba61325..3a11c067a 100644 --- a/calico-vpp-agent/watchers/bgp_configuration_watcher.go +++ b/calico-vpp-agent/watchers/bgp_configuration_watcher.go @@ -25,28 +25,30 @@ import ( calicov3cli "github.com/projectcalico/calico/libcalico-go/lib/clientv3" calicoerr "github.com/projectcalico/calico/libcalico-go/lib/errors" "github.com/projectcalico/calico/libcalico-go/lib/options" + "github.com/projectcalico/calico/libcalico-go/lib/watch" "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" - "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" "github.com/projectcalico/vpp-dataplane/v3/config" ) type BGPConfigurationWatcher struct { - log *logrus.Entry - clientv3 calicov3cli.Interface - BGPConfigurationWatcherEventChan chan any - BGPConf *calicov3.BGPConfigurationSpec + log *logrus.Entry + clientv3 calicov3cli.Interface + BGPConf *calicov3.BGPConfigurationSpec + // Watch interface for monitoring BGP configuration changes + watcher watch.Interface + currentWatchRevision string + // Callback function to handle BGP configuration changes + onBGPConfigChanged func() error } -func NewBGPConfigurationWatcher(clientv3 calicov3cli.Interface, log *logrus.Entry) *BGPConfigurationWatcher { +func NewBGPConfigurationWatcher(clientv3 calicov3cli.Interface, log *logrus.Entry, configChangeHandler func() error) *BGPConfigurationWatcher { w := BGPConfigurationWatcher{ - log: log, - clientv3: clientv3, - BGPConfigurationWatcherEventChan: make(chan any, common.ChanSize), + log: log, + clientv3: clientv3, + onBGPConfigChanged: configChangeHandler, } - reg := common.RegisterHandler(w.BGPConfigurationWatcherEventChan, "BGP Config watcher events") - reg.ExpectEvents(common.BGPConfChanged) return &w } @@ -126,52 +128,93 @@ func (w *BGPConfigurationWatcher) getDefaultBGPConfig() (*calicov3.BGPConfigurat } } -// bgpConfChanged compares only the fields consumed by vpp-dataplane. Fields not yet -// implemented (e.g. ServiceLoadBalancerAggregation) are intentionally excluded so that -// changes to them don't trigger a restart. Remove the exclusion once the field is used. -// Nil and empty slices are treated as equal to avoid spurious restarts when the API -// normalizes a missing field to an empty slice. -func bgpConfChanged(a, b *calicov3.BGPConfigurationSpec) bool { - if a == nil || b == nil { - return a != b - } - sliceChanged := func(lenA, lenB int, equal bool) bool { - return lenA != lenB || (lenA > 0 && !equal) - } - return a.LogSeverityScreen != b.LogSeverityScreen || - !reflect.DeepEqual(a.NodeToNodeMeshEnabled, b.NodeToNodeMeshEnabled) || - !reflect.DeepEqual(a.ASNumber, b.ASNumber) || - a.ListenPort != b.ListenPort || - sliceChanged(len(a.ServiceClusterIPs), len(b.ServiceClusterIPs), reflect.DeepEqual(a.ServiceClusterIPs, b.ServiceClusterIPs)) || - sliceChanged(len(a.ServiceExternalIPs), len(b.ServiceExternalIPs), reflect.DeepEqual(a.ServiceExternalIPs, b.ServiceExternalIPs)) || - sliceChanged(len(a.ServiceLoadBalancerIPs), len(b.ServiceLoadBalancerIPs), reflect.DeepEqual(a.ServiceLoadBalancerIPs, b.ServiceLoadBalancerIPs)) -} - func (w *BGPConfigurationWatcher) WatchBGPConfiguration(t *tomb.Tomb) error { + w.log.Info("BGP configuration watcher started") for t.Alive() { - select { - case <-t.Dying(): - w.log.Warn("BGPConf watcher stopped") - return nil - case msg := <-w.BGPConfigurationWatcherEventChan: - evt, ok := msg.(common.CalicoVppEvent) - if !ok { - continue - } - switch evt.Type { - case common.BGPConfChanged: - oldBGPConf := w.BGPConf - newBGPConf, err := w.GetBGPConf() - if err != nil { - return errors.Wrap(err, "error getting BGP configuration") + w.currentWatchRevision = "" + err := w.resyncAndCreateWatcher() + if err != nil { + w.log.WithError(err).Error("Failed to create BGP configuration watcher") + goto restart + } + for { + select { + case <-t.Dying(): + w.log.Info("BGP configuration watcher asked to stop") + w.cleanExistingWatcher() + return nil + case event, ok := <-w.watcher.ResultChan(): + if !ok { + w.log.Debug("BGP configuration watcher closed, restarting...") + goto restart } - if bgpConfChanged(oldBGPConf, newBGPConf) { - w.log.Error("BGPConf updated") - return errors.Errorf("BGPConf updated, restarting") + bgpConf, ok := event.Object.(*calicov3.BGPConfiguration) + if !ok { + w.log.Error("Unexpected object type in BGP configuration event") + goto restart + } + w.currentWatchRevision = bgpConf.GetResourceVersion() + switch event.Type { + case watch.Error: + w.log.Debug("BGP configuration watch returned error, restarting...") + goto restart + case watch.Added, watch.Modified: + w.handleBGPConfigurationUpdate() + case watch.Deleted: + w.log.Debug("BGP configuration deleted, using defaults") + w.handleBGPConfigurationUpdate() } - default: } } + restart: + w.cleanExistingWatcher() + w.log.Debug("Restarting BGP configuration watcher...") + } + return nil +} + +// resyncAndCreateWatcher creates a new watcher for BGP configurations +func (w *BGPConfigurationWatcher) resyncAndCreateWatcher() error { + w.cleanExistingWatcher() + + opts := options.ListOptions{ + ResourceVersion: w.currentWatchRevision, + } + + watcher, err := w.clientv3.BGPConfigurations().Watch(context.Background(), opts) + if err != nil { + return errors.Wrap(err, "failed to create BGP configuration watcher") } + w.watcher = watcher return nil } + +// cleanExistingWatcher closes the existing watcher if it exists +func (w *BGPConfigurationWatcher) cleanExistingWatcher() { + if w.watcher != nil { + w.watcher.Stop() + w.watcher = nil + } +} + +// handleBGPConfigurationUpdate handles BGP configuration update events +func (w *BGPConfigurationWatcher) handleBGPConfigurationUpdate() { + if w.onBGPConfigChanged == nil { + w.log.Debug("No BGP configuration change handler set") + return + } + + oldConf := w.BGPConf + newConf, err := w.GetBGPConf() + if err != nil { + w.log.WithError(err).Error("Failed to get updated BGP configuration") + return + } + + // Only call the callback if the config actually changed + if !reflect.DeepEqual(oldConf, newConf) { + if err := w.onBGPConfigChanged(); err != nil { + w.log.WithError(err).Error("BGP configuration change handler failed") + } + } +}