登录 |  注册 |  繁體中文


Spark实例TopN

分类: Spark 颜色:橙色 默认  字号: 阅读(2148) | 评论(0)

TopN操作

网上经常见到的问题: 给定一个大文件,求里面Ip出现最多次数的前N个Ip地址和出现次数。

 bin/spark-shell  #进入spark-shell
scala> val data = sc.textFile("/dw/spark/log.txt") #加载文件
14/05/14 17:23:33 INFO MemoryStore: ensureFreeSpace(73490) called with curMem=0, maxMem=308713881
14/05/14 17:23:33 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 71.8 KB, free 294.3 MB)
data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

我们Step By Step

1. 找出每行出现的ip地址

找出IP地址有很多方法,可以按分隔符分割,也可以用正则。

这里我用正则找出IP地址了,因为只有1列是IP。

scala> data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString) take 10
14/05/14 17:41:14 INFO SparkContext: Starting job: take at <console>:15
14/05/14 17:41:14 INFO DAGScheduler: Got job 2 (take at <console>:15) with 1 output partitions (allowLocal=true)
14/05/14 17:41:14 INFO DAGScheduler: Final stage: Stage 2 (take at <console>:15)
14/05/14 17:41:14 INFO DAGScheduler: Parents of final stage: List()
14/05/14 17:41:14 INFO DAGScheduler: Missing parents: List()
14/05/14 17:41:14 INFO DAGScheduler: Computing the requested partition locally
14/05/14 17:41:14 INFO HadoopRDD: Input split: hdfs://web02.dw:9000/dw/spark/mobile.txt:0+134217728
14/05/14 17:41:14 INFO SparkContext: Job finished: take at <console>:15, took 0.072784535 s
res2: Array[String] = Array("", 61.172.242.36, "", 61.172.242.36, "", 127.0.0.1, 10.129.20.108, 10.129.20.109, 10.129.20.98, 127.0.0.1)

这里发现有的IP是空的,这些是我们不需要的。

2. 过滤掉非IP地址

scala> data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString).filter(_!="") take 10
14/05/14 17:42:26 INFO SparkContext: Starting job: take at <console>:15
14/05/14 17:42:26 INFO DAGScheduler: Got job 3 (take at <console>:15) with 1 output partitions (allowLocal=true)
14/05/14 17:42:26 INFO DAGScheduler: Final stage: Stage 3 (take at <console>:15)
14/05/14 17:42:26 INFO DAGScheduler: Parents of final stage: List()
14/05/14 17:42:26 INFO DAGScheduler: Missing parents: List()
14/05/14 17:42:26 INFO DAGScheduler: Computing the requested partition locally
14/05/14 17:42:26 INFO HadoopRDD: Input split: hdfs://web02.dw:9000/dw/spark/mobile.txt:0+134217728
14/05/14 17:42:26 INFO SparkContext: Job finished: take at <console>:15, took 0.04304932 s
res3: Array[String] = Array(61.172.242.36, 61.172.242.36, 127.0.0.1, 10.129.20.108, 10.129.20.109, 10.129.20.98, 127.0.0.1, 61.172.242.36, 127.0.0.1, 10.129.20.111)

3. 最经典的word count

map阶段
形成(IP,1) 这样的key,value的map结构。
scala> data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString).filter(_!="").map(word=>(word,1)) take 10
14/05/14 17:43:29 INFO SparkContext: Starting job: take at <console>:15
14/05/14 17:43:29 INFO DAGScheduler: Got job 5 (take at <console>:15) with 1 output partitions (allowLocal=true)
14/05/14 17:43:29 INFO DAGScheduler: Final stage: Stage 5 (take at <console>:15)
14/05/14 17:43:29 INFO DAGScheduler: Parents of final stage: List()
14/05/14 17:43:29 INFO DAGScheduler: Missing parents: List()
14/05/14 17:43:29 INFO DAGScheduler: Computing the requested partition locally
14/05/14 17:43:29 INFO HadoopRDD: Input split: hdfs://web02.dw:9000/dw/spark/mobile.txt:0+134217728
14/05/14 17:43:29 INFO SparkContext: Job finished: take at <console>:15, took 0.017397074 s
res5: Array[(String, Int)] = Array((61.172.242.36,1), (61.172.242.36,1), (127.0.0.1,1), (10.129.20.108,1), (10.129.20.109,1), (10.129.20.98,1), (127.0.0.1,1), (61.172.242.36,1), (127.0.0.1,1), (10.129.20.111,1))

reduce阶段

累加IP出现的次数。

data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString).filter(_!="").map(word=>(word,1)).reduceByKey(_+_)

这里仅仅是统计出了IP次数。

逆转map,排序

将map的k,v互换,变成(value, IP) 这样才能利用sortByKey排序找出top的IPs。

data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString).filter(_!="").map(word=>(word,1)).reduceByKey(_+_).map(word=>(word._2,word._1)).sortByKey(false).map(word=>(word._2,word._1)) take 50

 

结果:

topIps: Array[(String, Int)] = Array((127.0.0.1,3517348), (10.129.41.91,519688), (10.129.41.92,515434), (10.129.41.93,503120), (10.129.41.94,484359), 
(10.129.41.95,466621), (10.129.41.96,441576), (10.129.41.97,397698), (10.129.41.98,393082), (10.66.63.21,255484), (61.172.242.36,151109), (10.66.63.22,97582), 
(10.129.20.109,90529), (10.129.20.111,90472), (10.129.20.110,89783), (10.129.20.108,89411), (10.129.20.99,88445), (61.160.212.242,84373), (10.129.20.98,82245), 
(114.80.132.9,71766), (10.127.41.31,46312), (10.129.20.102,33377), (61.172.254.185,26054), (218.206.68.237,24529), (10.129.8.22,21361), (10.129.8.21,21136), 
(10.127.41.32,19309), (10.150.9.241,12631), (61.172.251.20,12496), (10.150.9.240,12340), (61.172.254.152,6509), (10.131.28.25,5908), (61.172.254.9,5566), 
(10.131.28.26,494...



上一篇:推荐算法分析   下一篇:Spark的Map和Reduce

姓 名: *
邮 箱:
内 容: *
验证码: 点击刷新 *   

回到顶部