Spark基础学习精髓——第一篇

Spark基础学习精髓

1 Spark与大数据

1.1 大数据基础

1.1.1 大数据特点

  • 存储空间大
  • 数据量大
  • 计算量大

1.1.2 大数据开发通用步骤及其对应的技术

大数据采集->大数据预处理->大数据存储->大数据处理->大数据可视化

(1)大数据采集技术

 分布式架构、多种采集技术混合使用

 web数据采集:shell编程、爬虫工具、爬虫程序开发、HTTP协议、TCP/IP基本原理及Socket程序接口、编程语言、数据格式转换、分布式存储的命令和接口(HDFS、HBase等)、分布式应用开发

 日志数据采集:采集工具(Flume、Fluentd等)、接入工具(Kafka).日志采集程序(Java、Python等)开发、Shell编程、TCP/IP基本原理以及网络编程接口、编程语言、数据格式转换、分布式存储系统的命令和接口(HDFS、HBase等)、分布式应用开发。

 数据库数据采集:Shell编程、采集工具(Flume、Fluentd等)、接入工具(Kafka)、数据库采集程序(Java、Python等)开发、SQL查询语言及编程接口、关系型数据、库连接如JDBC等的使用、TCP/IP基本原理以及Socket编程接口、编程语言、数据格式转换、分布式存储系统的命令和接口(HDFS、HBase等)、分布式应用开发。

(2)大数据存储技术

 分布式海量文件存储:HDFS CEPH Moosefs GlusterFS。

 NoSQL数据库:Hbase Cassandra。

 NewSQL数据库:VoltDB、Spanner、TiDB等。

(3)大数据处理技术

 Hadoop框架、Spark框架

1.2 认识Spark

Spark是一个统一的大规模数据处理分析引擎。

技术特点:高性能(基于内存)、支持多种语言、通用(提供SQL操作、流数据处理、图数据处理、机器学习算法库)、多平台运行、分布式开发更容易

1.3 Spark技术栈

1.4 Scala与Spark关系

Spark框架是用scala开发的。Scala语言特点有如下:

  • 面向对象、函数式编程
  • 是强类型语言,不支持类型的隐式转换
  • 静态类型语言
  • 在JVM虚拟机上运行,可以利用JAVA资源
  • 支持交互式解释器REPL

1.5 Spark快速学习路线

虚拟机基础:定制虚拟机、能安装centos7

Linux基础:实现host和guest网络连接、完成基本文件操作、会用vim

Scala编程:能编写、编译、打包、调试、运行Scala程序,会用Scala编写简单的串行处理程序,能看懂简单的Spark Scala API接口

Spark基础:能说出Spark程序运行时架构、能提交Spark程序分布式运行、能解释Spark相关概念:RDD、Application、Job、DAG、Stage、Task,能说出Spark程序的运行过程和代码执行过程

Spark核心编程:能使用IDEA来编写、编译、打包、调试、运行Spark程序;能使用RDD、DataFrame/Dataset的基础API编写Spark程序

(具体学习资源请参见《Spark大数据编程实用教程》以及《艾叔》网易云系列,能帮助初学者快速入门,少踩坑)

1.6 使用软件和版本推荐

VMware workstation15、Centos7.2、jdk-8u162-linux-x64.tar.gz、hadoop-2.7.6.tar.gz、spark-2.3.0-bin-hadoop2.7.tgz、scala-2.11.12、ideaIC-2018.1.4.tar.gz

2 Spark运行环境的搭建

构建一个Spark运行环境,除Spark自身框架外,还要有集群管理器和存储系统用来存储输入和输出数据。

2.1 Spark程序运行时架构

定义:Spark程序运行后程序的各个组成部分。

三种角色:

  • Client(客户端):负责提交Spark程序,提交的对象可以是集群管理器、也可以没有提交对象从而在本地运行;
  • Driver(驱动程序):负责此次Spark程序运行的管理和状态监控,从程序开始到程序结束都由Driver全程负责;
  • Executor(执行器):负责执行具体任务,Executor可能有多个,所有executor合并共同完成整个任务。Executor中具体任务是Task,每个Task是一个线程(每个Task不一定只占一个CPU,可以占多个CPU),一个executor中可能有多个Task,一个Task的处理逻辑相同,处理数据不一样;

   Client向集群管理器发出申请,集群管理器接收请求,并为其分配合适的资源。具体选择哪种管理器,可以在Client提交时通过参数指定。每种资源管理器运行Spark程序时机制可能不一样,但不管怎样,Spark程序运行时的架构是不变的。其他细节,如Excutor、Task的分配、资源调度、不同资源管理器上Spak执行机制等。

