YarnIsSimple

[TOC]

1.Hadoop Yarn is Simple

这个教程将手把手教你如何把一个hello world搬到yarn上运行,然后我们会实现一个简易的parameter server并把它搬到yarn上运行。最后我们会基于spark实现一个和parameter server交互的逻辑回归算法

2.What is Hadoop Yarn

http://hadoop.apache.org/ 不知道hadoop是什么的自己看吧

简单来说我们通常说的hadoop由三部分组成,即负责分布式存储的hdfs, 负责分布式计算的mapReduce,负责分布式资源调度的Yarn.

相比于单机系统,hdfs就相当于单机的disk,mapReduce是运行在系统中的app, 资源调度就是操作系统

3.Yarn Architecture

ya

我们先来看yarn的结构,yarn两个主要的任务是1. 资源管理 (resource management) 2. 调度监测任务

(Job scheduling&monitoring)

所以根据这个理念 我们主要有三个大的组件,即

  • ResourceManager(RM) 负责管理集群所有的可用资源
  • NodeManager(NM) 负责一台机器上的资源管理,主要是启动荣旗,监测(cpu,memory, disk, network)并把情况汇报给RM
  • ApplicationMaster(AM)负责一个application的运行,AM会向RM请求资源,然后和NM一起把具体运行任务的容器拉起来

所以yarn的framework的设计是非常清晰的,即

  • 针对整个集群的框架 RM per cluster
  • 针对单台机器的框架 NM per machine
  • 针对单个运行程序的框架 AM per application

了解清楚了Yarn的设计目的,我们再看这个架构图就更清晰了整个流程如下:

  1. 客户端机向RM提交任务(submit job)
  2. RM接到任务后,会查看集群资源是否足够运行该任务
  3. 如果资源充足,就命令NM启动运行AM的container
  4. AM向RM请求运行具体执行任务的资源
  5. RM命令NM启动容器执行具体任务
  6. AM负责监控所有容器的运行情况
  7. NM向RM报告节点的运行状况

这个操作看上去是不是很熟悉,我们想象下单机你启动游戏,操作系统(NM)要为游戏(AM)分配对应的内存和cpu资源,然后启动游戏(AM)。如果游戏内有多个操作在进行,由你(AM)负责管理这些操作(Container)

而分布式的区别就在于我们用RM+NM来管理所有的节点上的所有操作

ResourceManager

RM是集群最高领导人,统管集群所有的资源和监控运行情况。它主要有两部分组成即调取器scheduler和应用程序管理器application manager(注意不是AM,这个很容易搞混)

Scheduler的目的是合理分配集群资源

applicationManager的目的是为了监控所有app的运行状况

Scheduler

scheduler是纯粹的,高尚的,只负责调度资源,至于app挂了,节点挂了,统统不管。它只关心各个节点上的cpu,memory, network, disk, gpu的使用情况

scheduler的两个调度机制如下: capacityScheduler 和 FairScheduler

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html

具体的机制不再细讲了,可以去参考上述两个链接,简单来说

capacityScheduler保证集群的资源要高可用(富人优先,穷人靠边的思路)

fairScheduler保证集群的资源要公平分配,大家不要打架(不患寡而患不均的思想)

Application Manager

ApplicationManager当然就负责其他的事情了,监测所有application的运行情况,命令NM启动AM,如果AM挂了就重新启动。然后由AM向scheduler请求资源,最后启动容器来运行任务

4. yarn 常见的命令

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnCommands.html

参考上述链接,我们常用的有

yarn application -kill <application_id> kill掉一个yarn application

yarn logs -applicationId <application_id> 查看application运行的日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Usage: yarn [--config confdir] [COMMAND | CLASSNAME]
CLASSNAME run the class named CLASSNAME
or
where COMMAND is one of:
resourcemanager -format-state-store deletes the RMStateStore
resourcemanager run the ResourceManager
nodemanager run a nodemanager on each slave
timelineserver run the timeline server
rmadmin admin tools
sharedcachemanager run the SharedCacheManager daemon
scmadmin SharedCacheManager admin tools
version print the version
jar <jar> run a jar file
application prints application(s)
report/kill application
applicationattempt prints applicationattempt(s)
report
container prints container(s) report
node prints node report(s)
queue prints queue information
logs dump container logs
classpath prints the class path needed to
get the Hadoop jar and the
required libraries
cluster prints cluster information
daemonlog get/set the log level for each
daemon

5. Writing a Hello world on yarn

说了那么多,终于到正题了,现在我们来手把手写一个hello world on yarn。

我们将使用scala来完成这个程序,如果你不熟悉scala请看:https://www.tutorialspoint.com/scala/

再来回顾下一个app从提交到运行的整个流程:

  1. 客户端机向RM提交任务(submit job)
  2. RM接到任务后,会查看集群资源是否足够运行该任务(verify resources)
  3. 如果资源充足,就命令NM启动运行AM的container(start AM)
  4. AM向RM请求运行具体执行任务的资源(AM -> RM)
  5. RM命令NM启动容器执行具体任务(RM-> NM)
  6. AM负责监控所有执行该任务的容器的运行情况(AM reports)
  7. NM向RM报告节点的运行状况(NM reports)

这个流程一定要非常清楚,否则整个on yarn的程序对你来说会非常痛苦

针对这个流程,我们看到,我们需要完成三个主要的通信

1) submit job aka Client --- RM

客户端机向RM提交任务 通过YarnClient这个类来完成

  1. AM向RM请求资源

AMRMClientAsync使用这个类, events通过AMRMClientAsync.CallbackHandler来处理

3)AM和NM协作启动容器

