From bfbab08f99bfcbcd7aee50ca443cbb6d6bc9a586 Mon Sep 17 00:00:00 2001 From: pedroSG94 Date: Fri, 7 Nov 2025 09:49:22 +0100 Subject: [PATCH 1/2] use ktor 2.3.13 --- .../pedro/common/socket/ktor/TcpStreamSocketKtorBase.kt | 9 ++++++--- .../com/pedro/common/socket/ktor/UdpStreamSocketKtor.kt | 9 ++++----- gradle/libs.versions.toml | 2 +- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/com/pedro/common/socket/ktor/TcpStreamSocketKtorBase.kt b/common/src/main/java/com/pedro/common/socket/ktor/TcpStreamSocketKtorBase.kt index d2089bb50..c125adc5d 100644 --- a/common/src/main/java/com/pedro/common/socket/ktor/TcpStreamSocketKtorBase.kt +++ b/common/src/main/java/com/pedro/common/socket/ktor/TcpStreamSocketKtorBase.kt @@ -8,9 +8,9 @@ import io.ktor.network.sockets.openReadChannel import io.ktor.network.sockets.openWriteChannel import io.ktor.utils.io.ByteReadChannel import io.ktor.utils.io.ByteWriteChannel +import io.ktor.utils.io.close import io.ktor.utils.io.readFully import io.ktor.utils.io.readUTF8Line -import io.ktor.utils.io.writeByte import io.ktor.utils.io.writeFully import kotlinx.coroutines.Dispatchers import java.net.InetAddress @@ -37,7 +37,10 @@ abstract class TcpStreamSocketKtorBase( } override suspend fun close() { - runCatching { output?.flushAndClose() } + runCatching { + output?.flush() + output?.close() + } runCatching { address = null input = null @@ -52,7 +55,7 @@ abstract class TcpStreamSocketKtorBase( } override suspend fun write(bytes: ByteArray, offset: Int, size: Int) { - output?.writeFully(bytes, offset, offset + size) + output?.writeFully(bytes, offset, size) } override suspend fun write(b: Int) { diff --git a/common/src/main/java/com/pedro/common/socket/ktor/UdpStreamSocketKtor.kt b/common/src/main/java/com/pedro/common/socket/ktor/UdpStreamSocketKtor.kt index d19cc7952..7f782c8d0 100644 --- a/common/src/main/java/com/pedro/common/socket/ktor/UdpStreamSocketKtor.kt +++ b/common/src/main/java/com/pedro/common/socket/ktor/UdpStreamSocketKtor.kt @@ -8,10 +8,9 @@ import io.ktor.network.sockets.Datagram import io.ktor.network.sockets.InetSocketAddress import io.ktor.network.sockets.aSocket import io.ktor.network.sockets.isClosed -import io.ktor.utils.io.core.remaining +import io.ktor.utils.io.core.ByteReadPacket +import io.ktor.utils.io.core.readBytes import kotlinx.coroutines.Dispatchers -import kotlinx.io.Buffer -import kotlinx.io.readByteArray import java.net.ConnectException import java.net.InetAddress @@ -51,7 +50,7 @@ class UdpStreamSocketKtor( } override suspend fun write(bytes: ByteArray) { - val datagram = Datagram(Buffer().apply { write(bytes, 0, bytes.size) }, address) + val datagram = Datagram(ByteReadPacket(bytes), address) socket?.send(datagram) } @@ -59,7 +58,7 @@ class UdpStreamSocketKtor( val socket = socket ?: throw ConnectException("Read with socket closed, broken pipe") val packet = socket.receive().packet val length = packet.remaining.toInt() - return packet.readByteArray().sliceArray(0 until length) + return packet.readBytes().sliceArray(0 until length) } override fun isConnected(): Boolean = socket?.isClosed != true diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c431e5a07..2d1915637 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -9,7 +9,7 @@ dokka = "2.0.0" appcompat = "1.6.1" #noinspection GradleDependency, version 2.2.0 need min sdk 21 constraintlayout = "2.1.4" -ktor = "3.3.2" +ktor = "2.3.13" camerax = "1.5.1" multidex = "2.0.1" annotation = "1.9.1" From 55c61788f8f1533d9d3d57d0401fde3809e75003 Mon Sep 17 00:00:00 2001 From: pedroSG94 Date: Tue, 11 Nov 2025 10:54:23 +0100 Subject: [PATCH 2/2] revert StreamBlockingQueue --- .../main/java/com/pedro/common/base/BaseSender.kt | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/com/pedro/common/base/BaseSender.kt b/common/src/main/java/com/pedro/common/base/BaseSender.kt index 154eab3e4..eab1cad6c 100644 --- a/common/src/main/java/com/pedro/common/base/BaseSender.kt +++ b/common/src/main/java/com/pedro/common/base/BaseSender.kt @@ -5,6 +5,7 @@ import com.pedro.common.BitrateManager import com.pedro.common.ConnectChecker import com.pedro.common.StreamBlockingQueue import com.pedro.common.frame.MediaFrame +import com.pedro.common.trySend import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -14,6 +15,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import java.nio.ByteBuffer +import java.util.concurrent.LinkedBlockingQueue abstract class BaseSender( protected val connectChecker: ConnectChecker, @@ -24,7 +26,7 @@ abstract class BaseSender( protected var running = false private var cacheSize = 400 @Volatile - protected var queue = StreamBlockingQueue(cacheSize) + protected var queue = LinkedBlockingQueue(cacheSize) protected var audioFramesSent: Long = 0 protected var videoFramesSent: Long = 0 var droppedAudioFrames: Long = 0 @@ -94,24 +96,24 @@ abstract class BaseSender( @Throws(IllegalArgumentException::class) fun hasCongestion(percentUsed: Float = 20f): Boolean { if (percentUsed < 0 || percentUsed > 100) throw IllegalArgumentException("the value must be in range 0 to 100") - val size = queue.getSize().toFloat() + val size = queue.size.toFloat() val remaining = queue.remainingCapacity().toFloat() val capacity = size + remaining return size >= capacity * (percentUsed / 100f) } fun resizeCache(newSize: Int) { - if (newSize < queue.getSize() - queue.remainingCapacity()) { + if (newSize < queue.size - queue.remainingCapacity()) { throw RuntimeException("Can't fit current cache inside new cache size") } - val tempQueue = StreamBlockingQueue(newSize) + val tempQueue = LinkedBlockingQueue(newSize) queue.drainTo(tempQueue) queue = tempQueue } fun getCacheSize(): Int = cacheSize - fun getItemsInCache(): Int = queue.getSize() + fun getItemsInCache(): Int = queue.size fun clearCache() { queue.clear() @@ -148,7 +150,7 @@ abstract class BaseSender( fun getBitrateExponentialFactor() = bitrateManager.exponentialFactor fun setDelay(delay: Long) { - queue.setCacheTime(delay) +// queue.setCacheTime(delay) } fun resetBytesSend() {