diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java index f657036d81f5..47720f3bc67d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java @@ -20,6 +20,7 @@ import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; public class ThinClientStoreModelTest { @@ -192,6 +193,39 @@ public void testNullAdditionalHeadersThinClientStoreModel() throws Exception { .isNull(); } + @Test(groups = "unit") + public void getRootUriThrowsWhenThinClientEndpointIsUnavailable() throws Exception { + DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class); + Mockito.doReturn(new DiagnosticsClientContext.DiagnosticsClientConfig()).when(clientContext).getConfig(); + + ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class); + GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class); + RegionalRoutingContext regionalRoutingContext = new RegionalRoutingContext(new URI("https://localhost:8080")); + Mockito.doReturn(regionalRoutingContext).when(globalEndpointManager).resolveServiceEndpoint(any()); + + ThinClientStoreModel storeModel = new ThinClientStoreModel( + clientContext, + sessionContainer, + ConsistencyLevel.SESSION, + new UserAgentContainer(), + globalEndpointManager, + Mockito.mock(HttpClient.class), + null); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName( + clientContext, + OperationType.Query, + "/dbs/db/colls/col/docs", + ResourceType.Document); + + assertThatThrownBy(() -> storeModel.getRootUri(request)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Thin client endpoint is not available for resolved gateway endpoint https://localhost:8080") + .hasMessageContaining("operation type Query") + .hasMessageContaining("resource type Document") + .hasMessageContaining("activity id " + request.getActivityId()); + } + @Test(groups = "unit") public void thinClientDoesNotAddNoRetry449Header() throws Exception { DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/LocationCacheTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/LocationCacheTest.java index d5d25cbcdfbb..5a27301313e3 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/LocationCacheTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/LocationCacheTest.java @@ -5,22 +5,23 @@ import com.azure.cosmos.CosmosExcludedRegions; import com.azure.cosmos.DirectConnectionConfig; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; -import com.azure.cosmos.implementation.LifeCycleUtils; -import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.azure.cosmos.implementation.Constants; import com.azure.cosmos.implementation.DatabaseAccount; -import com.azure.cosmos.implementation.DatabaseAccountLocation; -import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.DatabaseAccountManagerInternal; +import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.LifeCycleUtils; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; -import com.azure.cosmos.models.ModelBridgeUtils; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import com.azure.cosmos.implementation.guava25.collect.Iterables; +import com.azure.cosmos.models.ModelBridgeUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -59,6 +60,10 @@ public class LocationCacheTest { private final static URI Location2Endpoint = createUrl("https://location2.documents.azure.com"); private final static URI Location3Endpoint = createUrl("https://location3.documents.azure.com"); private final static URI Location4Endpoint = createUrl("https://location4.documents.azure.com"); + private final static URI Location1ThinClientEndpoint = createUrl("https://location1-thin.documents.azure.com"); + private final static URI Location2ThinClientEndpoint = createUrl("https://location2-thin.documents.azure.com"); + private final static URI Location3ThinClientEndpoint = createUrl("https://location3-thin.documents.azure.com"); + private final static URI Location4ThinClientEndpoint = createUrl("https://location4-thin.documents.azure.com"); private static HashMap EndpointByLocation = new HashMap<>(); @@ -568,6 +573,120 @@ private static DatabaseAccount createDatabaseAccount(boolean useMultipleWriteLoc return databaseAccount; } + private static DatabaseAccount createDatabaseAccountWithThinClientLocations(boolean useMultipleWriteLocations) { + DatabaseAccount databaseAccount = createDatabaseAccount(useMultipleWriteLocations); + + databaseAccount.set( + Constants.Properties.THINCLIENT_READABLE_LOCATIONS, + ImmutableList.of( + createDatabaseAccountLocation("location1", LocationCacheTest.Location1ThinClientEndpoint.toString()), + createDatabaseAccountLocation("location2", LocationCacheTest.Location2ThinClientEndpoint.toString()), + createDatabaseAccountLocation("location3", LocationCacheTest.Location3ThinClientEndpoint.toString()), + createDatabaseAccountLocation("location4", LocationCacheTest.Location4ThinClientEndpoint.toString()))); + + databaseAccount.set( + Constants.Properties.THINCLIENT_WRITABLE_LOCATIONS, + ImmutableList.of( + createDatabaseAccountLocation("location1", LocationCacheTest.Location1ThinClientEndpoint.toString()), + createDatabaseAccountLocation("location2", LocationCacheTest.Location2ThinClientEndpoint.toString()), + createDatabaseAccountLocation("location3", LocationCacheTest.Location3ThinClientEndpoint.toString()))); + + return databaseAccount; + } + + private static DatabaseAccount createDatabaseAccountWithThinClientReadableLocationsOnly(boolean useMultipleWriteLocations) { + DatabaseAccount databaseAccount = createDatabaseAccount(useMultipleWriteLocations); + + databaseAccount.set( + Constants.Properties.THINCLIENT_READABLE_LOCATIONS, + ImmutableList.of( + createDatabaseAccountLocation("location1", LocationCacheTest.Location1ThinClientEndpoint.toString()), + createDatabaseAccountLocation("location2", LocationCacheTest.Location2ThinClientEndpoint.toString()), + createDatabaseAccountLocation("location3", LocationCacheTest.Location3ThinClientEndpoint.toString()), + createDatabaseAccountLocation("location4", LocationCacheTest.Location4ThinClientEndpoint.toString()))); + + return databaseAccount; + } + + @Test(groups = "unit") + public void unmatchedPreferredRegionShouldResolveThinClientEndpoint() { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig()); + connectionPolicy.setEndpointDiscoveryEnabled(true); + connectionPolicy.setMultipleWriteRegionsEnabled(true); + connectionPolicy.setPreferredRegions(Collections.singletonList("East US 2")); + + LocationCache locationCache = new LocationCache( + connectionPolicy, + LocationCacheTest.DefaultEndpoint, + configs); + + locationCache.onDatabaseAccountRead(createDatabaseAccountWithThinClientLocations(true)); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.create( + mockDiagnosticsClientContext(), + OperationType.Read, + ResourceType.Document); + request.useThinClientMode = true; + + RegionalRoutingContext resolvedRoutingContext = locationCache.resolveServiceEndpoint(request); + + assertThat(resolvedRoutingContext.getGatewayRegionalEndpoint()).isEqualTo(LocationCacheTest.Location1Endpoint); + assertThat(resolvedRoutingContext.getThinclientRegionalEndpoint()).isEqualTo(LocationCacheTest.Location1ThinClientEndpoint); + } + + @Test(groups = "unit") + public void explicitGatewayOnlyRoutingContextShouldResolveThinClientEndpoint() { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig()); + connectionPolicy.setEndpointDiscoveryEnabled(true); + connectionPolicy.setMultipleWriteRegionsEnabled(true); + connectionPolicy.setPreferredRegions(Collections.emptyList()); + + LocationCache locationCache = new LocationCache( + connectionPolicy, + LocationCacheTest.DefaultEndpoint, + configs); + + locationCache.onDatabaseAccountRead(createDatabaseAccountWithThinClientLocations(true)); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.create( + mockDiagnosticsClientContext(), + OperationType.Read, + ResourceType.Document); + request.useThinClientMode = true; + request.requestContext.routeToLocation(new RegionalRoutingContext(LocationCacheTest.Location2Endpoint)); + + RegionalRoutingContext resolvedRoutingContext = locationCache.resolveServiceEndpoint(request); + + assertThat(resolvedRoutingContext.getGatewayRegionalEndpoint()).isEqualTo(LocationCacheTest.Location2Endpoint); + assertThat(resolvedRoutingContext.getThinclientRegionalEndpoint()).isEqualTo(LocationCacheTest.Location2ThinClientEndpoint); + } + + @Test(groups = "unit") + public void writeRequestShouldNotResolveToThinClientReadEndpoint() { + ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig()); + connectionPolicy.setEndpointDiscoveryEnabled(true); + connectionPolicy.setMultipleWriteRegionsEnabled(true); + connectionPolicy.setPreferredRegions(Collections.singletonList("East US 2")); + + LocationCache locationCache = new LocationCache( + connectionPolicy, + LocationCacheTest.DefaultEndpoint, + configs); + + locationCache.onDatabaseAccountRead(createDatabaseAccountWithThinClientReadableLocationsOnly(true)); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.create( + mockDiagnosticsClientContext(), + OperationType.Batch, + ResourceType.Document); + request.useThinClientMode = true; + + RegionalRoutingContext resolvedRoutingContext = locationCache.resolveServiceEndpoint(request); + + assertThat(resolvedRoutingContext.getGatewayRegionalEndpoint()).isEqualTo(LocationCacheTest.DefaultEndpoint); + assertThat(resolvedRoutingContext.getThinclientRegionalEndpoint()).isNull(); + } + private void initialize( boolean useMultipleWriteLocations, boolean enableEndpointDiscovery, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 3a849b4c5baa..9d3d9950aa84 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -4,6 +4,7 @@ import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.implementation.caches.RxClientCollectionCache; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdFramer; @@ -13,9 +14,9 @@ import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; -import com.azure.cosmos.implementation.caches.RxClientCollectionCache; import com.azure.cosmos.implementation.routing.HexConvert; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; +import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -99,8 +100,19 @@ protected Map getDefaultHeaders( @Override public URI getRootUri(RxDocumentServiceRequest request) { - // need to have thin client endpoint here - return this.globalEndpointManager.resolveServiceEndpoint(request).getThinclientRegionalEndpoint(); + RegionalRoutingContext regionalRoutingContext = this.globalEndpointManager.resolveServiceEndpoint(request); + URI thinClientEndpoint = regionalRoutingContext.getThinclientRegionalEndpoint(); + + if (thinClientEndpoint == null) { + throw new IllegalStateException( + "Thin client endpoint is not available for resolved gateway endpoint " + + regionalRoutingContext.getGatewayRegionalEndpoint() + + ", operation type " + request.getOperationType() + + ", resource type " + request.getResourceType() + + ", activity id " + request.getActivityId()); + } + + return thinClientEndpoint; } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java index fe09800f1773..e5e7b3bdd77b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java @@ -196,10 +196,11 @@ public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest re "RxDocumentServiceRequest.requestContext is required and cannot be null."); if (request.requestContext.regionalRoutingContextToRoute != null) { - return request.requestContext.regionalRoutingContextToRoute; + return this.resolveThinClientRoutingContextIfNeeded(request, request.requestContext.regionalRoutingContextToRoute); } int locationIndex = Utils.getValueOrDefault(request.requestContext.locationIndexToRoute, 0); + RegionalRoutingContext resolvedRoutingContext; boolean usePreferredLocations = request.requestContext.usePreferredLocations != null ? request.requestContext.usePreferredLocations : true; if (!usePreferredLocations || (request.getOperationType().isWriteOperation() && !this.canUseMultipleWriteLocations(request))) { @@ -211,15 +212,92 @@ public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest re if (this.enableEndpointDiscovery && !currentLocationInfo.availableWriteLocations.isEmpty()) { locationIndex = Math.min(locationIndex%2, currentLocationInfo.availableWriteLocations.size()-1); String writeLocation = currentLocationInfo.availableWriteLocations.get(locationIndex); - return currentLocationInfo.availableWriteRegionalRoutingContextsByRegionName.get(writeLocation); + resolvedRoutingContext = currentLocationInfo.availableWriteRegionalRoutingContextsByRegionName.get(writeLocation); } else { - return this.defaultRoutingContext; + resolvedRoutingContext = this.defaultRoutingContext; } } else { UnmodifiableList endpoints = request.getOperationType().isWriteOperation()? this.getApplicableWriteRegionRoutingContexts(request) : this.getApplicableReadRegionRoutingContexts(request); - return endpoints.get(locationIndex % endpoints.size()); + resolvedRoutingContext = endpoints.get(locationIndex % endpoints.size()); } + + return this.resolveThinClientRoutingContextIfNeeded(request, resolvedRoutingContext); + } + + private RegionalRoutingContext resolveThinClientRoutingContextIfNeeded( + RxDocumentServiceRequest request, + RegionalRoutingContext regionalRoutingContext) { + + if (!request.useThinClientMode + || regionalRoutingContext == null + || regionalRoutingContext.getThinclientRegionalEndpoint() != null) { + + return regionalRoutingContext; + } + + DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; + Iterable primaryRoutingContexts = request.getOperationType().isWriteOperation() + ? currentLocationInfo.availableWriteRegionalRoutingContexts + : currentLocationInfo.availableReadRegionalRoutingContexts; + Iterable secondaryRoutingContexts = request.getOperationType().isWriteOperation() + ? null + : currentLocationInfo.availableWriteRegionalRoutingContexts; + + RegionalRoutingContext thinClientRoutingContext = this.getThinClientRoutingContextForGatewayEndpoint( + regionalRoutingContext, + primaryRoutingContexts); + + if (thinClientRoutingContext == null) { + thinClientRoutingContext = this.getThinClientRoutingContextForGatewayEndpoint( + regionalRoutingContext, + secondaryRoutingContexts); + } + + if (thinClientRoutingContext == null + && regionalRoutingContext.getGatewayRegionalEndpoint().equals(this.defaultRoutingContext.getGatewayRegionalEndpoint())) { + + thinClientRoutingContext = this.getFirstThinClientRoutingContext(primaryRoutingContexts); + + if (thinClientRoutingContext == null) { + thinClientRoutingContext = this.getFirstThinClientRoutingContext(secondaryRoutingContexts); + } + } + + return thinClientRoutingContext != null ? thinClientRoutingContext : regionalRoutingContext; + } + + private RegionalRoutingContext getThinClientRoutingContextForGatewayEndpoint( + RegionalRoutingContext regionalRoutingContext, + Iterable routingContexts) { + + if (routingContexts == null) { + return null; + } + + for (RegionalRoutingContext candidateRoutingContext : routingContexts) { + if (candidateRoutingContext.getThinclientRegionalEndpoint() != null + && candidateRoutingContext.getGatewayRegionalEndpoint().equals(regionalRoutingContext.getGatewayRegionalEndpoint())) { + + return candidateRoutingContext; + } + } + + return null; + } + + private RegionalRoutingContext getFirstThinClientRoutingContext(Iterable routingContexts) { + if (routingContexts == null) { + return null; + } + + for (RegionalRoutingContext candidateRoutingContext : routingContexts) { + if (candidateRoutingContext.getThinclientRegionalEndpoint() != null) { + return candidateRoutingContext; + } + } + + return null; } public UnmodifiableList getApplicableWriteRegionRoutingContexts(RxDocumentServiceRequest request) { @@ -1026,7 +1104,20 @@ private void addRoutingContexts( URI endpoint = new URI(thinclientDbAccountLocation.getEndpoint().toLowerCase(Locale.ROOT)); RegionalRoutingContext regionalRoutingContext = endpointsByLocation.get(location); - regionalRoutingContext.setThinclientRegionalEndpoint(endpoint); + if (regionalRoutingContext != null) { + regionalRoutingContext.setThinclientRegionalEndpoint(endpoint); + + if (this.defaultRoutingContext.getGatewayRegionalEndpoint() + .equals(regionalRoutingContext.getGatewayRegionalEndpoint())) { + + this.defaultRoutingContext.setThinclientRegionalEndpoint(endpoint); + } + } else { + logger.warn( + "Skipping thin client endpoint add for location = [{}] and endpoint = [{}] because no matching gateway endpoint was found.", + thinclientDbAccountLocation.getName(), + thinclientDbAccountLocation.getEndpoint()); + } } catch (Exception e) { logger.warn("Skipping add for location = [{}] and endpoint = [{}] due to exception [{}]", thinclientDbAccountLocation.getName(),