博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming之优雅停止
阅读量:4218 次
发布时间:2019-05-26

本文共 5294 字,大约阅读时间需要 17 分钟。

1.监听redis的某个key是否存在

/**  * 优雅的停止Streaming程序  *  * @param ssc  */def stopByMarkKey(ssc: StreamingContext): Unit = {  val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在    var isStop = false    while (!isStop) {      isStop = ssc.awaitTerminationOrTimeout(intervalMills)        if (!isStop && isExists(STOP_FLAG)) {          LOG.warn("2秒后开始关闭sparstreaming程序.....")            Thread.sleep(2000)            ssc.stop(true, true)        }    }}/**    * 判断Key是否存在    *    * @param key    * @return    */def isExists(key: String): Boolean = {  val jedis = InternalRedisClient.getPool.getResource    val flag = jedis.exists(key)    jedis.close()    flag}

2.KafkaRedisStreaming

package me.jinkun.streamimport me.jinkun.scala.ETLStreaming.LOGimport me.jinkun.scala.util.InternalRedisClientimport org.apache.kafka.clients.consumer.ConsumerConfigimport org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext, TaskContext}import org.slf4j.LoggerFactory/**  *  */object KafkaRedisStreaming {  private val LOG = LoggerFactory.getLogger("KafkaRedisStreaming")  private val STOP_FLAG = "TEST_STOP_FLAG"  def initRedisPool() = {    // Redis configurations    val maxTotal = 20    val maxIdle = 10    val minIdle = 1    val redisHost = "47.98.119.122"    val redisPort = 6379    val redisTimeout = 30000    InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)  }  /**    * 从redis里获取Topic的offset值    *    * @param topicName    * @param partitions    * @return    */  def getLastCommittedOffsets(topicName: String, partitions: Int): Map[TopicPartition, Long] = {    if (LOG.isInfoEnabled())      LOG.info("||--Topic:{},getLastCommittedOffsets from Redis--||", topicName)    //从Redis获取上一次存的Offset    val jedis = InternalRedisClient.getPool.getResource    val fromOffsets = collection.mutable.HashMap.empty[TopicPartition, Long]    for (partition <- 0 to partitions - 1) {      val topic_partition_key = topicName + "_" + partition      val lastSavedOffset = jedis.get(topic_partition_key)      val lastOffset = if (lastSavedOffset == null) 0L else lastSavedOffset.toLong      fromOffsets += (new TopicPartition(topicName, partition) -> lastOffset)    }    jedis.close()    fromOffsets.toMap  }  def main(args: Array[String]): Unit = {    //初始化Redis Pool    initRedisPool()    val conf = new SparkConf()      .setAppName("ScalaKafkaStream")      .setMaster("local[2]")    val sc = new SparkContext(conf)    sc.setLogLevel("WARN")    val ssc = new StreamingContext(sc, Seconds(3))    val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"    val groupId = "kafka-test-group"    val topicName = "Test"    val maxPoll = 20000    val kafkaParams = Map(      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,      ConsumerConfig.GROUP_ID_CONFIG -> groupId,      ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]    )    // 这里指定Topic的Partition的总数    val fromOffsets = getLastCommittedOffsets(topicName, 3)    // 初始化KafkaDS    val kafkaTopicDS =      KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets))    kafkaTopicDS.foreachRDD(rdd => {      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges      // 如果rdd有数据      if (!rdd.isEmpty()) {        val jedis = InternalRedisClient.getPool.getResource        val p = jedis.pipelined()        p.multi() //开启事务        // 处理数据        rdd          .map(_.value)          .flatMap(_.split(" "))          .map(x => (x, 1L))          .reduceByKey(_ + _)          .sortBy(_._2, false)          .foreach(println)        //更新Offset        offsetRanges.foreach { offsetRange =>          println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)          val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition          p.set(topic_partition_key, offsetRange.untilOffset + "")        }        p.exec() //提交事务        p.sync //关闭pipeline        jedis.close()      }    })    ssc.start()    // 优雅停止    stopByMarkKey(ssc)    ssc.awaitTermination()  }  /**    * 优雅停止    *    * @param ssc    */  def stopByMarkKey(ssc: StreamingContext): Unit = {    val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在    var isStop = false    while (!isStop) {      isStop = ssc.awaitTerminationOrTimeout(intervalMills)      if (!isStop && isExists(STOP_FLAG)) {        LOG.warn("2秒后开始关闭sparstreaming程序.....")        Thread.sleep(2000)        ssc.stop(true, true)      }    }  }  /**    * 判断Key是否存在    *    * @param key    * @return    */  def isExists(key: String): Boolean = {    val jedis = InternalRedisClient.getPool.getResource    val flag = jedis.exists(key)    jedis.close()    flag  }}

这样只需要在Redis里创建TEST_STOP_FLAG即可

set "TEST_STOP_FLAG" 1

运行结果:

这样在停止SparkStreaming程序是就不会造成数据丢失了。

转载地址:http://zxvmi.baihongyu.com/

你可能感兴趣的文章
java scoket 编程实例
查看>>
Struts 1 和 Struts 2 的线程安全
查看>>
数据库视图作用
查看>>
数据库查询的5种视图以及作用
查看>>
为什么要使用Hibernate
查看>>
Hibernate工作原理及为什么要用?
查看>>
Hibernate如何调用存储过程
查看>>
Hibernate HQL 插入,查询,更新
查看>>
Hibernate的HQL查询语句对比Sql语句学习
查看>>
struts标签bean:cookie,bean:write,logic:page,logic:present,logic:iterate使用实例
查看>>
为什么要使用Spring
查看>>
Spring Ioc与工厂模式的区别
查看>>
AspectJ AOP实现
查看>>
Java工厂模式Ioc和AOP 框架设计
查看>>
Spring框架与AOP思想的研究与应用(1)
查看>>
Spring框架与AOP思想的研究与应用(2)
查看>>
一个简单的Spring的AOP例子
查看>>
反射实现 AOP 动态代理模式(Spring AOP 的实现 原理)
查看>>
追MM与23种设计模式
查看>>
SPRING设计思想之工厂模式
查看>>