diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/ManagedVSR.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/ManagedVSR.java index 0d486ba7e8f32..4771977dbf705 100644 --- a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/ManagedVSR.java +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/ManagedVSR.java @@ -14,9 +14,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.Closeable; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -26,7 +28,7 @@ * Managed wrapper around VectorSchemaRoot that handles state transitions * and provides thread-safe access for the ACTIVE/FROZEN lifecycle. */ -public class ManagedVSR implements AutoCloseable { +public class ManagedVSR implements Closeable { private static final Logger logger = LogManager.getLogger(ManagedVSR.class); @@ -34,7 +36,8 @@ public class ManagedVSR implements AutoCloseable { private final VectorSchemaRoot vsr; private final BufferAllocator allocator; private final AtomicReference state; - private final ReadWriteLock lock; + private final Lock readLock; + private final Lock writeLock; private final long createdTime; private final Map fields = new HashMap<>(); @@ -44,7 +47,9 @@ public ManagedVSR(String id, VectorSchemaRoot vsr, BufferAllocator allocator) { this.vsr = vsr; this.allocator = allocator; this.state = new AtomicReference<>(VSRState.ACTIVE); - this.lock = new ReentrantReadWriteLock(); + ReadWriteLock rwLock = new ReentrantReadWriteLock(); + this.readLock = rwLock.readLock(); + this.writeLock = rwLock.writeLock(); this.createdTime = System.currentTimeMillis(); for (Field field : vsr.getSchema().getFields()) { fields.put(field.getName(), field); @@ -68,11 +73,11 @@ public VectorSchemaRoot getVSR() { * @return Number of rows currently in the VSR */ public int getRowCount() { - lock.readLock().lock(); + readLock.lock(); try { return vsr.getRowCount(); } finally { - lock.readLock().unlock(); + readLock.unlock(); } } @@ -84,14 +89,14 @@ public int getRowCount() { * @throws IllegalStateException if VSR is not active or is immutable */ public void setRowCount(int rowCount) { - lock.writeLock().lock(); + writeLock.lock(); try { if (state.get() != VSRState.ACTIVE) { throw new IllegalStateException("Cannot modify VSR in state: " + state.get()); } vsr.setRowCount(rowCount); } finally { - lock.writeLock().unlock(); + writeLock.unlock(); } } @@ -103,24 +108,20 @@ public void setRowCount(int rowCount) { * @return FieldVector for the field, or null if not found */ public FieldVector getVector(String fieldName) { - lock.readLock().lock(); + readLock.lock(); try { return vsr.getVector(fields.get(fieldName)); } finally { - lock.readLock().unlock(); + readLock.unlock(); } } - /** - * Changes the state of this VSR. - * Handles state transition logic and immutability. - * - * @param newState New state to transition to - */ public void setState(VSRState newState) { - VSRState oldState = state.getAndSet(newState); - - logger.debug("State transition: {} -> {} for VSR {}", oldState, newState, id); + VSRState oldState; + do { + oldState = state.get(); + oldState.validateTransition(newState); + } while (!state.compareAndSet(oldState, newState)); } /** @@ -146,7 +147,7 @@ public ArrowExport exportToArrow() { throw new IllegalStateException("Cannot export VSR in state: " + currentState); } - lock.readLock().lock(); + readLock.lock(); try { ArrowArray arrowArray = ArrowArray.allocateNew(allocator); ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator); @@ -156,12 +157,12 @@ public ArrowExport exportToArrow() { return new ArrowExport(arrowArray, arrowSchema); } finally { - lock.readLock().unlock(); + readLock.unlock(); } } public ArrowExport exportSchema() { - lock.readLock().lock(); + readLock.lock(); try { ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator); @@ -170,7 +171,7 @@ public ArrowExport exportSchema() { return new ArrowExport(null, arrowSchema); } finally { - lock.readLock().unlock(); + readLock.unlock(); } } @@ -217,7 +218,7 @@ public BufferAllocator getAllocator() { */ @Override public void close() { - lock.writeLock().lock(); + writeLock.lock(); try { if (state.get() != VSRState.CLOSED) { state.set(VSRState.CLOSED); @@ -225,7 +226,7 @@ public void close() { allocator.close(); } } finally { - lock.writeLock().unlock(); + writeLock.unlock(); } } diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRPool.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRPool.java index 4c3317200b712..765dd78683b72 100644 --- a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRPool.java +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRPool.java @@ -57,7 +57,9 @@ public VSRPool(String poolId, Schema schema, ArrowBufferPool arrowBufferPool) { * @return Active ManagedVSR for writing, or null if none exists */ public ManagedVSR getActiveVSR() { - return activeVSR.get(); + synchronized (this) { + return activeVSR.get(); + } } /** @@ -69,8 +71,6 @@ public ManagedVSR getActiveVSR() { */ public boolean maybeRotateActiveVSR() throws IOException { ManagedVSR current = activeVSR.get(); - - // Check if rotation is needed if (current == null || !shouldRotateVSR(current)) { return false; // No rotation needed } @@ -82,29 +82,16 @@ public boolean maybeRotateActiveVSR() throws IOException { "system bottleneck or processing failure."); } - // Safe to rotate - perform the rotation synchronized (this) { - // Double-check conditions under lock current = activeVSR.get(); - if (current == null || !shouldRotateVSR(current)) { - return false; // Conditions changed while acquiring lock + if (current == null || !shouldRotateVSR(current) || frozenVSR.get() != null) { + return false; } - - // Check frozen slot again under lock - if (frozenVSR.get() != null) { - throw new IOException("Cannot rotate VSR: frozen slot became occupied during rotation"); - } - - // Freeze current VSR if it exists and has data - if (current != null && current.getRowCount() > 0) { + if (current.getRowCount() > 0) { freezeVSR(current); } - - // Create new active VSR - ManagedVSR newActive = createNewVSR(); - activeVSR.set(newActive); - - return true; // Rotation occurred + activeVSR.set(createNewVSR()); + return true; } } @@ -143,13 +130,16 @@ public ManagedVSR getFrozenVSR() { } public void unsetFrozenVSR() throws IOException { - if (frozenVSR.get() == null) { + ManagedVSR vsr = frozenVSR.get(); + if (vsr == null) { throw new IOException("unsetFrozenVSR called when frozen VSR is not set"); } - if (!VSRState.CLOSED.equals(frozenVSR.get().getState())) { - throw new IOException("frozenVSR cannot be unset, state is " + frozenVSR.get().getState()); + if (vsr.getState() != VSRState.CLOSED) { + throw new IOException("frozenVSR cannot be unset, state is " + vsr.getState()); + } + if (!frozenVSR.compareAndSet(vsr, null)) { + throw new IOException("frozenVSR changed during unset"); } - frozenVSR.set(null); } /** @@ -233,20 +223,16 @@ public void close() { } private void initializeActiveVSR() { - ManagedVSR initial = createNewVSR(); - activeVSR.set(initial); + activeVSR.set(createNewVSR()); } private ManagedVSR createNewVSR() { - String vsrId = poolId + "-vsr-" + vsrCounter.incrementAndGet(); BufferAllocator allocator = null; VectorSchemaRoot vsr = null; - try { allocator = bufferPool.createChildAllocator(vsrId); vsr = VectorSchemaRoot.create(schema, allocator); - ManagedVSR managedVSR = new ManagedVSR(vsrId, vsr, allocator); allVSRs.put(vsrId, managedVSR); @@ -255,18 +241,10 @@ private ManagedVSR createNewVSR() { } catch (Exception e) { // Clean up resources on failure since ManagedVSR couldn't take ownership if (vsr != null) { - try { - vsr.close(); - } catch (Exception closeEx) { - e.addSuppressed(closeEx); - } + try { vsr.close(); } catch (Exception ex) { e.addSuppressed(ex); } } if (allocator != null) { - try { - allocator.close(); - } catch (Exception closeEx) { - e.addSuppressed(closeEx); - } + try { allocator.close(); } catch (Exception ex) { e.addSuppressed(ex); } } throw new RuntimeException("Failed to create new VSR", e); } @@ -274,29 +252,9 @@ private ManagedVSR createNewVSR() { private void freezeVSR(ManagedVSR vsr) { vsr.setState(VSRState.FROZEN); - - // CRITICAL FIX: Check if frozen slot is already occupied - ManagedVSR previousFrozen = frozenVSR.get(); - if (previousFrozen != null) { - // NEVER blindly overwrite a frozen VSR - this would cause data loss - logger.error("Attempting to freeze VSR when frozen slot is occupied! " + - "Previous VSR: {} ({} rows), New VSR: {} ({} rows). " + - "This indicates a logic error - frozen VSR should be consumed before replacement.", - previousFrozen.getId(), previousFrozen.getRowCount(), - vsr.getId(), vsr.getRowCount()); - - // Return VSR to ACTIVE state to prevent state corruption - vsr.setState(VSRState.ACTIVE); - throw new IllegalStateException("Cannot freeze VSR: frozen slot is occupied by unprocessed VSR " + - previousFrozen.getId() + ". This would cause data loss."); - } - - // Safe to set frozen VSR since slot is empty - boolean success = frozenVSR.compareAndSet(null, vsr); - if (!success) { - // Race condition: another thread set frozen VSR between our check and set + if (!frozenVSR.compareAndSet(null, vsr)) { vsr.setState(VSRState.ACTIVE); - throw new IllegalStateException("Race condition detected: frozen slot was occupied during freeze operation"); + throw new IllegalStateException("Frozen slot occupied during freeze"); } } diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRState.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRState.java index cd55f30ca24cc..638c86689f8a2 100644 --- a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRState.java +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRState.java @@ -1,28 +1,38 @@ package com.parquet.parquetdataformat.vsr; +import java.util.EnumSet; +import java.util.Set; + /** - * Represents the lifecycle states of a VectorSchemaRoot in the Project Mustang - * Parquet Writer Plugin architecture. + * Represents the lifecycle states of a VectorSchemaRoot. + * Valid transitions: ACTIVE -> FROZEN -> FLUSHING -> CLOSED */ public enum VSRState { - /** - * Currently accepting writes - the VSR is active and can be modified. - */ ACTIVE, - - /** - * Read-only state - VSR is frozen and queued for flush to Rust. - * No further modifications are allowed in this state. - */ FROZEN, - - /** - * Currently being processed by Rust - VSR is in the handoff process. - */ FLUSHING, - - /** - * Completed and cleaned up - VSR processing is complete and resources freed. - */ - CLOSED + CLOSED; + + private static final Set ACTIVE_VALID_NEXT = EnumSet.of(FROZEN, CLOSED); + private static final Set FROZEN_VALID_NEXT = EnumSet.of(FLUSHING, CLOSED); + private static final Set FLUSHING_VALID_NEXT = EnumSet.of(CLOSED); + private static final Set CLOSED_VALID_NEXT = EnumSet.noneOf(VSRState.class); + + public boolean canTransitionTo(VSRState next) { + switch (this) { + case ACTIVE: return ACTIVE_VALID_NEXT.contains(next); + case FROZEN: return FROZEN_VALID_NEXT.contains(next); + case FLUSHING: return FLUSHING_VALID_NEXT.contains(next); + case CLOSED: return CLOSED_VALID_NEXT.contains(next); + default: return false; + } + } + + public void validateTransition(VSRState next) { + if (!canTransitionTo(next)) { + throw new IllegalStateException( + String.format("Invalid state transition: %s -> %s", this, next) + ); + } + } }