2.2 首先构建最简Spark大数据运行环境

最简单的Spark运行环境由HDFS、Yarn、Spark三个部分组成。

部署图如下:

 2.2.1 构建HDFS

1.什么是hdfs

HDFS(Hadoop Distributed File System Hadoop分布式文件系统),是一个分布式文件系统。Spark处理数据时的数据源和处理结果都存储在HDFS上。

2.重要特征

 (1)HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64M

 (2)HDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data

 (3)目录结构及文件分块信息(元数据)的管理由namenode节点承担——namenode是HDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息(block的id,及所在的datanode服务器)

 (4)文件的各个block的存储管理由datanode节点承担---- datanode是HDFS集群从节点,每一个block都可以在多个datanode上存储多个副本(副本数量也可以通过参数设置dfs.replication)

 (5)HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的修改

3.hdfs命令行

    (1)查看帮助
        hdfs dfs -help 
    (2)查看当前目录信息
        hdfs dfs -ls /
    (3)上传文件
        hdfs dfs -put /本地路径 /hdfs路径
    (4)剪切文件
        hdfs dfs -moveFromLocal a.txt /aa.txt
    (5)下载文件到本地
        hdfs dfs -get /hdfs路径 /本地路径
    (6)合并下载
        hdfs dfs -getmerge /hdfs路径文件夹 /合并后的文件
    (7)创建文件夹
        hdfs dfs -mkdir /hello
    (8)创建多级文件夹
        hdfs dfs -mkdir -p /hello/world
    (9)移动hdfs文件
        hdfs dfs -mv /hdfs路径 /hdfs路径
    (10)复制hdfs文件
        hdfs dfs -cp /hdfs路径 /hdfs路径
    (11)删除hdfs文件
        hdfs dfs -rm /aa.txt
    (12)删除hdfs文件夹
        hdfs dfs -rm -r /hello
    (13)查看hdfs中的文件
        hdfs dfs -cat /文件
        hdfs dfs -tail -f /文件
    (14)查看文件夹中有多少个文件
        hdfs dfs -count /文件夹
    (15)查看hdfs的总空间
        hdfs dfs -df /
        hdfs dfs -df -h /
    (16)修改副本数    
        hdfs dfs -setrep 1 /a.txt
4.hdfs工作机制

 

(1)概述

  • HDFS集群分为两大角色:NameNode、DataNode
  • NameNode负责管理整个文件系统的元数据
  • DataNode 负责管理用户的文件数据块
  • 文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode上
  • 每一个文件块可以有多个副本,并存放在不同的datanode上
  • Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量
  • HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进

(2)HDFS写工作原理

有一个文件FileA,100M大小。Client将FileA写入到HDFS上。

HDFS按默认配置。

HDFS分布在三个机架上Rack1,Rack2,Rack3。

a. Client将FileA按64M分块。分成两块,block1和Block2;

b. Client向nameNode发送写数据请求,如图蓝色虚线①------>。

c. NameNode节点,记录block信息。并返回可用的DataNode,如粉色虚线②--------->。

    Block1: host2,host1,host3

    Block2: host7,host8,host4

    原理:

        NameNode具有RackAware机架感知功能,这个可以配置。

        若client为DataNode节点,那存储block时,规则为:副本1,同client的节点上;副本2,不同机架节点上;副本3,同第二个副本机架的另一个节点上;其他副本随机挑选。

        若client不为DataNode节点,那存储block时,规则为:副本1,随机选择一个节点上;副本2,不同副本1,机架上;副本3,同副本2相同的另一个节点上;其他副本随机挑选。

