Skip to content

Commit ca1dfd3

Browse files
committed
SOLR-15045: DistributedZkUpdateProcessor should issue commits to local shards and remote shards in parallel (#545)
NOTE: base for this backport commit was cherry-picked from d195e4c; but the commit was amended as appropriate to include updates to the relevant files on `main` through b39bd76
1 parent 5c188b5 commit ca1dfd3

File tree

4 files changed

+216
-5
lines changed

4 files changed

+216
-5
lines changed

solr/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ Bug Fixes
3434

3535
* SOLR-16112: DefaultSolrHighlighter.doHighlighting to Query#rewrite multiple times if necessary. (Christine Poerschke)
3636

37+
* SOLR-15045: `DistributedZkUpdateProcessor` now issues commits to local shards and remote shards in parallel,
38+
halving the latency of synchronous commits (Michael Gibney)
39+
3740
Other Changes
3841
---------------------
3942
* SOLR-15897: Remove <jmx/> from all unit test solrconfig.xml files. (Eric Pugh)

solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
207207
// zk
208208
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
209209

210+
// TODO: revisit the need for tracking `issuedDistribCommit` -- see below, and SOLR-15045
211+
boolean issuedDistribCommit = false;
210212
List<SolrCmdDistributor.Node> useNodes = null;
211213
if (req.getParams().get(COMMIT_END_POINT) == null) {
212214
useNodes = nodes;
@@ -217,11 +219,17 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
217219
DISTRIB_FROM,
218220
ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
219221
cmdDistrib.distribCommit(cmd, useNodes, params);
220-
cmdDistrib.blockAndDoRetries();
222+
issuedDistribCommit = true;
221223
}
222224
}
223225

224226
if (isLeader) {
227+
if (issuedDistribCommit) {
228+
// defensive copy of params, which was passed into distribCommit(...) above; will
229+
// unconditionally replace DISTRIB_UPDATE_PARAM, COMMIT_END_POINT, and DISTRIB_FROM if the
230+
// new `params` val will actually be used
231+
params = new ModifiableSolrParams(params);
232+
}
225233
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
226234

227235
params.set(COMMIT_END_POINT, "replicas");
@@ -233,14 +241,22 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
233241
DISTRIB_FROM,
234242
ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
235243

244+
// NOTE: distribCommit(...) internally calls `blockAndDoRetries()`, flushing any TOLEADER
245+
// distrib commits
236246
cmdDistrib.distribCommit(cmd, useNodes, params);
247+
issuedDistribCommit = true;
237248
}
238249

