登录 |  注册 |  繁體中文


Spark 操作实例

分类: Spark 颜色:橙色 默认  字号: 阅读(2771) | 评论(0)

Spark shell提供了一个简单方式去学习API,它也是一个交互式分析数据的强大工具。 你既可以使用Scala(运行在JVM之上,所以可以使用众多的Java库),也可以使用Python。运行Spark文件夹下的的命令:

./bin/spark-shell

Spark最主要的一个抽象出来的概念就是分布式的数据集合, 也就是弹性分布式数据集Resilient Distributed Dataset (RDD). RDD可以从Hadoop InputFormats (比如HDFS文件)创建, 也可以通过其它RDD转换(transforming)得到

基本操作

在Spark中使用README 文件创建textFileRDD

scala> val textFile = sc.textFile("README.md")

获取textFile  RDD的第一个元素

scala>textFile.first()   // RDD 中的第一个 item,对于文本文件,就是第一行内容
res3: String = # Apache Spark

对textFile RDD中的数据进行过滤操作,返回所有包含“Spark”关键字的行,操作完成后会返回一个新的RDD,操作完成后可以对返回的RDD的行进行计数

筛选出包括Spark关键字的RDD然后进行行计数

scala>val linesWithSpark = textFile.filter(line => line.contains("Spark"))   // 筛选出包含 Spark 的行
scala>linesWithSpark.count()  // RDD 中的 item 数量,对于文本文件,就是总行数
res10: Long = 19

要找出RDD linesWithSpark单词出现最多的行,可以使用下列操作。使用map方法,将RDD中的各行映射成一个数,然后再使用reduce方法找出包含单词数最多的行。

找出RDD textFile 中包含单词数最多的行

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res11: Int = 14

返回结果表明第14行单词数最多。

代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到最大的数。

也可以引入其它java包,例如 Math.max()方法,因为map和reduce方法接受scala函数字面量作为参数。

在scala shell中引入Java方法

scala>import java.lang.Math 
scala>textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res12: Int = 14

我们可以很容易地将数据缓存到内存当中。

将RDD linesWithSpark 缓存,然后进行行计数

Spark也支持将数据集放入集群的内存中缓存起来. 当数据重复访问时特别有用, 比如查询一个小的 “hot”数据集或者运行一个交互式算法PageRank. 看一个简单的例子, 我们把上面的linesWithSpark数据集缓存起来:

scala>linesWithSpark.cache()
  res13: linesWithSpark.type =   MapPartitionsRDD[8] at filter at <console>:23  
scala>linesWithSpark.count()
  res15: Long = 19

下面总结一下Spark从开始到结果的运行过程

  • 创建某种数据类型的RDD
  • 对RDD中的数据进行转换操作,例如过滤操作
  • 在需要重用的情况下,对转换后或过滤后的RDD进行缓存
  • 在RDD上进行action操作,例如提取数据、计数、存储数据到Cassandra等。

关于RDD所有的操作清单和描述,可以参考 Spark documentation

独立应用

下面我们想说一下怎样使用Spark API编写一个独立的应用程序。

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
 
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}

这个程序统计Spark README文件中包含字符a和b的行数。 注意你需要用你实际的Spark路径替换 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我们初始化一个SparkContext 作为程序的一部分.

 

编写Driver程序

import spark.SparkContext
import SparkContext._

object WordCount {
  def main(args: Array[String]) {
    if (args.length ==0 ){
      println("usage is org.test.WordCount ")
    }
    println("the args: ")
    args.foreach(println)

    val hdfsPath = "hdfs://hadoop1:8020"

    // create the SparkContext, args(0)由yarn传入appMaster地址
    val sc = new SparkContext(args(0), "WrodCount",
    System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))

    val textFile = sc.textFile(hdfsPath + args(1))

    val result = textFile.flatMap(line => line.split("s+"))
        .map(word => (word, 1)).reduceByKey(_ + _)

    result.saveAsTextFile(hdfsPath + args(2))
  }
} 
  • 在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是一样的,不同的地方就是SparkContext需要自己创建。
编译程序

使用下面给出的命令编译上述程序。这个命令应该在spark-application应用程序目录下执行。这里,/usr/local/spark /lib/spark-assembly-1.4.0-hadoop2.6.0.jar ,Spark 采用了 Hadoop 的 jar 支持程序。

scalac  -classpath $CLASSPATH:/data/app/spark-1.5.2/lib/*  SimpleApp.scala 
创建 JAR
使用以下 spark 命令应用程序创建 jar 文件。这里,wordcount 为 jar 文件的文件名。
jar -cvf wordcount.jar SimpleApp*.class 
提交spark应用

使用以下命令提交 spark 应用

spark-submit --class SparkWordCount --master local wordcount.jar 



姓 名: *
邮 箱:
内 容: *
验证码: 点击刷新 *   

回到顶部