d. client向DataNode发送block1;发送过程是以流式写入。

    流式写入过程,//逐个传输 host2-->host1--host3>

        1>将64M的block1按64k的package划分;

        2>然后将第一个package发送给host2;

        3>host2接收完后,将第一个package发送给host1,同时client想host2发送第二个package;

        4>host1接收完第一个package后,发送给host3,同时接收host2发来的第二个package。

        5>以此类推,如图红线实线所示,直到将block1发送完毕。

        6>host2,host1,host3向NameNode,host2向Client发送通知,说“消息发送完了”。如图粉红颜色实线所示。

        7>client收到host2发来的消息后,向namenode发送消息,说我写完了。这样就真完成了。如图×××粗实线

        8>发送完block1后,再向host7,host8,host4发送block2,如图蓝色实线所示。

        9>发送完block2后,host7,host8,host4向NameNode,host7向Client发送通知,如图浅绿色实线所示。

        10>client向NameNode发送消息,说我写完了,如图×××粗实线。。。这样就完毕了。

分析,通过写过程,我们可以了解到:

    写1T文件,我们需要3T的存储,3T的网络流量贷款。

    在执行读或写的过程中,NameNode和DataNode通过HeartBeat进行保存通信,确定DataNode活着。如果发现DataNode死掉了,就将死掉的DataNode上的数据,放到其他节点去。读取时,要读其他节点去。

    挂掉一个节点,没关系,还有其他节点可以备份;甚至,挂掉某一个机架,也没关系;其他机架上,也有备份。

(3)HDFS读工作原理

读操作就简单一些了,如图所示,client要从datanode上,读取FileA。而FileA由block1和block2组成。 

那么,读操作流程为:

a. client向namenode发送读请求。

b. namenode查看Metadata信息,返回fileA的block的位置。

    block1:host2,host1,host3

    block2:host7,host8,host4

c. block的位置是有先后顺序的,先读block1,再读block2。而且block1去host2上读取;然后block2,去host7上读取;

上面例子中,client位于机架外,那么如果client位于机架内某个DataNode上,例如,client是host6。那么读取的时候,遵循的规律是:

  优选读取本机架上的数据

5.HDFS的构建

(1)定制虚拟机(取名scala_dev)

在VMware 15.x上安装centos7.x,在第一部分已经介绍过具体的安装包版本,可以在对应的官网或者国内源下载,也可以联系我的邮箱zhangv_chian@163.com。

具体安装步骤可参见:

https://www.cnblogs.com/gebilaoqin/p/12817510.html
https://www.cnblogs.com/gebilaoqin/p/12817510.html

建议安装图形化界面,并且配置好网络,使得host和guest能够相互Ping通。本人使用的是NAT模式,并且给网卡配置固定的IP地址,防止重启后IP有变化,影响后续的配置和ssh登录。

NAT配置可参见:

https://blog.csdn.net/sdyb_yueding/article/details/78216135?utm_source=blogxgwz8
https://blog.csdn.net/sdyb_yueding/article/details/78216135?utm_source=blogxgwz8

关闭防火墙可参见:

https://www.cnblogs.com/yyxq/p/10551274.html
https://www.cnblogs.com/yyxq/p/10551274.html

安装vmtools设置共享文件夹,将windows下载的安装包通过共享文件夹传递到centos中:

https://www.cnblogs.com/Jankin-Wen/p/10157244.html
https://www.cnblogs.com/Jankin-Wen/p/10157244.html

修改主机名:

a. 在root用户下:vi  /etc/hosts

b. 在root用户下: vi /etc/hostname

修改完主机名就可以输入:reboot 重启centos

可以ping scaladev看看是否能解析出IP地址

(2)scala_dev无密码登录自己

  因为搭建的是最简单的HDFS,NameNode 和 DataNode 都在 scala_dev 上,因此,需要做 scala_dev 无密码登录自己,操作如下。:

  解释:上述命令会 1)自动创建~/.ssh 目录; 2)在~/.ssh 下自动生成:id_dsa 和 id_dsa.pub 两个文件,其中,id_dsa 是私钥,保存在 NameNode 节点,id_dsa.pub 是公钥,要放置在 DataNode 节点,id_dsa.pub 相当于 NameNode 的身份信息,一旦在 DataNode 节点登记,就相当于 DataNode 节点已认可 NameNode,这样, NameNode 就可以无密码登录 DataNode 了; 3)-P 后面的 '' 是 2 个单引号,不是双引号;

  将公钥 id_dsa.pub 加入到 scala_dev 的 authorized_keys 中,实现 scala_dev 对 scala_dev 自身的认证。

 

   修改 authorized_keys 的权限

   验证,如果不需要密码就可以登录,则说明操作成功

