Skip to content

Conversation

@minal-kyada
Copy link

Summary

Implements replica-side write threshold warnings to detect and warn about writes to large partitions or partitions with many tombstones. This follows the same architecture as the existing read threshold warnings but is warning-only (no blocking behavior).

Related Work

This implementation follows the pattern established by read threshold warnings but adapts it for write operations with a warning-only approach.

patch by Minal Kyada reviewed by TBD for CASSANDRA-17258


public volatile boolean write_thresholds_enabled = false;
public volatile DataStorageSpec.LongBytesBound write_size_warn_threshold = null;
public volatile int write_tombstone_warn_threshold = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use -1 for disabled. The reason that

    public volatile int tombstone_warn_threshold = 1000;
    public volatile int tombstone_failure_threshold = 100000;

have values is that we retro fit those old behaviors into read_thresholds, which fixed their behavior to work better. They are the only configs enabled by default and have been enabled for years.

read thresholds has been around for awhile, so I might send a patch to enable things by default, but that still won't change this patch as this patch adds a new feature; so should be off by default.

{
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
totalTombstones += update.affectedRowCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method doesn't have anything to do with tombstones

{
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
totalSize += update.dataSize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the metric we actually want? this is how large a single mutation is, but we won't warn that its writing to a partition thats "too large"?

Comment on lines 661 to 662
maybeWarnWriteSize(mutations, options);
maybeWarnWriteTombstones(mutations, options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this logic here rather than in the write coordination like read is? Wasn't the assumption that we check replica state which wouldn't be possible here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-flight check at the co-ordinator side, it's tracking large write operations, I added this because today we do have a pre-check on large read operations, and we take actions against those calls, should we have a similar implementation for write too?

Additionally, I have implemented replica side write threshold check too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do have a pre-check on large read operations

are those not different checks? There is read_size which is replica, and coordinator_read_size which is how much the coordinator used; this check has 2 very different definitions and won't be clear from client point of view what happens if these checks actually return a warn

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the co-ordinator side checks can be guardrailed for read and write both. Removing these changes from this PR,a s it needs another one.

Comment on lines 40 to 42
Message<NoPayload> reply = respondTo.emptyResponse();
reply = MessageParams.addToMessage(reply);
MessagingService.instance().send(reply, respondToAddress);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can make this a single line

MessagingService.instance().send(MessageParams.addToMessage(respondTo.emptyResponse()), respondToAddress);

DataStorageSpec.LongBytesBound sizeWarnThreshold = DatabaseDescriptor.getWriteSizeWarnThreshold();
int tombstoneWarnThreshold = DatabaseDescriptor.getWriteTombstoneWarnThreshold();

if (sizeWarnThreshold == null && tombstoneWarnThreshold == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tombstoneWarnThreshold == -1

Long currentSize = org.apache.cassandra.db.MessageParams.get(ParamType.WRITE_SIZE_WARN);
if (currentSize == null || currentSize < estimatedSize)
{
org.apache.cassandra.db.MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we import this class to make this easier to read?

long estimatedSize = cfs.topPartitions.topSizes().getEstimate(key);
long estimatedTombstones = cfs.topPartitions.topTombstones().getEstimate(key);

if (sizeWarnBytes > 0 && estimatedSize > sizeWarnBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (sizeWarnBytes > 0 && estimatedSize > sizeWarnBytes)
if (sizeWarnBytes != -1 && estimatedSize > sizeWarnBytes)

Should be consistent. -1 is the "not enabled" case, so should check for that

org.apache.cassandra.db.MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize);

TableMetadata meta = update.metadata();
String pk = meta.partitionKeyType.getString(key.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think toCQLString is better than getString.


TableMetadata meta = update.metadata();
String pk = meta.partitionKeyType.getString(key.getKey());
logger.warn("Write to {}.{} partition {} triggered size warning; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could spam, can we use a NoSpamLogger and only log once a minute?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks for highlighting this!


TableMetadata meta = update.metadata();
String pk = meta.partitionKeyType.getString(key.getKey());
logger.warn("Write to {}.{} partition {} triggered tombstone warning; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could spam, can we use a NoSpamLogger and log once a minute?

org.apache.cassandra.db.MessageParams.add(ParamType.WRITE_TOMBSTONE_WARN, (int) estimatedTombstones);

TableMetadata meta = update.metadata();
String pk = meta.partitionKeyType.getString(key.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, toCQLString is prob best here

String pk = meta.partitionKeyType.getString(key.getKey());
logger.warn("Write to {}.{} partition {} triggered size warning; " +
"estimated size is {} bytes, threshold is {} bytes (see write_size_warn_threshold)",
meta.keyspace, meta.name, pk, estimatedSize, sizeWarnBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the toString for TableMetadata is keyspace.name so you could simplify this and remove {}.{} and just do {} and pass in meta. It will also do the correct quoting if needed

String pk = meta.partitionKeyType.getString(key.getKey());
logger.warn("Write to {}.{} partition {} triggered tombstone warning; " +
"estimated tombstone count is {}, threshold is {} (see write_tombstone_warn_threshold)",
meta.keyspace, meta.name, pk, estimatedTombstones, tombstoneWarnThreshold);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the toString for TableMetadata is keyspace.name so you could simplify this and remove {}.{} and just do {} and pass in meta. It will also do the correct quoting if needed

DataStorageSpec.LongBytesBound sizeWarnThreshold = DatabaseDescriptor.getWriteSizeWarnThreshold();
int tombstoneWarnThreshold = DatabaseDescriptor.getWriteTombstoneWarnThreshold();

if (sizeWarnThreshold == null && tombstoneWarnThreshold == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (sizeWarnThreshold == null && tombstoneWarnThreshold == 0)
if (sizeWarnThreshold == null && tombstoneWarnThreshold == -1)

{
top.pollLast();
TopPartition p = top.pollLast();
lookup.remove(p.key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you remove from the lookup but you don't add?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, missed the lookup.put() call. Thanks!

if (!Paxos.isInRangeAndShouldProcess(from, agreed.update.partitionKey(), agreed.update.metadata(), false))
return null;

WriteThresholds.checkWriteThresholds(agreed.update, agreed.update.partitionKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not sure if this would actually work in practice

if (executeOnSelf)
        {
            ExecutorPlus executor = PAXOS_COMMIT_REQ.stage.executor();
            if (async) executor.execute(this::executeOnSelf);
            else executor.maybeExecuteImmediately(this::executeOnSelf);
        }

in some code paths we do a blocking execute, in others we do it async and ignore when it completes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we hold onto making these changes in this patch.

* Accumulates warnings from multiple write operations in a single client request,
* then sends them to the client and updates metrics.
*/
public class CoordinatorWriteWarnings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a lot of overlap with CoordinatorWarnings, can we refactor so read/write can use 80% the same logic and only the read/write related changes need to be custom?

Comment on lines +34 to +37
maxWarningValue.accumulateAndGet(value, Math::max);
warnings.add(from);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't thread safe... what is the concurrency expecations for this method?

Copy link
Author

@minal-kyada minal-kyada Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It follows the same concurrency pattern as the read flow's present WarnAbortCounter.

Concurrency expectation:

  • Writers: Multiple n/w callback threads calling it concurrently.
  • Reader: Single coordinator thread calling concurrently with writers.

According to me the thread safety approach would be like update value first and add instance second.

This would basically ensure the ThresholdCounter handles the race condition correctly for read and write flows both, as the comment says:
call add last so concurrent reads see empty even if values > 0; if done in different order then size=1 could have values == 0

I can add this comment for Write flow to make this ordering explicit. Would that address your concern, or did you have a different concurrency scenario in mind ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add the following comment, which is from the read side

        // call add last so concurrent reads see empty even if values > 0; if done in different order then
        // size=1 could have values == 0

When someone reads this code it can come off as a bug; without a comment explaining then future readers may break this without understanding it.

@minal-kyada minal-kyada force-pushed the cassandra-17258 branch 2 times, most recently from 803344e to d49ca1c Compare January 16, 2026 23:11
/**
* Utility class for checking write threshold warnings on replicas.
*/
public class WriteThresholds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paxos / accord were left out of this patch intentionally. Can you leave a comment in this class to denote that?

something like

// CASSANDRA-17258: paxos and accord do complex thread hand off and custom write logic which makes this patch complex, so was deferred

}
else
{
logger.warn("Cannot propagate write warnings to coordinator because hintOnFailure is null. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you use grep or intellij indexes to find where this comes from you will see BatchlogResponseHandler makes this null; which isn't a programming error. The field also annotates

    private @Nullable final Supplier<Mutation> hintOnFailure;

denoted that this is in fact expected behavior

  1. logger.warn can spam and flood operator logs, be very careful of this in the hot path
  2. null looks like expected behavior for batch log, so not being able to update is fine

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will keep an eye on it hence forth and update this as well.

long estimatedSize = cfs.topPartitions.topSizes().getEstimate(key);
long estimatedTombstones = cfs.topPartitions.topTombstones().getEstimate(key);
TableMetadata meta = update.metadata();
String pk = meta.partitionKeyType.toCQLString(key.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a costly operation and happens for every update, we should avoid creating this string unless we need to; which means at least one warning happened

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a common pattern for this is to have String pk = null and w/e you enter the if block you check if null and compute it, that way you do it once and only do it if needed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, let me change that!

@dcapwell
Copy link
Contributor

Minor Issues & Nitpicks

🔵 Style: Missing newline at end of several files

Severity: Minor
Location: Multiple files including CoordinatorWriteWarnings.java, WarnCounter.java, WriteWarningContext.java, etc.

The diff shows \ No newline at end of file for many new files. This should be fixed for consistency with the rest of the codebase.

local_read_size_fail_threshold: 8192KiB
row_index_read_size_warn_threshold: 4096KiB
row_index_read_size_fail_threshold: 8192KiB
write_size_warn_threshold: 4096KiB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you also need write_thresholds_enabled: true?

Comment on lines 92 to 93
Pair<TableId, DecoratedKey> key = Pair.create(update.metadata().id, update.partitionKey());
warnings.merge(key, snapshot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Pair<TableId, DecoratedKey> key = Pair.create(update.metadata().id, update.partitionKey());
warnings.merge(key, snapshot);
warnings.merge(update.metadata().id, update.partitionKey(), snapshot);

code is easier to read if you push creating the Pair into the merge function. This is also a leaky detail as your API exposes how the internal data is structured.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

}

TableMetadata metadata = cfs.metadata();
String partitionKey = metadata.partitionKeyType.getString(key.right.getKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String partitionKey = metadata.partitionKeyType.getString(key.right.getKey());
String partitionKey = metadata.partitionKeyType.toCQLString(key.right.getKey());

getString is almost never what you want sadly; toCQLString is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, will make sure I use CQLString hence forth!


public volatile boolean write_thresholds_enabled = false;
public volatile DataStorageSpec.LongBytesBound write_size_warn_threshold = null;
public volatile int write_tombstone_warn_threshold = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see this validated in DatabaseDescriptor.apply can you make sure we validate? -1, 0+ seem to be the only valid values, but where is this enforced?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take a look at applyReadThresholdsValidations. Prob best to just rename that method and put the check there =)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For write_size_warn_threshold:
The validation is enforced by the DataStorageSpec type itself and the constructor validates that any non-null value must be >= 0. For this type, null represents "disabled" (equivalent to -1 for int fields). So technically the valid values are either null OR >= 0

I think it will be a redundant validation if I add soemthing similar to read thresholds, what do you think ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants