当前位置 > CPDA数据分析师 > “数”业专攻 > Hadoop动态调整Map Task内存资源大小

Hadoop动态调整Map Task内存资源大小

来源:数据分析师 CPDA | 时间:2015-11-26 | 作者:admin

前言

我们都知道,在Hadoop中,一个Job的执行需要转化成1个个的Task去执行,在Task中,有会有2个类型,一个为Map Task,另一个就是Reduce Task.当然,这不是最底层的级别,在Task内部,还可以再分为TaskAttempt,叫做任务尝试,任务尝试姑且不在本篇文章的论述范围内.OK,针对每个Task,他当然会有他的资源使用量,广义的来讲,资源分为2个概念,1个是Memory 内存,另一个是Vcores,虚拟核数.这些资源的分配情况非常的关键,因为资源分少了,可能空闲集群资源浪费了,又可能会导致oom内存不够用的问题,假设你内存分小了,既然这样,那我们把资源调大了是不就行了,当然会导致一个狼多羊少的问题,毕竟资源有限,你用的多了,别人就会用的少了.所以这里就会衍生出一个问题,对于Job中的每个Task,我该如何去设置可使用的资源量呢,采用默认统一的map.memory.mb这样的配置显然不是一个好的解决办法,其实让人可以马上联想到的办法就是能够数据量的大小动态调整分配的资源量,这无疑是最棒的方案,下面就来简单的聊聊这个方案.

资源调配的指标

什么是资源调配的指标,通俗的讲就是一个资源分配的参考值,依照这个值,我可以进行资源的动态分配,这也是非常符合正常逻辑思维的方式的.在这里的标准值就是处理数据量的大小,所以调整的目标对象就是Map Task而不是Reduce Task.那么这个数据可以怎么拿到呢,稍微了解过Hadoop的人一定知道map的过程是如何拿到数据的,简单的就是从inputSplit中拿到数据,而这个inputSplit当然会被保留在Map Task中.就是下面所示的代码中:

@SuppressWarnings("rawtypes")
public class MapTaskAttemptImpl extends TaskAttemptImpl {
  private final TaskSplitMetaInfo splitInfo;
  public MapTaskAttemptImpl(TaskId taskId, int attempt, 
  EventHandler eventHandler, Path jobFile, 
  int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
  TaskAttemptListener taskAttemptListener, 
  Token<JobTokenIdentifier> jobToken,
  Credentials credentials, Clock clock,
  AppContext appContext) {
super(taskId, attempt, eventHandler, 
taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
jobToken, credentials, clock, appContext);
this.splitInfo = splitInfo;
  }

在这个TaskSplitMetaInfo中就会有输入数据长度的一个变量

/**
   * This represents the meta information about the task split that the 
   * JobTracker creates
   */
  public static class TaskSplitMetaInfo {
    private TaskSplitIndex splitIndex;
    private long inputDataLength;
    private String[] locations;
    public TaskSplitMetaInfo(){
      this.splitIndex = new TaskSplitIndex();
      this.locations = new String[0];
    }
...

后面我们就用到这个关键变量.

需要调整的资源变量

上文中已经提过,目标资源调整的维度有2个,1个是内存,还有1个核数,数据量的大小一般会直接与内存大小相关联,核数偏重于处理速度,所以我们应该调整的task的内存大小,也就是map.memory.mb这个配置项的值.这个配置项的默认值是1024M,就是下面这个配置:

public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
  public static final int DEFAULT_MAP_MEMORY_MB = 1024;

如果你在配置文件中没配的话,他走的就是默认值,个人感觉这个值还是有点偏大的,如果一个Job一不小心起了1000多个Task,那么1T的内存就用掉了.OK,下面看下这个变量是保存在哪个变量中的呢,进入到TaskAttemptImpl中你就可以发现了.

/**
 * Implementation of TaskAttempt interface.
 */
@SuppressWarnings({ "rawtypes" })
public abstract class TaskAttemptImpl implements
    org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
      EventHandler<TaskAttemptEvent> {

  static final Counters EMPTY_COUNTERS = new Counters();
  private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
  private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
  private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

  protected final JobConf conf;
  protected final Path jobFile;
  protected final int partition;
  protected EventHandler eventHandler;
  private final TaskAttemptId attemptId;
  private final Clock clock;
  private final org.apache.hadoop.mapred.JobID oldJobId;
  private final TaskAttemptListener taskAttemptListener;
  private final Resource resourceCapability;
  protected Set<String> dataLocalHosts;
...

就是上面这个resourceCapability,在这里就会有核数和内存2个资源指标.

@Public
@Stable
public static Resource newInstance(int memory, int vCores) {
  Resource resource = Records.newRecord(Resource.class);
  resource.setMemory(memory);
  resource.setVirtualCores(vCores);
  return resource;
}

/**
 * Get <em>memory</em> of the resource.
 * @return <em>memory</em> of the resource
 */
@Public
@Stable
public abstract int getMemory();
  
/**
 * Set <em>memory</em> of the resource.
 * @param memory <em>memory</em> of the resource
 */
@Public
@Stable
public abstract void setMemory(int memory);

到时就可以在这边进行设置.

动态调整Map TaskAttempt内存大小

刚刚上文中已经提到过,想要根据处理数据量的大小来调整Map的内存大小,首先你要有一个标准值,比如1个G的数据量对应1个G的内存值,然后如果你这次来了512M的数据,那么我就配512/1024(就是1个G)*1024M=512M,所以我最后分的内存就是512M,如果数据量大了,同理.这个标准值当然要做出可配,有使用方来决定.姑且用下面新的配置值来定义:

public static final String MAP_MEMORY_MB_AUTOSET_ENABLED = "map.memory-autoset.enabled";
  public static final String DEFAULT_MEMORY_MB_AUTOSET_ENABLED = "false";

  public static final String MAP_UNIT_INPUT_LENGTH = "map.unit-input.length";
  public static final int DEFAULT_MAP_UNIT_INPUT_LENGTH = 1024 * 1024 * 1024;

然后在Map TaskAttempt中加上动态调整方法,如果你开启了此项新功能,则会执行方法中的部分操作.

public class MapTaskAttemptImpl extends TaskAttemptImpl {
  private final TaskSplitMetaInfo splitInfo;
  public MapTaskAttemptImpl(TaskId taskId, int attempt, 
  EventHandler eventHandler, Path jobFile, 
  int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
  TaskAttemptListener taskAttemptListener, 
  Token<JobTokenIdentifier> jobToken,
  Credentials credentials, Clock clock,
  AppContext appContext) {
super(taskId, attempt, eventHandler, 
taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
jobToken, credentials, clock, appContext);
this.splitInfo = splitInfo;
autoSetMemorySize();
  }
...
  private void autoSetMemorySize() {
int memory;
int unitInputLength;
int unitMemorySize;
boolean isMemoryAutoSetEnabled;
Resource resourceCapacity;
isMemoryAutoSetEnabled =
Boolean.parseBoolean(conf.get(
MRJobConfig.MAP_MEMORY_MB_AUTOSET_ENABLED,
MRJobConfig.DEFAULT_MEMORY_MB_AUTOSET_ENABLED));
//判断是否开启动态调整内存功能
if (isMemoryAutoSetEnabled) {
  unitInputLength =
  conf.getInt(MRJobConfig.MAP_UNIT_INPUT_LENGTH,
  MRJobConfig.DEFAULT_MAP_UNIT_INPUT_LENGTH);
  unitMemorySize =
  conf.getInt(MRJobConfig.MAP_MEMORY_MB,
  MRJobConfig.DEFAULT_MAP_MEMORY_MB);
  memory =
  (int) (Math.ceil(1.0 * splitInfo.getInputDataLength()
  / unitInputLength) * unitMemorySize);
} else {
  memory =
  conf.getInt(MRJobConfig.MAP_MEMORY_MB,
  MRJobConfig.DEFAULT_MAP_MEMORY_MB);
}
//调整内存资源量
resourceCapacity = getResourceCapability();
resourceCapacity.setMemory(memory);
  }

在这里我做了特别处理,为了使分配的内存大小符合2的幂次方,我用了向上取整的方法算倍数,这样规范化一些.下面是单元测试案例

@Test
public void testMapTaskAttemptMemoryAutoSet() throws Exception {
  int memorySize;
  int adjustedMemorySize;
  Resource resourceCapacity;

  EventHandler eventHandler = mock(EventHandler.class);
  String[] hosts = new String[3];
  hosts[0] = "host1";
  hosts[1] = "host2";
  hosts[2] = "host3";
  TaskSplitMetaInfo splitInfo =
      new TaskSplitMetaInfo(hosts, 0, 2 * 1024 * 1024 * 1024l);

  TaskAttemptImpl mockTaskAttempt =
      createMapTaskAttemptImplForTest(eventHandler, splitInfo);

  resourceCapacity = mockTaskAttempt.getResourceCapability();
  memorySize = resourceCapacity.getMemory();

  // Disable the auto-set memorySize function
  // memorySize will be equal to default size
  assertEquals(MRJobConfig.DEFAULT_MAP_MEMORY_MB, memorySize);

  // Enable the auto-set memory function
  Clock clock = new SystemClock();
  ApplicationId appId = ApplicationId.newInstance(1, 1);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  Path jobFile = mock(Path.class);
  JobConf jobConf = new JobConf();
  jobConf.set(MRJobConfig.MAP_MEMORY_MB_AUTOSET_ENABLED, "true");
  jobConf.set(MRJobConfig.MAP_MEMORY_MB,
      String.valueOf(MRJobConfig.DEFAULT_MAP_MEMORY_MB));
  jobConf.set(MRJobConfig.MAP_UNIT_INPUT_LENGTH,
      String.valueOf(MRJobConfig.DEFAULT_MAP_UNIT_INPUT_LENGTH));

  TaskAttemptImpl taImpl =
      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splitInfo,
          jobConf, taListener, null, null, clock, null);

  resourceCapacity = taImpl.getResourceCapability();
  memorySize = resourceCapacity.getMemory();
  adjustedMemorySize =
      (int) (Math.ceil(1.0 * splitInfo.getInputDataLength()
          / MRJobConfig.DEFAULT_MAP_UNIT_INPUT_LENGTH) * MRJobConfig.DEFAULT_MAP_MEMORY_MB);

  // Enable the auto-set function,the memorySize will be changed
  assertEquals(adjustedMemorySize, memorySize);
}