(填自己的IP地址)查询系统自带的java文件,根据不同的系统版本,输入rpm -qa | grep jdk或者rpm -qa | grep java

(3)配置JDK

1)查看系统自带的jdk

2)查询系统自带的java文件,根据不同的系统版本,输入rpm -qa | grep jdk或者rpm -qa | grep java

3)删除noarch文件以外的其他文件,输入rpm -e --nodeps 需要卸载的安装文件名

4)查看是否已经删除完毕

 

5)解压jdk

  在设置好共享目录的前提下,共享目录一般都在/mnt/hgfs/<共享文件名>/, 在windows上把安装包都放在此目录下

然后解压到/home/user/ ,命令为:tar xf /mnt/hgfs/sharefile/jdk-8u162-linux-x64.tar.gz  /home/user/

6)配置环境变量

  切换到root, 然后vi /etc/profile,输入以下内容(路径根据自己实际路径来,不要照搬)

 然后切换的普通用户:su user

 配置完环境变量后都要source /etc/profile

 验证:java -version

(4) 配置HDFS

1)解压hdfs

tar xf /mnt/hgfs/sharefile/hadoop-2.7.6.tar.gz   /home/user/

2)配置环境变量

切换到root然后编辑/etc/profile(路径根据自己实际路径来,不要照搬)

  配置完环境变量后都要source /etc/profile

 验证,退回到普通用户,输入 hd,看能否用 tab 键补全 hdfs,如果可以,说明 profile 设 置成功,如果不行,则要检查,或者运行 source/etc/profile 再试。 

3)设置hostname

 

 

 此处scaladev不等加下划线_,否则会出错,然后重启;

4)添加hosts信息

 

验证,如果 pingscaladev 能自动解析出 IP,则说明修改成功。 

5)修改 hadoop-env.sh (在前面修改了环境变量/etc/profile是不够的,必须在每台节点上加入java的路径)

 

 

 

 

 6)修改slaves,它存储的是所有DataNode的主机名

 

 

 后续扩展DataNode,只需要在slaves里面加主机名

7)修改 hdfs-site.xml,先复制模板文件 

 

 

 编辑模板

 

 

 

 

 

 

 

 

 

 

 

 8)修改core-site.xml 复制模板

 

 

 

  配置了 defaultFS 和 fs.default.name 后,会自动在路径前面加上 hdfs://scaladev:9001/前缀,这样,默 认路径就是 hdfs 上的路径,之前的 file:///前缀,表示的是本地文件系统。按照目前的配置,/表示 hdfs://scaladev:9001/,表示 HDFS 上的/目录,而 file:///则表示本地文件系统的/目录。

 

 

 9)格式化并启动 HDFS 

a.格式化

 

 

 b.启动HDFS

 

 

 c.验证

 

 

 

 

 

 2.2.2 构建YARN

1.yarn简介

Yarn是hadoop的集群管理器,Spark和Mapreduce程序都可以运行在Yarn上。

2.Yarn配置步骤

