Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;

public class ThinClientStoreModelTest {
Expand Down Expand Up @@ -192,6 +193,39 @@ public void testNullAdditionalHeadersThinClientStoreModel() throws Exception {
.isNull();
}

@Test(groups = "unit")
public void getRootUriThrowsWhenThinClientEndpointIsUnavailable() throws Exception {
DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class);
Mockito.doReturn(new DiagnosticsClientContext.DiagnosticsClientConfig()).when(clientContext).getConfig();

ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class);
RegionalRoutingContext regionalRoutingContext = new RegionalRoutingContext(new URI("https://localhost:8080"));
Mockito.doReturn(regionalRoutingContext).when(globalEndpointManager).resolveServiceEndpoint(any());

ThinClientStoreModel storeModel = new ThinClientStoreModel(
clientContext,
sessionContainer,
ConsistencyLevel.SESSION,
new UserAgentContainer(),
globalEndpointManager,
Mockito.mock(HttpClient.class),
null);

RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName(
clientContext,
OperationType.Query,
"/dbs/db/colls/col/docs",
ResourceType.Document);

assertThatThrownBy(() -> storeModel.getRootUri(request))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Thin client endpoint is not available for resolved gateway endpoint https://localhost:8080")
.hasMessageContaining("operation type Query")
.hasMessageContaining("resource type Document")
.hasMessageContaining("activity id " + request.getActivityId());
}

@Test(groups = "unit")
public void thinClientDoesNotAddNoRetry449Header() throws Exception {
DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@

import com.azure.cosmos.CosmosExcludedRegions;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.LifeCycleUtils;
import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.DatabaseAccount;
import com.azure.cosmos.implementation.DatabaseAccountLocation;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.DatabaseAccountManagerInternal;
import com.azure.cosmos.implementation.DatabaseAccountLocation;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.LifeCycleUtils;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.models.ModelBridgeUtils;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
import com.azure.cosmos.implementation.guava25.collect.Iterables;
import com.azure.cosmos.models.ModelBridgeUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -59,6 +60,10 @@ public class LocationCacheTest {
private final static URI Location2Endpoint = createUrl("https://location2.documents.azure.com");
private final static URI Location3Endpoint = createUrl("https://location3.documents.azure.com");
private final static URI Location4Endpoint = createUrl("https://location4.documents.azure.com");
private final static URI Location1ThinClientEndpoint = createUrl("https://location1-thin.documents.azure.com");
private final static URI Location2ThinClientEndpoint = createUrl("https://location2-thin.documents.azure.com");
private final static URI Location3ThinClientEndpoint = createUrl("https://location3-thin.documents.azure.com");
private final static URI Location4ThinClientEndpoint = createUrl("https://location4-thin.documents.azure.com");

private static HashMap<String, URI> EndpointByLocation = new HashMap<>();

Expand Down Expand Up @@ -568,6 +573,120 @@ private static DatabaseAccount createDatabaseAccount(boolean useMultipleWriteLoc
return databaseAccount;
}

private static DatabaseAccount createDatabaseAccountWithThinClientLocations(boolean useMultipleWriteLocations) {
DatabaseAccount databaseAccount = createDatabaseAccount(useMultipleWriteLocations);

databaseAccount.set(
Constants.Properties.THINCLIENT_READABLE_LOCATIONS,
ImmutableList.of(
createDatabaseAccountLocation("location1", LocationCacheTest.Location1ThinClientEndpoint.toString()),
createDatabaseAccountLocation("location2", LocationCacheTest.Location2ThinClientEndpoint.toString()),
createDatabaseAccountLocation("location3", LocationCacheTest.Location3ThinClientEndpoint.toString()),
createDatabaseAccountLocation("location4", LocationCacheTest.Location4ThinClientEndpoint.toString())));

databaseAccount.set(
Constants.Properties.THINCLIENT_WRITABLE_LOCATIONS,
ImmutableList.of(
createDatabaseAccountLocation("location1", LocationCacheTest.Location1ThinClientEndpoint.toString()),
createDatabaseAccountLocation("location2", LocationCacheTest.Location2ThinClientEndpoint.toString()),
createDatabaseAccountLocation("location3", LocationCacheTest.Location3ThinClientEndpoint.toString())));

return databaseAccount;
}

private static DatabaseAccount createDatabaseAccountWithThinClientReadableLocationsOnly(boolean useMultipleWriteLocations) {
DatabaseAccount databaseAccount = createDatabaseAccount(useMultipleWriteLocations);

databaseAccount.set(
Constants.Properties.THINCLIENT_READABLE_LOCATIONS,
ImmutableList.of(
createDatabaseAccountLocation("location1", LocationCacheTest.Location1ThinClientEndpoint.toString()),
createDatabaseAccountLocation("location2", LocationCacheTest.Location2ThinClientEndpoint.toString()),
createDatabaseAccountLocation("location3", LocationCacheTest.Location3ThinClientEndpoint.toString()),
createDatabaseAccountLocation("location4", LocationCacheTest.Location4ThinClientEndpoint.toString())));

return databaseAccount;
}

@Test(groups = "unit")
public void unmatchedPreferredRegionShouldResolveThinClientEndpoint() {
ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig());
connectionPolicy.setEndpointDiscoveryEnabled(true);
connectionPolicy.setMultipleWriteRegionsEnabled(true);
connectionPolicy.setPreferredRegions(Collections.singletonList("East US 2"));

LocationCache locationCache = new LocationCache(
connectionPolicy,
LocationCacheTest.DefaultEndpoint,
configs);
Comment thread
arnabnandy7 marked this conversation as resolved.

locationCache.onDatabaseAccountRead(createDatabaseAccountWithThinClientLocations(true));

RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
mockDiagnosticsClientContext(),
OperationType.Read,
ResourceType.Document);
request.useThinClientMode = true;

RegionalRoutingContext resolvedRoutingContext = locationCache.resolveServiceEndpoint(request);

assertThat(resolvedRoutingContext.getGatewayRegionalEndpoint()).isEqualTo(LocationCacheTest.Location1Endpoint);
assertThat(resolvedRoutingContext.getThinclientRegionalEndpoint()).isEqualTo(LocationCacheTest.Location1ThinClientEndpoint);
}

