From 0ea614f26e438f2b0ed37bb7c9c346d525f1c4f9 Mon Sep 17 00:00:00 2001 From: Aditya Tiwari Date: Sun, 26 Dec 2021 19:19:02 +0530 Subject: [PATCH 1/3] Adding SparkPulsarReliableReceiver --- pom.xml | 6 +- pulsar-spark/pom.xml | 2 +- .../pulsar/spark/SparkPulsarMessage.scala | 7 + ...SparkStreamingReliablePulsarReceiver.scala | 158 ++++++++++++++++++ tests/pulsar-spark-test/pom.xml | 2 +- 5 files changed, 170 insertions(+), 5 deletions(-) create mode 100644 pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkPulsarMessage.scala create mode 100644 pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkStreamingReliablePulsarReceiver.scala diff --git a/pom.xml b/pom.xml index c69107e..ad96eb6 100644 --- a/pom.xml +++ b/pom.xml @@ -184,7 +184,7 @@ 2.6.2 0.9.0 0.7.3 - 2.1.0 + 2.4.4 3.18.1 1.18.20 1.3.2 @@ -1006,8 +1006,8 @@ org.apache.spark - spark-streaming_2.10 - ${spark-streaming_2.10.version} + spark-streaming_2.11 + ${spark-streaming_2.11.version} com.google.guava diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml index 3736168..ce42f30 100644 --- a/pulsar-spark/pom.xml +++ b/pulsar-spark/pom.xml @@ -53,7 +53,7 @@ org.apache.spark - spark-streaming_2.10 + spark-streaming_2.11 jackson-annotations diff --git a/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkPulsarMessage.scala b/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkPulsarMessage.scala new file mode 100644 index 0000000..6098b42 --- /dev/null +++ b/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkPulsarMessage.scala @@ -0,0 +1,7 @@ +package org.apache.pulsar.spark + +import org.apache.pulsar.client.api.MessageId + +// Wrapper class for PubSub Message since Message is not Serializable +case class SparkPulsarMessage(data: Array[Byte], key: String, messageId: MessageId, publishTime: Long, eventTime: Long, topicName: String, properties: Map[String, String]) extends Serializable + diff --git a/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkStreamingReliablePulsarReceiver.scala b/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkStreamingReliablePulsarReceiver.scala new file mode 100644 index 0000000..63dce90 --- /dev/null +++ b/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkStreamingReliablePulsarReceiver.scala @@ -0,0 +1,158 @@ +package org.apache.pulsar.spark + +import com.google.common.util.concurrent.RateLimiter +import org.apache.pulsar.client.api._ +import org.apache.pulsar.client.impl.PulsarClientImpl +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import org.slf4j.LoggerFactory + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +/** + * Custom spark receiver to pull messages from Pubsub topic and push into reliable store. + * If backpressure is enabled,the message ingestion rate for this receiver will be managed by Spark. + * + * Following spark configurations can be used to control rates and block size + * spark.streaming.backpressure.enabled + * spark.streaming.backpressure.initialRate + * spark.streaming.receiver.maxRate + * spark.streaming.blockQueueSize: Controlling block size + * spark.streaming.backpressure.pid.minRate + * + * See Spark streaming configurations doc + * pay._1 -> pay._2.asInstanceOf[AnyRef])) + .topics(topics).receiverQueueSize(maxPollSize).consumerName(consumerName) + .subscribe() + } catch { + case e: Exception => + SparkStreamingReliablePulsarReceiver.LOG.error("Failed to start subscription : {}", e.getMessage) + e.printStackTrace() + restart("Restart a consumer") + } + latestStorePushTime = System.currentTimeMillis() + new Thread() { + override def run() { + receive() + } + }.start() + + } + + override def onStop(): Unit = try { + if (consumer != null) consumer.close() + if (pulsarClient != null) pulsarClient.close() + } catch { + case e: PulsarClientException => + SparkStreamingReliablePulsarReceiver.LOG.error("Failed to close client : {}", e.getMessage) + } + + // Function that continuously keeps polling records from pulsar and store them. + def receive(): Unit = { + while(!isStopped()){ + try { + val messages= consumer.batchReceive().toList + + // Update rate limit if necessary + updateRateLimit + + if(messages.size() > 0){ + buffer ++= messages.map(msg => { + SparkPulsarMessage(msg.getData, msg.getKey, msg.getMessageId, msg.getPublishTime, msg.getEventTime, msg.getTopicName, msg.getProperties.asScala.toMap) + }) + + } + + val timeSinceLastStorePush = System.currentTimeMillis() - latestStorePushTime + + // Push messages to spark store if `blockIntervalMs` has passed or we have more messages than blockSize + if(timeSinceLastStorePush >= blockIntervalMs || buffer.length >= blockSize) { + val (completeBlocks, partialBlock) = buffer.grouped(blockSize) + .partition(block => block.size == blockSize) + val blocksToStore = if (completeBlocks.hasNext) completeBlocks else partialBlock + + buffer.clear() + + if(completeBlocks.hasNext && partialBlock.hasNext) + buffer.appendAll(partialBlock.next()) + + while (blocksToStore.hasNext){ + val groupedMessages = blocksToStore.next().toList + SparkStreamingReliablePulsarReceiver.LOG.debug("Pushing " + groupedMessages.size + " messages in store.") + rateLimiter.acquire(groupedMessages.size) + store(groupedMessages.toIterator) + if(autoAcknowledge) + consumer.acknowledge(groupedMessages.map(_.messageId)) + } + latestStorePushTime = System.currentTimeMillis() + } + + } + catch { + case e: Exception => reportError("Failed to get messages", e) + e.printStackTrace() + } + } + } + + def updateRateLimit(): Unit = { + val newRateLimit = rateMultiplierFactor * supervisor.getCurrentRateLimit.min(maxRateLimit) + SparkStreamingReliablePulsarReceiver.LOG.info("New rate limit: " + newRateLimit) + if (rateLimiter.getRate != newRateLimit) { + rateLimiter.setRate(newRateLimit) + } + } + +} + +object SparkStreamingReliablePulsarReceiver { + private val LOG = LoggerFactory.getLogger(classOf[SparkStreamingReliablePulsarReceiver]) +} \ No newline at end of file diff --git a/tests/pulsar-spark-test/pom.xml b/tests/pulsar-spark-test/pom.xml index 50b6d7d..a7faf10 100644 --- a/tests/pulsar-spark-test/pom.xml +++ b/tests/pulsar-spark-test/pom.xml @@ -57,7 +57,7 @@ org.apache.spark - spark-streaming_2.10 + spark-streaming_2.11 test From f8917f660432a5c991a8a356e245c405470b6f2b Mon Sep 17 00:00:00 2001 From: Aditya Tiwari Date: Wed, 29 Dec 2021 15:41:27 +0530 Subject: [PATCH 2/3] Adding receiver thread close check and repackaging --- .../pulsar/spark/SparkPulsarMessage.scala | 7 ----- .../streaming/pulsar/SparkPulsarMessage.scala | 25 ++++++++++++++++ ...SparkStreamingReliablePulsarReceiver.scala | 29 +++++++++++++++++-- 3 files changed, 51 insertions(+), 10 deletions(-) delete mode 100644 pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkPulsarMessage.scala create mode 100644 pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala rename pulsar-spark/src/main/scala/org/apache/{pulsar/spark => spark/streaming/pulsar}/SparkStreamingReliablePulsarReceiver.scala (86%) diff --git a/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkPulsarMessage.scala b/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkPulsarMessage.scala deleted file mode 100644 index 6098b42..0000000 --- a/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkPulsarMessage.scala +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.pulsar.spark - -import org.apache.pulsar.client.api.MessageId - -// Wrapper class for PubSub Message since Message is not Serializable -case class SparkPulsarMessage(data: Array[Byte], key: String, messageId: MessageId, publishTime: Long, eventTime: Long, topicName: String, properties: Map[String, String]) extends Serializable - diff --git a/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala new file mode 100644 index 0000000..49292b5 --- /dev/null +++ b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkPulsarMessage.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.streaming.pulsar + +import org.apache.pulsar.client.api.MessageId + +// Wrapper class for PubSub Message since Message is not Serializable +case class SparkPulsarMessage(data: Array[Byte], key: String, messageId: MessageId, publishTime: Long, eventTime: Long, topicName: String, properties: Map[String, String]) extends Serializable + diff --git a/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkStreamingReliablePulsarReceiver.scala b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala similarity index 86% rename from pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkStreamingReliablePulsarReceiver.scala rename to pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala index 63dce90..eaf6de5 100644 --- a/pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkStreamingReliablePulsarReceiver.scala +++ b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala @@ -1,4 +1,22 @@ -package org.apache.pulsar.spark +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.streaming.pulsar import com.google.common.util.concurrent.RateLimiter import org.apache.pulsar.client.api._ @@ -61,6 +79,7 @@ class SparkStreamingReliablePulsarReceiver(override val storageLevel: StorageLev private var pulsarClient: PulsarClient = null private var consumer: Consumer[Array[Byte]] = null + private var consumerThread: Thread = null var latestStorePushTime: Long = System.currentTimeMillis() @@ -79,15 +98,19 @@ class SparkStreamingReliablePulsarReceiver(override val storageLevel: StorageLev restart("Restart a consumer") } latestStorePushTime = System.currentTimeMillis() - new Thread() { + + consumerThread = new Thread() { override def run() { receive() } - }.start() + } + + consumerThread.start() } override def onStop(): Unit = try { + if (consumerThread != null && consumerThread.isAlive) consumerThread.stop() // Ideally consumrThread should be closed beforee calling onStop. if (consumer != null) consumer.close() if (pulsarClient != null) pulsarClient.close() } catch { From 3a831623fde508964c9ef3288cd1a2d3a8a102cd Mon Sep 17 00:00:00 2001 From: Aditya Tiwari Date: Tue, 25 Jan 2022 17:32:48 +0530 Subject: [PATCH 3/3] Adding pulgin for scala and some minor changes --- pulsar-spark/pom.xml | 16 ++++++++++++++++ .../SparkStreamingReliablePulsarReceiver.scala | 18 ++++++++++++------ 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml index ce42f30..cffc168 100644 --- a/pulsar-spark/pom.xml +++ b/pulsar-spark/pom.xml @@ -69,6 +69,22 @@ + + net.alchim31.maven + scala-maven-plugin + 4.3.0 + + + + compile + add-source + + + + + -target:jvm-1.8 + + org.apache.maven.plugins maven-shade-plugin diff --git a/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala index eaf6de5..d4a02ac 100644 --- a/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala +++ b/pulsar-spark/src/main/scala/org/apache/spark/streaming/pulsar/SparkStreamingReliablePulsarReceiver.scala @@ -110,7 +110,7 @@ class SparkStreamingReliablePulsarReceiver(override val storageLevel: StorageLev } override def onStop(): Unit = try { - if (consumerThread != null && consumerThread.isAlive) consumerThread.stop() // Ideally consumrThread should be closed beforee calling onStop. + consumerThread.join(30000L) if (consumer != null) consumer.close() if (pulsarClient != null) pulsarClient.close() } catch { @@ -122,17 +122,23 @@ class SparkStreamingReliablePulsarReceiver(override val storageLevel: StorageLev def receive(): Unit = { while(!isStopped()){ try { - val messages= consumer.batchReceive().toList - // Update rate limit if necessary updateRateLimit - if(messages.size() > 0){ + val messages = consumer.batchReceive() + + if (messages != null && messages.size() > 0) { buffer ++= messages.map(msg => { SparkPulsarMessage(msg.getData, msg.getKey, msg.getMessageId, msg.getPublishTime, msg.getEventTime, msg.getTopicName, msg.getProperties.asScala.toMap) }) } + } + catch { + case e: Exception => reportError("Failed to get messages", e) + } + + try { val timeSinceLastStorePush = System.currentTimeMillis() - latestStorePushTime @@ -160,7 +166,7 @@ class SparkStreamingReliablePulsarReceiver(override val storageLevel: StorageLev } catch { - case e: Exception => reportError("Failed to get messages", e) + case e: Exception => reportError("Failed to store messages", e) e.printStackTrace() } } @@ -168,8 +174,8 @@ class SparkStreamingReliablePulsarReceiver(override val storageLevel: StorageLev def updateRateLimit(): Unit = { val newRateLimit = rateMultiplierFactor * supervisor.getCurrentRateLimit.min(maxRateLimit) - SparkStreamingReliablePulsarReceiver.LOG.info("New rate limit: " + newRateLimit) if (rateLimiter.getRate != newRateLimit) { + SparkStreamingReliablePulsarReceiver.LOG.info("New rate limit: " + newRateLimit) rateLimiter.setRate(newRateLimit) } }