(1)复制 yarn-site.xml 模板文件 

 

 

  编辑 yarn-site.xml  

 

 

 

 

 

 (2)复制 mapred-site.xml 模板文件 

 

 

 

 (3)启动Yarn

 

 

 (4)验证:Yarn上运行MapReduce程序

  MapReduce 和 Spark 一样,也是一个分布式处理框架,MapReduce 程序和 Spark 一样,可 以提交到 Yarn 上运行,在 Spark 出现之前,MapReduce 是主流的大数据处理平台。Hadoop 中自带了 MapReduce 的例子程序,如经典的 wordcount(单词计数)程序,如果 MapReduce 执行成功,说明 Yarn 配置成功,后续,我们将学习如何将 Spark 程序提交到 Yarn 上执行。

  提交 MapReduce 程序到 Yarn 上执行的步骤如下:

  1)复制文件到HDFS

  file:///表示本地文件系统,/表示 HDFS 的根目录,这是因为在 core-site.xml 中配置了 defaultFS 为 hdfs://scaladev:9001/ 

 

 

   2)验证

 

 

   3)在 HDFS 上准备输入目录 input,以及文件 core-site.xml 

 

 

   4)运行wordcount的例子

 

 

   5)验证

 

 

 (5)Yarn运行Mapreduce程序的过程

 

 

 1)当执行下面的命令后, Client 向 ResourceManager 发送运行 wordcount 程序(Application) 的请求;ResourceManager 响应请求,返回 Application ID 和一个 HDFS 地址;Client 将启动 Application 所需信息(要运行的 jar 包,资源要求等)上传到指定的 HDFS 路径下,并向 ResourceManager 发起启动 MRApplicationMaster(简称 AM)请求; hadoopjarshare/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.6.jarwordcount/input/output 

 2)ResourceManager 根据当前资源使用情况和调度策略,确定一个可用节点(例如 NodeManager01),向该节点的 NodeManager 发送命令,启动一个 Container 来运行 AM;

   AM 是此次 wordcount 程序运行的管理者,从 wordcount 的启动到结束都由此 AM 来负责。 AM 启动后,向 ResourceManager 注册,计算 Task 数(map 数(Split 数量决定,一个 Split 对 应一个 map)+reduce 数(mapreduce.job.reduces)),以此确定 Container 数(map 数+reduce 数),然后准备好每个任务 Container 请求,发送给 ResourceManager;ResourceManager 响应 请求,为其指定可用的节点 NodeManager02~NodeManagerXX;

 

 

 3) AM 依次和这些节点的 NodeManager 通信,在这些节点上启动 Container,并在 Container 执行 wordcount 中的 Task,wordcount 的 Task 分为 map 和 reduce 两种,先执行 mapTask,然 后执行 reduceTask(汇总操作),并向 AM 汇报任务状态;在执行过程中,Client 会和 AM 通 信,查询 Application 执行情况或者控制任务执行; 

 4)当某个 Container 上的任务执行完毕,可以退出时,AM 会和 ResourceManager 通信, 申请释放此 Container 及其资源。待总的 Application 结束,所有资源都释放完毕,AM 会向 ResourceManager 申请注销自己,最后,Client 退出。

(6)Yarn日志

  Yarn的日志分为两大类: 1.Yarn架构自身相关日志,包括ResourceManager和NodeManager 的日志;2. 在 Yarn 上执行的程序(Application)日志。

  第一类日志的 ResourceManager 日志位于 ResoureManager 的$HADOOP_HOME 下的 log 目录中,日志文件名是 yarn-user-resourcemanager-scaladev.log。

  第一类日志的 NodeManager 日志位于每个 NodeManager 的$HADOOP_HOME 下的 log 目 录中,日志文件名是 yarn-user-nodemanager-scaladev.log。

  第二类日志位于 ResoureManager 的$HADOOP_HOME 下的 log/userlogs 目录下,每个 Application 都会根据其 ID 号创建一个目录,例如:application_1533802263437_0005,在此目录下,会保存该 Application 所有 Container 的日志,示例如下,可以看到 wordcount 这个 Application有3个Container,其中尾号为1的container是AM,其它的container用来执行Task, 可能是 MapTask,也可能ReduceTask。每个 container 日志目录下又有 3 个文件:stdout、 stderr 和 syslog,其中 stdout 是 Task 执行过程中输出,例如 printfln 就会输出到 stdout 中, stderr 会保存报错信息,syslog 则会保存系统日志输出。 

2.2.3 构建Spark集群

(1)下载 Spark 软件包

Spark 所选择的版本是 2.3.0,软件包名:spark-2.3.0-bin-hadoop2.7.tgz,下载地址为: http://archive.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz 

(2)解压

 

 

(3)配置环境变量

 

 

(4)设置JAVA_HOME

 

 

2.3 运行Spark程序

本地运行、分布式运行

2.3.1本地运行方式

示例:SparkPi

Spark软件包中有一个spark-examples_2.11-2.3.0.jar 它是Spark自带示例的jar包,下面就以其中的SparkPi为例,介绍Spark程序的本地(local)运行方式

运行SparkPi具体命令如下:

在spark目录下执行:spark-submit --class org.apache.spark.examples.SparkPi --master local examples/jars/spark-examples_2.11-2.3.0.jar  10

SparkPi的程序参数说明如下:

  • --class org.apache.spark.examples.SparkPi , 指明此运行程序的Main Class
  • --master local ,表示此Spark程序Local运行
  • examples/jars/spark-examples_2.11-2.3.0.jar, 为Spark示例的jar包
  • 10,表示迭代10次。

如果输出结果,这说明成功。

程序运行时如果报了一个WARN提示:NativeCodeLoader:62-Unable to load native-hadoop library for your platform

