Flume+Kafka+SparkStreaming已经发展为一个比较成熟的实时日志收集与计算架构,利用Kafka,即可以支持将用于离线分析的数据流到HDFS,又可以同时支撑多个消费者实时消费数据,包括SparkStreaming。
实时统计需求
以60秒为间隔,统计60秒内的pv,ip数,uv
最终结果包括: 时间点:pv:ips:uv
原始日志格式
2015-11-11T14:59:59|~|xxx|~|202.109.201.181|~|xxx|~|xxx|~|xxx|~|B5C96DCA0003DB546E7 2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|B1611D0E00003857808 2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|1555BD0100016F2E76F 2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E 2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E 2015-11-11T15:00:01|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|4E3512790001039FDB9
每条日志包含7个字段,分隔符为|~|,其中,第3列为ip,第7列为cookieid。 假设原始日志已经由Flume流到Kafka中。
程序代码:
import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.kafka.KafkaUtils
case class DapLog(day:String, ip:String, cookieid:String)
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
object LogStreaming {
def main (args : Array[String]) {
val sparkConf = new SparkConf().setAppName("LogStreaming")
//每60秒一个批次
val ssc = new StreamingContext(sparkConf, Seconds(60))
//从Kafka中读取数据,topic为daplog,该topic包含两个分区
val kafkaStream = KafkaUtils.createStream(
ssc,
"bj11-65:2181", //Kafka集群使用的zookeeper
"group_spark_streaming", //该消费者使用的group.id
Map[String, Int]("daplog" -> 0,"daplog" -> 1), //日志在Kafka中的topic及其分区
StorageLevel.MEMORY_AND_DISK_SER)
.map(x => x._2.split("|~|", -1)) //日志以|~|为分隔符
kafkaStream.foreachRDD((rdd: RDD[Array[String]], time: Time) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
//构造case class: DapLog,提取日志中相应的字段
val logDataFrame = rdd.map(w => DapLog(w(0).substring(0, 10),w(2),w(6))).toDF()
//注册为tempTable
logDataFrame.registerTempTable("daplog")
//查询该批次的pv,ip数,uv
val logCountsDataFrame =
sqlContext.sql("select date_format(current_timestamp(),yyyy-MM-dd HH:mm:ss) as time,count(1) as pv,count(distinct ip) as ips,count(distinct cookieid) as uv from daplog")
//打印查询结果
logCountsDataFrame.show()
})
ssc.start()
ssc.awaitTermination()
}
}
示例中只是将实时统计的结果打印到标准输出,真实场景一般是将结果持久化到数据库中。
运行程序
注意:插件运行时候需要依赖相应的jar包。
sbt打包jar (sbt详见 http://www.php3.cn/a/172.html)
./bin/spark-submit --class LogStreaming --master spark://hd1:7077 target/scala-2.10/LogStreaming-assembly-1.0.jar
结果
+-------------------+---+---+---+ | time| pv|ips| uv| +-------------------+---+---+---+ |2016-02-24 10:16:00| 6| 2| 5| +-------------------+---+---+---+

