Skip to content

Commit dcdd1dc

Browse files
added active passive behavior for in cluster vs routing (#429)
1 parent f2c17b6 commit dcdd1dc

File tree

4 files changed

+837
-9
lines changed

4 files changed

+837
-9
lines changed

admiral/pkg/clusters/serviceentry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,7 @@ func modifyServiceEntryForNewServiceOrPod(
843843
env,
844844
sourceClusterLocality,
845845
ingressDestinations,
846-
false)
846+
false, cname, sourceCluster)
847847
if err != nil {
848848
ctxLogger.Errorf(common.CtxLogFormat, "processGTPAndAddWeightsByCluster",
849849
deploymentOrRolloutName, eventNamespace, sourceCluster, err)
@@ -858,7 +858,7 @@ func modifyServiceEntryForNewServiceOrPod(
858858
env,
859859
sourceClusterLocality,
860860
inClusterDestinations,
861-
true)
861+
true, cname, sourceCluster)
862862
if err != nil {
863863
ctxLogger.Errorf(common.CtxLogFormat, "processGTPAndAddWeightsByCluster",
864864
deploymentOrRolloutName, eventNamespace, sourceCluster, err)

admiral/pkg/clusters/virtualservice_routing.go

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -964,6 +964,15 @@ func performDRPinning(ctx context.Context,
964964
}
965965
newDR := cachedDR.DeepCopy()
966966

967+
if newDR.Spec.TrafficPolicy == nil ||
968+
newDR.Spec.TrafficPolicy.LoadBalancer == nil ||
969+
newDR.Spec.TrafficPolicy.LoadBalancer.LocalityLbSetting == nil {
970+
errs = append(errs, fmt.Errorf(
971+
"skipped pinning DR to remote region as TrafficPolicy or LoadBalancer or LocalityLbSetting is nil for DR %s in cluster %s",
972+
drName, sourceCluster))
973+
continue
974+
}
975+
967976
newDR.Spec.TrafficPolicy.LoadBalancer.LocalityLbSetting, err = getLocalityLBSettings(currentLocality)
968977
if err != nil {
969978
errs = append(errs, fmt.Errorf(
@@ -1385,14 +1394,26 @@ func processGTPAndAddWeightsByCluster(ctxLogger *log.Entry,
13851394
env string,
13861395
sourceClusterLocality string,
13871396
destinations map[string][]*vsrouting.RouteDestination,
1388-
updateWeights bool) error {
1397+
updateWeights bool, cname, sourceCluster string) error {
13891398
//update ingress gtp destination
13901399
// Get the global traffic policy for the env and identity
13911400
// and add the additional endpoints/hosts to the destination map
13921401
globalTrafficPolicy, err := remoteRegistry.AdmiralCache.GlobalTrafficCache.GetFromIdentity(sourceIdentity, env)
13931402
if err != nil {
13941403
return err
13951404
}
1405+
// If there is no GTP and active/passive is default and the func is called for
1406+
// in-cluster vs. (updateWeights signifies in-cluster vs and not ingress vs)
1407+
if globalTrafficPolicy == nil && updateWeights && common.EnableActivePassive() {
1408+
// doActivePassiveInClusterVS func returns a dummy GlobalTrafficPolicy
1409+
// that is used to perform active/passive routing for in-cluster VS.
1410+
globalTrafficPolicy, err = doActivePassiveInClusterVS(
1411+
remoteRegistry, cname, sourceCluster, sourceClusterLocality)
1412+
if err != nil {
1413+
ctxLogger.Warnf(common.CtxLogFormat, "doActivePassiveInClusterVS",
1414+
cname, "", sourceCluster, err.Error())
1415+
}
1416+
}
13961417
if globalTrafficPolicy != nil {
13971418
// Add the global traffic policy destinations to the destination map for ingress vs
13981419
gtpDestinations, err := getDestinationsForGTPDNSPrefixes(ctxLogger, globalTrafficPolicy, destinations, env, sourceClusterLocality, updateWeights)
@@ -1412,6 +1433,94 @@ func processGTPAndAddWeightsByCluster(ctxLogger *log.Entry,
14121433
return nil
14131434
}
14141435

1436+
// doActivePassiveInClusterVS is a helper function that creates a dummy GlobalTrafficPolicy
1437+
// to perform an active/passive routing for in-cluster VS.
1438+
// First we fetch the ServiceEntry from cache to find out if the identity is multi-region.
1439+
// If it is multi-region, we fetch the DestinationRule from cache to find out the locality that is passive.
1440+
// Once we identify that the current cluster is the passive region, then we create a dummy failover GTP to fail
1441+
// traffic to the active region mentioned in the DR.
1442+
func doActivePassiveInClusterVS(remoteRegistry *RemoteRegistry,
1443+
cname string,
1444+
sourceCluster string,
1445+
sourceClusterLocality string) (*v1alpha1.GlobalTrafficPolicy, error) {
1446+
1447+
if remoteRegistry == nil {
1448+
return nil, fmt.Errorf("remoteRegistry is nil")
1449+
}
1450+
rc := remoteRegistry.GetRemoteController(sourceCluster)
1451+
if rc == nil {
1452+
return nil, fmt.Errorf("remotecontroller is nil for cluster %s", sourceCluster)
1453+
}
1454+
if rc.DestinationRuleController == nil {
1455+
return nil, fmt.Errorf("destinationRuleController is nil for cluster %s", sourceCluster)
1456+
}
1457+
if rc.DestinationRuleController.Cache == nil {
1458+
return nil, fmt.Errorf("destinationRuleController.Cache is nil for cluster %s", sourceCluster)
1459+
}
1460+
if rc.ServiceEntryController == nil {
1461+
return nil, fmt.Errorf("serviceEntryController is nil for cluster %s", sourceCluster)
1462+
}
1463+
if rc.ServiceEntryController.Cache == nil {
1464+
return nil, fmt.Errorf("serviceEntryController.Cache is nil for cluster %s", sourceCluster)
1465+
}
1466+
seName := fmt.Sprintf("%s-se", cname)
1467+
seFromCache := rc.ServiceEntryController.Cache.Get(seName, sourceCluster)
1468+
if seFromCache == nil {
1469+
return nil, fmt.Errorf("no se found in cache for seName %s", seName)
1470+
}
1471+
if !isSEMultiRegion(&seFromCache.Spec) {
1472+
return nil, fmt.Errorf("skipped active passive for incluster as the SE is not multi-region %s", seName)
1473+
}
1474+
drName := fmt.Sprintf("%s-default-dr", cname)
1475+
drFromCache := rc.DestinationRuleController.Cache.Get(drName, common.GetSyncNamespace())
1476+
if drFromCache == nil {
1477+
return nil, fmt.Errorf("no dr found in cache for drName %s", drName)
1478+
}
1479+
if drFromCache.Spec.TrafficPolicy == nil ||
1480+
drFromCache.Spec.TrafficPolicy.LoadBalancer == nil ||
1481+
drFromCache.Spec.TrafficPolicy.LoadBalancer.LocalityLbSetting == nil ||
1482+
drFromCache.Spec.TrafficPolicy.LoadBalancer.LocalityLbSetting.Distribute == nil {
1483+
return nil, fmt.Errorf(
1484+
"skipped active passive for incluster as the DR has no localityLBSetting %s", drName)
1485+
}
1486+
distribution := drFromCache.Spec.TrafficPolicy.LoadBalancer.LocalityLbSetting.Distribute
1487+
if len(distribution) != 1 {
1488+
return nil, fmt.Errorf("distribution on the DR %s has a traffic split", drName)
1489+
}
1490+
if _, ok := distribution[0].To[sourceClusterLocality]; ok {
1491+
return nil, fmt.Errorf(
1492+
"the DR %s is pointing to the active cluster %s already", drName, sourceClusterLocality)
1493+
}
1494+
activeLocality := ""
1495+
for currentLocalityOnDR := range distribution[0].To {
1496+
activeLocality = currentLocalityOnDR
1497+
}
1498+
if activeLocality == "" {
1499+
return nil, fmt.Errorf("current locality is empty for dr %s", drName)
1500+
}
1501+
globalTrafficPolicy := &v1alpha1.GlobalTrafficPolicy{
1502+
Spec: model.GlobalTrafficPolicy{
1503+
Policy: []*model.TrafficPolicy{
1504+
{
1505+
DnsPrefix: common.Default,
1506+
LbType: model.TrafficPolicy_FAILOVER,
1507+
Target: []*model.TrafficGroup{
1508+
{
1509+
Region: sourceClusterLocality,
1510+
Weight: int32(0),
1511+
},
1512+
{
1513+
Region: activeLocality,
1514+
Weight: int32(100),
1515+
},
1516+
},
1517+
},
1518+
},
1519+
},
1520+
}
1521+
return globalTrafficPolicy, nil
1522+
}
1523+
14151524
// addWeightsToRouteDestinations ensures that the weights of route destinations in the provided map
14161525
// are correctly distributed to sum to 100 or 0.
14171526
func addWeightsToRouteDestinations(destinations map[string][]*vsrouting.RouteDestination) error {
@@ -1428,6 +1537,10 @@ func addWeightsToRouteDestinations(destinations map[string][]*vsrouting.RouteDes
14281537
if totalWeight == 100 {
14291538
continue
14301539
}
1540+
if totalWeight > 0 {
1541+
log.Warnf("total weight is %d, expected 100 or 0", totalWeight)
1542+
continue
1543+
}
14311544
weightSplits := getWeightSplits(len(routeDestinations))
14321545
for i, destination := range routeDestinations {
14331546
destination.Weight = weightSplits[i]
@@ -1605,6 +1718,21 @@ func addUpdateInClusterDestinationRule(
16051718
cname string,
16061719
env string) error {
16071720

1721+
if remoteRegistry == nil {
1722+
return fmt.Errorf("remoteRegistry is nil")
1723+
}
1724+
var clientConnectionSettings *v1alpha1.ClientConnectionConfig
1725+
var err error
1726+
if remoteRegistry.AdmiralCache != nil && remoteRegistry.AdmiralCache.ClientConnectionConfigCache != nil {
1727+
clientConnectionSettings, err =
1728+
remoteRegistry.AdmiralCache.ClientConnectionConfigCache.GetFromIdentity(sourceIdentity, env)
1729+
if err != nil {
1730+
ctxLogger.Warnf(common.CtxLogFormat, "addUpdateInClusterDestinationRule",
1731+
sourceIdentity, "", "",
1732+
fmt.Sprintf("no clientConnectionConfig found for identity %s env %s", sourceIdentity, env))
1733+
}
1734+
}
1735+
16081736
if sourceIdentity == "" {
16091737
return fmt.Errorf("sourceIdentity is empty")
16101738
}
@@ -1637,7 +1765,7 @@ func addUpdateInClusterDestinationRule(
16371765

16381766
err := addUpdateRoutingDestinationRule(
16391767
ctx, ctxLogger, remoteRegistry, drHosts, sourceCluster,
1640-
"incluster-dr", exportToNamespaces, clientTLSSettings)
1768+
common.InclusterDRSuffix, exportToNamespaces, clientTLSSettings, clientConnectionSettings)
16411769

16421770
if err != nil {
16431771
ctxLogger.Errorf(common.CtxLogFormat, "addUpdateDestinationRuleForSourceIngress",
@@ -1687,7 +1815,7 @@ func addUpdateDestinationRuleForSourceIngress(
16871815

16881816
err := addUpdateRoutingDestinationRule(
16891817
ctx, ctxLogger, remoteRegistry, drHosts, sourceCluster,
1690-
"routing-dr", common.GetIngressVSExportToNamespace(), clientTLSSettings)
1818+
common.RoutingDRSuffix, common.GetIngressVSExportToNamespace(), clientTLSSettings, nil)
16911819

16921820
if err != nil {
16931821
ctxLogger.Errorf(common.CtxLogFormat, "addUpdateDestinationRuleForSourceIngress",
@@ -1707,7 +1835,8 @@ func addUpdateRoutingDestinationRule(
17071835
sourceCluster string,
17081836
drNameSuffix string,
17091837
exportToNamespaces []string,
1710-
clientTLSSettings *networkingV1Alpha3.ClientTLSSettings) error {
1838+
clientTLSSettings *networkingV1Alpha3.ClientTLSSettings,
1839+
clientConnectionSettings *v1alpha1.ClientConnectionConfig) error {
17111840

17121841
if remoteRegistry == nil {
17131842
return fmt.Errorf("remoteRegistry is nil")
@@ -1755,6 +1884,19 @@ func addUpdateRoutingDestinationRule(
17551884
}
17561885
}
17571886

1887+
clientConnectionSettingsOverride := getClientConnectionPoolOverrides(clientConnectionSettings)
1888+
if clientConnectionSettingsOverride != nil {
1889+
drObj.TrafficPolicy.ConnectionPool = clientConnectionSettingsOverride
1890+
}
1891+
if common.DisableDefaultAutomaticFailover() {
1892+
// If automatic failover is disabled, we set the outlier detection settings to zero
1893+
// TODO: need add OOD processing similar to SE based routing
1894+
drObj.TrafficPolicy.OutlierDetection = &networkingV1Alpha3.OutlierDetection{
1895+
ConsecutiveGatewayErrors: &wrappers.UInt32Value{Value: 0},
1896+
Consecutive_5XxErrors: &wrappers.UInt32Value{Value: 0},
1897+
}
1898+
}
1899+
17581900
newDR := createDestinationRuleSkeleton(drObj, drName, util.IstioSystemNamespace)
17591901

17601902
newDR.Labels = map[string]string{

0 commit comments

Comments
 (0)