NMClientAsync。 handling container events by NMClientAsync.CallbackHandler

三个通信协议

ApplicationClientProtocol, ApplicationMasterProtocol 和 ContainerManagementProtocol 被上述三个client包裹。大多数情况,所有的通信都通过上述三个client来进行,如果需要特别的定制化操作,可以使用这三个protocol来自己操作

好了话不多说,开始写"Hello world"

Hello world on Yarn

Step1 submit job

首先回忆第一步,我需要操作YarnClient这个类去提交一个application给RM。代码结构如下

1
2
3
4
5
val yarnClient = YarnClient.createYarnClient		
// 初始化
yarnClient.init(conf)
// 启动yarnClient
yarnClient.start()

当yarnClient启动后,我们需要创建一个新的application, 然后拿到它的response

1
2
3
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse
appId = newAppResponse.getApplicationId

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Client(conf: Configuration) {

private val log = LoggerFactory.getLogger(this.getClass)

// 创建一个yarnClient
private val yarnClient = YarnClient.createYarnClient

def submitApplication() : ApplicationId = {
var appId: ApplicationId = null

// 初始化
yarnClient.init(conf)
// 启动yarnClient
yarnClient.start()

log.info("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// 创建一个新的app然后拿到appResponse和Id
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse
appId = newAppResponse.getApplicationId

appId

}

}

是不是非常简单

newAppResponse中包含了cluster中资源capacity信息,这是为下一步启动AM做准备(step2), 你马上就能看到

step2 verify resource capacity

为了验证集群资源能力,防止我们请求了不合理的资源,我们需要对cluster resource capacity做一次验证

为了验证我们需要引入四个新的常量,为了后面和spark交互,我们统一和spark命名一致

1
2
3
4
private val amMemory = 512
private val amCores = 1
private val executorMemory = 1024
private val executorCores = 1

默认值也和spark一致,我们给AM分配512m内存, 1个core. 给executor分配1024m内存,1个core

验证代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
val maxMem = newAppResponse.getMaximumResourceCapability.getMemory
if (executorMemory > maxMem) {
throw new IllegalArgumentException(s"Required executor memory ($executorMemory)," +
s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
}
if (amMemory > maxMem) {
throw new IllegalArgumentException(s"Required AM memory $amMemory " +
s"is above the max threshold ($maxMem MB) of this cluster! " +
"Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
"'yarn.nodemanager.resource.memory-mb'.")
}
}

Step3 启动AM

如果你坚持看到了这里,那么你马上迎来最复杂的步骤,越过这座山,后面就是星辰大海。

我们知道ApplicationMaster是负责管理运行这个app的所有的容器的,所以我们首先要向RM请求一个容器用来启动AM.

我们要操作的类是 ApplicationSubmissionContext(后面简称ASC), 这个类定义了所有RM需要启动AM的信息。

作为客户端机我们主要需要set以下几个field

1 application info: 主要包括id 和name

  1. queue 提交的队列, priority 任务执行的优先级

  2. user: 谁在向RM submmit app

  3. ContainerLaunchContext:启动容器所需的上下文。我们后面简称是CLC

CLC 主要包含:

a) local resources(binaries, jars, files) 本地的资源

b) Environment Settings(CLASSPATH etc) 容器所需的环境变量

c) the command to be executed 需要执行的命令

d) security tokens 处理安全tokens

好我们看如何操作这个类

首先解决clc

1
2
3
4
5
6
7
// 创建一个clc
val clc = Records.newRecord(classOf[ContainerLaunchContext])
// 然后set我们需要的fields
clc.setCommands(null) // am里要执行的命令
clc.setLocalResources(null) // 需要的本地资源
clc.setEnvironment(null) // am里运行所需的环境变量
clc.setTokens(null) // 安全验证tokens

具体的implementations请看完整代码

然后ASC

1
2
3
4
val clc = createContainerLaunchContext(newAppResponse) // 创建一个clc
val asc = newApp.getApplicationSubmissionContext // 创建一个asc
asc.setResource(ascSetResource()) // 设置am所需的资源
asc.setAMContainerSpec(clc) // 设置clc

最后提交任务

1
yarnClient.submitApplication(asc)

没了,不要怀疑,on yarn看上去非常复杂,但其实就是set, set ,set就搞定了。关键是你要了解整个架构运行的原理和机制这样才能set好。所有的xxx on yarn本质上都是如此(tensorflow on yarn, pytorch on yarn, mxnet on yarn, spark on yarn ,flink on yarn) 之后有空的话,我们再手把手把tensorflow dnn训练mnist搬到yarn上来做分布式

具体set的内容可以参考完整代码

Writing an ApplicationMaster

下面该轮到AM了,AM我们再回顾下它的职责,

AM per application 每个应用一个

AM由RM来启动,通过client来提交

AM管理任务直到任务完成

我们看下官方解释

  • The AM is the actual owner of the job. It will be launched by the RM and via the client will be provided all the necessary information and resources about the job that it has been tasked with to oversee and complete.
  • As the AM is launched within a container that may (likely will) be sharing a physical host with other containers, given the multi-tenancy nature, amongst other issues, it cannot make any assumptions of things like pre-configured ports that it can listen on.
  • When the AM starts up, several parameters are made available to it via the environment. These include the ContainerId for the AM container, the application submission time and details about the NM (NodeManager) host running the ApplicationMaster. Ref ApplicationConstants for parameter names.
  • All interactions with the RM require an ApplicationAttemptId (there can be multiple attempts per application in case of failures). The ApplicationAttemptId can be obtained from the AM’s container id. There are helper APIs to convert the value obtained from the environment into objects.