239250
doLocalCommit(cmd);
240-
241-
if (useNodes != null) {
242-
cmdDistrib.blockAndDoRetries();
243-
}
251+
}
252+
if (issuedDistribCommit) {
253+
// TODO: according to discussion on SOLR-15045, this call (and all tracking of
254+
// `issuedDistribCommit`) may well be superfluous, and can probably simply be removed. It is
255+
// left in place for now, intentionally punting on the question of whether this internal
256+
// `blockAndDoRetries()` is necessary. At worst, its presence is misleading; but it should
257+
// be harmless, and allows the change fixing SOLR-15045 to be as tightly scoped as possible,
258+
// leaving the behavior of the code otherwise functionally equivalent (for better or worse!)
259+
cmdDistrib.blockAndDoRetries();
244260
}
245261
}
246262
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?xml version="1.0" ?>
2+
3+
<!--
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
-->
19+
20+
<!--
21+
Test Config for a simple Classification Update Request Processor Chain
22+
-->
23+
<config>
24+
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
25+
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="solrconfig.snippet.randomindexconfig.xml"/>
26+
<requestHandler name="/select" class="solr.SearchHandler"></requestHandler>
27+
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
28+
<schemaFactory class="ClassicIndexSchemaFactory"/>
29+
30+
<updateHandler class="solr.DirectUpdateHandler2">
31+
<updateLog enable="${enable.update.log:true}">
32+
<str name="dir">${solr.ulog.dir:}</str>
33+
</updateLog>
34+
35+
<commitWithin>
36+
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
37+
</commitWithin>
38+
39+
</updateHandler>
40+
41+
<requestHandler name="/update" class="solr.UpdateRequestHandler">
42+
<lst name="invariants">
43+
<str name="update.chain">ensure-parallel-commit</str>
44+
</lst>
45+
</requestHandler>
46+
47+
<updateProcessor class="org.apache.solr.cloud.ParallelCommitExecutionTest$CheckFactory" name="check"/>
48+
49+
<updateRequestProcessorChain name="ensure-parallel-commit" post-processor="check">
50+
<processor class="solr.RunUpdateProcessorFactory"/>
51+
</updateRequestProcessorChain>
52+
</config>
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.solr.cloud;
18+
19+
import java.io.IOException;
20+
import java.lang.invoke.MethodHandles;
21+
import java.nio.file.Path;
22+
import java.nio.file.Paths;
23+
import java.util.LinkedHashMap;
24+
import java.util.Map;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
import org.apache.lucene.util.TestUtil;
29+
import org.apache.solr.client.solrj.impl.CloudSolrClient;
30+
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
31+
import org.apache.solr.common.cloud.ZkStateReader;
32+
import org.apache.solr.request.SolrQueryRequest;
33+
import org.apache.solr.response.SolrQueryResponse;
34+
import org.apache.solr.update.CommitUpdateCommand;
35+
import org.apache.solr.update.processor.UpdateRequestProcessor;
36+
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
37+
import org.junit.AfterClass;
38+
import org.junit.BeforeClass;
39+
import org.junit.Test;
40+
41+
public class ParallelCommitExecutionTest extends SolrCloudTestCase {
42+
43+
private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName();
44+
private static final String COLLECTION_NAME = DEBUG_LABEL + "_collection";
45+
46+
/** A basic client for operations at the cloud level, default collection will be set */
47+
private static CloudSolrClient CLOUD_CLIENT;
48+
49+
private static int expectCount;
50+
51+
private static volatile CountDownLatch countdown;
52+
private static final AtomicInteger countup = new AtomicInteger();
53+
54+
@BeforeClass
55+
public static void beforeClass() throws Exception {
56+
// multi replicas matters; for the initial parallel commit execution tests, only consider
57+
// repFactor=1
58+
final int repFactor = 1; // random().nextBoolean() ? 1 : 2;
59+
final int numShards = TestUtil.nextInt(random(), 1, 4);
60+
final int numNodes = (numShards * repFactor);
61+
expectCount = numNodes;
62+
63+
final String configName = DEBUG_LABEL + "_config-set";
64+
final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf");
65+
66+
configureCluster(numNodes).addConfig(configName, configDir).configure();
67+
68+
Map<String, String> collectionProperties = new LinkedHashMap<>();
69+
collectionProperties.put("config", "solrconfig-parallel-commit.xml");
70+
collectionProperties.put("schema", "schema_latest.xml");
71+
CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
72+
.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
73+
.setProperties(collectionProperties)
74+
.process(cluster.getSolrClient());
75+
76+
CLOUD_CLIENT = cluster.getSolrClient();
77+
CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
78+
waitForRecoveriesToFinish(CLOUD_CLIENT);
79+
}
80+
81+
@AfterClass
82+
private static void afterClass() throws Exception {
83+
if (null != CLOUD_CLIENT) {
84+
CLOUD_CLIENT.close();
85+
CLOUD_CLIENT = null;
86+
}
87+
}
88+
89+
private static void initSyncVars() {
90+
final int ct;
91+
ct = expectCount;
92+
countdown = new CountDownLatch(ct);
93+
countup.set(0);
94+
}
95+
96+
@Test
97+
public void testParallelOk() throws Exception {
98+
initSyncVars();
99+
CLOUD_CLIENT.commit(true, true);
100+
assertEquals(0, countdown.getCount());
101+
assertEquals(expectCount, countup.get());
102+
}
103+
104+
public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception {
105+
assert null != client.getDefaultCollection();
106+
AbstractDistribZkTestBase.waitForRecoveriesToFinish(
107+
client.getDefaultCollection(), ZkStateReader.from(client), true, true, 330);
108+
}
109+
110+
public static class CheckFactory extends UpdateRequestProcessorFactory {
111+
@Override
112+
public UpdateRequestProcessor getInstance(
113+
SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
114+
return new Check(next);
115+
}
116+
}
117+
118+
public static class Check extends UpdateRequestProcessor {
119+
120+
public Check(UpdateRequestProcessor next) {
121+
super(next);
122+
}
123+
124+
@Override
125+
public void processCommit(CommitUpdateCommand cmd) throws IOException {
126+
super.processCommit(cmd);
127+
countdown.countDown();
128+
try {
129+
// NOTE: this ensures that all commits are executed in parallel; no commit can complete
130+
// successfully until all commits have entered the `processCommit(...)` method.
131+
if (!countdown.await(5, TimeUnit.SECONDS)) {
132+
throw new RuntimeException("done waiting");
133+
}
134+
countup.incrementAndGet();
135+
} catch (InterruptedException ex) {
136+
throw new RuntimeException(ex);
137+
}
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)