Flume+Kafka+SparkStreaming已经发展为一个比较成熟的实时日志收集与计算架构,利用Kafka,即可以支持将用于离线分析的数据流到HDFS,又可以同时支撑多个消费者实时消费数据,包括SparkStreaming。
实时统计需求
以60秒为间隔,统计60秒内的pv,ip数,uv
最终结果包括: 时间点:pv:ips:uv
原始日志格式
2015-11-11T14:59:59|~|xxx|~|202.109.201.181|~|xxx|~|xxx|~|xxx|~|B5C96DCA0003DB546E7 2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|B1611D0E00003857808 2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|1555BD0100016F2E76F 2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E 2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E 2015-11-11T15:00:01|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|4E3512790001039FDB9
每条日志包含7个字段,分隔符为|~|,其中,第3列为ip,第7列为cookieid。 假设原始日志已经由Flume流到Kafka中。
程序代码:
import scala.reflect.runtime.universe import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Time import org.apache.spark.streaming.kafka.KafkaUtils case class DapLog(day:String, ip:String, cookieid:String) object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } } object LogStreaming { def main (args : Array[String]) { val sparkConf = new SparkConf().setAppName("LogStreaming") //每60秒一个批次 val ssc = new StreamingContext(sparkConf, Seconds(60)) //从Kafka中读取数据,topic为daplog,该topic包含两个分区 val kafkaStream = KafkaUtils.createStream( ssc, "bj11-65:2181", //Kafka集群使用的zookeeper "group_spark_streaming", //该消费者使用的group.id Map[String, Int]("daplog" -> 0,"daplog" -> 1), //日志在Kafka中的topic及其分区 StorageLevel.MEMORY_AND_DISK_SER) .map(x => x._2.split("|~|", -1)) //日志以|~|为分隔符 kafkaStream.foreachRDD((rdd: RDD[Array[String]], time: Time) => { val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ //构造case class: DapLog,提取日志中相应的字段 val logDataFrame = rdd.map(w => DapLog(w(0).substring(0, 10),w(2),w(6))).toDF() //注册为tempTable logDataFrame.registerTempTable("daplog") //查询该批次的pv,ip数,uv val logCountsDataFrame = sqlContext.sql("select date_format(current_timestamp(),yyyy-MM-dd HH:mm:ss) as time,count(1) as pv,count(distinct ip) as ips,count(distinct cookieid) as uv from daplog") //打印查询结果 logCountsDataFrame.show() }) ssc.start() ssc.awaitTermination() } }
示例中只是将实时统计的结果打印到标准输出,真实场景一般是将结果持久化到数据库中。
运行程序
注意:插件运行时候需要依赖相应的jar包。
sbt打包jar (sbt详见 http://www.php3.cn/a/172.html)
./bin/spark-submit --class LogStreaming --master spark://hd1:7077 target/scala-2.10/LogStreaming-assembly-1.0.jar
结果
+-------------------+---+---+---+ | time| pv|ips| uv| +-------------------+---+---+---+ |2016-02-24 10:16:00| 6| 2| 5| +-------------------+---+---+---+