spark第一篇简介
spark第一篇简介
- spark概述
- spark的特性
- spark集群安装
- spark集群启动和停止
- spark的web管理界面
- 基于zk搭建一个sparkHA
- spark角色介绍
- 初识spark程序
- spark-shell 使用
- 通过IDEA编写spark wordcount程序
- 1 利用IDEA开发spark wodcount程序(本地运行)
- 2 利用IDEA开发spark wodcount程序(集群运行)
- 3 利用IDEA开发spark wordCount程序(java开发)
spark概述
- 什么是spark
- spark是一个基于内存计算的引擎,计算速度非常快,但是并没有涉及到数据的存储,后期想要处理数据,需要引入外部的数据源(比如hadoop中数据)
- 为什么要用spark
- 比mapreduce计算速度快很多。
spark的特性
- 1、速度快
- 比mapreduce在内存中快100x,在磁盘中快10x
- 快的原因
- 1、spark的中间处理结果数据可以保存在内存中,mapreduce中间结果数据保存在磁盘中。
- 2、mapreduce最终的任务是以进程中方式运行在集群中。比如有100个MapTask任务,1个ReduceTask任务。Spark任务是线程的方式运行在进程中。
- 2、易用性
- 可以快速的写一个spark应用程序通过4种语言(java/scala/Python/R)
- 3、通用性
- 可以使用spark sql /sparkStreaming/Mlib/Graphx
- 4、兼容性
- spark就是一个计算任务的程序,哪里可以给当前程序提供计算的资源,我们就可以把任务提交到哪里去运行
- yarn (资源的分配由resourceManager去分配资源)
- standAlone(资源的分配由Master去分配资源)
- mesos(apache下开源的资源调度框架)
- spark就是一个计算任务的程序,哪里可以给当前程序提供计算的资源,我们就可以把任务提交到哪里去运行
spark集群安装
1、下载spark安装包
- spark-2.0.2-bin-hadoop2.7.tgz
- 下载地址spark官网:http://spark.apache.org/downloads.html
2、规划安装目录
- /export/servers
3、上传安装包到服务器中
4、解压安装包到指定的安装目录
- tar -zxvf spark-2.0.2-bin-hadoop2.7.tgz -C /export/servers
5、重命名安装目录
- mv spark-2.0.2-bin-hadoop2.7 spark
6、修改配置文件
- 6.1 进入到conf目录,修改spark-env.sh( mv spark-env.sh.template spark-env.sh)
配置java环境变量
export JAVA_HOME=/export/servers/jdk
配置master的地址
export SPARK_MASTER_HOST=node1
配置master的端口
export SPARK_MASTER_PORT=70776.2 修改slaves (mv slaves.template slaves) .指定整个集群中worker节点
node2 node3
7、配置spark环境变量
- 修改 /etc/profile
export SPARK_HOME=/export/servers/spark export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
8、分发spark安装目录到其他节点
scp -r spark node2:/export/servers scp -r spark node3:/export/servers
9、分发spark环境变量到其他节点
scp /etc/profile node2:/etc scp /etc/profile node3:/etc
10、让所有节点的spark环境变量生效
- 在所有节点上执行
- source /etc/profile
- echo $SPARK_HOME 查看配置文件地址是否生效
- 在所有节点上执行
spark集群启动和停止
- 1、启动spark集群
- 在主节点上执行脚本
- $SPARK_HOME/sbin/start-all.sh
- 在主节点上执行脚本
- 2、停止spark集群
- 在主节点上执行脚本
- $SPARK_HOME/sbin/stop-all.sh
- 在主节点上执行脚本
spark的web管理界面
- 启动spark集群后
- 访问 http://master地址:8080 可以通过这样一个界面,来查看整个spark集群相关信息和对应任务运行的情况。
基于zk搭建一个sparkHA
1、修改配置文件
vi spark-env.sh
1.1 注释掉手动指定master参数 # export SPARK_MASTER_HOST=node1 1.2 引入zk 搭建sparkHA export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181 -Dspark.deploy.zookeeper.dir=/spark"
2、分发spark-env.sh到其他节点
scp spark-env.sh node2:$PWD scp spark-env.sh node3:$PWD
3、启动sparkHA集群
- 3.1 启动zk
- 3.2 在任意一台节点来启动 start-all.sh (保证每2台机器之间实现ssh免登陆)
- 1、会在当前机器上产生一个Master进程
- 2、worker进程通过slaves文件指定的节点上产生
- 3.3 在另一台机器上单独启动master
- sbin/start-master.sh
在整个spark集群中有一个活着的master,其他多个master都处于standby。此时,活着的master挂掉之后,zk会感知到,接下来会在所有的处于standby的master中进行选举,再次产生一个活着的master。此时这个活着的master会读取在zk中保存上次活着的master的相关信息。进行恢复!整个恢复过程需要1-2分钟。
在整个恢复的过程中:
之前运行的任务正常运行。只不过这里你再次提交新的任务到集群中的时候,此时没有活着的master去分配对应的资源,是无法提交到集群中去运行。
spark角色介绍
- 1、Driver
- 运行客户端的main方法,创建sparkContext对象
- 2、Application
- 一个应用程序,包括:driver的代码和任务计算所需要的资源
- 3、Master
- 整个spark集群中的主节点,负责资源的调度和任务的分配
- 4、ClusterManager
- 指的是在集群上获取资源的外部服务。目前有三种类型
- Standalone: spark原生的资源管理,由Master负责资源的分配
- Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
- Hadoop Yarn: 主要是指Yarn中的ResourceManager
- 指的是在集群上获取资源的外部服务。目前有三种类型
- 5、Worker
- 就是具体干活的小弟
- 在Standalone模式中指的是通过slaves文件配置的Worker节点
- 在Spark on Yarn模式下就是NodeManager节点
- 6、executor
- 是一个进程,它会在worker节点上启动一个进程
- 7、task
- spark任务最后会以task线程的方式运行在worker节点的executor进程中
初识spark程序
1、普通模式提交(已经知道活着的master地址)
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://node1:7077 \ --executor-memory 1G \ --total-executor-cores 2 \ examples/jars/spark-examples_2.11-2.0.2.jar \ 10
2、高可用模式提交(不确定活着的master是哪一个)
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://node1:7077,node2:7077,node3:7077 \ --executor-memory 1G \ --total-executor-cores 2 \ examples/jars/spark-examples_2.11-2.0.2.jar \ 10 此时指定master为一个列表: --master spark://node1:7077,node2:7077,node3:7077 应用程序会轮询该master列表,找到活着的master,最后把任务提交给活着的master
spark-shell 使用
1、利用 spark-shell –master local[N] 读取本地数据文件实现单词统计
- local[N]
- local表示本地单机版运行程序,N表示一个正整数,表示本地采用几个线程去运行
- 它会产生SparkSubmit进程
sc.textFile("file:///root/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
- local[N]
2、利用 spark-shell –master local[N] 读取HDFS上数据文件实现单词统计
spark整合HDFS
vi spark-env.sh
export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
3、利用spark-shell 指定具体的master 读取HDFS上数据文件实现单词统计
提交脚本
spark-shell --master spark://node1:7077 --executor-memory 1g --total-executor-cores 2
sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
通过IDEA编写spark wordcount程序
1 利用IDEA开发spark wodcount程序(本地运行)
导包
org.scala-lang scala-library 2.11.8 org.apache.spark spark-core_2.11 2.0.2 org.apache.hadoop hadoop-client 2.7.4
开发实现
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD //todo:通过IDEA开发spark的wordcount程序 object WordCount { def main(args: Array[String]): Unit = { //1、创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]") //2、创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler val sc = new SparkContext(sparkConf) //设置日志输出的级别 sc.setLogLevel("WARN") //3、读取数据文件 val data: RDD[String] = sc.textFile("D:\words.txt") //4、切分每一行,获取所有的单词 val words: RDD[String] = data.flatMap(_.split(" ")) //5、每个单词记为1 val wordAndOne: RDD[(String, Int)] = words.map((_,1)) //6、相同单词出现的次数进行累加 val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) //按照单词出现的次数降序排列 val sortByRDD: RDD[(String, Int)] = result.sortBy(x=>x._2,false) //7、收集打印结果数据 val finalResult: Array[(String, Int)] = sortByRDD.collect() println(finalResult.toBuffer) //8、关闭sparkContext对象 sc.stop() } }
2 利用IDEA开发spark wodcount程序(集群运行)
- 代码开发
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//todo:通过IDEA开发spark wordcount程序,打成jar包,提交到集群中去运行
object WordCount_Online {
def main(args: Array[String]): Unit = {
//1、创建SparkConf对象 设置appName
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount_Online")
//2、创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
val sc = new SparkContext(sparkConf)
//设置日志输出的级别
sc.setLogLevel("WARN")
//3、读取数据文件
val data: RDD[String] = sc.textFile(args(0))
//4、切分每一行,获取所有的单词
val words: RDD[String] = data.flatMap(_.split(" "))
//5、每个单词记为1
val wordAndOne: RDD[(String, Int)] = words.map((_,1))
//6、相同单词出现的次数进行累加
val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//7、把结果数据保存到HDFS上
result.saveAsTextFile(args(1))
//8、关闭sparkContext对象
sc.stop()
}
}
打包运行
spark-submit --master spark://node1:7077 --class cn.包名.WordCount_Online --executor-memory 1g --total-executor-cores 2 original-spark_class06-2.0.jar /words.txt /out
3 利用IDEA开发spark wordCount程序(java开发)
开发实现
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; //todo:利用java来实现spark的wordcount程序 public class WordCount_Java { public static void main(String[] args) { //1、创建SparkConf SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]"); //2、创建SparkContext JavaSparkContext jsc = new JavaSparkContext(sparkConf); //3、读取数据文件 JavaRDD data = jsc.textFile("d:\words.txt"); //4、切分每一行,获取所有的单词 JavaRDD wordsJavaRDD = data.flatMap(new FlatMapFunction() { public Iterator call(String line) throws Exception { String[] words = line.split(" "); return Arrays.asList(words).iterator(); } }); //5、每个单词记为1 JavaPairRDD wordAndOneJavaPairRDD = wordsJavaRDD.mapToPair(new PairFunction() { public Tuple2 call(String word) throws Exception { return new Tuple2(word, 1); } }); //6、相同单词出现的次数累加 (hadoop,List(1,1,1,1)) JavaPairRDD resultJavaPairRDD = wordAndOneJavaPairRDD.reduceByKey(new Function2() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //按照单词出现的次数降序排列 (单词,次数)----->(次数,单词).sortByKey(false)----->(单词,次数) JavaPairRDD reverseJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2, Integer, String>() { public Tuple2 call(Tuple2 t) throws Exception { return new Tuple2(t._2, t._1); } }); JavaPairRDD sortedJavaPairRDD = reverseJavaPairRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2, String, Integer>() { public Tuple2 call(Tuple2 t) throws Exception { return new Tuple2(t._2, t._1); } }); //7、收集打印 List<Tuple2> finalResult = sortedJavaPairRDD.collect(); //遍历 快捷键 iter for (Tuple2 t : finalResult) { System.out.println("单词:"+t._1+" 次数:"+t._2); } //8、关闭jsc jsc.stop(); } }