This project has retired. For details please refer to its
Attic page.
JobConfProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
19
20 import java.io.File;
21 import java.io.FileOutputStream;
22 import java.util.Calendar;
23 import java.util.Random;
24 import java.util.regex.Matcher;
25 import java.util.regex.Pattern;
26
27 import javax.xml.parsers.DocumentBuilder;
28 import javax.xml.parsers.DocumentBuilderFactory;
29
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
31 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
33 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reporter;
36 import org.apache.log4j.Logger;
37 import org.json.JSONObject;
38 import org.w3c.dom.Document;
39 import org.w3c.dom.Element;
40 import org.w3c.dom.Node;
41 import org.w3c.dom.NodeList;
42 import org.w3c.dom.Text;
43
44 @Tables(annotations={
45 @Table(name="Mapreduce",columnFamily="JobData"),
46 @Table(name="Mapreduce",columnFamily="JobConfData")
47 })
48 public class JobConfProcessor extends AbstractProcessor {
49 static Logger log = Logger.getLogger(JobConfProcessor.class);
50 private static final String jobData = "JobData";
51 private static final String jobConfData = "JobConfData";
52
53 static Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
54 static Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
55 @Override
56 protected void parse(String recordEntry,
57 OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
58 Reporter reporter)
59 throws Throwable
60 {
61 Long time = 0L;
62 Random randomNumber = new Random();
63 String tags = this.chunk.getTags();
64
65 Matcher matcher = timePattern.matcher(tags);
66 if (matcher.matches()) {
67 time = Long.parseLong(matcher.group(2));
68 }
69 String capp = this.chunk.getStreamName();
70 String jobID = "";
71 matcher = jobPattern.matcher(capp);
72 if(matcher.matches()) {
73 jobID=matcher.group(2);
74 }
75 ChukwaRecord record = new ChukwaRecord();
76 ChukwaRecord jobConfRecord = new ChukwaRecord();
77 DocumentBuilderFactory docBuilderFactory
78 = DocumentBuilderFactory.newInstance();
79
80 docBuilderFactory.setIgnoringComments(true);
81 try {
82 DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
83 Document doc = null;
84 String fileName = "test_"+randomNumber.nextInt();
85 File tmp = new File(fileName);
86 FileOutputStream out = new FileOutputStream(tmp);
87 out.write(recordEntry.getBytes());
88 out.close();
89 doc = builder.parse(fileName);
90 Element root = doc.getDocumentElement();
91 if (!"configuration".equals(root.getTagName()))
92 log.fatal("bad conf file: top-level element not <configuration>");
93 NodeList props = root.getChildNodes();
94 JSONObject json = new JSONObject();
95 String queue = "default";
96
97 for (int i = 0; i < props.getLength(); i++) {
98 Node propNode = props.item(i);
99 if (!(propNode instanceof Element))
100 continue;
101 Element prop = (Element)propNode;
102 if (!"property".equals(prop.getTagName()))
103 log.warn("bad conf file: element not <property>");
104 NodeList fields = prop.getChildNodes();
105 String attr = null;
106 String value = null;
107 for (int j = 0; j < fields.getLength(); j++) {
108 Node fieldNode = fields.item(j);
109 if (!(fieldNode instanceof Element))
110 continue;
111 Element field = (Element)fieldNode;
112 if ("name".equals(field.getTagName()) && field.hasChildNodes())
113 attr = ((Text)field.getFirstChild()).getData().trim();
114 if ("value".equals(field.getTagName()) && field.hasChildNodes())
115 value = ((Text)field.getFirstChild()).getData();
116 }
117
118
119 if (attr != null && value != null) {
120 json.put(attr, value);
121 if(attr.intern()=="mapred.job.queue.name".intern()) {
122 queue=value;
123 }
124 jobConfRecord.add("job_conf." + attr, value);
125 }
126 }
127 record.add("JOBCONF-JSON", json.toString());
128 record.add("mapred.job.queue.name", queue);
129 record.add("JOBID", "job_" + jobID);
130 buildGenericRecord(record, null, time, jobData);
131 calendar.setTimeInMillis(time);
132 calendar.set(Calendar.MINUTE, 0);
133 calendar.set(Calendar.SECOND, 0);
134 calendar.set(Calendar.MILLISECOND, 0);
135 key.setKey("" + calendar.getTimeInMillis() + "/job_" + jobID + "/" + time);
136 output.collect(key, record);
137
138 jobConfRecord.add("JOBID", "job_" + jobID);
139 buildGenericRecord(jobConfRecord, null, time, jobConfData);
140 output.collect(key, jobConfRecord);
141
142 tmp.delete();
143 } catch(Exception e) {
144 e.printStackTrace();
145 throw e;
146 }
147 }
148
149 public String getDataType() {
150 return JobConfProcessor.class.getName();
151 }
152 }