博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析
阅读量:6599 次
发布时间:2019-06-24

本文共 6885 字,大约阅读时间需要 22 分钟。

欢迎转载,转载请注明出处,徽沪一郎.

概要

WEB UI和Metrics子系统为外部观察监测Spark内部运行情况提供了必要的窗口,本文将简略的过一下其内部代码实现。

WEB UI

先上图感受一下spark webui 假设当前已经在本机运行standalone cluster模式,输入http://127.0.0.1:8080将会看到如下页面

  driver application默认会打开4040端口进行http监听,可以看到application相关的详细信息

显示每个stage的详细信息

启动过程

本节要讨论的重点是http server是如何启动的,页面中的数据是从哪里获取到的?Spark中用到的http server是jetty, jetty采用java编写,是非常轻巧的servlet engine和http server。能够嵌入到用户程序中执行,不用像tomcat或jboss那样需要自己独立的jvm进程。

SparkUI在SparkContext初始化的时候创建

// Initialize the Spark UI , registering allassociated listenersprivate [spark] val ui = new SparkUI (this)ui.bind ()

initialize的主要工作是注册页面处理句柄,WebUI的子类需要实现自己的initialize函数

bind将真正启动jetty server.

def bind () {assert (! serverInfo .isDefined , " Attempted to bind %s more than once!". format ( className ))try {// 启 动 JettyServerserverInfo = Some( startJettyServer (" 0.0.0.0 ",port , handlers , conf))logInfo (" Started %s at http ://%s:%d". format (className , publicHostName , boundPort ))} catch {case e: Exception =>logError (" Failed to bind %s". format ( className ), e)System .exit (1)}}

在startJettyServer函数中将JettyServer运行起来的关键处理函数是connect

def connect(currentPort: Int): (Server, Int) = {      val server = new Server(new InetSocketAddress(hostName, currentPort))      val pool = new QueuedThreadPool      pool.setDaemon(true)      server.setThreadPool(pool)      server.setHandler(collection)      Try {        server.start()      } match {        case s: Success[_] =>          (server, server.getConnectors.head.getLocalPort)        case f: Failure[_] =>          val nextPort = (currentPort + 1) % 65536          server.stop()          pool.stop()          val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."          if (f.toString.contains("Address already in use")) {            logWarning(s"$msg - $f")          } else {            logError(msg, f.exception)          }          connect(nextPort)      }    }    val (server, boundPort) = connect(port)    ServerInfo(server, boundPort, collection)  }

数据获取

页面中的数据是如何获取的呢,这就要归功于SparkListener了,典型的观察者设计模式。当有与stage及task相关的事件发生时,这些Listener都将收到通知,并进行数据更新。

需要指出的是,数据尽管得以自动更新,但页面并没有,还是需要手工刷新才能得到最新的数据。

 

上图显示的是SparkUI中注册了哪些SparkListener子类。来看一看这些子类是在什么时候注册进去的, 注意研究一下SparkUI.initialize函

def initialize() {    listenerBus.addListener(storageStatusListener)    val jobProgressTab = new JobProgressTab(this)    attachTab(jobProgressTab)    attachTab(new StorageTab(this))    attachTab(new EnvironmentTab(this))    attachTab(new ExecutorsTab(this))    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))    attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))    attachHandler(      createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))    if (live) {      sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)    }  }

举一个实际例子来看看Notifier发送Event的时刻,比如有任务提交的时 resourceOffer->taskStarted->handleBeginEvent

private [ scheduler ] def handleBeginEvent (task: Task[_], taskInfo : TaskInfo ) {listenerBus .post( SparkListenerTaskStart (task.stageId , taskInfo ))submitWaitingStages ()}

post其实是向listenerBus的消息队列中添加一个消息,真正将消息发送 出去的时另一个处理线程listenerThread