解决办法是在/etc/profile中添加下面的内容

export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native

切换到普通用户,运行下面的命令,使得配置生效。

source  /etc/profile

2.4 运行Spark程序(分布式)

Spark程序分布式运行要依赖特定的集群管理器,最常用的有Yarn和Standalone。Client和Driver是否在一个进程里,可以分为client和cluster模式。

2.4.1 Spark on Yarn

1.client deploy mode

以DFSReadWriteTest为例,说明Spark on Yarn的client 的deploy mode。

DFSReadWriteTest是Spark-examples_2.11-2.3.0.jar自带的一个示例,它会读取本地文件进行单词计数,然后将本地文件上传到HDFS,从HDFS读取该文件,使用Spark进行计数,最后比较两次计数的结果。

(1)提交Spark程序 到Yarn上 ,以client mode运行

spark-submit --class org.apache.spark.examples.DFSReadWriteTest --master yarn /home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar /etc/profile /output

运行改程序要保证Yarn和HDFS同时启动。

如果结果正确,会输出:Success!

运行报错:初次运行程序时,可能会有以下两个报错。

报错1的报错信息如下所示:
Exception in thread "main"java.lang Exception: When unning with master yam' either HADOOP_CONF DIR or YARN_CONF _DIR must be set in the environment.

报错原因:没有设置环境变量:HADOOP_CONF_DIR或YARN_CONF_DR.
解决办法:在/etc/profile中增加下面的内容。
HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

export HADOOP_CONF_DIR

切回到普通用户,使刚才的配置失效。

报错2的报错信息如下所示。

报错原因:Container(容器)的内存超出了虚拟内存限制,Container的虚拟内存为2.1GB,但使用了2.3GB.comtainer fpd-+22containeriD-container.15373079299 0002_02 0000l1]is runing ba

mual ncnoy ias CGma uage I64 SMB ofI GB plysical memory uscd 23 GB of 2.1 GB virual meaused Killing container

解决办法:

改变分配Container最小物理内存值,将yarn.scheduler.minimum-allocation-mb设置成2GB,重启Yarn,每个Container向RM申请的虚拟内存为2GB*2.1=4.2GB

(2)Spark程序在Yarn上的执行过程

1)client模式下,Client和Driver在一个进程内,向Resource Manager发出请求;

2)Resource Manager指定一个节点启动Container,用来运行AM;AM向resource manager申请container来执行程序,resource manager向AM返回可用节点;

3)AM同可用节点的NodeManager通信,在每个节点上启动Container,每个Container中运行 一个Spark的Excutor,Excutor再运行若干Tasks;

4)Driver与Executor通信,向其分配Task并运行,并检测其状态,直到整个任务完成;

5)总任务完成后,Driver清理Executor,通知AM,AM想ResouceManager请求释放Container,所有资源清理完毕后,AM注销并退出、client退出。

 

 2. Spark on Yarn (cluster deploy mode)

(1)提交DFSReadWriteTest到Yarn运行(cluster deploy mode)

命令:spark-submit --deploy-mode cluster --class org.apache.spark.examples.DFSReadWriteTest --master yarn /home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-example_2.11-2.3.0.jar /etc/profile /outputSpark

(2)Spark程序在Yarn上执行过程

1)Client想ResourceManager提交Application请求;

2)ResourceManager指定一个节点,启动Container来运行AM和Spark Driver;AM根据任务情况向ResourceManager申请Container;ResourceManager返回可以运行Container NodeManager;

3)AM与这些NodeManager通信,启动Container,在Container中执行Executor;

4)Spark Driver与Executor通信,向它们分配Task,并监控Task执行状态;

5)所有Task执行完毕后,清理Executor,清理完毕后,Driver通知AM,AM请求Resource Manager,释放所有Container;Client收到Application FINISHED后退出。

 

 2.4.2 Spark on Standalone

Standalone是Spark自带的集群管理器,主/从式架构,包括Master和Worker两种角色,Master管理所有的Worker,Worker负责单个节点的管理。

优点:简单、方便、快速部署。

缺点:不通用、只支持Spark,功能没有Yarn强大

1.Spark on Standalone(client deployed mode)

(1)部署Standalone

1)配置slaves文件,改文件保存了整个集群中被管理节点的主机名。先复制模板文件;

