Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.ChannelPoolMetricsTracer;
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer;
import io.grpc.ManagedChannelBuilder;
import java.io.Closeable;
import java.io.IOException;
Expand All @@ -31,6 +32,9 @@ public interface Metrics extends Closeable {
@Nullable
ChannelPoolMetricsTracer getChannelPoolMetricsTracer();

@Nullable
DirectPathCompatibleTracer getDirectPathCompatibleTracer();

void start();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.ChannelPoolMetricsTracer;
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DefaultDirectPathCompatibleTracer;
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer;
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.Pacemaker;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -67,6 +69,7 @@ public class MetricsImpl implements Metrics, Closeable {

@Nullable private final GrpcOpenTelemetry grpcOtel;
@Nullable private final ChannelPoolMetricsTracer channelPoolMetricsTracer;
@Nullable private final DirectPathCompatibleTracer directPathCompatibleTracer;
@Nullable private final Pacemaker pacemaker;
private final List<ScheduledFuture<?>> tasks = new ArrayList<>();

Expand Down Expand Up @@ -94,6 +97,8 @@ public MetricsImpl(
this.internalRecorder = metricRegistry.newRecorderRegistry(internalOtel.getMeterProvider());
this.pacemaker = new Pacemaker(internalRecorder, clientInfo, "background");
this.channelPoolMetricsTracer = new ChannelPoolMetricsTracer(internalRecorder, clientInfo);
this.directPathCompatibleTracer =
new DefaultDirectPathCompatibleTracer(clientInfo, internalRecorder);
this.grpcOtel =
GrpcOpenTelemetry.newBuilder()
.sdk(internalOtel)
Expand All @@ -109,6 +114,7 @@ public MetricsImpl(
this.grpcOtel = null;
this.pacemaker = null;
this.channelPoolMetricsTracer = null;
this.directPathCompatibleTracer = null;
}

if (userOtel != null) {
Expand Down Expand Up @@ -171,6 +177,12 @@ public ChannelPoolMetricsTracer getChannelPoolMetricsTracer() {
return channelPoolMetricsTracer;
}

@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont make this nullable, instead add a NoopTracer in NoopMetrics

@Override
public DirectPathCompatibleTracer getDirectPathCompatibleTracer() {
return directPathCompatibleTracer;
}

public static OpenTelemetrySdk createBuiltinOtel(
MetricRegistry metricRegistry,
ClientInfo clientInfo,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.internal.csm.tracers;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.data.v2.internal.csm.MetricRegistry;
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;

@InternalApi
public class DefaultDirectPathCompatibleTracer implements DirectPathCompatibleTracer {
private final ClientInfo clientInfo;
private final MetricRegistry.RecorderRegistry recorder;

public DefaultDirectPathCompatibleTracer(
ClientInfo clientInfo, MetricRegistry.RecorderRegistry recorder) {
this.clientInfo = clientInfo;
this.recorder = recorder;
}

@Override
public void recordSuccess(String ipPreference) {
recorder.dpCompatGuage.recordSuccess(clientInfo, ipPreference);
}

@Override
public void recordFailure(String reason) {
recorder.dpCompatGuage.recordFailure(clientInfo, reason);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.internal.csm.tracers;

import com.google.api.core.InternalApi;

/** Interface for recording DirectPath/DirectAccess eligibility metrics. */
@InternalApi
public interface DirectPathCompatibleTracer {

/**
* Records that the environment is eligible and successfully connected via DirectPath.
*
* @param ipPreference The IP preference used (e.g., "ipv6").
*/
void recordSuccess(String ipPreference);

/**
* Records that the environment is not eligible or failed to connect via DirectPath.
*
* @param reason The reason for the failure (e.g., "routing_check_failed").
*/
void recordFailure(String reason);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.InternalApi;
import io.grpc.ManagedChannel;
import java.io.IOException;

@InternalApi
public interface BigtableChannelFactory {
ManagedChannel createSingleChannel() throws IOException;
Comment on lines +23 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use Supplier

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
Expand Down Expand Up @@ -88,21 +88,21 @@ static BigtableChannelPrimer create(
}

@Override
public void primeChannel(ManagedChannel managedChannel) {
public void primeChannel(Channel channel) {
try {
primeChannelUnsafe(managedChannel);
primeChannelUnsafe(channel);
} catch (IOException | RuntimeException e) {
LOG.log(Level.WARNING, "Unexpected error while trying to prime a channel", e);
}
}

private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
sendPrimeRequestsBlocking(managedChannel);
private void primeChannelUnsafe(Channel channel) throws IOException {
sendPrimeRequestsBlocking(channel);
}

private void sendPrimeRequestsBlocking(ManagedChannel managedChannel) {
private void sendPrimeRequestsBlocking(Channel channel) {
try {
sendPrimeRequestsAsync(managedChannel).get(1, TimeUnit.MINUTES);
sendPrimeRequestsAsync(channel).get(1, TimeUnit.MINUTES);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
Expand All @@ -111,7 +111,7 @@ private void sendPrimeRequestsBlocking(ManagedChannel managedChannel) {
}
}

public ApiFuture<PingAndWarmResponse> sendPrimeRequestsAsync(ManagedChannel managedChannel) {
public ApiFuture<PingAndWarmResponse> sendPrimeRequestsAsync(Channel managedChannel) {
ClientCall<PingAndWarmRequest, PingAndWarmResponse> clientCall =
managedChannel.newCall(
BigtableGrpc.getPingAndWarmMethod(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public static BigtableClientContext create(
transportProvider.build(),
channelPrimer,
metrics.getChannelPoolMetricsTracer(),
backgroundExecutor);
backgroundExecutor,
metrics.getDirectPathCompatibleTracer());

builder.setTransportChannelProvider(btTransportProvider);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is unary/jetstream agnostic, then it should live in data.v2.internal...maybe create a pacakge for dp?


import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer;
import javax.annotation.Nullable;

@InternalApi
/* Evaluates whether a given channel supports Direct Access. */
public interface DirectAccessChecker {
/**
* Evaluates if Direct Access is available by creating a test channel.
*
* @param channelFactory A factory to create the test channel
* @return true if the channel is eligible for Direct Access
*/
boolean check(BigtableChannelFactory channelFactory, @Nullable DirectPathCompatibleTracer tracer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,25 @@ public ClientOperationSettings getPerOpSettings() {
return perOpSettings;
}

/** Applies common pool, message size, and keep-alive settings to the provided builder. */
private static InstantiatingGrpcChannelProvider.Builder commonTraits(
InstantiatingGrpcChannelProvider.Builder builder) {
return builder
.setChannelPoolSettings(
ChannelPoolSettings.builder()
.setInitialChannelCount(10)
.setMinRpcsPerChannel(1)
// Keep it conservative as we scale the channel size every 1min
// and delta is 2 channels.
.setMaxRpcsPerChannel(25)
.setPreemptiveRefreshEnabled(true)
.build())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval
.setKeepAliveTimeout(
Duration.ofSeconds(10)); // wait this long before considering the connection dead
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
InstantiatingGrpcChannelProvider.Builder grpcTransportProviderBuilder =
Expand All @@ -261,20 +280,24 @@ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProvi
Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS));
}
}
return grpcTransportProviderBuilder
.setChannelPoolSettings(
ChannelPoolSettings.builder()
.setInitialChannelCount(10)
.setMinRpcsPerChannel(1)
// Keep it conservative as we scale the channel size every 1min
// and delta is 2 channels.
.setMaxRpcsPerChannel(25)
.setPreemptiveRefreshEnabled(true)
.build())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval
.setKeepAliveTimeout(
Duration.ofSeconds(10)); // wait this long before considering the connection dead
return commonTraits(grpcTransportProviderBuilder);
}

/** Applies Direct Access traits (DirectPath & ALTS) to an existing builder. */
public static InstantiatingGrpcChannelProvider.Builder applyDirectAccessTraits(
InstantiatingGrpcChannelProvider.Builder builder) {

builder
.setAttemptDirectPathXds()
.setAttemptDirectPath(true)
.setAllowNonDefaultServiceAccount(true);

if (!DIRECT_PATH_BOUND_TOKEN_DISABLED) {
builder.setAllowHardBoundTokenTypes(
Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS));
}

return builder;
}

@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -609,12 +632,16 @@ private Builder() {

perOpSettings = new ClientOperationSettings.Builder();

// Note: RouteLookup evaluates and returns directpath targets
// only if Traffic Director sends the request (with grpc as target type)
// For GFE/CFE, sending setDirectAccessRequested
// is fine as GFE/CFE sends with gslb target type
featureFlags =
FeatureFlags.newBuilder()
.setReverseScans(true)
.setLastScannedRowResponses(true)
.setDirectAccessRequested(DIRECT_PATH_ENABLED)
.setTrafficDirectorEnabled(DIRECT_PATH_ENABLED)
.setDirectAccessRequested(true)
.setTrafficDirectorEnabled(true)
Comment on lines +643 to +644
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you ignoring the env var?

.setPeerInfo(true);
}

Expand Down
Loading
Loading