Skip to content

Commit f9dd602

Browse files
committed
Improve the json parameter
1 parent 88c24e7 commit f9dd602

File tree

6 files changed

+52
-51
lines changed

6 files changed

+52
-51
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -427,21 +427,42 @@ object EventMessage extends DefaultJsonProtocol {
427427
def parse(msg: String) = Try(format.read(msg.parseJson))
428428
}
429429

430-
case class ByteSizeMessage(userMemory: ByteSize) extends Message {
431-
override def serialize = ByteSizeMessage.serdes.write(this).compactPrint
430+
case class UserMemoryMessage(userMemory: ByteSize) extends Message {
431+
override def serialize = UserMemoryMessage.serdes.write(this).compactPrint
432432
}
433433

434-
object ByteSizeMessage extends DefaultJsonProtocol {
435-
implicit val serdes = new RootJsonFormat[ByteSizeMessage] {
436-
override def write(message: ByteSizeMessage): JsValue = {
434+
object UserMemoryMessage extends DefaultJsonProtocol {
435+
implicit val serdes = new RootJsonFormat[UserMemoryMessage] {
436+
override def write(message: UserMemoryMessage): JsValue = {
437437
JsObject("userMemory" -> JsString(message.userMemory.toString))
438438
}
439439

440-
override def read(json: JsValue): ByteSizeMessage = {
440+
override def read(json: JsValue): UserMemoryMessage = {
441441
val userMemory = fromField[String](json, "userMemory")
442-
new ByteSizeMessage(ByteSize.fromString(userMemory))
442+
new UserMemoryMessage(ByteSize.fromString(userMemory))
443443
}
444444
}
445445

446446
def parse(msg: String) = Try(serdes.read(msg.parseJson))
447447
}
448+
449+
case class ConfigMemory(invoker: Int, memory: ByteSize)
450+
case class ConfigMemoryList(items: List[ConfigMemory])
451+
452+
object ConfigMemoryProtocol extends DefaultJsonProtocol {
453+
implicit val serdes = new RootJsonFormat[ByteSize] {
454+
override def write(obj: ByteSize): JsValue = JsObject("memory" -> JsString(obj.toString))
455+
456+
override def read(json: JsValue): ByteSize = {
457+
json match {
458+
case JsString(memory) => ByteSize.fromString(memory)
459+
case _ => throw new DeserializationException("Could not deserialize ByteSize")
460+
}
461+
}
462+
}
463+
implicit val configMemoryFormat = jsonFormat2(ConfigMemory)
464+
implicit object configMemoryListJsonFormat extends RootJsonFormat[ConfigMemoryList] {
465+
def read(value: JsValue) = ConfigMemoryList(value.convertTo[List[ConfigMemory]])
466+
def write(f: ConfigMemoryList) = ???
467+
}
468+
}

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.openwhisk.common.{
4141
TransactionId
4242
}
4343
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
44-
import org.apache.openwhisk.core.connector.{ByteSizeMessage, MessagingProvider}
44+
import org.apache.openwhisk.core.connector.{ConfigMemoryList, MessagingProvider, UserMemoryMessage}
4545
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
4646
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
4747
import org.apache.openwhisk.core.entitlement._
@@ -190,48 +190,28 @@ class Controller(val instance: ControllerInstanceId,
190190
/**
191191
* config user memory of ContainerPool
192192
*/
193+
import org.apache.openwhisk.core.connector.ConfigMemoryProtocol._
193194
private val configMemory = {
194195
implicit val executionContext = actorSystem.dispatcher
195196
(path("config" / "memory") & post) {
196197
extractCredentials {
197198
case Some(BasicHttpCredentials(username, password)) =>
198199
if (username == controllerCredentials.username && password == controllerCredentials.password) {
199200
entity(as[String]) { memory =>
200-
try {
201-
val userMemoryMessage = ByteSizeMessage(ByteSize.fromString(memory))
202-
if (userMemoryMessage.userMemory.size == 0) {
203-
complete(StatusCodes.BadRequest, "user memory must be positive")
204-
} else {
205-
parameter('limit.?) { limit =>
206-
limit match {
207-
case Some(targetValue) =>
208-
val pattern = """\d+:\d"""
209-
if (targetValue.matches(pattern)) {
210-
val invokerArray = targetValue.split(":")
211-
val beginIndex = invokerArray(0).toInt
212-
val finishIndex = invokerArray(1).toInt
213-
if (finishIndex < beginIndex) {
214-
complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
215-
} else {
216-
val targetInvokers = (beginIndex to finishIndex).toList
217-
loadBalancer.sendUserMemoryToInvokers(userMemoryMessage, Some(targetInvokers))
218-
logging.info(this, "config user memory request is already sent to target invokers")
219-
complete(StatusCodes.Accepted)
220-
}
221-
} else {
222-
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
223-
}
224-
case None =>
225-
loadBalancer.sendUserMemoryToInvokers(userMemoryMessage, None)
226-
logging.info(this, "config user memory request is already sent to all invokers")
227-
complete(StatusCodes.Accepted)
228-
}
229-
}
201+
val configMemoryList = memory.parseJson.convertTo[ConfigMemoryList]
202+
203+
val existIllegalUserMemory = configMemoryList.items.exists { configMemory =>
204+
MemoryLimit.MIN_MEMORY.compare(configMemory.memory) > 0
205+
}
206+
if (existIllegalUserMemory) {
207+
complete(StatusCodes.BadRequest, s"user memory can't be less than ${MemoryLimit.MIN_MEMORY}")
208+
} else {
209+
configMemoryList.items.foreach { configMemory =>
210+
val invoker = configMemory.invoker
211+
val userMemoryMessage = UserMemoryMessage(configMemory.memory)
212+
loadBalancer.sendUserMemoryToInvoker(userMemoryMessage, invoker)
230213
}
231-
} catch {
232-
case ex: IllegalArgumentException =>
233-
logging.info(this, s"error message: ${ex.getMessage}")
234-
complete(StatusCodes.BadRequest, ex.getMessage)
214+
complete(StatusCodes.Accepted)
235215
}
236216
}
237217
} else {

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ trait LoadBalancer {
6666
* @param userMemory
6767
* @param targetInvokers
6868
*/
69-
def sendUserMemoryToInvokers(userMemoryMessage: ByteSizeMessage, targetInvokers: Option[List[Int]]): Unit = {}
69+
def sendUserMemoryToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = {}
7070

7171
/**
7272
* Returns a message indicating the health of the containers and/or container pool in general.

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,9 @@ class ShardingContainerPoolBalancer(
318318
}
319319

320320
/** send user memory to invokers */
321-
override def sendUserMemoryToInvokers(userMemoryMessage: ByteSizeMessage, targetInvokers: Option[List[Int]]): Unit = {
321+
override def sendUserMemoryToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = {
322322
schedulingState.invokers.filter { invoker =>
323-
targetInvokers.getOrElse(schedulingState.invokers.map(_.id.instance)).contains(invoker.id.instance)
323+
invoker.id.instance == targetInvoker
324324
} foreach { invokerHealth =>
325325
val topic = s"invoker${invokerHealth.id.toInt}"
326326
messageProducer.send(topic, userMemoryMessage).andThen {

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.openwhisk.core.containerpool
1919

2020
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
2121
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
22-
import org.apache.openwhisk.core.connector.{ByteSizeMessage, MessageFeed}
22+
import org.apache.openwhisk.core.connector.{MessageFeed, UserMemoryMessage}
2323
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
2424
import org.apache.openwhisk.core.entity._
2525
import org.apache.openwhisk.core.entity.size._
@@ -305,11 +305,11 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
305305
case RescheduleJob =>
306306
freePool = freePool - sender()
307307
busyPool = busyPool - sender()
308-
case message: ByteSizeMessage =>
308+
case userMemoryMessage: UserMemoryMessage =>
309309
logging.info(
310310
this,
311-
s"user memory is reconfigured from ${latestUserMemory.toString} to ${message.userMemory.toString}")
312-
latestUserMemory = message.userMemory
311+
s"user memory is reconfigured from ${latestUserMemory.toString} to ${userMemoryMessage.userMemory.toString}")
312+
latestUserMemory = userMemoryMessage.userMemory
313313
case UserMemoryQuery =>
314314
sender() ! latestUserMemory.toString
315315
case EmitMetrics =>

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ class InvokerReactive(
173173
Future(
174174
ActivationMessage
175175
.parse(new String(bytes, StandardCharsets.UTF_8))
176-
.orElse(ByteSizeMessage.parse(new String(bytes, StandardCharsets.UTF_8))))
176+
.orElse(UserMemoryMessage.parse(new String(bytes, StandardCharsets.UTF_8))))
177177
.flatMap(Future.fromTry)
178178
.flatMap {
179179
case msg: ActivationMessage =>
@@ -259,7 +259,7 @@ class InvokerReactive(
259259
logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.")
260260
Future.successful(())
261261
}
262-
case msg: ByteSizeMessage =>
262+
case msg: UserMemoryMessage =>
263263
pool ! msg
264264
activationFeed ! MessageFeed.Processed
265265
Future.successful(())

0 commit comments

Comments
 (0)