
distributedShell是Yarn自帶的應用程序,和MR類(lèi)似,當前工具可以用來(lái)對Yarn進(jìn)行壓測。
參考命令如下:
./bin/hadoop jar ./share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.4.1.jar \
-jar ./share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.4.1.jar -shell_command \
'/bin/date' -num_containers 5
可以提交一個(gè)樣例作業(yè)到Yarn上面。
當前樣例的入口類(lèi)是org.apache.hadoop.yarn.applications.distributedshell.Client ,在pom文件里面默認定義了當前類(lèi)為主類(lèi)。所以在提交的時(shí)候可以不用指定主類(lèi)。
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- 省略部分參數 -->
</executions>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.yarn.applications.distributedshell.Client</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
核心流程主要包含下面3個(gè):
其中前面兩個(gè)主要在客戶(hù)端,第3個(gè)主要是在yarn上面。
初始化階段包括下面兩部分:
下面是初始化Client對象的核心代碼。
Client(String appMasterMainClass, Configuration conf) {
this.conf = conf;
this.conf.setBoolean(
YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true);
this.appMasterMainClass = appMasterMainClass;
// 創(chuàng )建和RM的連接
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
opts = new Options();
// 初始化支持的參數列表
stopSignalReceived = new AtomicBoolean(false);
isRunning = new AtomicBoolean(false);
}
初始化Client,在初始化Client階段主要是讀取命令行參數。
// 初始化Client函數入口
boolean doRun = client.init(args);
首先還是建立和Yarn服務(wù)端的連接,為作業(yè)提交做準備。
isRunning.set(true);
yarnClient.start();
在連接建立之后會(huì )查詢(xún)并且在控制臺打印Yarn服務(wù)端的一些信息。主要包含下面內容:
yarnClient.getYarnClusterMetrics() 查詢(xún)到并且顯示。yarnClient.getNodeReports(NodeState.RUNNING)查詢(xún)到。yarnClient.getQueueInfo(this.amQueue)查詢(xún)到。yarnClient.getQueueAclsInfo()查詢(xún)。yarnClient.getResourceProfiles()查詢(xún)。在打印完集群信息之后才是作業(yè)提交的開(kāi)始。
提交作業(yè)之前,是需要先向RM申請AppId的。AppId可以通過(guò)YarnClientApplication app = yarnClient.createApplication();獲取。作業(yè)提交信息一般都在A(yíng)pplicationSubmissionContext里面,包含下面信息:
AM申請資源的請求。通過(guò)appContext.setAMContainerResourceRequests(amResourceRequests);設置。
AM的上下文信息:
App名稱(chēng)。通過(guò)appContext.setApplicationName(appName);設置。
app tag信息。
資源標簽信息。
作業(yè)的優(yōu)先級。
作業(yè)提交的隊列信息。
日志聚合相關(guān)配置。主要是和日志歸集的Rolling模式有關(guān)系??梢栽O置需要通過(guò)rolling的方式歸集哪些日志。通過(guò)appContext.setLogAggregationContext(logAggregationContext);設置。
作業(yè)真正提交的代碼只有一行:
yarnClient.submitApplication(appContext);
當前樣例做到了作業(yè)所需要的信息可配置。是一個(gè)比較適合開(kāi)發(fā)作業(yè)的樣例。
AM的核心代碼是在A(yíng)pplicationMaster.java里面的。在啟動(dòng)AM的時(shí)候會(huì )調用到當前函數的main函數。
在構造函數里面和init函數里面,主要是加載配置項以及命令行參數。真正運行的函數是run,核心在run函數里面,
首先需要創(chuàng )建和RM以及NM的連接。
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
startTimelineClient(conf);
在A(yíng)M啟動(dòng)OK了第一件事就是需要去RM上面注冊,證明當前AM已經(jīng)啟動(dòng)完成了。
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl, placementConstraintMap);
普通Container的申請是在A(yíng)M里面處理的,類(lèi)似下面代碼,下面代碼是異步申請的。
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
當Container申請好之后,可以通過(guò)下面代碼獲取,在樣例中觸發(fā)onContainerAllocated事件。
List<Container> allocated = response.getAllocatedContainers();
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
}
通過(guò)下面代碼啟動(dòng)Container.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(),
null, containerRetryContext);
nmClientAsync.startContainerAsync(container, ctx);
在作業(yè)結束的時(shí)候,AM需要做下面事:
nmClientAsync.stop();
try {
amRMClient.unregisterApplicationMaster(appStatus, message, null);
} catch (YarnException | IOException ex) {
LOG.error("Failed to unregister application", ex);
}
amRMClient.stop();