This project has retired. For details please refer to its Attic page.
MRJobReduceProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
19  
20  
21  import java.io.IOException;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
25  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
26  import org.apache.hadoop.chukwa.extraction.engine.Record;
27  import org.apache.hadoop.mapred.OutputCollector;
28  import org.apache.hadoop.mapred.Reporter;
29  import org.apache.log4j.Logger;
30  
31  public class MRJobReduceProcessor implements ReduceProcessor {
32    static Logger log = Logger.getLogger(MRJobReduceProcessor.class);
33  
34    @Override
35    public String getDataType() {
36      return MRJobReduceProcessor.class.getName();
37    }
38  
39    @Override
40    public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
41        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
42      try {
43        HashMap<String, String> data = new HashMap<String, String>();
44  
45        ChukwaRecord record = null;
46        String[] fields = null;
47        while (values.hasNext()) {
48          record = values.next();
49          fields = record.getFields();
50          for (String field : fields) {
51            data.put(field, record.getValue(field));
52          }
53        }
54  
55        // Extract initial time: SUBMIT_TIME
56        long initTime = Long.parseLong(data.get("SUBMIT_TIME"));
57  
58        // Extract HodId
59        // maybe use a regex to extract this and load it from configuration
60        // JOBCONF=
61        // "/user/xxx/mapredsystem/563976.xxx.yyy.com/job_200809062051_0001/job.xml"
62        String jobConf = data.get("JOBCONF");
63        int idx = jobConf.indexOf("mapredsystem/");
64        idx += 13;
65        int idx2 = jobConf.indexOf(".", idx);
66        data.put("HodId", jobConf.substring(idx, idx2));
67  
68        ChukwaRecordKey newKey = new ChukwaRecordKey();
69        newKey.setKey("" + initTime);
70        newKey.setReduceType("MRJob");
71  
72        ChukwaRecord newRecord = new ChukwaRecord();
73        newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
74        newRecord.setTime(initTime);
75        newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
76        Iterator<String> it = data.keySet().iterator();
77        while (it.hasNext()) {
78          String field = it.next();
79          newRecord.add(field, data.get(field));
80        }
81  
82        output.collect(newKey, newRecord);
83      } catch (IOException e) {
84        log.warn("Unable to collect output in JobLogHistoryReduceProcessor ["
85            + key + "]", e);
86        e.printStackTrace();
87      }
88  
89    }
90  
91  }