@Test(groups = "unit")
public void explicitGatewayOnlyRoutingContextShouldResolveThinClientEndpoint() {
ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig());
connectionPolicy.setEndpointDiscoveryEnabled(true);
connectionPolicy.setMultipleWriteRegionsEnabled(true);
connectionPolicy.setPreferredRegions(Collections.emptyList());

LocationCache locationCache = new LocationCache(
connectionPolicy,
LocationCacheTest.DefaultEndpoint,
configs);

locationCache.onDatabaseAccountRead(createDatabaseAccountWithThinClientLocations(true));

RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
mockDiagnosticsClientContext(),
OperationType.Read,
ResourceType.Document);
request.useThinClientMode = true;
request.requestContext.routeToLocation(new RegionalRoutingContext(LocationCacheTest.Location2Endpoint));

RegionalRoutingContext resolvedRoutingContext = locationCache.resolveServiceEndpoint(request);

assertThat(resolvedRoutingContext.getGatewayRegionalEndpoint()).isEqualTo(LocationCacheTest.Location2Endpoint);
assertThat(resolvedRoutingContext.getThinclientRegionalEndpoint()).isEqualTo(LocationCacheTest.Location2ThinClientEndpoint);
}

@Test(groups = "unit")
public void writeRequestShouldNotResolveToThinClientReadEndpoint() {
ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig());
connectionPolicy.setEndpointDiscoveryEnabled(true);
connectionPolicy.setMultipleWriteRegionsEnabled(true);
connectionPolicy.setPreferredRegions(Collections.singletonList("East US 2"));

LocationCache locationCache = new LocationCache(
connectionPolicy,
LocationCacheTest.DefaultEndpoint,
configs);

