Skip to content

Commit 70b15ac

Browse files
committed
Make so RootEncoder delivers bandwidth info needed for bitrate changing
1 parent c066dc9 commit 70b15ac

File tree

8 files changed

+169
-12
lines changed

8 files changed

+169
-12
lines changed

app/src/main/java/com/pedro/streamer/file/FromFileActivity.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import androidx.activity.result.contract.ActivityResultContracts
3131
import androidx.annotation.RequiresApi
3232
import androidx.appcompat.app.AppCompatActivity
3333
import com.pedro.common.ConnectChecker
34+
import com.pedro.common.Throughput
3435
import com.pedro.encoder.input.decoder.AudioDecoderInterface
3536
import com.pedro.encoder.input.decoder.VideoDecoderInterface
3637
import com.pedro.library.base.recording.RecordController
@@ -194,6 +195,13 @@ class FromFileActivity : AppCompatActivity(), ConnectChecker,
194195

195196
override fun onNewBitrate(bitrate: Long) {}
196197

198+
override fun onStreamingStats(
199+
bitrate: Long,
200+
bytesSent: Long,
201+
bytesQueued: Long,
202+
throughput: Throughput
203+
) {
204+
}
197205
override fun onDisconnect() {
198206
toast("Disconnected")
199207
}

app/src/main/java/com/pedro/streamer/oldapi/OldApiActivity.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import android.widget.ImageView
2525
import android.widget.Toast
2626
import androidx.appcompat.app.AppCompatActivity
2727
import com.pedro.common.ConnectChecker
28+
import com.pedro.common.Throughput
2829
import com.pedro.encoder.input.video.CameraHelper
2930
import com.pedro.encoder.input.video.CameraOpenException
3031
import com.pedro.library.base.recording.RecordController
@@ -151,6 +152,13 @@ class OldApiActivity : AppCompatActivity(), ConnectChecker, TextureView.SurfaceT
151152

152153
override fun onNewBitrate(bitrate: Long) {}
153154

155+
override fun onStreamingStats(
156+
bitrate: Long,
157+
bytesSent: Long,
158+
bytesQueued: Long,
159+
throughput: Throughput
160+
) {
161+
}
154162
override fun onDisconnect() {
155163
toast("Disconnected")
156164
}

common/src/main/java/com/pedro/common/BitrateChecker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@
2121
*/
2222
public interface BitrateChecker {
2323
default void onNewBitrate(long bitrate) {}
24+
25+
default void onStreamingStats(long bitrate, long bytesSent, long bytesQueued, Throughput throughput) {}
2426
}

