Skip to content

Commit 1274fdf

Browse files
authored
Adjust prewarm container dynamically (#4871)
* Adjust prewarm container dynamically Co-authored-by: ning.yougang <[email protected]>
1 parent 97c9f7a commit 1274fdf

File tree

14 files changed

+653
-112
lines changed

14 files changed

+653
-112
lines changed

ansible/files/runtimes.json

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,15 @@
5757
},
5858
"stemCells": [
5959
{
60-
"count": 2,
61-
"memory": "256 MB"
60+
"initialCount": 2,
61+
"memory": "256 MB",
62+
"reactive": {
63+
"minCount": 1,
64+
"maxCount": 4,
65+
"ttl": "2 minutes",
66+
"threshold": 1,
67+
"increment": 1
68+
}
6269
}
6370
]
6471
},

ansible/roles/invoker/tasks/deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@
278278
"CONFIG_whisk_invoker_https_keystorePassword": "{{ invoker.ssl.keystore.password }}"
279279
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
280280
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
281+
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
281282

282283
- name: extend invoker dns env
283284
set_fact:

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,12 @@ object LoggingMarkers {
511511
LogMarkerToken(containerPool, "prewarmSize", counter)(MeasurementUnit.information.megabytes)
512512
val CONTAINER_POOL_IDLES_COUNT =
513513
LogMarkerToken(containerPool, "idlesCount", counter)(MeasurementUnit.none)
514+
def CONTAINER_POOL_PREWARM_COLDSTART(memory: String, kind: String) =
515+
LogMarkerToken(containerPool, "prewarmColdstart", counter, None, Map("memory" -> memory, "kind" -> kind))(
516+
MeasurementUnit.none)
517+
def CONTAINER_POOL_PREWARM_EXPIRED(memory: String, kind: String) =
518+
LogMarkerToken(containerPool, "prewarmExpired", counter, None, Map("memory" -> memory, "kind" -> kind))(
519+
MeasurementUnit.none)
514520
val CONTAINER_POOL_IDLES_SIZE =
515521
LogMarkerToken(containerPool, "idlesSize", counter)(MeasurementUnit.information.megabytes)
516522

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.openwhisk.core.entity.{ByteSize, ExecManifest, ExecutableWhisk
2424
import org.apache.openwhisk.spi.Spi
2525

2626
import scala.concurrent.Future
27+
import scala.concurrent.duration.FiniteDuration
2728
import scala.math.max
2829

2930
case class ContainerArgsConfig(network: String,
@@ -43,11 +44,16 @@ case class ContainerArgsConfig(network: String,
4344
}.toMap
4445
}
4546

46-
case class ContainerPoolConfig(userMemory: ByteSize, concurrentPeekFactor: Double, akkaClient: Boolean) {
47+
case class ContainerPoolConfig(userMemory: ByteSize,
48+
concurrentPeekFactor: Double,
49+
akkaClient: Boolean,
50+
prewarmExpirationCheckInterval: FiniteDuration) {
4751
require(
4852
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
4953
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
5054

55+
require(prewarmExpirationCheckInterval.toSeconds > 0, "prewarmExpirationCheckInterval must be > 0")
56+
5157
/**
5258
* The shareFactor indicates the number of containers that would share a single core, on average.
5359
* cpuShare is a docker option (-c) whereby a container's CPU access is limited.

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ import spray.json.DefaultJsonProtocol._
2626
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
2727
import org.apache.openwhisk.core.entity.Attachments._
2828
import org.apache.openwhisk.core.entity.Attachments.Attached._
29-
import fastparse._, NoWhitespace._
29+
import fastparse._
30+
import NoWhitespace._
31+
32+
import scala.concurrent.duration.{Duration, FiniteDuration}
3033

3134
/**
3235
* Reads manifest of supported runtimes from configuration file and stores
@@ -135,11 +138,37 @@ protected[core] object ExecManifest {
135138
/**
136139
* A stemcell configuration read from the manifest for a container image to be initialized by the container pool.
137140
*
138-
* @param count the number of stemcell containers to create
141+
* @param initialCount the initial number of stemcell containers to create
139142
* @param memory the max memory this stemcell will allocate
143+
* @param reactive the reactive prewarming prewarmed config, which is disabled by default
140144
*/
141-
protected[entity] case class StemCell(count: Int, memory: ByteSize) {
142-
require(count > 0, "count must be positive")
145+
protected[entity] case class StemCell(initialCount: Int,
146+
memory: ByteSize,
147+
reactive: Option[ReactivePrewarmingConfig] = None) {
148+
require(initialCount > 0, "initialCount must be positive")
149+
}
150+
151+
/**
152+
* A stemcell's ReactivePrewarmingConfig configuration
153+
*
154+
* @param minCount the max number of stemcell containers to exist
155+
* @param maxCount the max number of stemcell containers to create
156+
* @param ttl time to live of the prewarmed container
157+
* @param threshold the executed activation number of cold start in previous one minute
158+
* @param increment increase per increment prewarmed number under per threshold activations
159+
*/
160+
protected[core] case class ReactivePrewarmingConfig(minCount: Int,
161+
maxCount: Int,
162+
ttl: FiniteDuration,
163+
threshold: Int,
164+
increment: Int) {
165+
require(
166+
minCount >= 0 && minCount <= maxCount,
167+
"minCount must be be greater than 0 and less than or equal to maxCount")
168+
require(maxCount > 0, "maxCount must be positive")
169+
require(ttl.toMillis > 0, "ttl must be positive")
170+
require(threshold > 0, "threshold must be positive")
171+
require(increment > 0 && increment <= maxCount, "increment must be positive and less than or equal to maxCount")
143172
}
144173

145174
/**
@@ -344,9 +373,45 @@ protected[core] object ExecManifest {
344373

345374
protected[entity] implicit val imageNameSerdes: RootJsonFormat[ImageName] = jsonFormat4(ImageName.apply)
346375

347-
protected[entity] implicit val stemCellSerdes: RootJsonFormat[StemCell] = {
376+
protected[entity] implicit val ttlSerdes: RootJsonFormat[FiniteDuration] = new RootJsonFormat[FiniteDuration] {
377+
override def write(finiteDuration: FiniteDuration): JsValue = JsString(finiteDuration.toString)
378+
379+
override def read(value: JsValue): FiniteDuration = value match {
380+
case JsString(s) =>
381+
val duration = Duration(s)
382+
FiniteDuration(duration.length, duration.unit)
383+
case _ =>
384+
deserializationError("time unit not supported. Only milliseconds, seconds, minutes, hours, days are supported")
385+
}
386+
}
387+
388+
protected[entity] implicit val reactivePrewarmingConfigSerdes: RootJsonFormat[ReactivePrewarmingConfig] = jsonFormat5(
389+
ReactivePrewarmingConfig.apply)
390+
391+
protected[entity] implicit val stemCellSerdes = new RootJsonFormat[StemCell] {
348392
import org.apache.openwhisk.core.entity.size.serdes
349-
jsonFormat2(StemCell.apply)
393+
val defaultSerdes = jsonFormat3(StemCell.apply)
394+
override def read(value: JsValue): StemCell = {
395+
val fields = value.asJsObject.fields
396+
val initialCount: Option[Int] =
397+
fields
398+
.get("initialCount")
399+
.orElse(fields.get("count"))
400+
.map(_.convertTo[Int])
401+
val memory: Option[ByteSize] = fields.get("memory").map(_.convertTo[ByteSize])
402+
val config = fields.get("reactive").map(_.convertTo[ReactivePrewarmingConfig])
403+
404+
(initialCount, memory) match {
405+
case (Some(c), Some(m)) => StemCell(c, m, config)
406+
case (Some(c), None) =>
407+
throw new IllegalArgumentException(s"memory is required, just provide initialCount: ${c}")
408+
case (None, Some(m)) =>
409+
throw new IllegalArgumentException(s"initialCount is required, just provide memory: ${m.toString}")
410+
case _ => throw new IllegalArgumentException("both initialCount and memory are required")
411+
}
412+
}
413+
414+
override def write(s: StemCell) = defaultSerdes.write(s)
350415
}
351416

352417
protected[entity] implicit val runtimeManifestSerdes: RootJsonFormat[RuntimeManifest] = jsonFormat8(RuntimeManifest)

core/invoker/src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ whisk {
6060
user-memory: 1024 m
6161
concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
6262
akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
63+
prewarm-expiration-check-interval: 1 minute
6364
}
6465

6566
kubernetes {

0 commit comments

Comments
 (0)