This project has retired. For details please refer to its Attic page.
JobConfProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
19  
20  import java.io.File;
21  import java.io.FileOutputStream;
22  import java.util.Calendar;
23  import java.util.Random;
24  import java.util.regex.Matcher;
25  import java.util.regex.Pattern;
26  
27  import javax.xml.parsers.DocumentBuilder;
28  import javax.xml.parsers.DocumentBuilderFactory;
29  
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34  import org.apache.hadoop.mapred.OutputCollector;
35  import org.apache.hadoop.mapred.Reporter;
36  import org.apache.log4j.Logger;
37  import org.json.JSONObject;
38  import org.w3c.dom.Document;
39  import org.w3c.dom.Element;
40  import org.w3c.dom.Node;
41  import org.w3c.dom.NodeList;
42  import org.w3c.dom.Text;
43  
44  @Tables(annotations={
45  @Table(name="Mapreduce",columnFamily="JobData"),
46  @Table(name="Mapreduce",columnFamily="JobConfData")
47  })
48  public class JobConfProcessor extends AbstractProcessor {
49      static Logger log = Logger.getLogger(JobConfProcessor.class);
50      private static final String jobData = "JobData";
51      private static final String jobConfData = "JobConfData";
52      
53      static  Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
54      static  Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
55      @Override
56      protected void parse(String recordEntry,
57        OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
58        Reporter reporter) 
59     throws Throwable
60    {
61      Long time = 0L;
62      Random randomNumber = new Random();
63      String tags = this.chunk.getTags();
64  
65      Matcher matcher = timePattern.matcher(tags);
66      if (matcher.matches()) {
67        time = Long.parseLong(matcher.group(2));
68      }
69      String capp = this.chunk.getStreamName();
70        String jobID = "";
71          matcher = jobPattern.matcher(capp);
72          if(matcher.matches()) {
73            jobID=matcher.group(2);
74          }
75          ChukwaRecord record = new ChukwaRecord();
76          ChukwaRecord jobConfRecord = new ChukwaRecord();
77        DocumentBuilderFactory docBuilderFactory 
78          = DocumentBuilderFactory.newInstance();
79        //ignore all comments inside the xml file
80        docBuilderFactory.setIgnoringComments(true);
81        try {
82            DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
83            Document doc = null;
84            String fileName = "test_"+randomNumber.nextInt();
85            File tmp = new File(fileName);
86            FileOutputStream out = new FileOutputStream(tmp);
87            out.write(recordEntry.getBytes());
88            out.close();
89          doc = builder.parse(fileName);
90          Element root = doc.getDocumentElement();
91          if (!"configuration".equals(root.getTagName()))
92              log.fatal("bad conf file: top-level element not <configuration>");
93          NodeList props = root.getChildNodes();
94              JSONObject json = new JSONObject();
95              String queue = "default";
96      
97          for (int i = 0; i < props.getLength(); i++) {
98              Node propNode = props.item(i);
99              if (!(propNode instanceof Element))
100                 continue;
101             Element prop = (Element)propNode;
102             if (!"property".equals(prop.getTagName()))
103                 log.warn("bad conf file: element not <property>");
104             NodeList fields = prop.getChildNodes();
105             String attr = null;
106             String value = null;
107             for (int j = 0; j < fields.getLength(); j++) {
108                 Node fieldNode = fields.item(j);
109                 if (!(fieldNode instanceof Element))
110                     continue;
111                 Element field = (Element)fieldNode;
112                 if ("name".equals(field.getTagName()) && field.hasChildNodes())
113                     attr = ((Text)field.getFirstChild()).getData().trim();
114                 if ("value".equals(field.getTagName()) && field.hasChildNodes())
115                     value = ((Text)field.getFirstChild()).getData();
116             }
117             
118             // Ignore this parameter if it has already been marked as 'final'
119             if (attr != null && value != null) {
120                 json.put(attr, value);
121                 if(attr.intern()=="mapred.job.queue.name".intern()) {
122                     queue=value;
123                 }
124                 jobConfRecord.add("job_conf." + attr, value);
125             }
126         }
127         record.add("JOBCONF-JSON", json.toString());
128         record.add("mapred.job.queue.name", queue);
129         record.add("JOBID", "job_" + jobID);
130         buildGenericRecord(record, null, time, jobData);
131         calendar.setTimeInMillis(time);
132         calendar.set(Calendar.MINUTE, 0);
133         calendar.set(Calendar.SECOND, 0);
134         calendar.set(Calendar.MILLISECOND, 0);
135         key.setKey("" + calendar.getTimeInMillis() + "/job_" + jobID + "/" + time);
136         output.collect(key, record);
137 
138         jobConfRecord.add("JOBID", "job_" + jobID);
139         buildGenericRecord(jobConfRecord, null, time, jobConfData);
140         output.collect(key, jobConfRecord);
141             
142         tmp.delete();
143       } catch(Exception e) {
144           e.printStackTrace();  
145           throw e;
146       }
147   }
148   
149   public String getDataType() {
150     return JobConfProcessor.class.getName();
151   }
152 }