common/src/main/java/com/pedro/common/BitrateManager.kt

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,63 @@ open class BitrateManager(private val bitrateChecker: BitrateChecker) {
3030
var exponentialFactor: Float = 1f
3131
private var timeStamp = TimeUtils.getCurrentTimeMillis()
3232

33+
private var byterate = 0L
34+
private var queuedBytes: Long = 0
35+
private val measureInterval = 3
36+
private val previousQueueBytesOut: MutableList<Long> = mutableListOf()
37+
38+
fun queueBytes(size: Long) {
39+
queuedBytes += size
40+
}
41+
42+
suspend fun calculateBitrateAndBandwidth(bytesSendPerSecond: Long, myQueueValue: Long) {
43+
// Track bitrate and queue management
44+
bitrate += (bytesSendPerSecond * 8)
45+
byterate += bytesSendPerSecond
46+
queuedBytes -= bytesSendPerSecond
47+
48+
val timeDiff = TimeUtils.getCurrentTimeMillis() - timeStamp
49+
if (timeDiff >= 1000) {
50+
// Calculate bitrate with exponential moving average
51+
val currentValue = (bitrate / (timeDiff / 1000f)).toLong()
52+
if (bitrateOld == 0L) { bitrateOld = currentValue }
53+
bitrateOld = (bitrateOld + exponentialFactor * (currentValue - bitrateOld)).toLong()
54+
55+
// Calculate bandwidth and throughput analysis
56+
var throughput: Throughput = Throughput.Unknown
57+
previousQueueBytesOut.add(queuedBytes)
58+
if (measureInterval <= previousQueueBytesOut.size) {
59+
var countQueuedBytesGrowing = 0
60+
for (i in 0 until previousQueueBytesOut.size - 1) {
61+
if (previousQueueBytesOut[i] < previousQueueBytesOut[i + 1]) {
62+
countQueuedBytesGrowing++
63+
}
64+
}
65+
if (countQueuedBytesGrowing == measureInterval - 1) {
66+
throughput = Throughput.Insufficient
67+
} else if (countQueuedBytesGrowing == 0) {
68+
throughput = Throughput.Sufficient
69+
}
70+
previousQueueBytesOut.removeAt(0)
71+
}
72+
73+
// Call both callbacks on main thread
74+
onMainThread {
75+
bitrateChecker.onNewBitrate(bitrateOld)
76+
bitrateChecker.onStreamingStats(
77+
((bytesSendPerSecond * 8) / (timeDiff / 1000f)).toLong(),
78+
bytesSendPerSecond,
79+
myQueueValue,
80+
throughput
81+
)
82+
}
83+
84+
// Reset all counters
85+
timeStamp = TimeUtils.getCurrentTimeMillis()
86+
bitrate = 0
87+
}
88+
}
89+
3390
suspend fun calculateBitrate(size: Long) {
3491
bitrate += size
3592
val timeDiff = TimeUtils.getCurrentTimeMillis() - timeStamp
@@ -44,7 +101,9 @@ open class BitrateManager(private val bitrateChecker: BitrateChecker) {
44101
}
45102

46103
fun reset() {
47-
bitrate = 0
104+
byterate = 0
48105
bitrateOld = 0
106+
queuedBytes = 0
107+
previousQueueBytesOut.clear()
49108
}
50109
}

common/src/main/java/com/pedro/common/StreamBlockingQueue.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,20 @@ class StreamBlockingQueue(size: Int) {
5959
}
6060

6161
fun getSize() = queue.size
62+
63+
64+
/**
65+
* Gets the total size of all items in the queue by summing the individual sizes.
66+
*
67+
* This method calculates the cumulative size of all [MediaFrame] objects currently
68+
* in the queue by summing their [MediaFrame.info.size] values. This is useful for
69+
* monitoring memory usage, bandwidth calculations, or determining the total data
70+
* volume in the queue.
71+
*
72+
* @return The total size in bytes as a [Long] value, representing the sum of all
73+
* individual frame sizes in the queue.
74+
*
75+
* @see getSize for getting the count of items in the queue
76+
*/
77+
fun getTotalSize(): Long = queue.sumOf { it.info.size.toLong() }
6278
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.pedro.common
2+
3+
public enum class Throughput {
4+
Unknown,
5+
Sufficient,
6+
Insufficient
7+
}

common/src/main/java/com/pedro/common/base/BaseSender.kt

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,21 @@ abstract class BaseSender(
4747
protected abstract suspend fun stopImp(clear: Boolean = true)
4848

4949
fun sendMediaFrame(mediaFrame: MediaFrame) {
50-
if (running && !queue.trySend(mediaFrame)) {
51-
when (mediaFrame.type) {
52-
MediaFrame.Type.VIDEO -> {
53-
Log.i(TAG, "Video frame discarded")
54-
droppedVideoFrames++
55-
}
56-
MediaFrame.Type.AUDIO -> {
57-
Log.i(TAG, "Audio frame discarded")
58-
droppedAudioFrames++
50+
if (running){
51+
if(!queue.trySend(mediaFrame)) {
52+
when (mediaFrame.type) {
53+
MediaFrame.Type.VIDEO -> {
54+
Log.i(TAG, "Video frame discarded")
55+
droppedVideoFrames++
56+
}
57+
58+
MediaFrame.Type.AUDIO -> {
59+
Log.i(TAG, "Audio frame discarded")
60+
droppedAudioFrames++
61+
}
5962
}
63+
} else {
64+
bitrateManager.queueBytes(mediaFrame.info.size.toLong())
6065
}
6166
}
6267
}
@@ -68,8 +73,7 @@ abstract class BaseSender(
6873
job = scope.launch {
6974
val bitrateTask = async {
7075
while (scope.isActive && running) {
71-
//bytes to bits
72-
bitrateManager.calculateBitrate(bytesSendPerSecond * 8)
76+
bitrateManager.calculateBitrateAndBandwidth(bytesSendPerSecond, queue.getTotalSize())
7377
bytesSendPerSecond = 0
7478
delay(timeMillis = 1000)
7579
}

common/src/test/java/com/pedro/common/BitrateManagerTest.kt

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import com.pedro.common.util.MainDispatcherRule
2020
import com.pedro.common.util.Utils
2121
import kotlinx.coroutines.test.runTest
2222
import org.junit.After
23+
import org.junit.Assert.assertEquals
2324
import org.junit.Assert.assertTrue
2425
import org.junit.Before
2526
import org.junit.Rule
@@ -76,4 +77,56 @@ class BitrateManagerTest {
7677
assertTrue(expectedResult - marginError <= resultValue.firstValue && resultValue.firstValue <= expectedResult + marginError)
7778
}
7879
}
80+
81+
@Test
82+
fun `WHEN calculateBitrateAndBandwidth called multiple times THEN return total bitrate and bandwidth stats`() = runTest {
83+
Utils.useStatics(listOf(timeUtilsMocked)) {
84+
val bitrateManager = BitrateManager(connectChecker)
85+
val fakeValues = arrayOf(100L, 200L, 300L, 400L, 500L)
86+
val queueValues = arrayOf(50L, 100L, 150L, 200L, 250L)
87+
var expectedBitrate = 0L
88+
89+
// Call calculateBitrateAndBandwidth multiple times within the same second
90+
fakeValues.forEachIndexed { index, value ->
91+
bitrateManager.calculateBitrateAndBandwidth(value, queueValues[index])
92+
expectedBitrate += (value * 8) // Convert bytes to bits
93+
}
94+
95+
// Advance time by 1 second to trigger the calculation
96+
fakeTime += 1000
97+
98+
// Call one more time to trigger the final calculation
99+
val finalValue = 100L
100+
val finalQueueValue = 300L
101+
bitrateManager.calculateBitrateAndBandwidth(finalValue, finalQueueValue)
102+
expectedBitrate += (finalValue * 8)
103+
104+
// Verify onNewBitrate was called
105+
val bitrateCaptor = argumentCaptor<Long>()
106+
verify(connectChecker, times(1)).onNewBitrate(bitrateCaptor.capture())
107+
108+
// Verify onStreamingStats was called
109+
val bitrateStatsCaptor = argumentCaptor<Long>()
110+
val bytesSentCaptor = argumentCaptor<Long>()
111+
val bytesQueuedCaptor = argumentCaptor<Long>()
112+
val throughputCaptor = argumentCaptor<Throughput>()
113+
114+
verify(connectChecker, times(1)).onStreamingStats(
115+
bitrateStatsCaptor.capture(),
116+
bytesSentCaptor.capture(),
117+
bytesQueuedCaptor.capture(),
118+
throughputCaptor.capture()
119+
)
120+
121+
// Check bitrate calculation (with margin for exponential moving average)
122+
val marginError = 100
123+
assertTrue(expectedBitrate - marginError <= bitrateCaptor.firstValue && bitrateCaptor.firstValue <= expectedBitrate + marginError)
124+
125+
// Check streaming stats
126+
assertTrue(bitrateStatsCaptor.firstValue > 0)
127+
assertTrue(bytesSentCaptor.firstValue > 0)
128+
assertEquals(finalQueueValue, bytesQueuedCaptor.firstValue)
129+
assertEquals(Throughput.Unknown, throughputCaptor.firstValue) // Should be Unknown for first measurement
130+
}
131+
}
79132
}

0 commit comments

Comments
 (0)