diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..572bd41 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,12 @@ +[target.aarch64-unknown-linux-gnu] +linker = "aarch64-linux-gnu-gcc" + +[env] +CC_aarch64_unknown_linux_gnu = "aarch64-linux-gnu-gcc" +CXX_aarch64_unknown_linux_gnu = "aarch64-linux-gnu-g++" +AR_aarch64_unknown_linux_gnu = "aarch64-linux-gnu-ar" +CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER = "aarch64-linux-gnu-gcc" + +# Use vendored OpenSSL to avoid cross-compilation issues +OPENSSL_STATIC = "true" +OPENSSL_VENDORED = "true" \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 283ba8e..19618ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ panic = "abort" # Remove panic unwinding code strip = true # Strip symbols from binary debug = false # No debug symbols + # Use faster memory allocator [profile.release.package."*"] codegen-units = 1 diff --git a/build-docker/alpine/Dockerfile b/build-docker/alpine/Dockerfile index b7e2547..1628e7e 100644 --- a/build-docker/alpine/Dockerfile +++ b/build-docker/alpine/Dockerfile @@ -2,21 +2,26 @@ FROM rust:alpine AS builder WORKDIR /app -# Install build dependencies +# Better build environment for Rust + musl RUN apk add --no-cache \ build-base \ cmake \ - musl-dev\ + musl-dev \ + pkgconfig \ perl \ - perl-utils \ - perl-dev \ sqlite-dev \ - openssl-dev + openssl-dev \ + openssl-libs-static + +# Optimize Rust compilation for musl +ENV RUSTFLAGS="-C target-feature=-crt-static -C link-arg=-s" +ENV CARGO_BUILD_TARGET="x86_64-unknown-linux-musl" +RUN rustup target add x86_64-unknown-linux-musl COPY . . -RUN cargo build -p router-cli --release -RUN cargo build -p router-core --release -RUN cargo build -p router-api --release +RUN cargo build -p router-cli --release --target x86_64-unknown-linux-musl +RUN cargo build -p router-core --release --target x86_64-unknown-linux-musl +RUN cargo build -p router-api --release --target x86_64-unknown-linux-musl # Runtime image stage FROM alpine:latest diff --git a/build-docker/debian/Dockerfile b/build-docker/debian/Dockerfile index a984c5f..2701f9d 100644 --- a/build-docker/debian/Dockerfile +++ b/build-docker/debian/Dockerfile @@ -1,12 +1,20 @@ # Build stage FROM rust:bookworm AS builder WORKDIR /app + # Install build dependencies RUN apt-get update && apt-get install -y \ + build-essential \ cmake \ + perl \ + libperl-dev \ + libsqlite3-dev \ + libssl-dev \ + pkg-config \ && rm -rf /var/lib/apt/lists/* COPY . . +RUN cargo build -p router-cli --release RUN cargo build -p router-core --release RUN cargo build -p router-api --release @@ -14,43 +22,26 @@ RUN cargo build -p router-api --release FROM debian:bookworm-slim WORKDIR /app -# Install systemd and other dependencies +# Install minimal runtime dependencies RUN apt-get update && apt-get install -y \ - systemd \ - systemd-sysv \ ca-certificates \ + bash \ + libsqlite3-0 \ + libssl3 \ procps \ + util-linux \ + coreutils \ && rm -rf /var/lib/apt/lists/* # Copy binaries from builder COPY --from=builder /app/target/release/router-core /usr/local/bin/router-core COPY --from=builder /app/target/release/router-api /usr/local/bin/router-api +COPY --from=builder /app/target/release/router-cli /usr/local/bin/gwrs -# Create necessary directories -RUN mkdir -p /opt/gwrs/bin && \ - mkdir -p /opt/gwrs/conf && \ - mkdir -p /tmp/gwrs/log - -# Create symlinks to binaries -RUN ln -sf /usr/local/bin/router-core /opt/gwrs/bin/router-core && \ - ln -sf /usr/local/bin/router-api /opt/gwrs/bin/router-api - -# Create systemd service files -RUN mkdir -p /etc/systemd/system -COPY build-docker/debian/router-core.service /etc/systemd/system/gwrs-core.service -COPY build-docker/debian/router-api.service /etc/systemd/system/gwrs-api.service - -# Enable services -RUN systemctl enable gwrs-core.service -RUN systemctl enable gwrs-api.service - -# Set up entrypoint script -COPY build-docker/debian/entrypoint.sh /entrypoint.sh -RUN chmod +x /entrypoint.sh +COPY build-docker/debian/entrypoint.sh /usr/local/bin/entrypoint.sh -# Use systemd as command, with a fallback path -ENTRYPOINT ["/entrypoint.sh"] -CMD ["/lib/systemd/systemd"] +# Make everything executable +RUN chmod +x /usr/local/bin/* -# Expose API -EXPOSE 24042 \ No newline at end of file +# Set entrypoint +ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] \ No newline at end of file diff --git a/build-docker/debian/entrypoint.sh b/build-docker/debian/entrypoint.sh index 4391186..03f5062 100644 --- a/build-docker/debian/entrypoint.sh +++ b/build-docker/debian/entrypoint.sh @@ -1,29 +1,133 @@ #!/bin/bash set -e -# Prepare systemd for container environment -if [ ! -d /run/systemd/system ]; then - mkdir -p /run/systemd/system -fi - -# Check for the correct init path -INIT_PATH="" -for path in /sbin/init /lib/systemd/systemd /usr/lib/systemd/systemd /bin/systemd; do - if [ -x "$path" ]; then - INIT_PATH="$path" - break +# Simple configuration +LOG_DIR="/tmp/gwrs/log" +PID_DIR="/tmp/gwrs/pids" +CHECK_INTERVAL=5 + +# Create directories +mkdir -p "$LOG_DIR" "$PID_DIR" + +# Logging +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_DIR/manager.log" +} + +# Start core service +start_core() { + log "Starting router-core..." + nohup /usr/local/bin/router-core > "$LOG_DIR/core.log" 2> "$LOG_DIR/core.error" & + echo $! > "$PID_DIR/core.pid" + log "router-core started (PID: $!)" +} + +# Start API service +start_api() { + log "Starting router-api..." + nohup /usr/local/bin/router-api > "$LOG_DIR/api.log" 2> "$LOG_DIR/api.error" & + echo $! > "$PID_DIR/api.pid" + log "router-api started (PID: $!)" +} + +# Stop a service +stop_service() { + local service=$1 + local pid_file="$PID_DIR/${service}.pid" + + if [ -f "$pid_file" ]; then + local pid=$(cat "$pid_file") + if kill -0 $pid 2>/dev/null; then + log "Stopping $service (PID: $pid)" + kill $pid + sleep 2 + # Force kill if still running + if kill -0 $pid 2>/dev/null; then + log "Force killing $service (PID: $pid)" + kill -9 $pid + fi + fi + rm -f "$pid_file" + fi +} + +# Check if service is running +is_running() { + local service=$1 + local pid_file="$PID_DIR/${service}.pid" + + if [ -f "$pid_file" ]; then + local pid=$(cat "$pid_file") + if kill -0 $pid 2>/dev/null; then + return 0 # Running + else + rm -f "$pid_file" # Clean up stale PID + fi fi -done - -# If we're supposed to run init but couldn't find it -if [ "$1" = "/sbin/init" ] && [ -z "$INIT_PATH" ]; then - echo "Error: Could not find systemd init binary. Please check your installation." - exit 1 -elif [ "$1" = "/sbin/init" ] && [ -n "$INIT_PATH" ]; then - # Use the found init path instead of the specified one - shift - exec "$INIT_PATH" "$@" -else - # Run command as specified - exec "$@" -fi \ No newline at end of file + return 1 # Not running +} + +# Restart core (and then API) +restart_core() { + log "Core is down! Restarting core and API..." + + # Stop both services + stop_service "api" + stop_service "core" + + sleep 2 + + # Start core first + start_core + sleep 3 + + # Then start API + start_api +} + +# Restart API only +restart_api() { + log "API is down! Restarting API..." + stop_service "api" + sleep 2 + start_api +} + +# Monitor services +monitor() { + log "Starting service monitor..." + + while true; do + # Check core + if ! is_running "core"; then + restart_core + # Check API (only if core is running) + elif ! is_running "api"; then + restart_api + fi + + sleep $CHECK_INTERVAL + done +} + +# Cleanup on exit +cleanup() { + log "Shutting down services..." + stop_service "api" + stop_service "core" + exit 0 +} + +# Handle signals +trap cleanup SIGTERM SIGINT + +# Main execution +log "=== Router Process Manager Starting ===" + +# Start both services +start_core +sleep 3 # Give core time to start +start_api + +# Monitor forever +monitor \ No newline at end of file diff --git a/build-docker/debian/router-api.service b/build-docker/debian/router-api.service deleted file mode 100644 index e8072a4..0000000 --- a/build-docker/debian/router-api.service +++ /dev/null @@ -1,27 +0,0 @@ -[Unit] -Description=GWRS Mini-Gateway API -After=network.target gwrs-core.service -Wants=network-online.target gwrs-core.service - -[Service] -Type=simple -User=root -Group=root -ExecStart=/usr/local/bin/router-api --ip 0.0.0.0 -Restart=on-failure -RestartSec=5 -StandardOutput=append:/tmp/gwrs/log/api.log -StandardError=append:/tmp/gwrs/log/api.error.log -# Log rotation settings -LogRateLimitBurst=10000 -LogsDirectory=gwrs -LogsDirectoryMode=0755 -RuntimeMaxUse=50M -RuntimeMaxFileSize=20M -RuntimeMaxFiles=10 -SystemMaxUse=200M -SystemMaxFileSize=50M -SystemMaxFiles=10 - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/build-docker/debian/router-core.service b/build-docker/debian/router-core.service deleted file mode 100644 index d9542c5..0000000 --- a/build-docker/debian/router-core.service +++ /dev/null @@ -1,27 +0,0 @@ -[Unit] -Description=GWRS Mini-Gateway Core -After=network.target -Wants=network-online.target - -[Service] -Type=simple -User=root -Group=root -ExecStart=/usr/local/bin/router-core -Restart=on-failure -RestartSec=5 -StandardOutput=append:/tmp/gwrs/log/core.log -StandardError=append:/tmp/gwrs/log/core.error.log -# Log rotation settings -LogRateLimitBurst=10000 -LogsDirectory=gwrs -LogsDirectoryMode=0755 -RuntimeMaxUse=50M -RuntimeMaxFileSize=20M -RuntimeMaxFiles=10 -SystemMaxUse=200M -SystemMaxFileSize=50M -SystemMaxFiles=10 - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/router-api/src/main.rs b/router-api/src/main.rs index 21b0427..a8df642 100644 --- a/router-api/src/main.rs +++ b/router-api/src/main.rs @@ -189,7 +189,7 @@ async fn main() -> Result<(), Box> { // Bind server to the specified address and port .bind(&bind_address)? // Set number of worker threads to 2 for handling concurrent requests - .workers(2) + .workers(1) // Start the HTTP server and keep it running until terminated .run() .await?; diff --git a/router-api/src/module/memory_log/core.rs b/router-api/src/module/memory_log/core.rs index 272cda9..752b01e 100644 --- a/router-api/src/module/memory_log/core.rs +++ b/router-api/src/module/memory_log/core.rs @@ -4,6 +4,12 @@ use std::mem; use std::ptr; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +// Architecture detection +#[cfg(target_arch = "x86_64")] +const ARCH_NAME: &str = "x86_64"; +#[cfg(target_arch = "aarch64")] +const ARCH_NAME: &str = "aarch64"; + // Constants for shared memory pub const MAX_MEMORY_SIZE: usize = 50 * 1024 * 1024; // 50MB max memory (for larger buffer) pub const ENTRY_MAX_SIZE: usize = 4096; // Maximum 4KB per entry @@ -11,6 +17,33 @@ pub const SHM_METADATA_SIZE: usize = 2048; // Space for metadata at the beginnin pub const PROXY_LOGGER_NAME: &str = "/gwrs-proxy"; pub const GATEWAY_LOGGER_NAME: &str = "/gwrs-gateway"; +// Architecture-specific memory ordering helpers +#[inline(always)] +fn acquire_ordering() -> Ordering { + // Both architectures support Acquire ordering efficiently + Ordering::Acquire +} + +#[inline(always)] +fn release_ordering() -> Ordering { + // Both architectures support Release ordering efficiently + Ordering::Release +} + +#[inline(always)] +fn memory_fence_release() { + // On ARM64, this compiles to DMB ISH instruction + // On x86_64, this is often a no-op due to strong memory model + std::sync::atomic::fence(Ordering::Release); +} + +#[inline(always)] +fn memory_fence_acquire() { + // On ARM64, this compiles to DMB ISH instruction + // On x86_64, this is often a no-op due to strong memory model + std::sync::atomic::fence(Ordering::Acquire); +} + // Control structure at the beginning of shared memory #[repr(C, align(64))] pub struct QueueControl { @@ -21,6 +54,8 @@ pub struct QueueControl { read_index: AtomicUsize, count: AtomicUsize, capacity: AtomicUsize, + // Overflow tracking + overflow_count: AtomicUsize, // Slots for future metadata _reserved: [u8; 2048], // Increased to 2KB for future expansion } @@ -35,30 +70,59 @@ impl QueueControl { read_index: AtomicUsize::new(0), count: AtomicUsize::new(0), capacity: AtomicUsize::new(capacity), + overflow_count: AtomicUsize::new(0), _reserved: [0; 2048], } } - pub fn lock(&self) { - // Simple spin lock + pub fn lock(&self) -> Result<(), io::Error> { + // Add a timeout to prevent indefinite spinning + let start = std::time::Instant::now(); + let timeout = std::time::Duration::from_millis(500); // 500ms max wait + + // Use Acquire ordering for the lock to ensure all subsequent reads + // see values written before the lock was released while self .lock - .compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed) + .compare_exchange_weak(0, 1, acquire_ordering(), Ordering::Relaxed) .is_err() { + // Check for timeout + if start.elapsed() > timeout { + return Err(Error::new( + ErrorKind::TimedOut, + "Failed to acquire lock within timeout", + )); + } + // This is architecture-aware and will use appropriate pause instruction std::hint::spin_loop(); } + + // Memory fence to ensure lock acquisition is visible + memory_fence_acquire(); + + Ok(()) // Successfully acquired lock } pub fn unlock(&self) { - self.lock.store(0, Ordering::Release); + // Memory fence before unlock to ensure all writes are visible + memory_fence_release(); + + // Use Release ordering to ensure all previous writes are visible + // to the next thread that acquires the lock + self.lock.store(0, release_ordering()); } + pub fn dequeue_item(&self, read_idx: usize, capacity: usize) { // Update read index with Release ordering self.read_index - .store((read_idx + 1) % capacity, Ordering::Release); + .store((read_idx + 1) % capacity, release_ordering()); + + // Memory fence to ensure index update is visible before count update + memory_fence_release(); + // Update count with Release ordering - self.count.fetch_sub(1, Ordering::Release); + self.count.fetch_sub(1, release_ordering()); } } @@ -72,21 +136,45 @@ pub struct SharedMemoryConsumer { _shm_name: CString, } +// Safety: SharedMemoryConsumer operations are not thread-safe by default +// Users must ensure proper synchronization when sharing between threads +unsafe impl Send for SharedMemoryConsumer {} + // Implementing consumer impl SharedMemoryConsumer { // Open existing shared memory pub fn open(name: &str, expected_size: usize) -> io::Result { + // Log architecture for debugging + eprintln!("[-LO-] Opening shared memory consumer on {} architecture", ARCH_NAME); + // Create a C-style string for the name let c_name = CString::new(name).map_err(|_| Error::new(ErrorKind::InvalidInput, "Invalid name"))?; // Open shared memory object let fd = unsafe { - libc::shm_open( - c_name.as_ptr(), - libc::O_RDWR, // We need write access for the control structure - 0o600, - ) + let mut attempts = 0; + let max_attempts = 3; + + loop { + let result = libc::shm_open( + c_name.as_ptr(), + libc::O_RDWR, // We need write access for the control structure + 0o600, + ); + + if result >= 0 { + break result; + } + + attempts += 1; + if attempts >= max_attempts { + break -1; + } + + // Small delay between attempts + std::thread::sleep(std::time::Duration::from_millis(10)); + } }; if fd < 0 { @@ -114,6 +202,22 @@ impl SharedMemoryConsumer { let control_ptr = ptr as *mut QueueControl; let data_start = unsafe { (ptr as *mut u8).add(SHM_METADATA_SIZE) }; + // Verify the control structure looks valid + unsafe { + // Memory fence to ensure we see the latest values + memory_fence_acquire(); + + let capacity = (*control_ptr).capacity.load(acquire_ordering()); + if capacity == 0 { + libc::munmap(ptr, expected_size); + libc::close(fd); + return Err(Error::new( + ErrorKind::InvalidData, + "Shared memory appears uninitialized (capacity is 0)", + )); + } + } + Ok(SharedMemoryConsumer { ptr: ptr as *mut u8, size: expected_size, @@ -128,85 +232,99 @@ impl SharedMemoryConsumer { pub fn dequeue(&self) -> io::Result>> { unsafe { // Lock the queue - (*self.control).lock(); + match (*self.control).lock() { + Ok(()) => { + // We got the lock, now use a defer-like pattern to ensure unlock + struct LockGuard<'a> { + control: &'a QueueControl, + } + + impl<'a> Drop for LockGuard<'a> { + fn drop(&mut self) { + self.control.unlock(); + } + } + + // Create a guard that will automatically unlock when it goes out of scope + let _guard = LockGuard { control: &*self.control }; + + // Use explicit Acquire ordering for cross-process visibility + let count = (*self.control).count.load(acquire_ordering()); + if count == 0 { + return Ok(None); + } - // Use explicit Acquire ordering for cross-process visibility - let count = (*self.control).count.load(Ordering::Acquire); - if count == 0 { - (*self.control).unlock(); - return Ok(None); - } + // Get current read position with explicit Acquire ordering + let read_idx = (*self.control).read_index.load(acquire_ordering()); + let capacity = (*self.control).capacity.load(acquire_ordering()); - // Get current read position with explicit Acquire ordering - let read_idx = (*self.control).read_index.load(Ordering::Acquire); - let capacity = (*self.control).capacity.load(Ordering::Acquire); + // Safety check - if read index is out of bounds, something is wrong + if read_idx >= capacity { + return Err(Error::new( + ErrorKind::InvalidData, + format!("Invalid read index: {} (capacity: {})", read_idx, capacity), + )); + } - // Safety check - if read index is out of bounds, something is wrong - if read_idx >= capacity { - (*self.control).unlock(); - return Err(Error::new( - ErrorKind::InvalidData, - format!("Invalid read index: {} (capacity: {})", read_idx, capacity), - )); - } + // Calculate offset in buffer + let offset = read_idx * ENTRY_MAX_SIZE; + + // Verify that offset is within bounds of allocated memory + if offset >= self.size - SHM_METADATA_SIZE { + return Err(Error::new( + ErrorKind::InvalidData, + format!( + "Buffer offset out of bounds: {} (max: {})", + offset, + self.size - SHM_METADATA_SIZE + ), + )); + } - // Calculate offset in buffer - let offset = read_idx * ENTRY_MAX_SIZE; + // Get pointer to position + let entry_ptr = self.data_start.add(offset); - // Verify that offset is within bounds of allocated memory - if offset >= self.size - SHM_METADATA_SIZE { - (*self.control).unlock(); - return Err(Error::new( - ErrorKind::InvalidData, - format!( - "Buffer offset out of bounds: {} (max: {})", - offset, - self.size - SHM_METADATA_SIZE - ), - )); - } + // Memory fence to ensure we see the latest data + memory_fence_acquire(); - // Get pointer to position - let entry_ptr = self.data_start.add(offset); + // Read entry size first + let entry_size = ptr::read(entry_ptr as *const usize); - // Read entry size first - let entry_size = *(entry_ptr as *const usize); + // Check entry size is sensible + if entry_size == 0 || entry_size > ENTRY_MAX_SIZE - mem::size_of::() { + // Skip this entry by advancing read index + (*self.control).dequeue_item(read_idx, capacity); - // Check entry size is sensible - if entry_size == 0 || entry_size > ENTRY_MAX_SIZE - mem::size_of::() { - // Skip this entry by advancing read index - let capacity = (*self.control).capacity.load(Ordering::Acquire); - (*self.control) - .read_index - .store((read_idx + 1) % capacity, Ordering::Release); - (*self.control).count.fetch_sub(1, Ordering::Release); + return Err(Error::new( + ErrorKind::InvalidData, + format!( + "Invalid entry size: {} (max: {}), skipping entry", + entry_size, + ENTRY_MAX_SIZE - mem::size_of::() + ), + )); + } - (*self.control).unlock(); - return Err(Error::new( - ErrorKind::InvalidData, - format!( - "Invalid entry size: {} (max: {}), skipping entry", - entry_size, - ENTRY_MAX_SIZE - mem::size_of::() - ), - )); - } + // Memory fence before reading data to ensure size is read before data + memory_fence_acquire(); - // Read the actual data - let mut data = vec![0u8; entry_size]; - ptr::copy_nonoverlapping( - entry_ptr.add(mem::size_of::()), - data.as_mut_ptr(), - entry_size, - ); + // Read the actual data + let mut data = vec![0u8; entry_size]; + ptr::copy_nonoverlapping( + entry_ptr.add(mem::size_of::()), + data.as_mut_ptr(), + entry_size, + ); - // Update read index and count - (*self.control).dequeue_item(read_idx, capacity); + // Update read index and count + (*self.control).dequeue_item(read_idx, capacity); - // Unlock - (*self.control).unlock(); + // Note: Unlock happens automatically via LockGuard drop - Ok(Some(data)) + Ok(Some(data)) + }, + Err(e) => Err(e), + } } } @@ -220,10 +338,15 @@ impl SharedMemoryConsumer { Ok(Some(data)) => return Ok(Some(data)), Ok(None) => std::thread::sleep(std::time::Duration::from_millis(10)), Err(e) => { - // Log the error - // log::error!("Error in dequeue: {}", e); + // Log the error if it's not a timeout + if e.kind() != ErrorKind::TimedOut { + eprintln!("[-LO-] Error in dequeue on {}: {}", ARCH_NAME, e); + } std::thread::sleep(std::time::Duration::from_millis(10)); - return Err(e); + // Continue trying unless it's a fatal error + if e.kind() == ErrorKind::InvalidData { + return Err(e); + } } } } @@ -233,16 +356,22 @@ impl SharedMemoryConsumer { // Get number of items in queue pub fn queue_size(&self) -> usize { - unsafe { (*self.control).count.load(Ordering::Relaxed) } + unsafe { (*self.control).count.load(acquire_ordering()) } } // Get maximum capacity of queue pub fn capacity(&self) -> usize { - unsafe { (*self.control).capacity.load(Ordering::Relaxed) } + unsafe { (*self.control).capacity.load(acquire_ordering()) } + } + + // Get overflow count (if available) + pub fn overflow_count(&self) -> usize { + unsafe { (*self.control).overflow_count.load(Ordering::Relaxed) } } // Clean up shared memory resources but don't unlink (the producer/router-core owns the shared memory) pub fn cleanup(&self) -> io::Result<()> { + eprintln!("[-LO-] Cleaning up consumer on {}...", ARCH_NAME); // Consumer should not call shm_unlink as it would remove the shared memory // that may still be in use by other processes. // Only unmap memory and close file descriptor in Drop implementation. @@ -254,9 +383,16 @@ impl Drop for SharedMemoryConsumer { fn drop(&mut self) { unsafe { // Unmap memory - libc::munmap(self.ptr as *mut libc::c_void, self.size); + let unmap_result = libc::munmap(self.ptr as *mut libc::c_void, self.size); + if unmap_result != 0 { + eprintln!("[-LO-] Failed to unmap memory: {}", Error::last_os_error()); + } + // Close file descriptor - libc::close(self.shm_fd); + let close_result = libc::close(self.shm_fd); + if close_result != 0 { + eprintln!("[-LO-] Failed to close file descriptor: {}", Error::last_os_error()); + } // Note: We don't unlink here unless explicitly requested } } @@ -277,8 +413,13 @@ pub struct LogConsumer { shm: SharedMemoryConsumer, } +// Safety: LogConsumer operations are not thread-safe by default +// Users must ensure proper synchronization when sharing between threads +unsafe impl Send for LogConsumer {} + impl LogConsumer { pub fn new(name: &str, size: usize) -> io::Result { + eprintln!("[-LO-] Creating log consumer for {} on {}", name, ARCH_NAME); let shm = SharedMemoryConsumer::open(name, size)?; Ok(LogConsumer { shm }) } @@ -355,6 +496,11 @@ impl LogConsumer { self.shm.capacity() } + #[allow(dead_code)] + pub fn overflow_count(&self) -> usize { + self.shm.overflow_count() + } + #[allow(dead_code)] pub fn cleanup(&self) -> io::Result<()> { self.shm.cleanup() diff --git a/router-api/src/module/memory_log/logging/gateway.rs b/router-api/src/module/memory_log/logging/gateway.rs index 6868dc0..c311c5e 100644 --- a/router-api/src/module/memory_log/logging/gateway.rs +++ b/router-api/src/module/memory_log/logging/gateway.rs @@ -35,6 +35,7 @@ pub fn listen() { // If we haven't received anything in a while, try to reconnect if consecutive_empty > 2000 { + batch.shrink_to_fit(); // Force capacity reduction log::warn!( "Too many consecutive empty results ({}), attempting to recreate consumer", consecutive_empty @@ -68,16 +69,18 @@ pub fn listen() { // Process full batch if batch.len() >= BATCH_SIZE { - process_batch(&batch); + // process_batch(&batch); batch.clear(); + batch.shrink_to_fit(); } } Ok(None) => { // Process any remaining logs if !batch.is_empty() { consecutive_empty = 0; - process_batch(&batch); + // process_batch(&batch); batch.clear(); + batch.shrink_to_fit(); } // Exponential backoff with max cap diff --git a/router-api/src/module/memory_log/logging/proxy.rs b/router-api/src/module/memory_log/logging/proxy.rs index 560ca85..d63073f 100644 --- a/router-api/src/module/memory_log/logging/proxy.rs +++ b/router-api/src/module/memory_log/logging/proxy.rs @@ -31,6 +31,7 @@ pub fn listen() { if last_health_check.elapsed() >= health_check_interval { // If we haven't received anything in a while, try to reconnect if consecutive_empty > 2000 { + batch.shrink_to_fit(); // Force capacity reduction log::warn!( "Too many consecutive empty results ({}), attempting to recreate consumer", consecutive_empty @@ -64,7 +65,7 @@ pub fn listen() { // Process full batch if batch.len() >= BATCH_SIZE { - process_batch(&batch); + // process_batch(&batch); batch.clear(); } } @@ -72,7 +73,7 @@ pub fn listen() { // Process any remaining logs first before incrementing consecutive_empty if !batch.is_empty() { consecutive_empty = 0; // Reset counter when we process logs - process_batch(&batch); + // process_batch(&batch); batch.clear(); } diff --git a/router-core/Cargo.toml b/router-core/Cargo.toml index 4a57db7..4fe6f5d 100644 --- a/router-core/Cargo.toml +++ b/router-core/Cargo.toml @@ -24,6 +24,7 @@ lazy_static = "1.5.0" regex-automata = "0.4.9" lru = "0.14.0" num_cpus = "1.16.0" +openssl = { version = "*", features = ["vendored"] } [target.'cfg(target_os = "macos")'.dependencies] dirs = "6.0.0" diff --git a/router-core/src/system/memory_log/mod.rs b/router-core/src/system/memory_log/mod.rs index 9de091d..62bb3c8 100644 --- a/router-core/src/system/memory_log/mod.rs +++ b/router-core/src/system/memory_log/mod.rs @@ -1,4 +1,5 @@ // A raw implementation of shared memory in Rust using direct system calls +// Now with support for both x86_64 and ARM64 (aarch64) architectures pub mod sender; use std::ffi::CString; @@ -8,6 +9,12 @@ use std::ptr; use std::slice; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +// Architecture detection +#[cfg(target_arch = "x86_64")] +const ARCH_NAME: &str = "x86_64"; +#[cfg(target_arch = "aarch64")] +const ARCH_NAME: &str = "aarch64"; + // Constants for shared memory - adjusting for more efficient memory usage pub const MAX_MEMORY_SIZE: usize = 50 * 1024 * 1024; // 50MB max memory const ENTRY_MAX_SIZE: usize = 4096; // Reduce from 4KB to 2048 bytes since most logs are small @@ -19,6 +26,7 @@ pub const LEVEL_WARN: u8 = 3; // Warning messages, potential issues pub const LEVEL_ERROR: u8 = 4; // Error conditions, but application can continue // Control structure at the beginning of shared memory +// The 64-byte alignment is good for cache line optimization on both x86_64 and ARM64 #[repr(C, align(64))] pub struct QueueControl { // Mutex for synchronization @@ -41,6 +49,33 @@ pub enum OverflowPolicy { Overwrite, // Overwrite oldest entries when queue is full } +// Architecture-specific memory ordering helpers +#[inline(always)] +fn acquire_ordering() -> Ordering { + // Both architectures support Acquire ordering efficiently + Ordering::Acquire +} + +#[inline(always)] +fn release_ordering() -> Ordering { + // Both architectures support Release ordering efficiently + Ordering::Release +} + +#[inline(always)] +fn memory_fence_release() { + // On ARM64, this compiles to DMB ISH instruction + // On x86_64, this is often a no-op due to strong memory model + std::sync::atomic::fence(Ordering::Release); +} + +#[inline(always)] +fn memory_fence_acquire() { + // On ARM64, this compiles to DMB ISH instruction + // On x86_64, this is often a no-op due to strong memory model + std::sync::atomic::fence(Ordering::Acquire); +} + // A simple mutex implementation using an atomic impl QueueControl { pub fn new(capacity: usize) -> Self { @@ -54,14 +89,17 @@ impl QueueControl { _reserved: [0; 2048], } } + pub fn lock(&self) -> Result<(), io::Error> { // Add a timeout to prevent indefinite spinning let start = std::time::Instant::now(); let timeout = std::time::Duration::from_millis(500); // 500ms max wait + // Use Acquire ordering for the lock to ensure all subsequent reads + // see values written before the lock was released while self .lock - .compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed) + .compare_exchange_weak(0, 1, acquire_ordering(), Ordering::Relaxed) .is_err() { // Check for timeout @@ -71,17 +109,26 @@ impl QueueControl { "Failed to acquire lock within timeout", )); } + // This is architecture-aware and will use appropriate pause instruction std::hint::spin_loop(); } + // Memory fence to ensure lock acquisition is visible + memory_fence_acquire(); + Ok(()) // Successfully acquired lock } pub fn unlock(&self) { + // Memory fence before unlock to ensure all writes are visible + memory_fence_release(); + // Only unlock if currently locked + // Use Release ordering to ensure all previous writes are visible + // to the next thread that acquires the lock let was_locked = self .lock - .compare_exchange(1, 0, Ordering::Release, Ordering::Relaxed) + .compare_exchange(1, 0, release_ordering(), Ordering::Relaxed) .is_ok(); if !was_locked { // Optionally log this situation for debugging @@ -93,19 +140,19 @@ impl QueueControl { pub fn enqueue_item(&self, write_idx: usize, capacity: usize) { // Update write index with Release ordering to make changes visible self.write_index - .store((write_idx + 1) % capacity, Ordering::Release); + .store((write_idx + 1) % capacity, release_ordering()); // Check current count before updating to prevent overflow - let current_count = self.count.load(Ordering::Acquire); + let current_count = self.count.load(acquire_ordering()); // If count is suspiciously high, reset it to prevent overflow if current_count > capacity * 2 || current_count == usize::MAX - 1 { - self.count.store(capacity, Ordering::Release); + self.count.store(capacity, release_ordering()); } else { // Update count with Release ordering - use saturating_add to prevent overflow let _ = self .count - .fetch_update(Ordering::Release, Ordering::Relaxed, |c| { + .fetch_update(release_ordering(), Ordering::Relaxed, |c| { // Only increment if not suspiciously large if c < capacity * 2 { Some(c + 1) @@ -120,18 +167,18 @@ impl QueueControl { // Add a method to safely decrement count (for consumer implementations) #[allow(dead_code)] pub fn dequeue_item(&self) { - let current = self.count.load(Ordering::Acquire); + let current = self.count.load(acquire_ordering()); if current > 0 { - self.count.fetch_sub(1, Ordering::Release); + self.count.fetch_sub(1, release_ordering()); } } // Enhanced validation with more diagnostics pub fn validate_and_fix(&self, capacity: usize) -> bool { - let count = self.count.load(Ordering::Acquire); - let current_capacity = self.capacity.load(Ordering::Acquire); - let write_idx = self.write_index.load(Ordering::Acquire); - let read_idx = self.read_index.load(Ordering::Acquire); + let count = self.count.load(acquire_ordering()); + let current_capacity = self.capacity.load(acquire_ordering()); + let write_idx = self.write_index.load(acquire_ordering()); + let read_idx = self.read_index.load(acquire_ordering()); // Perform more thorough validation let corrupted = count > capacity * 2 @@ -151,16 +198,19 @@ impl QueueControl { // Enhanced reset with better diagnostics pub fn force_reset(&self, capacity: usize) { // Capture original values for debugging - let _old_count = self.count.load(Ordering::Acquire); - let _old_write_idx = self.write_index.load(Ordering::Acquire); - let _old_read_idx = self.read_index.load(Ordering::Acquire); - - // Reset all fields - self.write_index.store(0, Ordering::Release); - self.read_index.store(0, Ordering::Release); - self.count.store(0, Ordering::Release); - self.capacity.store(capacity, Ordering::Release); - self.overflow_count.store(0, Ordering::Release); + let _old_count = self.count.load(acquire_ordering()); + let _old_write_idx = self.write_index.load(acquire_ordering()); + let _old_read_idx = self.read_index.load(acquire_ordering()); + + // Reset all fields with Release ordering to ensure visibility + self.write_index.store(0, release_ordering()); + self.read_index.store(0, release_ordering()); + self.count.store(0, release_ordering()); + self.capacity.store(capacity, release_ordering()); + self.overflow_count.store(0, release_ordering()); + + // Ensure all stores are visible + memory_fence_release(); } } @@ -175,6 +225,10 @@ pub struct SharedMemoryProducer { overflow_policy: OverflowPolicy, } +// Safety: SharedMemoryProducer operations are not thread-safe by default +// Users must ensure proper synchronization when sharing between threads +unsafe impl Send for SharedMemoryProducer {} + // Implementing producer impl SharedMemoryProducer { // Create a new shared memory region @@ -190,6 +244,9 @@ impl SharedMemoryProducer { fresh_start: bool, overflow_policy: OverflowPolicy, ) -> io::Result { + // Log architecture for debugging + eprintln!("[-LO-] Creating shared memory on {} architecture", ARCH_NAME); + // Calculate capacity based on total size and entry size let data_size = total_size.saturating_sub(SHM_METADATA_SIZE); let capacity = data_size / ENTRY_MAX_SIZE; @@ -297,8 +354,11 @@ impl SharedMemoryProducer { let mut control_initialized = false; unsafe { + // Memory fence to ensure we see the latest values + memory_fence_acquire(); + // Check if we can read the capacity field to determine if memory was already initialized - let existing_capacity = (*control_ptr).capacity.load(Ordering::Acquire); + let existing_capacity = (*control_ptr).capacity.load(acquire_ordering()); // If capacity seems to exist and has a reasonable value if existing_capacity > 0 && existing_capacity <= capacity * 2 { @@ -308,6 +368,7 @@ impl SharedMemoryProducer { let was_corrupted = (*control_ptr).validate_and_fix(capacity); if was_corrupted { + eprintln!("[-LO-] Detected and fixed corrupted control structure"); } else if fresh_start { // Even if not corrupted, if fresh_start was requested, reset the structure (*control_ptr).force_reset(capacity); @@ -323,6 +384,8 @@ impl SharedMemoryProducer { if !control_initialized { unsafe { ptr::write(control_ptr, QueueControl::new(capacity)); + // Ensure initialization is visible to other processes + memory_fence_release(); } } @@ -352,7 +415,7 @@ impl SharedMemoryProducer { let required_total_size = required_data_size + SHM_METADATA_SIZE; // Ensure we don't exceed reasonable limits (2GB for 32-bit compatibility) - let max_reasonable_size = 2 * 1024 * 1024 * 1024; + let max_reasonable_size = MAX_MEMORY_SIZE; let total_size = if required_total_size > max_reasonable_size { max_reasonable_size } else { @@ -378,10 +441,10 @@ impl SharedMemoryProducer { // Add more robust corruption detection pub fn check_and_reset_if_corrupted(&self) -> bool { unsafe { - let capacity = (*self.control).capacity.load(Ordering::Acquire); - let count = (*self.control).count.load(Ordering::Acquire); - let write_idx = (*self.control).write_index.load(Ordering::Acquire); - let read_idx = (*self.control).read_index.load(Ordering::Acquire); + let capacity = (*self.control).capacity.load(acquire_ordering()); + let count = (*self.control).count.load(acquire_ordering()); + let write_idx = (*self.control).write_index.load(acquire_ordering()); + let read_idx = (*self.control).read_index.load(acquire_ordering()); // Use more robust corruption checks let is_corrupted = count > capacity * 2 @@ -434,9 +497,8 @@ impl SharedMemoryProducer { let _guard = LockGuard { control: &*self.control }; // After getting the lock, run a full validation of the control structure - let count = (*self.control).count.load(Ordering::Acquire); - let capacity = (*self.control).capacity.load(Ordering::Acquire); - // let write_idx = (*self.control).write_index.load(Ordering::Acquire); + let count = (*self.control).count.load(acquire_ordering()); + let capacity = (*self.control).capacity.load(acquire_ordering()); // Double-check for corruption after acquiring the lock if count > capacity * 2 || count == usize::MAX { @@ -448,7 +510,7 @@ impl SharedMemoryProducer { if count >= capacity { match self.overflow_policy { OverflowPolicy::Block => { - // Record overflow event - directly access the atomic field + // Record overflow event (*self.control) .overflow_count .fetch_add(1, Ordering::Relaxed); @@ -458,7 +520,7 @@ impl SharedMemoryProducer { return Err(Error::new(ErrorKind::Other, "Queue is full")); } OverflowPolicy::Overwrite => { - // Record overflow event - directly access the atomic field + // Record overflow event (*self.control) .overflow_count .fetch_add(1, Ordering::Relaxed); @@ -467,7 +529,7 @@ impl SharedMemoryProducer { } // Get current write position with explicit Acquire ordering - let write_idx = (*self.control).write_index.load(Ordering::Acquire); + let write_idx = (*self.control).write_index.load(acquire_ordering()); // Calculate offset in buffer let offset = write_idx * ENTRY_MAX_SIZE; @@ -476,7 +538,10 @@ impl SharedMemoryProducer { let entry_ptr = self.data_start.add(offset); // Write entry size first - *(entry_ptr as *mut usize) = data.len(); + ptr::write(entry_ptr as *mut usize, data.len()); + + // Memory fence to ensure size is written before data + memory_fence_release(); // Then write the actual data ptr::copy_nonoverlapping( @@ -485,6 +550,9 @@ impl SharedMemoryProducer { data.len(), ); + // Memory fence to ensure all data is written before updating indices + memory_fence_release(); + // Verify count is reasonable before updating if count < capacity * 2 { (*self.control).enqueue_item(write_idx, capacity); @@ -492,12 +560,11 @@ impl SharedMemoryProducer { // Force reset counters if they appear corrupted (*self.control).force_reset(capacity); // Try again with reset counters - let new_write_idx = (*self.control).write_index.load(Ordering::Acquire); + let new_write_idx = (*self.control).write_index.load(acquire_ordering()); (*self.control).enqueue_item(new_write_idx, capacity); } - // Unlock - (*self.control).unlock(); + // Note: Unlock happens automatically via LockGuard drop }, Err(e) => { @@ -513,7 +580,7 @@ impl SharedMemoryProducer { #[allow(dead_code)] pub fn queue_size(&self) -> usize { unsafe { - let count = (*self.control).count.load(Ordering::Acquire); + let count = (*self.control).count.load(acquire_ordering()); count } } @@ -521,10 +588,10 @@ impl SharedMemoryProducer { // Get maximum capacity of the queue #[allow(dead_code)] pub fn capacity(&self) -> usize { - unsafe { (*self.control).capacity.load(Ordering::Acquire) } + unsafe { (*self.control).capacity.load(acquire_ordering()) } } - // Get overflow count - directly access the atomic field + // Get overflow count #[allow(dead_code)] pub fn overflow_count(&self) -> usize { unsafe { (*self.control).overflow_count.load(Ordering::Relaxed) } @@ -532,7 +599,7 @@ impl SharedMemoryProducer { // Clean up shared memory (call this when done with it) pub fn cleanup(&self) -> io::Result<()> { - eprintln!("[-LO-] Cleaning up log producer... x"); + eprintln!("[-LO-] Cleaning up log producer on {}...", ARCH_NAME); let result = unsafe { libc::shm_unlink(self.shm_name.as_ptr()) }; if result < 0 { let err = Error::last_os_error(); @@ -548,11 +615,15 @@ impl Drop for SharedMemoryProducer { unsafe { // Unmap memory let unmap_result = libc::munmap(self.ptr as *mut libc::c_void, self.size); - if unmap_result != 0 {} + if unmap_result != 0 { + eprintln!("[-LO-] Failed to unmap memory: {}", Error::last_os_error()); + } // Close file descriptor let close_result = libc::close(self.shm_fd); - if close_result != 0 {} + if close_result != 0 { + eprintln!("[-LO-] Failed to close file descriptor: {}", Error::last_os_error()); + } } } } @@ -571,6 +642,10 @@ pub struct LogProducer { shm: SharedMemoryProducer, } +// Safety: LogProducer operations are not thread-safe by default +// Users must ensure proper synchronization when sharing between threads +unsafe impl Send for LogProducer {} + impl LogProducer { // Standard constructor - creates with default options #[allow(dead_code)] @@ -598,8 +673,8 @@ impl LogProducer { fresh_start: bool, overflow_policy: OverflowPolicy, ) -> io::Result { - let actual_size = if size > 1024 * 1024 * 1024 { - 1024 * 1024 * 1024 // 1GB max + let actual_size = if size > MAX_MEMORY_SIZE { + MAX_MEMORY_SIZE } else { size }; @@ -612,14 +687,14 @@ impl LogProducer { ) { Ok(producer) => producer, Err(e) => { - if size > 100 * 1024 * 1024 + if size > MAX_MEMORY_SIZE && (e.kind() == ErrorKind::PermissionDenied || e.kind() == ErrorKind::AddrNotAvailable || e.kind() == ErrorKind::OutOfMemory) { return Self::new_with_options( name, - 100 * 1024 * 1024, + MAX_MEMORY_SIZE, fresh_start, overflow_policy, ); @@ -664,21 +739,21 @@ impl LogProducer { pub fn log(&self, level: u8, message: &str) -> io::Result<()> { let header_size = mem::size_of::(); - let total_size_of_payload = header_size + message.len(); // Total size of LogEntry struct + message bytes + let total_size_of_payload = header_size + message.len(); if message.is_empty() { println!("Empty message received"); return Ok(()); } + // Maximum size an entry can hold for its payload (LogEntry + message) - // This is distinct from ENTRY_MAX_SIZE which includes the initial 'data_len' field written by SharedMemoryProducer. let max_payload_in_shm_entry = ENTRY_MAX_SIZE - mem::size_of::(); if total_size_of_payload > max_payload_in_shm_entry { let suffix = "..."; let suffix_len = suffix.len(); - // Calculate available space for the message text itself, considering header and suffix + // Calculate available space for the message text itself let available_for_message_text = max_payload_in_shm_entry .saturating_sub(header_size) .saturating_sub(suffix_len); @@ -686,26 +761,22 @@ impl LogProducer { let final_message_to_log: String; if available_for_message_text == 0 { - // Not enough space for even one char + suffix. Try to log just the suffix if it fits. if header_size + suffix_len <= max_payload_in_shm_entry { final_message_to_log = suffix.to_string(); } else { - // Cannot even fit the suffix. This implies ENTRY_MAX_SIZE is extremely small or header is huge. return Err(Error::new( ErrorKind::InvalidInput, "Log entry too small for header and truncation suffix.", )); } } else { - // Ensure cut_at does not exceed message length and respects char boundaries. + // Ensure cut_at does not exceed message length and respects char boundaries let mut cut_at = std::cmp::min(message.len(), available_for_message_text); while !message.is_char_boundary(cut_at) && cut_at > 0 { cut_at -= 1; } if cut_at == 0 && !message.is_empty() { - // Not even a single character of the original message fits with the suffix. - // Log only "..." if it fits by itself. if header_size + suffix_len <= max_payload_in_shm_entry { final_message_to_log = suffix.to_string(); } else { @@ -719,9 +790,7 @@ impl LogProducer { } } - // Call self recursively with the truncated message. - // This recursive call needs to be careful; the new total_size_of_payload must be <= max_payload_in_shm_entry. - // The logic above should ensure final_message_to_log is short enough. + // Call self recursively with the truncated message return self.log(level, &final_message_to_log); } @@ -798,7 +867,7 @@ static mut GLOBAL_LOG_GATEWAY: Option = None; #[allow(static_mut_refs)] pub unsafe fn proxy_logger() -> io::Result<&'static LogProducer> { if GLOBAL_LOG_PROXY.is_none() { - eprintln!("[-LO-] Initializing proxy logger..."); + eprintln!("[-LO-] Initializing proxy logger on {}...", ARCH_NAME); // Request 10 million entries with smaller size let desired_capacity = 10_000_000; // 10 million entries @@ -814,8 +883,6 @@ pub unsafe fn proxy_logger() -> io::Result<&'static LogProducer> { } Err(_) => { // Fall back to using default approach if capacity-based approach fails - - // Use a larger size for the shared memory (1GB instead of 100MB) let logger_size = MAX_MEMORY_SIZE; // Use 1GB for more capacity match LogProducer::new_with_options( @@ -851,7 +918,7 @@ pub unsafe fn proxy_logger() -> io::Result<&'static LogProducer> { #[allow(static_mut_refs)] pub unsafe fn gateway_logger() -> io::Result<&'static LogProducer> { if GLOBAL_LOG_GATEWAY.is_none() { - eprintln!("[-LO-] Initializing gateway logger..."); + eprintln!("[-LO-] Initializing gateway logger on {}...", ARCH_NAME); // Request 10 million entries with smaller size let desired_capacity = 10_000_000; // 10 million entries @@ -867,7 +934,6 @@ pub unsafe fn gateway_logger() -> io::Result<&'static LogProducer> { } Err(_) => { // Fall back to using default approach if capacity-based approach fails - let logger_size = MAX_MEMORY_SIZE; // Use 1GB for more capacity match LogProducer::new_with_options( @@ -945,71 +1011,4 @@ pub fn log_cleanup() -> io::Result<()> { } result -} - -#[cfg(test)] -mod tests { - use super::*; - - // Example proxy.rs - #[allow(dead_code)] - pub fn proxy_example() { - // Create shared memory for logs - let log_producer = match LogProducer::new_with_options( - "/my-app-logs", - 100 * 1024 * 1024, - true, // Fresh start - OverflowPolicy::Overwrite, // Overwrite when full - ) { - Ok(producer) => producer, - Err(_) => { - return; - } - }; - - // Log some messages - for i in 0..10 { - if let Err(_) = log_producer.log(1, &format!("Request #{} processed", i)) {} - } - } - - #[test] - pub fn test_producer() { - // Create with fresh start and overwrite policy using capacity-based approach - let log_producer = match LogProducer::new_with_capacity( - PROXY_LOGGER_NAME, - 500_000, // Request 500,000 entries - true, // Fresh start - OverflowPolicy::Overwrite, // Overwrite when full - ) { - Ok(producer) => producer, - Err(_) => { - return; - } - }; - - let mut counter = 0; - loop { - // Create a message with timestamp and counter - let message = format!("[INFO] [PXY] Test message #{}", counter); - - // Log the message - match log_producer.log(2, &message) { - Ok(_) => {} - Err(_) => {} - } - - counter += 1; - - // Faster testing - if counter % 100 == 0 { - std::thread::sleep(std::time::Duration::from_millis(1)); - } - - // Optional: exit after some number of messages for testing - if counter >= 100000 { - break; - } - } - } -} +} \ No newline at end of file