TopN操作
网上经常见到的问题: 给定一个大文件,求里面Ip出现最多次数的前N个Ip地址和出现次数。
bin/spark-shell #进入spark-shell scala> val data = sc.textFile("/dw/spark/log.txt") #加载文件 14/05/14 17:23:33 INFO MemoryStore: ensureFreeSpace(73490) called with curMem=0, maxMem=308713881 14/05/14 17:23:33 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 71.8 KB, free 294.3 MB) data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
我们Step By Step
1. 找出每行出现的ip地址
找出IP地址有很多方法,可以按分隔符分割,也可以用正则。
这里我用正则找出IP地址了,因为只有1列是IP。
scala> data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString) take 10 14/05/14 17:41:14 INFO SparkContext: Starting job: take at <console>:15 14/05/14 17:41:14 INFO DAGScheduler: Got job 2 (take at <console>:15) with 1 output partitions (allowLocal=true) 14/05/14 17:41:14 INFO DAGScheduler: Final stage: Stage 2 (take at <console>:15) 14/05/14 17:41:14 INFO DAGScheduler: Parents of final stage: List() 14/05/14 17:41:14 INFO DAGScheduler: Missing parents: List() 14/05/14 17:41:14 INFO DAGScheduler: Computing the requested partition locally 14/05/14 17:41:14 INFO HadoopRDD: Input split: hdfs://web02.dw:9000/dw/spark/mobile.txt:0+134217728 14/05/14 17:41:14 INFO SparkContext: Job finished: take at <console>:15, took 0.072784535 s res2: Array[String] = Array("", 61.172.242.36, "", 61.172.242.36, "", 127.0.0.1, 10.129.20.108, 10.129.20.109, 10.129.20.98, 127.0.0.1)
这里发现有的IP是空的,这些是我们不需要的。
2. 过滤掉非IP地址
scala> data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString).filter(_!="") take 10 14/05/14 17:42:26 INFO SparkContext: Starting job: take at <console>:15 14/05/14 17:42:26 INFO DAGScheduler: Got job 3 (take at <console>:15) with 1 output partitions (allowLocal=true) 14/05/14 17:42:26 INFO DAGScheduler: Final stage: Stage 3 (take at <console>:15) 14/05/14 17:42:26 INFO DAGScheduler: Parents of final stage: List() 14/05/14 17:42:26 INFO DAGScheduler: Missing parents: List() 14/05/14 17:42:26 INFO DAGScheduler: Computing the requested partition locally 14/05/14 17:42:26 INFO HadoopRDD: Input split: hdfs://web02.dw:9000/dw/spark/mobile.txt:0+134217728 14/05/14 17:42:26 INFO SparkContext: Job finished: take at <console>:15, took 0.04304932 s res3: Array[String] = Array(61.172.242.36, 61.172.242.36, 127.0.0.1, 10.129.20.108, 10.129.20.109, 10.129.20.98, 127.0.0.1, 61.172.242.36, 127.0.0.1, 10.129.20.111)
3. 最经典的word count
map阶段:
形成(IP,1) 这样的key,value的map结构。
scala> data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString).filter(_!="").map(word=>(word,1)) take 10 14/05/14 17:43:29 INFO SparkContext: Starting job: take at <console>:15 14/05/14 17:43:29 INFO DAGScheduler: Got job 5 (take at <console>:15) with 1 output partitions (allowLocal=true) 14/05/14 17:43:29 INFO DAGScheduler: Final stage: Stage 5 (take at <console>:15) 14/05/14 17:43:29 INFO DAGScheduler: Parents of final stage: List() 14/05/14 17:43:29 INFO DAGScheduler: Missing parents: List() 14/05/14 17:43:29 INFO DAGScheduler: Computing the requested partition locally 14/05/14 17:43:29 INFO HadoopRDD: Input split: hdfs://web02.dw:9000/dw/spark/mobile.txt:0+134217728 14/05/14 17:43:29 INFO SparkContext: Job finished: take at <console>:15, took 0.017397074 s res5: Array[(String, Int)] = Array((61.172.242.36,1), (61.172.242.36,1), (127.0.0.1,1), (10.129.20.108,1), (10.129.20.109,1), (10.129.20.98,1), (127.0.0.1,1), (61.172.242.36,1), (127.0.0.1,1), (10.129.20.111,1))
reduce阶段:
累加IP出现的次数。
data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString).filter(_!="").map(word=>(word,1)).reduceByKey(_+_)
这里仅仅是统计出了IP次数。
逆转map,排序
将map的k,v互换,变成(value, IP) 这样才能利用sortByKey排序找出top的IPs。
data.map(line=>"""d+.d+.d+.d+""".r.findAllIn(line).mkString).filter(_!="").map(word=>(word,1)).reduceByKey(_+_).map(word=>(word._2,word._1)).sortByKey(false).map(word=>(word._2,word._1)) take 50
结果:
topIps: Array[(String, Int)] = Array((127.0.0.1,3517348), (10.129.41.91,519688), (10.129.41.92,515434), (10.129.41.93,503120), (10.129.41.94,484359), (10.129.41.95,466621), (10.129.41.96,441576), (10.129.41.97,397698), (10.129.41.98,393082), (10.66.63.21,255484), (61.172.242.36,151109), (10.66.63.22,97582), (10.129.20.109,90529), (10.129.20.111,90472), (10.129.20.110,89783), (10.129.20.108,89411), (10.129.20.99,88445), (61.160.212.242,84373), (10.129.20.98,82245), (114.80.132.9,71766), (10.127.41.31,46312), (10.129.20.102,33377), (61.172.254.185,26054), (218.206.68.237,24529), (10.129.8.22,21361), (10.129.8.21,21136), (10.127.41.32,19309), (10.150.9.241,12631), (61.172.251.20,12496), (10.150.9.240,12340), (61.172.254.152,6509), (10.131.28.25,5908), (61.172.254.9,5566), (10.131.28.26,494...