Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,13 @@ public enum Property {
"The amount of time a scan reference is unused before its deleted from metadata table.",
"2.1.0"),
@Experimental
SSERV_SCAN_ALLOWED_TABLES("sserver.scan.allowed.tables.group.", null, PropertyType.PREFIX,
"A regular expression that determines which tables are allowed to be scanned for"
+ " servers in the specified group. The property name should end with the scan server"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 4.0, with the changes in #5749 would not need to group name in the property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. There are probably other changes when merging in 4.0 as well.

+ " group and the property value should take into account the table namespace and name."
+ " The default value disallows scans on tables in the accumulo namespace.",
"2.1.5"),
@Experimental
SSERV_THREADCHECK("sserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
"The time between adjustments of the thrift server thread pool.", "2.1.0"),
// properties that are specific to tablet server behavior
Expand Down
133 changes: 122 additions & 11 deletions server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.tserver;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCAN_SERVER_TABLET_METADATA_CACHE_POOL;

Expand All @@ -45,6 +46,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -54,6 +58,7 @@
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.cluster.ClusterConfigParser;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
Expand Down Expand Up @@ -88,6 +93,7 @@
import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.AbstractServer;
Expand All @@ -111,6 +117,7 @@
import org.apache.accumulo.tserver.tablet.SnapshotTablet;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.accumulo.tserver.tablet.TabletBase;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -177,6 +184,8 @@ private TabletMetadataLoader(Ample ample) {
}

private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
// Default pattern to allow scans on all tables not in accumulo namespace
private static final String DEFAULT_SCAN_ALLOWED_PATTERN = "^(?!accumulo\\.).*$";

