Skip to content

Commit 7f20b36

Browse files
authored
AWS: Do not create signer refresh executor when refresh is disabled (#7046)
1 parent 4fc9671 commit 7f20b36

File tree

1 file changed

+35
-19
lines changed

1 file changed

+35
-19
lines changed

aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Map;
2727
import java.util.concurrent.ScheduledExecutorService;
2828
import java.util.concurrent.TimeUnit;
29-
import java.util.concurrent.atomic.AtomicReference;
3029
import java.util.function.Consumer;
3130
import java.util.function.Supplier;
3231
import javax.annotation.Nullable;
@@ -70,10 +69,13 @@ public abstract class S3V4RestSignerClient
7069
private static final Cache<Key, SignedComponent> SIGNED_COMPONENT_CACHE =
7170
Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.SECONDS).maximumSize(100).build();
7271

73-
private static final ScheduledExecutorService TOKEN_REFRESH_EXECUTOR =
74-
ThreadPools.newScheduledPool("s3-signer-token-refresh", 1);
7572
private static final String SCOPE = "sign";
76-
private static final AtomicReference<RESTClient> HTTP_CLIENT_REF = new AtomicReference<>();
73+
74+
@SuppressWarnings("immutables:incompat")
75+
private static volatile ScheduledExecutorService tokenRefreshExecutor;
76+
77+
@SuppressWarnings("immutables:incompat")
78+
private static volatile RESTClient httpClient;
7779

7880
public abstract Map<String, String> properties();
7981

@@ -113,26 +115,44 @@ boolean keepTokenRefreshed() {
113115
OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT);
114116
}
115117

118+
private ScheduledExecutorService tokenRefreshExecutor() {
119+
if (!keepTokenRefreshed()) {
120+
return null;
121+
}
122+
123+
if (null == tokenRefreshExecutor) {
124+
synchronized (S3V4RestSignerClient.class) {
125+
if (null == tokenRefreshExecutor) {
126+
tokenRefreshExecutor = ThreadPools.newScheduledPool("s3-signer-token-refresh", 1);
127+
}
128+
}
129+
}
130+
131+
return tokenRefreshExecutor;
132+
}
133+
116134
private RESTClient httpClient() {
117-
if (null == HTTP_CLIENT_REF.get()) {
118-
// TODO: should be closed
119-
HTTP_CLIENT_REF.compareAndSet(
120-
null,
121-
HTTPClient.builder()
122-
.uri(baseSignerUri())
123-
.withObjectMapper(S3ObjectMapper.mapper())
124-
.build());
135+
if (null == httpClient) {
136+
synchronized (S3V4RestSignerClient.class) {
137+
if (null == httpClient) {
138+
httpClient =
139+
HTTPClient.builder()
140+
.uri(baseSignerUri())
141+
.withObjectMapper(S3ObjectMapper.mapper())
142+
.build();
143+
}
144+
}
125145
}
126146

127-
return HTTP_CLIENT_REF.get();
147+
return httpClient;
128148
}
129149

130150
private AuthSession authSession() {
131151
String token = token().get();
132152
if (null != token) {
133153
return AuthSession.fromAccessToken(
134154
httpClient(),
135-
keepTokenRefreshed() ? TOKEN_REFRESH_EXECUTOR : null,
155+
tokenRefreshExecutor(),
136156
token,
137157
expiresAtMillis(properties()),
138158
new AuthSession(ImmutableMap.of(), token, null, credential(), SCOPE));
@@ -144,11 +164,7 @@ private AuthSession authSession() {
144164
OAuthTokenResponse authResponse =
145165
OAuth2Util.fetchToken(httpClient(), session.headers(), credential(), SCOPE);
146166
return AuthSession.fromTokenResponse(
147-
httpClient(),
148-
keepTokenRefreshed() ? TOKEN_REFRESH_EXECUTOR : null,
149-
authResponse,
150-
startTimeMillis,
151-
session);
167+
httpClient(), tokenRefreshExecutor(), authResponse, startTimeMillis, session);
152168
}
153169

154170
return AuthSession.empty();

0 commit comments

Comments
 (0)