locationCache.onDatabaseAccountRead(createDatabaseAccountWithThinClientReadableLocationsOnly(true));

RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
mockDiagnosticsClientContext(),
OperationType.Batch,
ResourceType.Document);
request.useThinClientMode = true;

RegionalRoutingContext resolvedRoutingContext = locationCache.resolveServiceEndpoint(request);

assertThat(resolvedRoutingContext.getGatewayRegionalEndpoint()).isEqualTo(LocationCacheTest.DefaultEndpoint);
assertThat(resolvedRoutingContext.getThinclientRegionalEndpoint()).isNull();
}

private void initialize(
boolean useMultipleWriteLocations,
boolean enableEndpointDiscovery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.implementation.caches.RxClientCollectionCache;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdFramer;
Expand All @@ -13,9 +14,9 @@
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.caches.RxClientCollectionCache;
import com.azure.cosmos.implementation.routing.HexConvert;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -99,8 +100,19 @@ protected Map<String, String> getDefaultHeaders(

@Override
public URI getRootUri(RxDocumentServiceRequest request) {
// need to have thin client endpoint here
return this.globalEndpointManager.resolveServiceEndpoint(request).getThinclientRegionalEndpoint();
RegionalRoutingContext regionalRoutingContext = this.globalEndpointManager.resolveServiceEndpoint(request);
URI thinClientEndpoint = regionalRoutingContext.getThinclientRegionalEndpoint();

if (thinClientEndpoint == null) {
throw new IllegalStateException(
"Thin client endpoint is not available for resolved gateway endpoint "
+ regionalRoutingContext.getGatewayRegionalEndpoint()
+ ", operation type " + request.getOperationType()
+ ", resource type " + request.getResourceType()
+ ", activity id " + request.getActivityId());
}
Comment thread
arnabnandy7 marked this conversation as resolved.

return thinClientEndpoint;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,11 @@ public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest re
"RxDocumentServiceRequest.requestContext is required and cannot be null.");

if (request.requestContext.regionalRoutingContextToRoute != null) {
return request.requestContext.regionalRoutingContextToRoute;
return this.resolveThinClientRoutingContextIfNeeded(request, request.requestContext.regionalRoutingContextToRoute);
}

int locationIndex = Utils.getValueOrDefault(request.requestContext.locationIndexToRoute, 0);
RegionalRoutingContext resolvedRoutingContext;

boolean usePreferredLocations = request.requestContext.usePreferredLocations != null ? request.requestContext.usePreferredLocations : true;
if (!usePreferredLocations || (request.getOperationType().isWriteOperation() && !this.canUseMultipleWriteLocations(request))) {
Expand All @@ -211,15 +212,92 @@ public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest re
if (this.enableEndpointDiscovery && !currentLocationInfo.availableWriteLocations.isEmpty()) {
locationIndex = Math.min(locationIndex%2, currentLocationInfo.availableWriteLocations.size()-1);
String writeLocation = currentLocationInfo.availableWriteLocations.get(locationIndex);
return currentLocationInfo.availableWriteRegionalRoutingContextsByRegionName.get(writeLocation);
resolvedRoutingContext = currentLocationInfo.availableWriteRegionalRoutingContextsByRegionName.get(writeLocation);
} else {
return this.defaultRoutingContext;
resolvedRoutingContext = this.defaultRoutingContext;
}
} else {
UnmodifiableList<RegionalRoutingContext> endpoints =
request.getOperationType().isWriteOperation()? this.getApplicableWriteRegionRoutingContexts(request) : this.getApplicableReadRegionRoutingContexts(request);
return endpoints.get(locationIndex % endpoints.size());
resolvedRoutingContext = endpoints.get(locationIndex % endpoints.size());
}

return this.resolveThinClientRoutingContextIfNeeded(request, resolvedRoutingContext);
}

private RegionalRoutingContext resolveThinClientRoutingContextIfNeeded(
RxDocumentServiceRequest request,
RegionalRoutingContext regionalRoutingContext) {

if (!request.useThinClientMode
|| regionalRoutingContext == null
|| regionalRoutingContext.getThinclientRegionalEndpoint() != null) {

return regionalRoutingContext;
}

DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo;
Iterable<RegionalRoutingContext> primaryRoutingContexts = request.getOperationType().isWriteOperation()
? currentLocationInfo.availableWriteRegionalRoutingContexts
: currentLocationInfo.availableReadRegionalRoutingContexts;
Iterable<RegionalRoutingContext> secondaryRoutingContexts = request.getOperationType().isWriteOperation()
? null
: currentLocationInfo.availableWriteRegionalRoutingContexts;

RegionalRoutingContext thinClientRoutingContext = this.getThinClientRoutingContextForGatewayEndpoint(
regionalRoutingContext,
primaryRoutingContexts);

if (thinClientRoutingContext == null) {
thinClientRoutingContext = this.getThinClientRoutingContextForGatewayEndpoint(
regionalRoutingContext,
secondaryRoutingContexts);
}

if (thinClientRoutingContext == null
&& regionalRoutingContext.getGatewayRegionalEndpoint().equals(this.defaultRoutingContext.getGatewayRegionalEndpoint())) {

thinClientRoutingContext = this.getFirstThinClientRoutingContext(primaryRoutingContexts);

if (thinClientRoutingContext == null) {
thinClientRoutingContext = this.getFirstThinClientRoutingContext(secondaryRoutingContexts);
}
}

return thinClientRoutingContext != null ? thinClientRoutingContext : regionalRoutingContext;
}

private RegionalRoutingContext getThinClientRoutingContextForGatewayEndpoint(
RegionalRoutingContext regionalRoutingContext,
Iterable<RegionalRoutingContext> routingContexts) {

if (routingContexts == null) {
return null;
}

for (RegionalRoutingContext candidateRoutingContext : routingContexts) {
if (candidateRoutingContext.getThinclientRegionalEndpoint() != null
&& candidateRoutingContext.getGatewayRegionalEndpoint().equals(regionalRoutingContext.getGatewayRegionalEndpoint())) {

return candidateRoutingContext;
}
}

return null;
}

private RegionalRoutingContext getFirstThinClientRoutingContext(Iterable<RegionalRoutingContext> routingContexts) {
if (routingContexts == null) {
return null;
}

for (RegionalRoutingContext candidateRoutingContext : routingContexts) {
if (candidateRoutingContext.getThinclientRegionalEndpoint() != null) {
return candidateRoutingContext;
}
}

return null;
}

public UnmodifiableList<RegionalRoutingContext> getApplicableWriteRegionRoutingContexts(RxDocumentServiceRequest request) {
Expand Down Expand Up @@ -1026,7 +1104,20 @@ private void addRoutingContexts(
URI endpoint = new URI(thinclientDbAccountLocation.getEndpoint().toLowerCase(Locale.ROOT));

RegionalRoutingContext regionalRoutingContext = endpointsByLocation.get(location);
regionalRoutingContext.setThinclientRegionalEndpoint(endpoint);
if (regionalRoutingContext != null) {
regionalRoutingContext.setThinclientRegionalEndpoint(endpoint);

if (this.defaultRoutingContext.getGatewayRegionalEndpoint()
Comment thread
arnabnandy7 marked this conversation as resolved.
.equals(regionalRoutingContext.getGatewayRegionalEndpoint())) {

this.defaultRoutingContext.setThinclientRegionalEndpoint(endpoint);
}
} else {
logger.warn(
"Skipping thin client endpoint add for location = [{}] and endpoint = [{}] because no matching gateway endpoint was found.",
thinclientDbAccountLocation.getName(),
thinclientDbAccountLocation.getEndpoint());
}
} catch (Exception e) {
logger.warn("Skipping add for location = [{}] and endpoint = [{}] due to exception [{}]",
thinclientDbAccountLocation.getName(),
Expand Down