1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
192021import java.io.IOException;
22import java.util.HashMap;
23import java.util.Iterator;
24import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
25import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
26import org.apache.hadoop.chukwa.extraction.engine.Record;
27import org.apache.hadoop.mapred.OutputCollector;
28import org.apache.hadoop.mapred.Reporter;
29import org.apache.log4j.Logger;
3031publicclassMRJobReduceProcessorimplementsReduceProcessor {
32static Logger log = Logger.getLogger(MRJobReduceProcessor.class);
3334 @Override
35public String getDataType() {
36return MRJobReduceProcessor.class.getName();
37 }
3839 @Override
40publicvoid process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
41 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
42try {
43 HashMap<String, String> data = new HashMap<String, String>();
4445ChukwaRecord record = null;
46 String[] fields = null;
47while (values.hasNext()) {
48 record = values.next();
49 fields = record.getFields();
50for (String field : fields) {
51 data.put(field, record.getValue(field));
52 }
53 }
5455// Extract initial time: SUBMIT_TIME56long initTime = Long.parseLong(data.get("SUBMIT_TIME"));
5758// Extract HodId59// maybe use a regex to extract this and load it from configuration60// JOBCONF=61// "/user/xxx/mapredsystem/563976.xxx.yyy.com/job_200809062051_0001/job.xml"62 String jobConf = data.get("JOBCONF");
63int idx = jobConf.indexOf("mapredsystem/");
64 idx += 13;
65int idx2 = jobConf.indexOf(".", idx);
66 data.put("HodId", jobConf.substring(idx, idx2));
6768ChukwaRecordKey newKey = newChukwaRecordKey();
69 newKey.setKey("" + initTime);
70 newKey.setReduceType("MRJob");
7172ChukwaRecord newRecord = newChukwaRecord();
73 newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
74 newRecord.setTime(initTime);
75 newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
76 Iterator<String> it = data.keySet().iterator();
77while (it.hasNext()) {
78 String field = it.next();
79 newRecord.add(field, data.get(field));
80 }
8182 output.collect(newKey, newRecord);
83 } catch (IOException e) {
84 log.warn("Unable to collect output in JobLogHistoryReduceProcessor ["85 + key + "]", e);
86 e.printStackTrace();
87 }
8889 }
9091 }