登录 |  注册 |  繁體中文


Kafka+SparkStreaming实现实时统计

分类: Spark 颜色:橙色 默认  字号: 阅读(2954) | 评论(0)

Flume+Kafka+SparkStreaming已经发展为一个比较成熟的实时日志收集与计算架构,利用Kafka,即可以支持将用于离线分析的数据流到HDFS,又可以同时支撑多个消费者实时消费数据,包括SparkStreaming。

实时统计需求

以60秒为间隔,统计60秒内的pv,ip数,uv
最终结果包括: 时间点:pv:ips:uv

原始日志格式

2015-11-11T14:59:59|~|xxx|~|202.109.201.181|~|xxx|~|xxx|~|xxx|~|B5C96DCA0003DB546E7
2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|B1611D0E00003857808
2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|1555BD0100016F2E76F
2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E
2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E
2015-11-11T15:00:01|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|4E3512790001039FDB9

每条日志包含7个字段,分隔符为|~|,其中,第3列为ip,第7列为cookieid。 假设原始日志已经由Flume流到Kafka中。

程序代码:

import scala.reflect.runtime.universe
 
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.kafka.KafkaUtils

case class DapLog(day:String, ip:String, cookieid:String)
 
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _
  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

object LogStreaming {
  
  def main (args : Array[String]) {
    val sparkConf = new SparkConf().setAppName("LogStreaming")
    //每60秒一个批次
    val ssc = new StreamingContext(sparkConf, Seconds(60))
    //从Kafka中读取数据,topic为daplog,该topic包含两个分区
    val kafkaStream = KafkaUtils.createStream(
          ssc, 
          "bj11-65:2181", //Kafka集群使用的zookeeper
          "group_spark_streaming", //该消费者使用的group.id
          Map[String, Int]("daplog" -> 0,"daplog" -> 1), //日志在Kafka中的topic及其分区
          StorageLevel.MEMORY_AND_DISK_SER)
      .map(x => x._2.split("|~|", -1))  //日志以|~|为分隔符
    
    kafkaStream.foreachRDD((rdd: RDD[Array[String]], time: Time) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._
      //构造case class: DapLog,提取日志中相应的字段
      val logDataFrame = rdd.map(w => DapLog(w(0).substring(0, 10),w(2),w(6))).toDF()
      //注册为tempTable
      logDataFrame.registerTempTable("daplog")
      //查询该批次的pv,ip数,uv
      val logCountsDataFrame =
        sqlContext.sql("select date_format(current_timestamp(),yyyy-MM-dd HH:mm:ss) as time,count(1) as pv,count(distinct ip) as ips,count(distinct cookieid) as uv from daplog")
      //打印查询结果
      logCountsDataFrame.show()
    })    
    
    ssc.start()
    ssc.awaitTermination()    
  }   
}

示例中只是将实时统计的结果打印到标准输出,真实场景一般是将结果持久化到数据库中。

运行程序

注意:插件运行时候需要依赖相应的jar包。

sbt打包jar (sbt详见 http://www.php3.cn/a/172.html)

./bin/spark-submit --class LogStreaming  --master  spark://hd1:7077 target/scala-2.10/LogStreaming-assembly-1.0.jar

结果

+-------------------+---+---+---+
|               time| pv|ips| uv|
+-------------------+---+---+---+
|2016-02-24 10:16:00|  6|  2|  5|
+-------------------+---+---+---+

 




姓 名: *
邮 箱:
内 容: *
验证码: 点击刷新 *   

回到顶部