19、Executor原理剖析与源码分析


一、原理图解



二、源码分析

1、Executor注册机制

worker中为Application启动的executor,实际上是启动了这个CoarseGrainedExecutorBackend进程;


Executor注册机制:
###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala

/**
    * 在actor的初始化方法中
    */
  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    // 获取了driver的executor
    driver = context.actorSelection(driverUrl)
    // 向driver发送RegisterExecutor消息,driver是CoarseGrainedSchedulerBackend的一个内部类
    // driver注册executor成功之后,会发送回来RegisteredExecutor消息
    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }





###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala

override def receiveWithLogging = {
    // driver注册executor成功之后,会发送回来RegisteredExecutor消息
    // 此时,CoarseGrainedExecutorBackend会创建Executor对象,作为执行句柄
    // 其实它的大部分功能,都是通过Executor实现的
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      val (hostname, _) = Utils.parseHostPort(hostPort)
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)


3、启动Task

###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala

    // 启动task
    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        // 反序列化task
        val ser = env.closureSerializer.newInstance()
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        // 用内部的执行句柄,Executor的launchTask()方法来启动一个task
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }





###org.apache.spark.executor/Executor.scala

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
    // 对于每一个task,都会创建一个TaskRunner
    // TaskRunner继承的是Java多线程中的Runnable接口
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    // 将TaskRunner放入内存缓存
    runningTasks.put(taskId, tr)
    // Executor内部有一个Java线程池,这里其实将task封装在一个线程中(TaskRunner),直接将线程丢入线程池,进行执行
    // 线程池是自动实现了排队机制的,也就是说,如果线程池内的线程暂时没有空闲的,那么丢进去的线程都是要排队的
    threadPool.execute(tr)
  }
优质内容筛选与推荐>>
1、设计模式之依赖倒转原则
2、VMware workstation Windows 10虚拟机安装步骤
3、链队列的基本算法
4、【LaTeX-4】Greek Letters
5、a 标签添加 onclick 事件


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号





    联系我们

    欢迎来到TinyMind。

    关于TinyMind的内容或商务合作、网站建议,举报不良信息等均可联系我们。

    TinyMind客服邮箱:support@tinymind.net.cn