MapReduce中的计数器原理

1、MapReduce计数器是什么?

  计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。

2、MapReduce计数器能做什么?

  MapReduce 计数器(Counter)为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些 Counter 的数值表现出来的。

MapReduce 任务计数器
文件系统计数器
FileInputFormat 计数器
FileOutputFormat 计数器
作业计数器

3、计数器的该如何使用?

定义计数器

1)枚举声明计数器

1
2
// 自定义枚举变量Enum 
Counter counter = context.getCounter(Enum enum)

2)自定义计数器

1
2
// 自己命名groupName和counterName 
Counter counter = context.getCounter(String groupName,String counterName)

3)初始化计数器

1
counter.setValue(long value);// 设置初始值

4)计数器自增

1
counter.increment(long incr);// 增加计数
  1. 获取枚举计数器的值
1
2
3
4
5
6
7
Configuration conf = new Configuration(); 
Job job = new Job(conf, "MyCounter");
job.waitForCompletion(true);
Counters counters=job.getCounters();
// 查找枚举计数器,假如Enum的变量为BAD_RECORDS_LONG
Counter counter=counters.findCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG);
long value=counter.getValue();//获取计数值
  1. 获取自定义计数器的值
1
2
3
4
5
6
7
Configuration conf = new Configuration(); 
Job job = new Job(conf, "MyCounter");
job.waitForCompletion(true);
Counters counters = job.getCounters();
// 假如groupName为ErrorCounter,counterName为toolong
Counter counter=counters.findCounter("ErrorCounter","toolong");
long value = counter.getValue();// 获取计数值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.buaa;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* @ProjectName CustomCounterDemo
* @PackageName com.buaa
* @ClassName MyCounter
* @Description 假如一个文件,规范的格式是3个字段,“\t”作为分隔符,其中有2条异常数据,一条数据是只有2个字段,一条数据是有4个字段
* @Author 刘吉超
* @Date 2016-05-23 20:10:14
*/
public class MyCounter {
// \t键
private static String TAB_SEPARATOR = "\t";

public static class MyCounterMap extends
Mapper<LongWritable, Text, Text, Text> {
// 定义枚举对象
public static enum LOG_PROCESSOR_COUNTER {
BAD_RECORDS_LONG, BAD_RECORDS_SHORT
};

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String arr_value[] = value.toString().split(TAB_SEPARATOR);
if (arr_value.length > 3) {
/* 自定义计数器 */
context.getCounter("ErrorCounter", "toolong").increment(1);
/* 枚举计数器 */
context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG).increment(1);
} else if (arr_value.length < 3) {
// 自定义计数器
context.getCounter("ErrorCounter", "tooshort").increment(1);
// 枚举计数器
context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_SHORT).increment(1);
}
}
}

@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
String[] args0 = {
"hdfs://hadoop2:9000/buaa/counter/counter.txt",
"hdfs://hadoop2:9000/buaa/counter/out/"
};
// 读取配置文件
Configuration conf = new Configuration();

// 如果输出目录存在,则删除
Path mypath = new Path(args0[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}

// 新建一个任务
Job job = new Job(conf, "MyCounter");
// 主类
job.setJarByClass(MyCounter.class);
// Mapper
job.setMapperClass(MyCounterMap.class);

// 输入目录
FileInputFormat.addInputPath(job, new Path(args0[0]));
// 输出目录
FileOutputFormat.setOutputPath(job, new Path(args0[1]));

// 提交任务,并退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}