1919
2020package io .milvus .v2 .utils ;
2121
22- import io .grpc .ManagedChannel ;
23- import io .grpc .ManagedChannelBuilder ;
24- import io .grpc .Metadata ;
22+ import io .grpc .*;
2523import io .grpc .netty .shaded .io .grpc .netty .GrpcSslContexts ;
2624import io .grpc .netty .shaded .io .grpc .netty .NettyChannelBuilder ;
2725import io .grpc .netty .shaded .io .netty .handler .ssl .ApplicationProtocolConfig ;
3331import io .milvus .client .MilvusServiceClient ;
3432import io .milvus .grpc .*;
3533import io .milvus .v2 .client .ConnectConfig ;
36- import io .grpc .HttpConnectProxiedSocketAddress ;
37- import io .grpc .ProxiedSocketAddress ;
38- import io .grpc .ProxyDetector ;
3934import org .apache .commons .lang3 .StringUtils ;
40- import org .jetbrains .annotations .NotNull ;
4135import org .slf4j .Logger ;
4236import org .slf4j .LoggerFactory ;
4337
4741import java .net .UnknownHostException ;
4842import java .nio .charset .StandardCharsets ;
4943import java .time .LocalDateTime ;
44+ import java .util .ArrayList ;
5045import java .util .Base64 ;
46+ import java .util .List ;
5147import java .util .concurrent .TimeUnit ;
5248import java .net .InetSocketAddress ;
5349import java .net .SocketAddress ;
@@ -66,6 +62,30 @@ public ManagedChannel getChannel(ConnectConfig connectConfig){
6662 metadata .put (Metadata .Key .of ("dbname" , Metadata .ASCII_STRING_MARSHALLER ), connectConfig .getDbName ());
6763 }
6864
65+ List <ClientInterceptor > clientInterceptors = new ArrayList <>();
66+ clientInterceptors .add (MetadataUtils .newAttachHeadersInterceptor (metadata ));
67+ //client interceptor used to fetch client_request_id from threadlocal variable and set it for every grpc request
68+ clientInterceptors .add (new ClientInterceptor () {
69+ @ Override
70+ public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (MethodDescriptor <ReqT , RespT > method , CallOptions callOptions , Channel next ) {
71+ return new ForwardingClientCall
72+ .SimpleForwardingClientCall <ReqT , RespT >(next .newCall (method , callOptions )) {
73+ @ Override
74+ public void start (ClientCall .Listener <RespT > responseListener , Metadata headers ) {
75+ String currentMs = String .valueOf (System .currentTimeMillis ());
76+ headers .put (Metadata .Key .of ("client-request-unixmsec" , Metadata .ASCII_STRING_MARSHALLER ), currentMs );
77+ if (connectConfig .getClientRequestId () != null ) {
78+ String clientID = connectConfig .getClientRequestId ().get ();
79+ if (!StringUtils .isEmpty (clientID )) {
80+ headers .put (Metadata .Key .of ("client_request_id" , Metadata .ASCII_STRING_MARSHALLER ), clientID );
81+ }
82+ }
83+ super .start (responseListener , headers );
84+ }
85+ };
86+ }
87+ });
88+
6989 try {
7090 if (connectConfig .getSslContext () != null ) {
7191 // sslContext from connect config
@@ -77,7 +97,7 @@ public ManagedChannel getChannel(ConnectConfig connectConfig){
7797 .keepAliveTimeout (connectConfig .getKeepAliveTimeoutMs (), TimeUnit .MILLISECONDS )
7898 .keepAliveWithoutCalls (connectConfig .isKeepAliveWithoutCalls ())
7999 .idleTimeout (connectConfig .getIdleTimeoutMs (), TimeUnit .MILLISECONDS )
80- .intercept (MetadataUtils . newAttachHeadersInterceptor ( metadata ) );
100+ .intercept (clientInterceptors );
81101
82102 if (StringUtils .isNotEmpty (connectConfig .getProxyAddress ())) {
83103 configureProxy (builder , connectConfig .getProxyAddress ());
@@ -104,7 +124,7 @@ public ManagedChannel getChannel(ConnectConfig connectConfig){
104124 .keepAliveTimeout (connectConfig .getKeepAliveTimeoutMs (), TimeUnit .MILLISECONDS )
105125 .keepAliveWithoutCalls (connectConfig .isKeepAliveWithoutCalls ())
106126 .idleTimeout (connectConfig .getIdleTimeoutMs (), TimeUnit .MILLISECONDS )
107- .intercept (MetadataUtils . newAttachHeadersInterceptor ( metadata ) );
127+ .intercept (clientInterceptors );
108128
109129 if (StringUtils .isNotEmpty (connectConfig .getProxyAddress ())) {
110130 configureProxy (builder , connectConfig .getProxyAddress ());
@@ -130,7 +150,7 @@ public ManagedChannel getChannel(ConnectConfig connectConfig){
130150 .keepAliveTimeout (connectConfig .getKeepAliveTimeoutMs (), TimeUnit .MILLISECONDS )
131151 .keepAliveWithoutCalls (connectConfig .isKeepAliveWithoutCalls ())
132152 .idleTimeout (connectConfig .getIdleTimeoutMs (), TimeUnit .MILLISECONDS )
133- .intercept (MetadataUtils . newAttachHeadersInterceptor ( metadata ) );
153+ .intercept (clientInterceptors );
134154
135155 if (StringUtils .isNotEmpty (connectConfig .getProxyAddress ())) {
136156 configureProxy (builder , connectConfig .getProxyAddress ());
@@ -152,7 +172,7 @@ public ManagedChannel getChannel(ConnectConfig connectConfig){
152172 .keepAliveTimeout (connectConfig .getKeepAliveTimeoutMs (), TimeUnit .MILLISECONDS )
153173 .keepAliveWithoutCalls (connectConfig .isKeepAliveWithoutCalls ())
154174 .idleTimeout (connectConfig .getIdleTimeoutMs (), TimeUnit .MILLISECONDS )
155- .intercept (MetadataUtils . newAttachHeadersInterceptor ( metadata ) );
175+ .intercept (clientInterceptors );
156176 if (StringUtils .isNotEmpty (connectConfig .getProxyAddress ())) {
157177 configureProxy (builder , connectConfig .getProxyAddress ());
158178 }
0 commit comments