Spark shell提供了一个简单方式去学习API,它也是一个交互式分析数据的强大工具。 你既可以使用Scala(运行在JVM之上,所以可以使用众多的Java库),也可以使用Python。运行Spark文件夹下的的命令:
./bin/spark-shell
Spark最主要的一个抽象出来的概念就是分布式的数据集合, 也就是弹性分布式数据集Resilient Distributed Dataset (RDD). RDD可以从Hadoop InputFormats (比如HDFS文件)创建, 也可以通过其它RDD转换(transforming)得到
基本操作
在Spark中使用README 文件创建textFileRDD
scala> val textFile = sc.textFile("README.md")
获取textFile RDD的第一个元素
scala>textFile.first() // RDD 中的第一个 item,对于文本文件,就是第一行内容 res3: String = # Apache Spark
对textFile RDD中的数据进行过滤操作,返回所有包含“Spark”关键字的行,操作完成后会返回一个新的RDD,操作完成后可以对返回的RDD的行进行计数
筛选出包括Spark关键字的RDD然后进行行计数
scala>val linesWithSpark = textFile.filter(line => line.contains("Spark")) // 筛选出包含 Spark 的行 scala>linesWithSpark.count() // RDD 中的 item 数量,对于文本文件,就是总行数 res10: Long = 19
要找出RDD linesWithSpark单词出现最多的行,可以使用下列操作。使用map方法,将RDD中的各行映射成一个数,然后再使用reduce方法找出包含单词数最多的行。
找出RDD textFile 中包含单词数最多的行
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res11: Int = 14
返回结果表明第14行单词数最多。
代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到最大的数。
也可以引入其它java包,例如 Math.max()方法,因为map和reduce方法接受scala函数字面量作为参数。
在scala shell中引入Java方法
scala>import java.lang.Math scala>textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res12: Int = 14
我们可以很容易地将数据缓存到内存当中。
将RDD linesWithSpark 缓存,然后进行行计数
Spark也支持将数据集放入集群的内存中缓存起来. 当数据重复访问时特别有用, 比如查询一个小的 “hot”数据集或者运行一个交互式算法PageRank. 看一个简单的例子, 我们把上面的linesWithSpark数据集缓存起来:
scala>linesWithSpark.cache() res13: linesWithSpark.type = MapPartitionsRDD[8] at filter at <console>:23 scala>linesWithSpark.count() res15: Long = 19
下面总结一下Spark从开始到结果的运行过程:
- 创建某种数据类型的RDD
- 对RDD中的数据进行转换操作,例如过滤操作
- 在需要重用的情况下,对转换后或过滤后的RDD进行缓存
- 在RDD上进行action操作,例如提取数据、计数、存储数据到Cassandra等。
关于RDD所有的操作清单和描述,可以参考 Spark documentation
独立应用
下面我们想说一下怎样使用Spark API编写一个独立的应用程序。
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
这个程序统计Spark README文件中包含字符a和b的行数。 注意你需要用你实际的Spark路径替换 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我们初始化一个SparkContext 作为程序的一部分.
编写Driver程序
import spark.SparkContext import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length ==0 ){ println("usage is org.test.WordCount ") } println("the args: ") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" // create the SparkContext, args(0)由yarn传入appMaster地址 val sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath + args(2)) } }
- 在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是一样的,不同的地方就是SparkContext需要自己创建。
使用下面给出的命令编译上述程序。这个命令应该在spark-application应用程序目录下执行。这里,/usr/local/spark /lib/spark-assembly-1.4.0-hadoop2.6.0.jar ,Spark 采用了 Hadoop 的 jar 支持程序。
scalac -classpath $CLASSPATH:/data/app/spark-1.5.2/lib/* SimpleApp.scala
jar -cvf wordcount.jar SimpleApp*.class
使用以下命令提交 spark 应用
spark-submit --class SparkWordCount --master local wordcount.jar