override def run (): Unit = Utils.logUncaughtExceptions {while (true) {eventLock . acquire ()// Atomically remove and process this eventLiveListenerBus .this. synchronized {val event = eventQueue .pollif (event == SparkListenerShutdown ) {// Get out of the while loop and shutdownthe daemon threadreturn}Option (event). foreach ( postToAll )}}}

Option(event).foreach(postToAll)负责将事件通知给各个Observer.postToAll的函数实现如下

def postToAll(event: SparkListenerEvent) {    event match {      case stageSubmitted: SparkListenerStageSubmitted =>        foreachListener(_.onStageSubmitted(stageSubmitted))      case stageCompleted: SparkListenerStageCompleted =>        foreachListener(_.onStageCompleted(stageCompleted))      case jobStart: SparkListenerJobStart =>        foreachListener(_.onJobStart(jobStart))      case jobEnd: SparkListenerJobEnd =>        foreachListener(_.onJobEnd(jobEnd))      case taskStart: SparkListenerTaskStart =>        foreachListener(_.onTaskStart(taskStart))      case taskGettingResult: SparkListenerTaskGettingResult =>        foreachListener(_.onTaskGettingResult(taskGettingResult))      case taskEnd: SparkListenerTaskEnd =>        foreachListener(_.onTaskEnd(taskEnd))      case environmentUpdate: SparkListenerEnvironmentUpdate =>        foreachListener(_.onEnvironmentUpdate(environmentUpdate))      case blockManagerAdded: SparkListenerBlockManagerAdded =>        foreachListener(_.onBlockManagerAdded(blockManagerAdded))      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>        foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))      case unpersistRDD: SparkListenerUnpersistRDD =>        foreachListener(_.onUnpersistRDD(unpersistRDD))      case applicationStart: SparkListenerApplicationStart =>        foreachListener(_.onApplicationStart(applicationStart))      case applicationEnd: SparkListenerApplicationEnd =>        foreachListener(_.onApplicationEnd(applicationEnd))      case SparkListenerShutdown =>    }  }

Metrics

在系统设计中,测量模块是不可或缺的组成部分。通过这些测量数据来感知系统的运行情况。

在Spark中,测量模块由MetricsSystem来担任,MetricsSystem中有三个重要的概念,分述如下。

  • instance 表示谁在使用metrics system, 目前已知的有master, worker, executor和client driver会创建metrics system用以测量
  • source 表示数据源,从哪里获取数据
  • sinks 数据目的地,将从source获取的数据发送到哪

Spark目前支持将测量数据保存或发送到如下目的地

  • ConsoleSink 输出到console
  • CSVSink 定期保存成为CSV文件
  • JmxSink 注册到JMX,以通过JMXConsole来查看
  • MetricsServlet 在SparkUI中添加MetricsServlet用以查看Task运行时的测量数据
  • GraphiteSink 发送给Graphite以对整个系统(不仅仅包括spark)进行监控

下面从MetricsSystem的创建,数据源的添加,数据更新与发送几个方面来跟踪一下源码。

初始化过程

MetricsSystem依赖于由codahale提供的第三方库Metrics,可以在metrics.codahale.com找到更为详细的介绍。

以Driver Application为例,driver application首先会初始化SparkContext,在SparkContext的初始化过程中就会创建MetricsSystem,具体调用关系如下。 SparkContext.init->SparkEnv.init->MetricsSystem.createMetricsSystem

注册数据源,继续以SparkContext为例

private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)  private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)  private def initDriverMetrics() {    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)  }initDriverMetrics()

数据读取

数据读取由Sink来完成,在Spark中创建的Sink子类如下图所示

读取最新的数据,以CsvSink为例,最主要的就是创建CsvReporter,启动之后会定期更新最近的数据到console。不同类型的Sink所使用的Reporter是不一样的。

val reporter: CsvReporter = CsvReporter.forRegistry(registry)      .formatFor(Locale.US)      .convertDurationsTo(TimeUnit.MILLISECONDS)      .convertRatesTo(TimeUnit.SECONDS)      .build(new File(pollDir))        override def start() {    reporter.start(pollPeriod, pollUnit)  }

Spark中关于metrics子系统的配置文件详见conf/metrics.properties. 默认的Sink是MetricsServlet,在任务提交执行之后,输入http://127.0.0.1:4040/metrics/json会得到以json格式保存的metrics信息。

转载于:https://www.cnblogs.com/hseagle/p/3872003.html

你可能感兴趣的文章
五款免费的磁盘空间使用情况报告软件
查看>>
JAVA线程7 - 终止线程
查看>>
网卡绑定(服务器&&交换机),缓存服务器Squid架构配置
查看>>
linux下CPU、内存、IO、网络的压力测试,硬盘读写速度测试,Linux三个系统资源监控工具...
查看>>
Linux的lvm逻辑卷管理
查看>>
web网站加速之CDN(Content Delivery Network)技术原理
查看>>
Redis 数据结构-字符串源码分析
查看>>
关于linux load average的深入了解
查看>>
RRDtool监控绘图调研(优化版)
查看>>
时间区间选择插件daterangepicker
查看>>
2010-09-15 09:42 IAR扩展外部文本编辑器UltraEdit
查看>>
打算写一款框架来提高自己 写个结构吧
查看>>
这世界就是,一些人总在昼夜不停地运转,而另外一些人,起床就发现世界已经变了。...
查看>>
php递归函数
查看>>
学习Backbone中的一些疑惑
查看>>
应用程序是如何去访问所需的硬件资源的?
查看>>
h3c交换机备份
查看>>
FreeNAS如何配置LACP(链路聚合和故障)
查看>>
网页设置
查看>>
colorAccent,colorPrimary,colorPrimaryDark……来这里你就明白了
查看>>