cp conf/slaves.template conf/slaves

2)编辑slaves文件

vi conf/slaves

3)将localhost修改为scaladev

scala_dev

4)添加JAVA_HOME

cp conf/spark-env.sh.template conf/spark-env.sh

5)编辑spark-env.sh文件

vi conf/spark-env.sh

6)在最后一行添加下面内容

export JAVA_HOME=/home/user/jdk1.8.0_162

7)启动Standalone集群

sbin/start-all.sh

8)验证 jps,查看是否有worker和master

9)查看Standalone的Web监控界面

(2)提交Spark程序到Standalone上,以client deploy mode运行

提交前却把HDFS已经启动,HDFS上/output目录下已经清空。具体命令如下:

spark-submit --class org.apache.spark.examples.DFSReadWriteTest --master spark://scaladev:7077  /home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar /etc/profile /output

其中-master spark:// scaladev:7077 表示连接Standalone集群, scaladev是Master所在的主机名,没有指定--deploy-mode cluster,则部署模式默认为client

(3)Spark程序在Standalone上的运行过程(client deploy mode)

client部署模式下,Spark程序在Standalone的运行过程如图所示。

 

 1)Client初始化,内部启动Client模块和Driver模块,并向Master发送Application请求;

 

 2)Master接收请求,为其分配Worker,并通知Worker启动Executor;

 3)Executor向Driver注册,Driver向Executor发送Task,Executor执行Task,并反馈执行状态,Driver再根据Executor的当前情况,继续发送Task,直到整个Job完成。

 2.Spark on Standalone(cluster deploy mode)

(1)提交Spark程序到Standalone,以cluster deploy mode运行

具体命令如下:

spark-submit --class org.apache.spark.examples.DFSReadWriteTest --master spark://scaladev:6066 --deploy-mode cluster /home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar /etc/profile hdfs://scaladev:9001/output

有4点要特别注意:

1)采用cluster deploy mode时,Driver需要一个处理器,后续Executor还需要另外的处理器,如果虚拟机scaladev只有1个处理器,就会出现资源不足的警告,导致程序运行失败,如下所示:

WARN TaskSchedulerImpl:66 - Initial job has not accepted any resource

解决办法:增加虚拟机的处理器为2个。

2)命令参数中,--master spark://scaladev:6066 用来指定Master的URL,cluster deploy mode下,Client会向Master提交Rest URL, Spark://scaladev:6066就是Spark的Rest URL;如果还是使用原来的参数--master spark://scaladev:7077,则会报下的错误;

WARN RestSubmissionClient:66 -Unable to connect to server spark://7077

3)HDFS的路径前面要加hdfs://,因为Cluster Mode下,core-site.xml中的defaultFS设置不起作用;

4)Client提交成功后就会退出,而不是等待Application结束后才退出。

(2)Spark程序在Standalone上的运行过程(cluster deploy mode)

cluster deploy mode下,Spark程序在Standalone的运行过程如图所示。

 

 1)Client初始化,内部启动Client模块,并向Master注册Driver模块,并等待Driver信息,待后续Driver模块正常运行,Client退出;

 2)Master接收请求,分配一个Worker,并通知这些Worker启动Executor;

 3)Master接受请求,分配Worker,并通知这些Worker启动Executor;

 4)Executor向Driver注册,Driver向Executor发送Task,Executor执行Task,并反馈执行状态,Driver再根据Executor的当前情况,继续发送Task,直到整个Job完成

3. Spark on Standalone日志

  standalone的日志分为两类:框架日志、应用日志。

  框架日志:指Master和Worker日志,Master日志位于Master的Spark目录下的logs目录下,文件名:spark-user-org.apache.spark.deploy.master.Master-1-scaladev.out;Worker位于每个Worker节点的Spark目录下的logs目录下,文件名为:spark-user-org.apache.spark.deploy.worker.Worker-1-scaladev.out。

  应用日志:指每个Spark程序运行的日志,因为一个Spark程序可能会启动多个Executor,每个Executor都会有一个日志文件,位于Executor所在的Worker节点的Spark目录的work目录下,每个Spark运行会分配一个ID,运行时在控制台会打印ID的值,如下所示:

  Connected to  Spark cluster with app ID app-2018081004758-0001

  列出woker目录下的内容,命令如下。

  ls work/

  然后就可以看目录下的内容。