Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -26,15 +28,16 @@
* Managed wrapper around VectorSchemaRoot that handles state transitions
* and provides thread-safe access for the ACTIVE/FROZEN lifecycle.
*/
public class ManagedVSR implements AutoCloseable {
public class ManagedVSR implements Closeable {

private static final Logger logger = LogManager.getLogger(ManagedVSR.class);

private final String id;
private final VectorSchemaRoot vsr;
private final BufferAllocator allocator;
private final AtomicReference<VSRState> state;
private final ReadWriteLock lock;
private final Lock readLock;
private final Lock writeLock;
private final long createdTime;
private final Map<String, Field> fields = new HashMap<>();

Expand All @@ -44,7 +47,9 @@ public ManagedVSR(String id, VectorSchemaRoot vsr, BufferAllocator allocator) {
this.vsr = vsr;
this.allocator = allocator;
this.state = new AtomicReference<>(VSRState.ACTIVE);
this.lock = new ReentrantReadWriteLock();
ReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.createdTime = System.currentTimeMillis();
for (Field field : vsr.getSchema().getFields()) {
fields.put(field.getName(), field);
Expand All @@ -68,11 +73,11 @@ public VectorSchemaRoot getVSR() {
* @return Number of rows currently in the VSR
*/
public int getRowCount() {
lock.readLock().lock();
readLock.lock();
try {
return vsr.getRowCount();
} finally {
lock.readLock().unlock();
readLock.unlock();
}
}

Expand All @@ -84,14 +89,14 @@ public int getRowCount() {
* @throws IllegalStateException if VSR is not active or is immutable
*/
public void setRowCount(int rowCount) {
lock.writeLock().lock();
writeLock.lock();
try {
if (state.get() != VSRState.ACTIVE) {
throw new IllegalStateException("Cannot modify VSR in state: " + state.get());
}
vsr.setRowCount(rowCount);
} finally {
lock.writeLock().unlock();
writeLock.unlock();
}
}

Expand All @@ -103,24 +108,20 @@ public void setRowCount(int rowCount) {
* @return FieldVector for the field, or null if not found
*/
public FieldVector getVector(String fieldName) {
lock.readLock().lock();
readLock.lock();
try {
return vsr.getVector(fields.get(fieldName));
} finally {
lock.readLock().unlock();
readLock.unlock();
}
}

/**
* Changes the state of this VSR.
* Handles state transition logic and immutability.
*
* @param newState New state to transition to
*/
public void setState(VSRState newState) {
VSRState oldState = state.getAndSet(newState);

logger.debug("State transition: {} -> {} for VSR {}", oldState, newState, id);
VSRState oldState;
do {
oldState = state.get();
oldState.validateTransition(newState);
} while (!state.compareAndSet(oldState, newState));
}

/**
Expand All @@ -146,7 +147,7 @@ public ArrowExport exportToArrow() {
throw new IllegalStateException("Cannot export VSR in state: " + currentState);
}

lock.readLock().lock();
readLock.lock();
try {
ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
Expand All @@ -156,12 +157,12 @@ public ArrowExport exportToArrow() {

return new ArrowExport(arrowArray, arrowSchema);
} finally {
lock.readLock().unlock();
readLock.unlock();
}
}

public ArrowExport exportSchema() {
lock.readLock().lock();
readLock.lock();
try {
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);

Expand All @@ -170,7 +171,7 @@ public ArrowExport exportSchema() {

return new ArrowExport(null, arrowSchema);
} finally {
lock.readLock().unlock();
readLock.unlock();
}
}

Expand Down Expand Up @@ -217,15 +218,15 @@ public BufferAllocator getAllocator() {
*/
@Override
public void close() {
lock.writeLock().lock();
writeLock.lock();
try {
if (state.get() != VSRState.CLOSED) {
state.set(VSRState.CLOSED);
vsr.close();
allocator.close();
}
} finally {
lock.writeLock().unlock();
writeLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public VSRPool(String poolId, Schema schema, ArrowBufferPool arrowBufferPool) {
* @return Active ManagedVSR for writing, or null if none exists
*/
public ManagedVSR getActiveVSR() {
return activeVSR.get();
synchronized (this) {
return activeVSR.get();
}
}

/**
Expand All @@ -69,8 +71,6 @@ public ManagedVSR getActiveVSR() {
*/
public boolean maybeRotateActiveVSR() throws IOException {
ManagedVSR current = activeVSR.get();

// Check if rotation is needed
if (current == null || !shouldRotateVSR(current)) {
return false; // No rotation needed
}
Expand All @@ -82,29 +82,16 @@ public boolean maybeRotateActiveVSR() throws IOException {
"system bottleneck or processing failure.");
}

// Safe to rotate - perform the rotation
synchronized (this) {
// Double-check conditions under lock
current = activeVSR.get();
if (current == null || !shouldRotateVSR(current)) {
return false; // Conditions changed while acquiring lock
if (current == null || !shouldRotateVSR(current) || frozenVSR.get() != null) {
return false;
}

// Check frozen slot again under lock
if (frozenVSR.get() != null) {
throw new IOException("Cannot rotate VSR: frozen slot became occupied during rotation");
}

// Freeze current VSR if it exists and has data
if (current != null && current.getRowCount() > 0) {
if (current.getRowCount() > 0) {
freezeVSR(current);
}

// Create new active VSR
ManagedVSR newActive = createNewVSR();
activeVSR.set(newActive);

return true; // Rotation occurred
activeVSR.set(createNewVSR());
return true;
}
}

Expand Down Expand Up @@ -143,13 +130,16 @@ public ManagedVSR getFrozenVSR() {
}

public void unsetFrozenVSR() throws IOException {
if (frozenVSR.get() == null) {
ManagedVSR vsr = frozenVSR.get();
if (vsr == null) {
throw new IOException("unsetFrozenVSR called when frozen VSR is not set");
}
if (!VSRState.CLOSED.equals(frozenVSR.get().getState())) {
throw new IOException("frozenVSR cannot be unset, state is " + frozenVSR.get().getState());
if (vsr.getState() != VSRState.CLOSED) {
throw new IOException("frozenVSR cannot be unset, state is " + vsr.getState());
}
if (!frozenVSR.compareAndSet(vsr, null)) {
throw new IOException("frozenVSR changed during unset");
}
frozenVSR.set(null);
}

/**
Expand Down Expand Up @@ -233,20 +223,16 @@ public void close() {
}

private void initializeActiveVSR() {
ManagedVSR initial = createNewVSR();
activeVSR.set(initial);
activeVSR.set(createNewVSR());
}

private ManagedVSR createNewVSR() {

String vsrId = poolId + "-vsr-" + vsrCounter.incrementAndGet();
BufferAllocator allocator = null;
VectorSchemaRoot vsr = null;

try {
allocator = bufferPool.createChildAllocator(vsrId);
vsr = VectorSchemaRoot.create(schema, allocator);

ManagedVSR managedVSR = new ManagedVSR(vsrId, vsr, allocator);
allVSRs.put(vsrId, managedVSR);

Expand All @@ -255,48 +241,20 @@ private ManagedVSR createNewVSR() {
} catch (Exception e) {
// Clean up resources on failure since ManagedVSR couldn't take ownership
if (vsr != null) {
try {
vsr.close();
} catch (Exception closeEx) {
e.addSuppressed(closeEx);
}
try { vsr.close(); } catch (Exception ex) { e.addSuppressed(ex); }
}
if (allocator != null) {
try {
allocator.close();
} catch (Exception closeEx) {
e.addSuppressed(closeEx);
}
try { allocator.close(); } catch (Exception ex) { e.addSuppressed(ex); }
}
throw new RuntimeException("Failed to create new VSR", e);
}
}

private void freezeVSR(ManagedVSR vsr) {
vsr.setState(VSRState.FROZEN);

// CRITICAL FIX: Check if frozen slot is already occupied
ManagedVSR previousFrozen = frozenVSR.get();
if (previousFrozen != null) {
// NEVER blindly overwrite a frozen VSR - this would cause data loss
logger.error("Attempting to freeze VSR when frozen slot is occupied! " +
"Previous VSR: {} ({} rows), New VSR: {} ({} rows). " +
"This indicates a logic error - frozen VSR should be consumed before replacement.",
previousFrozen.getId(), previousFrozen.getRowCount(),
vsr.getId(), vsr.getRowCount());

// Return VSR to ACTIVE state to prevent state corruption
vsr.setState(VSRState.ACTIVE);
throw new IllegalStateException("Cannot freeze VSR: frozen slot is occupied by unprocessed VSR " +
previousFrozen.getId() + ". This would cause data loss.");
}

// Safe to set frozen VSR since slot is empty
boolean success = frozenVSR.compareAndSet(null, vsr);
if (!success) {
// Race condition: another thread set frozen VSR between our check and set
if (!frozenVSR.compareAndSet(null, vsr)) {
vsr.setState(VSRState.ACTIVE);
throw new IllegalStateException("Race condition detected: frozen slot was occupied during freeze operation");
throw new IllegalStateException("Frozen slot occupied during freeze");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
package com.parquet.parquetdataformat.vsr;

import java.util.EnumSet;
import java.util.Set;

/**
* Represents the lifecycle states of a VectorSchemaRoot in the Project Mustang
* Parquet Writer Plugin architecture.
* Represents the lifecycle states of a VectorSchemaRoot.
* Valid transitions: ACTIVE -> FROZEN -> FLUSHING -> CLOSED
*/
public enum VSRState {
/**
* Currently accepting writes - the VSR is active and can be modified.
*/
ACTIVE,

/**
* Read-only state - VSR is frozen and queued for flush to Rust.
* No further modifications are allowed in this state.
*/
FROZEN,

/**
* Currently being processed by Rust - VSR is in the handoff process.
*/
FLUSHING,

/**
* Completed and cleaned up - VSR processing is complete and resources freed.
*/
CLOSED
CLOSED;

private static final Set<VSRState> ACTIVE_VALID_NEXT = EnumSet.of(FROZEN, CLOSED);
private static final Set<VSRState> FROZEN_VALID_NEXT = EnumSet.of(FLUSHING, CLOSED);
private static final Set<VSRState> FLUSHING_VALID_NEXT = EnumSet.of(CLOSED);
private static final Set<VSRState> CLOSED_VALID_NEXT = EnumSet.noneOf(VSRState.class);

public boolean canTransitionTo(VSRState next) {
switch (this) {
case ACTIVE: return ACTIVE_VALID_NEXT.contains(next);
case FROZEN: return FROZEN_VALID_NEXT.contains(next);
case FLUSHING: return FLUSHING_VALID_NEXT.contains(next);
case CLOSED: return CLOSED_VALID_NEXT.contains(next);
default: return false;
}
}

public void validateTransition(VSRState next) {
if (!canTransitionTo(next)) {
throw new IllegalStateException(
String.format("Invalid state transition: %s -> %s", this, next)
);
}
}
}
Loading