博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
jobtracker对提交作业的初始化
阅读量:6268 次
发布时间:2019-06-22

本文共 5308 字,大约阅读时间需要 17 分钟。

  1. 通过EagerTaskInitializationListener类中的jobAdded(jobInProgress  job)方法将所提交的作业加入到要初始化的作业队列中,代码如下:
  2. public void jobAdded(JobInProgress job) {
        synchronized (jobInitQueue) {
          jobInitQueue.add(job);
          resortInitQueue();//按照时间大小递增排序
          jobInitQueue.notifyAll();
        }
      }
  3. 在初始化队列jobinitqueue中的是等待初始化的作业。通过线程池中的线程运行的jobinitmanager类代码进行初始化。代码如下:
  4. public void start() throws IOException {//EagerTaskInitializationListener类中的start方法
        this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
        jobInitManagerThread.setDaemon(true);//设置为守护进程
        this.jobInitManagerThread.start();
      }
  5. jobinitmanager类的具体代码如下:
  6. class JobInitManager implements Runnable {
        public void run() {
          JobInProgress job = null;
          while (true) {
            try {
              synchronized (jobInitQueue) {
                while (jobInitQueue.isEmpty()) {
                  jobInitQueue.wait();
                }
                job = jobInitQueue.remove(0);//每次取出第一个
              }
              threadPool.execute(new InitJob(job));
            } catch (InterruptedException t) {
              LOG.info("JobInitManagerThread interrupted.");
              break;
            }
          }
          LOG.info("Shutting down thread pool");
          threadPool.shutdownNow();
        }
      }
  7. initjob(job)代码如下:
  8. class InitJob implements Runnable {
        private JobInProgress job;
        public InitJob(JobInProgress job) {
          this.job = job;
        }
        public void run() {
          ttm.initJob(job);//tasktrackermanager接口定义的initjob()方法,在jobtracker中有实现
        }
      }
  9. jobtracker中的initjob()方法
  10. public void initJob(JobInProgress job) {
        if (null == job) {
          LOG.info("Init on null job is not valid");
          return;
        }
        try {
          JobStatus prevStatus = (JobStatus)job.getStatus().clone();
          LOG.info("Initializing " + job.getJobID());
          job.initTasks();//调用JobInProgress中的initTasks()方法初始化job中的所有task
          // Inform the listeners if the job state has changed
          // Note : that the job will be in PREP state.
          JobStatus newStatus = (JobStatus)job.getStatus().clone();
          if (prevStatus.getRunState() != newStatus.getRunState()) {
            JobStatusChangeEvent event =
              new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
                  newStatus);
            synchronized (JobTracker.this) {
              updateJobInProgressListeners(event);
            }
          }
        } catch (KillInterruptedException kie) {
          //   If job was killed during initialization, job state will be KILLED
          LOG.error("Job initialization interrupted:\n" +
              StringUtils.stringifyException(kie));
          killJob(job);
        } catch (Throwable t) {
          String failureInfo =
            "Job initialization failed:\n" + StringUtils.stringifyException(t);
          // If the job initialization is failed, job state will be FAILED
          LOG.error(failureInfo);
          job.getStatus().setFailureInfo(failureInfo);
          failJob(job);
        }
         }
  11. JobInProgress中的initTasks()方法中主要代码如下:
  12.  public synchronized void initTasks()
      numMapTasks = splits.length;
        // Sanity check the locations so we don't create/initialize unnecessary tasks
        for (TaskSplitMetaInfo split : splits) {
          NetUtils.verifyHostnames(split.getLocations());
        }
        
        jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
        jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
        this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
        this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);
        maps = new TaskInProgress[numMapTasks];
        for(int i=0; i < numMapTasks; ++i) {
          inputLength += splits[i].getInputDataLength();
          maps[i] = new TaskInProgress(jobId, jobFile,
                                       splits[i],
                                       jobtracker, conf, this, i, numSlotsPerMap);
        }
        LOG.info("Input size for job " + jobId + " = " + inputLength
            + ". Number of splits = " + splits.length);
        // Set localityWaitFactor before creating cache
        localityWaitFactor =
          conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
        if (numMapTasks > 0) {
          nonRunningMapCache = createCache(splits, maxLevel);
        }
        // set the launch time
        this.launchTime = jobtracker.getClock().getTime();
        //
        // Create reduce tasks
        //
        this.reduces = new TaskInProgress[numReduceTasks];
        for (int i = 0; i < numReduceTasks; i++) {
          reduces[i] = new TaskInProgress(jobId, jobFile,
                                          numMapTasks, i,
                                          jobtracker, conf, this, numSlotsPerReduce);
          nonRunningReduces.add(reduces[i]);
        }
        // Calculate the minimum number of maps to be complete before
        // we should start scheduling reduces
        completedMapsForReduceSlowstart =
          (int)Math.ceil(
              (conf.getFloat("mapred.reduce.slowstart.completed.maps",
                             DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
               numMapTasks));
        
        // ... use the same for estimating the total output of all maps
        resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
        
        // create cleanup two cleanup tips, one map and one reduce.
        cleanup = new TaskInProgress[2];
        // cleanup map tip. This map doesn't use any splits. Just assign an empty
        // split.
        TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
        cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
                jobtracker, conf, this, numMapTasks, 1);
        cleanup[0].setJobCleanupTask();
        // cleanup reduce tip.
        cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                           numReduceTasks, jobtracker, conf, this, 1);
        cleanup[1].setJobCleanupTask();
        // create two setup tips, one map and one reduce.
        setup = new TaskInProgress[2];
        // setup map tip. This map doesn't use any split. Just assign an empty
        // split.
        setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
                jobtracker, conf, this, numMapTasks + 1, 1);
        setup[0].setJobSetupTask();
        // setup reduce tip.
        setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                           numReduceTasks + 1, jobtracker, conf, this, 1);
        setup[1].setJobSetupTask();
      }

转载于:https://www.cnblogs.com/cxtblogs/p/5038516.html

你可能感兴趣的文章
Silverlight实用窍门系列:70.Silverlight的视觉状态组VisualStateGroup
查看>>
照片筛选与上传功能
查看>>
Hello ZED
查看>>
常见web攻击方式
查看>>
hdu 4472
查看>>
oracle存储过程中is和as区别
查看>>
windows 2003 群集
查看>>
几个gcc的扩展功能
查看>>
Spark一个简单案例
查看>>
关于结构体占用空间大小总结(#pragma pack的使用)
查看>>
通过浏览器查看nginx服务器状态配置方法
查看>>
shell简介
查看>>
android 使用WebView 支持播放优酷视频,土豆视频
查看>>
怎么用secureCRT连接Linux
查看>>
C# 使用WinRar命令压缩和解压缩
查看>>
linux学习笔记一----------文件相关操作
查看>>
Mono for Android 优势与劣势
查看>>
服务器端开发技术
查看>>
Python3中urllib详细使用方法(header,代理,超时,认证,异常处理)
查看>>
ajax提交多个对象,使用序列化表单和FormData
查看>>