protected ThriftScanClientHandler delegate;
private UUID serverLockUUID;
Expand Down Expand Up @@ -213,6 +222,9 @@ private TabletMetadataLoader(Ample ample) {

private final String groupName;

private final ConcurrentHashMap<TableId,Boolean> allowedTables = new ConcurrentHashMap<>();
private volatile String currentAllowedTableRegex;

public ScanServer(ScanServerOpts opts, String[] args) {
super("sserver", opts, args);

Expand Down Expand Up @@ -388,6 +400,7 @@ public void run() {
}

SecurityUtil.serverLogin(getConfiguration());
updateAllowedTables(false);

ServerAddress address = null;
try {
Expand Down Expand Up @@ -423,6 +436,7 @@ public void run() {
Thread.sleep(1000);
updateIdleStatus(sessionManager.getActiveScans().isEmpty()
&& tabletMetadataCache.estimatedSize() == 0);
updateAllowedTables(false);
} catch (InterruptedException e) {
LOG.info("Interrupt Exception received, shutting down");
gracefulShutdown(getContext().rpcCreds());
Expand Down Expand Up @@ -477,6 +491,106 @@ public void run() {
}
}

// Visible for testing
protected boolean isAllowed(TCredentials credentials, TableId tid)
throws ThriftSecurityException {
Boolean result = allowedTables.get(tid);
if (result == null) {

final Retry retry =
Retry.builder().maxRetries(10).retryAfter(1, SECONDS).incrementBy(0, SECONDS)
.maxWait(2, SECONDS).backOffFactor(1.0).logInterval(3, SECONDS).createRetry();

while (result == null && retry.canRetry()) {
try {
retry.waitForNextAttempt(LOG,
"Allowed tables mapping does not contain an entry for table: " + tid
+ ", refreshing table...");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while waiting for next retry", e);
break;
}
// Clear the cache and try again, maybe there
// is a race condition in table creation and scan
updateAllowedTables(true);
// validate that the table exists, else throw
delegate.getNamespaceId(credentials, tid);
result = allowedTables.get(tid);
retry.useRetry();
}

if (result == null) {
// Ran out of retries
throw new IllegalStateException(
"Unable to get allowed table mapping for table: " + tid + " within 10s");
}
}
return result;
}

private synchronized void updateAllowedTables(boolean clearCache) {

LOG.trace("Updating allowed tables for ScanServer");
if (clearCache) {
context.clearTableListCache();
}

// Remove tables that no longer exist
allowedTables.keySet().forEach(tid -> {
if (!getContext().getTableIdToNameMap().containsKey(tid)) {
LOG.trace("Removing table {} from allowed table map as it no longer exists", tid);
allowedTables.remove(tid);
}
});

final String propName = Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + groupName;
String allowedTableRegex = getConfiguration().get(propName);
if (allowedTableRegex == null) {
allowedTableRegex = DEFAULT_SCAN_ALLOWED_PATTERN;
}

if (currentAllowedTableRegex == null) {
LOG.trace("Property {} initial value: {}", propName, allowedTableRegex);
} else if (currentAllowedTableRegex.equals(allowedTableRegex)) {
// Property value has not changed, do nothing
} else {
LOG.info("Property {} has changed. Old value: {}, new value: {}", propName,
currentAllowedTableRegex, allowedTableRegex);
}

Pattern allowedTablePattern;
try {
allowedTablePattern = Pattern.compile(allowedTableRegex);
// Regex is valid, store it
currentAllowedTableRegex = allowedTableRegex;
} catch (PatternSyntaxException e) {
LOG.error(
"Property {} contains an invalid regular expression. Property value: {}. Disabling all tables.",
propName, allowedTableRegex);
allowedTablePattern = null;
}

Pattern p = allowedTablePattern;
context.getTableNameToIdMap().entrySet().forEach(e -> {
String tname = e.getKey();
TableId tid = e.getValue();
if (p == null) {
allowedTables.put(tid, Boolean.FALSE);
} else {
Matcher m = p.matcher(tname);
if (m.matches()) {
LOG.trace("Table {} can now be scanned via this ScanServer", tname);
allowedTables.put(tid, Boolean.TRUE);
} else {
LOG.trace("Table {} cannot be scanned via this ScanServer", tname);
allowedTables.put(tid, Boolean.FALSE);
}
}
});

}

@SuppressWarnings("unchecked")
private Map<KeyExtent,TabletMetadata> getTabletMetadata(Collection<KeyExtent> extents) {
if (tabletMetadataCache == null) {
Expand Down Expand Up @@ -945,11 +1059,6 @@ public void close() {
};
}

/* Exposed for testing */
protected boolean isSystemUser(TCredentials creds) {
return context.getSecurityOperation().isSystemUser(creds);
}

@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
Expand All @@ -966,9 +1075,10 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t

KeyExtent extent = getKeyExtent(textent);

if (extent.isMeta() && !isSystemUser(credentials)) {
throw new TException(
"Only the system user can perform eventual consistency scans on the root and metadata tables");
if (!isAllowed(credentials, extent.tableId())) {
throw new TApplicationException(TApplicationException.INTERNAL_ERROR,
"Scan of table " + extent.tableId() + " disallowed by property: "
+ Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName);
}

try (ScanReservation reservation =
Expand Down Expand Up @@ -1038,9 +1148,10 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,
for (Entry<TKeyExtent,List<TRange>> entry : tbatch.entrySet()) {
KeyExtent extent = getKeyExtent(entry.getKey());

if (extent.isMeta() && !context.getSecurityOperation().isSystemUser(credentials)) {
throw new TException(
"Only the system user can perform eventual consistency scans on the root and metadata tables");
if (!isAllowed(credentials, extent.tableId())) {
throw new TApplicationException(TApplicationException.INTERNAL_ERROR,
"Scan of table " + extent.tableId() + " disallowed by property: "
+ Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName);
}

batch.put(extent, entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public ThriftScanClientHandler(TabletHostingServer server, WriteTracker writeTra
.getTimeInMillis(Property.TSERV_SCAN_RESULTS_MAX_TIMEOUT);
}

private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
public NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
throws ThriftSecurityException {
try {
return server.getContext().getNamespaceId(tableId);
Expand Down
Loading