This project has retired. For details please refer to its
Attic page.
TorqueInfoProcessor xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.inputtools.mdl;
19
20
21 import java.sql.SQLException;
22 import java.util.Calendar;
23 import java.util.Set;
24 import java.util.TreeSet;
25 import java.util.TreeMap;
26 import java.util.Iterator;
27 import java.sql.Timestamp;
28 import java.text.ParseException;
29 import java.text.SimpleDateFormat;
30 import java.util.Timer;
31 import java.io.BufferedReader;
32 import java.io.File;
33 import java.io.IOException;
34 import java.io.InputStreamReader;
35 import java.util.Date;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38
39 public class TorqueInfoProcessor {
40
41 private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
42
43 private int intervalValue = 60;
44 private String torqueServer = null;
45 private String torqueBinDir = null;
46 private String domain = null;
47
48 private TreeMap<String, TreeMap<String, String>> currentHodJobs;
49
50 public TorqueInfoProcessor(DataConfig mdlConfig, int interval) {
51 this.intervalValue = interval;
52
53 torqueServer = System.getProperty("TORQUE_SERVER");
54 torqueBinDir = System.getProperty("TORQUE_HOME") + File.separator + "bin";
55 domain = System.getProperty("DOMAIN");
56 currentHodJobs = new TreeMap<String, TreeMap<String, String>>();
57 }
58
59 public void setup(boolean recover) throws Exception {
60 }
61
62 private void getHodJobInfo() throws IOException {
63 StringBuffer sb = new StringBuffer();
64 sb.append(torqueBinDir).append("/qstat -a");
65
66 String[] getQueueInfoCommand = new String[3];
67 getQueueInfoCommand[0] = "ssh";
68 getQueueInfoCommand[1] = torqueServer;
69 getQueueInfoCommand[2] = sb.toString();
70
71 String command = getQueueInfoCommand[0] + " " + getQueueInfoCommand[1]
72 + " " + getQueueInfoCommand[2];
73 ProcessBuilder pb = new ProcessBuilder(getQueueInfoCommand);
74
75 Process p = pb.start();
76
77 Timer timeout = new Timer();
78 TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
79 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
80
81 BufferedReader result = new BufferedReader(new InputStreamReader(p
82 .getInputStream()));
83 ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
84 command, true);
85 errorHandler.start();
86
87 String line = null;
88 boolean start = false;
89 TreeSet<String> jobsInTorque = new TreeSet<String>();
90 while ((line = result.readLine()) != null) {
91 if (line.startsWith("---")) {
92 start = true;
93 continue;
94 }
95
96 if (start) {
97 String[] items = line.split("\\s+");
98 if (items.length >= 10) {
99 String hodIdLong = items[0];
100 String hodId = hodIdLong.split("[.]")[0];
101 String userId = items[1];
102 String numOfMachine = items[5];
103 String status = items[9];
104 jobsInTorque.add(hodId);
105 if (!currentHodJobs.containsKey(hodId)) {
106 TreeMap<String, String> aJobData = new TreeMap<String, String>();
107
108 aJobData.put("userId", userId);
109 aJobData.put("numOfMachine", numOfMachine);
110 aJobData.put("traceCheckCount", "0");
111 aJobData.put("process", "0");
112 aJobData.put("status", status);
113 currentHodJobs.put(hodId, aJobData);
114 } else {
115 TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
116 aJobData.put("status", status);
117 currentHodJobs.put(hodId, aJobData);
118 }
119 }
120 }
121 }
122
123 try {
124 errorHandler.join();
125 } catch (InterruptedException ie) {
126 log.error(ie.getMessage());
127 }
128 timeout.cancel();
129
130 Set<String> currentHodJobIds = currentHodJobs.keySet();
131 Iterator<String> currentHodJobIdsIt = currentHodJobIds.iterator();
132 TreeSet<String> finishedHodIds = new TreeSet<String>();
133 while (currentHodJobIdsIt.hasNext()) {
134 String hodId = currentHodJobIdsIt.next();
135 if (!jobsInTorque.contains(hodId)) {
136 TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
137 String process = aJobData.get("process");
138 if (process.equals("0") || process.equals("1")) {
139 aJobData.put("status", "C");
140 } else {
141 finishedHodIds.add(hodId);
142 }
143 }
144 }
145
146 Iterator<String> finishedHodIdsIt = finishedHodIds.iterator();
147 while (finishedHodIdsIt.hasNext()) {
148 String hodId = finishedHodIdsIt.next();
149 currentHodJobs.remove(hodId);
150 }
151
152 }
153
154 private boolean loadQstatData(String hodId) throws IOException, SQLException {
155 TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
156 String userId = aJobData.get("userId");
157
158 StringBuffer sb = new StringBuffer();
159 sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
160 String[] qstatCommand = new String[3];
161 qstatCommand[0] = "ssh";
162 qstatCommand[1] = torqueServer;
163 qstatCommand[2] = sb.toString();
164
165 String command = qstatCommand[0] + " " + qstatCommand[1] + " "
166 + qstatCommand[2];
167 ProcessBuilder pb = new ProcessBuilder(qstatCommand);
168 Process p = pb.start();
169
170 Timer timeout = new Timer();
171 TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
172 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
173
174 BufferedReader result = new BufferedReader(new InputStreamReader(p
175 .getInputStream()));
176 ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
177 command, false);
178 errorHandler.start();
179 String line = null;
180 String hosts = null;
181 long startTimeValue = -1;
182 long endTimeValue = Calendar.getInstance().getTimeInMillis();
183 long executeTimeValue = Calendar.getInstance().getTimeInMillis();
184 boolean qstatfinished;
185
186 while ((line = result.readLine()) != null) {
187 if (line.indexOf("ctime") >= 0) {
188 String startTime = line.split("=")[1].trim();
189
190 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
191 Date startTimeDate;
192 try {
193 startTimeDate = sdf.parse(startTime);
194 startTimeValue = startTimeDate.getTime();
195 } catch (ParseException e) {
196
197 e.printStackTrace();
198 }
199
200 }
201 if (line.indexOf("mtime") >= 0) {
202 String endTime = line.split("=")[1].trim();
203 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
204 Date endTimeDate;
205 try {
206 endTimeDate = sdf.parse(endTime);
207 endTimeValue = endTimeDate.getTime();
208 } catch (ParseException e) {
209
210 e.printStackTrace();
211 }
212
213 }
214 if (line.indexOf("etime") >= 0) {
215 String executeTime = line.split("=")[1].trim();
216 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
217 Date executeTimeDate;
218 try {
219 executeTimeDate = sdf.parse(executeTime);
220 executeTimeValue = executeTimeDate.getTime();
221 } catch (ParseException e) {
222
223 e.printStackTrace();
224 }
225
226 }
227 if (line.indexOf("exec_host") >= 0) {
228 hosts = line.split("=")[1].trim();
229 }
230 }
231
232 if (hosts != null && startTimeValue >= 0) {
233 String[] items2 = hosts.split("[+]");
234 int num = 0;
235 for (int i = 0; i < items2.length; i++) {
236 String machinetmp = items2[i];
237 if (machinetmp.length() > 3) {
238 String machine = items2[i].substring(0, items2[i].length() - 2);
239 StringBuffer data = new StringBuffer();
240 data.append("HodId=").append(hodId);
241 data.append(", Machine=").append(machine);
242 if (domain != null) {
243 data.append(".").append(domain);
244 }
245 log.info(data);
246 num++;
247 }
248 }
249 Timestamp startTimedb = new Timestamp(startTimeValue);
250 Timestamp endTimedb = new Timestamp(endTimeValue);
251 StringBuffer data = new StringBuffer();
252 long timeQueued = executeTimeValue - startTimeValue;
253 data.append("HodID=").append(hodId);
254 data.append(", UserId=").append(userId);
255 data.append(", StartTime=").append(startTimedb);
256 data.append(", TimeQueued=").append(timeQueued);
257 data.append(", NumOfMachines=").append(num);
258 data.append(", EndTime=").append(endTimedb);
259 log.info(data);
260 qstatfinished = true;
261
262 } else {
263
264 qstatfinished = false;
265 }
266
267 try {
268 errorHandler.join();
269 } catch (InterruptedException ie) {
270 log.error(ie.getMessage());
271 }
272 result.close();
273 timeout.cancel();
274
275 return qstatfinished;
276 }
277
278 private boolean loadTraceJobData(String hodId) throws IOException,
279 SQLException {
280 TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
281 String userId = aJobData.get("userId");
282
283 StringBuffer sb = new StringBuffer();
284 sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
285 String[] traceJobCommand = new String[3];
286 traceJobCommand[0] = "ssh";
287 traceJobCommand[1] = torqueServer;
288 traceJobCommand[2] = sb.toString();
289
290 String command = traceJobCommand[0] + " " + traceJobCommand[1] + " "
291 + traceJobCommand[2];
292 ProcessBuilder pb = new ProcessBuilder(traceJobCommand);
293
294 Process p = pb.start();
295
296 Timer timeout = new Timer();
297 TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
298 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
299
300 BufferedReader result = new BufferedReader(new InputStreamReader(p
301 .getInputStream()));
302 ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
303 command, false);
304 errorHandler.start();
305 String line = null;
306 String exit_status = null;
307 String hosts = null;
308 long timeQueued = -1;
309 long startTimeValue = -1;
310 long endTimeValue = -1;
311 boolean findResult = false;
312
313 while ((line = result.readLine()) != null && !findResult) {
314 if (line.indexOf("end") >= 0 && line.indexOf("Exit_status") >= 0
315 && line.indexOf("qtime") >= 0) {
316 TreeMap<String, String> jobData = new TreeMap<String, String>();
317 String[] items = line.split("\\s+");
318 for (int i = 0; i < items.length; i++) {
319 String[] items2 = items[i].split("=");
320 if (items2.length >= 2) {
321 jobData.put(items2[0], items2[1]);
322 }
323
324 }
325 String startTime = jobData.get("ctime");
326 startTimeValue = Long.valueOf(startTime);
327 startTimeValue = startTimeValue - startTimeValue % (60);
328 Timestamp startTimedb = new Timestamp(startTimeValue * 1000);
329
330 String queueTime = jobData.get("qtime");
331 long queueTimeValue = Long.valueOf(queueTime);
332
333 String sTime = jobData.get("start");
334 long sTimeValue = Long.valueOf(sTime);
335
336 timeQueued = sTimeValue - queueTimeValue;
337
338 String endTime = jobData.get("end");
339 endTimeValue = Long.valueOf(endTime);
340 endTimeValue = endTimeValue - endTimeValue % (60);
341 Timestamp endTimedb = new Timestamp(endTimeValue * 1000);
342
343 exit_status = jobData.get("Exit_status");
344 hosts = jobData.get("exec_host");
345 String[] items2 = hosts.split("[+]");
346 int num = 0;
347 for (int i = 0; i < items2.length; i++) {
348 String machinetemp = items2[i];
349 if (machinetemp.length() >= 3) {
350 String machine = items2[i].substring(0, items2[i].length() - 2);
351 StringBuffer data = new StringBuffer();
352 data.append("HodId=").append(hodId);
353 data.append(", Machine=").append(machine);
354 if (domain != null) {
355 data.append(".").append(domain);
356 }
357 log.info(data.toString());
358 num++;
359 }
360 }
361
362 StringBuffer data = new StringBuffer();
363 data.append("HodID=").append(hodId);
364 data.append(", UserId=").append(userId);
365 data.append(", Status=").append(exit_status);
366 data.append(", TimeQueued=").append(timeQueued);
367 data.append(", StartTime=").append(startTimedb);
368 data.append(", EndTime=").append(endTimedb);
369 data.append(", NumOfMachines=").append(num);
370 log.info(data.toString());
371 findResult = true;
372 log.debug(" hod info for job " + hodId + " has been loaded ");
373 }
374
375 }
376
377 try {
378 errorHandler.join();
379 } catch (InterruptedException ie) {
380 log.error(ie.getMessage());
381 }
382
383 timeout.cancel();
384 boolean tracedone = false;
385 if (!findResult) {
386
387 String traceCheckCount = aJobData.get("traceCheckCount");
388 int traceCheckCountValue = Integer.valueOf(traceCheckCount);
389 traceCheckCountValue = traceCheckCountValue + 1;
390 aJobData.put("traceCheckCount", String.valueOf(traceCheckCountValue));
391
392 log.debug("did not find tracejob info for job " + hodId + ", after "
393 + traceCheckCountValue + " times checking");
394 if (traceCheckCountValue >= 2) {
395 tracedone = true;
396 }
397 }
398 boolean finished = findResult | tracedone;
399 return finished;
400 }
401
402 private void process_data() throws SQLException {
403
404 long currentTime = System.currentTimeMillis();
405 currentTime = currentTime - currentTime % (60 * 1000);
406
407 Set<String> hodIds = currentHodJobs.keySet();
408
409 Iterator<String> hodIdsIt = hodIds.iterator();
410 while (hodIdsIt.hasNext()) {
411 String hodId = hodIdsIt.next();
412 TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
413 String status = aJobData.get("status");
414 String process = aJobData.get("process");
415 if (process.equals("0") && (status.equals("R") || status.equals("E"))) {
416 try {
417 boolean result = loadQstatData(hodId);
418 if (result) {
419 aJobData.put("process", "1");
420 currentHodJobs.put(hodId, aJobData);
421 }
422 } catch (IOException ioe) {
423 log.error("load qsat data Error:" + ioe.getMessage());
424
425 }
426 }
427 if (!process.equals("2") && status.equals("C")) {
428 try {
429 boolean result = loadTraceJobData(hodId);
430
431 if (result) {
432 aJobData.put("process", "2");
433 currentHodJobs.put(hodId, aJobData);
434 }
435 } catch (IOException ioe) {
436 log.error("loadTraceJobData Error:" + ioe.getMessage());
437 }
438 }
439
440 }
441
442 }
443
444 private void handle_jobData() throws SQLException {
445 try {
446 getHodJobInfo();
447 } catch (IOException ex) {
448 log.error("getQueueInfo Error:" + ex.getMessage());
449 return;
450 }
451 try {
452 process_data();
453 } catch (SQLException ex) {
454 log.error("process_data Error:" + ex.getMessage());
455 throw ex;
456 }
457 }
458
459 public void run_forever() throws SQLException {
460 while (true) {
461 handle_jobData();
462 try {
463 log.debug("sleeping ...");
464 Thread.sleep(this.intervalValue * 1000);
465 } catch (InterruptedException e) {
466 log.error(e.getMessage());
467 }
468 }
469 }
470
471 public void shutdown() {
472 }
473 }