1、MapReduce计数器是什么?
计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。
2、MapReduce计数器能做什么?
MapReduce 计数器(Counter)为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些 Counter 的数值表现出来的。
MapReduce 任务计数器
文件系统计数器
FileInputFormat 计数器
FileOutputFormat 计数器
作业计数器
3、计数器的该如何使用?
定义计数器
1)枚举声明计数器
1 2
| // 自定义枚举变量Enum Counter counter = context.getCounter(Enum enum)
|
2)自定义计数器
1 2
| // 自己命名groupName和counterName Counter counter = context.getCounter(String groupName,String counterName)
|
3)初始化计数器
1
| counter.setValue(long value);// 设置初始值
|
4)计数器自增
1
| counter.increment(long incr);// 增加计数
|
- 获取枚举计数器的值
1 2 3 4 5 6 7
| Configuration conf = new Configuration(); Job job = new Job(conf, "MyCounter"); job.waitForCompletion(true); Counters counters=job.getCounters(); // 查找枚举计数器,假如Enum的变量为BAD_RECORDS_LONG Counter counter=counters.findCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG); long value=counter.getValue();//获取计数值
|
- 获取自定义计数器的值
1 2 3 4 5 6 7
| Configuration conf = new Configuration(); Job job = new Job(conf, "MyCounter"); job.waitForCompletion(true); Counters counters = job.getCounters(); // 假如groupName为ErrorCounter,counterName为toolong Counter counter=counters.findCounter("ErrorCounter","toolong"); long value = counter.getValue();// 获取计数值
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| package com.buaa;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/** * @ProjectName CustomCounterDemo * @PackageName com.buaa * @ClassName MyCounter * @Description 假如一个文件,规范的格式是3个字段,“\t”作为分隔符,其中有2条异常数据,一条数据是只有2个字段,一条数据是有4个字段 * @Author 刘吉超 * @Date 2016-05-23 20:10:14 */ public class MyCounter { // \t键 private static String TAB_SEPARATOR = "\t";
public static class MyCounterMap extends Mapper<LongWritable, Text, Text, Text> { // 定义枚举对象 public static enum LOG_PROCESSOR_COUNTER { BAD_RECORDS_LONG, BAD_RECORDS_SHORT }; protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String arr_value[] = value.toString().split(TAB_SEPARATOR); if (arr_value.length > 3) { /* 自定义计数器 */ context.getCounter("ErrorCounter", "toolong").increment(1); /* 枚举计数器 */ context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG).increment(1); } else if (arr_value.length < 3) { // 自定义计数器 context.getCounter("ErrorCounter", "tooshort").increment(1); // 枚举计数器 context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_SHORT).increment(1); } } }
@SuppressWarnings("deprecation") public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { String[] args0 = { "hdfs://hadoop2:9000/buaa/counter/counter.txt", "hdfs://hadoop2:9000/buaa/counter/out/" }; // 读取配置文件 Configuration conf = new Configuration(); // 如果输出目录存在,则删除 Path mypath = new Path(args0[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); }
// 新建一个任务 Job job = new Job(conf, "MyCounter"); // 主类 job.setJarByClass(MyCounter.class); // Mapper job.setMapperClass(MyCounterMap.class);
// 输入目录 FileInputFormat.addInputPath(job, new Path(args0[0])); // 输出目录 FileOutputFormat.setOutputPath(job, new Path(args0[1])); // 提交任务,并退出 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|