This project has retired. For details please refer to its Attic page.
TorqueInfoProcessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.inputtools.mdl;
19  
20  
21  import java.sql.SQLException;
22  import java.util.Calendar;
23  import java.util.Set;
24  import java.util.TreeSet;
25  import java.util.TreeMap;
26  import java.util.Iterator;
27  import java.sql.Timestamp;
28  import java.text.ParseException;
29  import java.text.SimpleDateFormat;
30  import java.util.Timer;
31  import java.io.BufferedReader;
32  import java.io.File;
33  import java.io.IOException;
34  import java.io.InputStreamReader;
35  import java.util.Date;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  
39  public class TorqueInfoProcessor {
40  
41    private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
42  
43    private int intervalValue = 60;
44    private String torqueServer = null;
45    private String torqueBinDir = null;
46    private String domain = null;
47  
48    private TreeMap<String, TreeMap<String, String>> currentHodJobs;
49  
50    public TorqueInfoProcessor(DataConfig mdlConfig, int interval) {
51      this.intervalValue = interval;
52  
53      torqueServer = System.getProperty("TORQUE_SERVER");
54      torqueBinDir = System.getProperty("TORQUE_HOME") + File.separator + "bin";
55      domain = System.getProperty("DOMAIN");
56      currentHodJobs = new TreeMap<String, TreeMap<String, String>>();
57    }
58  
59    public void setup(boolean recover) throws Exception {
60    }
61  
62    private void getHodJobInfo() throws IOException {
63      StringBuffer sb = new StringBuffer();
64      sb.append(torqueBinDir).append("/qstat -a");
65  
66      String[] getQueueInfoCommand = new String[3];
67      getQueueInfoCommand[0] = "ssh";
68      getQueueInfoCommand[1] = torqueServer;
69      getQueueInfoCommand[2] = sb.toString();
70  
71      String command = getQueueInfoCommand[0] + " " + getQueueInfoCommand[1]
72          + " " + getQueueInfoCommand[2];
73      ProcessBuilder pb = new ProcessBuilder(getQueueInfoCommand);
74  
75      Process p = pb.start();
76  
77      Timer timeout = new Timer();
78      TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
79      timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
80  
81      BufferedReader result = new BufferedReader(new InputStreamReader(p
82          .getInputStream()));
83      ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
84          command, true);
85      errorHandler.start();
86  
87      String line = null;
88      boolean start = false;
89      TreeSet<String> jobsInTorque = new TreeSet<String>();
90      while ((line = result.readLine()) != null) {
91        if (line.startsWith("---")) {
92          start = true;
93          continue;
94        }
95  
96        if (start) {
97          String[] items = line.split("\\s+");
98          if (items.length >= 10) {
99            String hodIdLong = items[0];
100           String hodId = hodIdLong.split("[.]")[0];
101           String userId = items[1];
102           String numOfMachine = items[5];
103           String status = items[9];
104           jobsInTorque.add(hodId);
105           if (!currentHodJobs.containsKey(hodId)) {
106             TreeMap<String, String> aJobData = new TreeMap<String, String>();
107 
108             aJobData.put("userId", userId);
109             aJobData.put("numOfMachine", numOfMachine);
110             aJobData.put("traceCheckCount", "0");
111             aJobData.put("process", "0");
112             aJobData.put("status", status);
113             currentHodJobs.put(hodId, aJobData);
114           } else {
115             TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
116             aJobData.put("status", status);
117             currentHodJobs.put(hodId, aJobData);
118           }// if..else
119         }
120       }
121     }// while
122 
123     try {
124       errorHandler.join();
125     } catch (InterruptedException ie) {
126       log.error(ie.getMessage());
127     }
128     timeout.cancel();
129 
130     Set<String> currentHodJobIds = currentHodJobs.keySet();
131     Iterator<String> currentHodJobIdsIt = currentHodJobIds.iterator();
132     TreeSet<String> finishedHodIds = new TreeSet<String>();
133     while (currentHodJobIdsIt.hasNext()) {
134       String hodId = currentHodJobIdsIt.next();
135       if (!jobsInTorque.contains(hodId)) {
136         TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
137         String process = aJobData.get("process");
138         if (process.equals("0") || process.equals("1")) {
139           aJobData.put("status", "C");
140         } else {
141           finishedHodIds.add(hodId);
142         }
143       }
144     }// while
145 
146     Iterator<String> finishedHodIdsIt = finishedHodIds.iterator();
147     while (finishedHodIdsIt.hasNext()) {
148       String hodId = finishedHodIdsIt.next();
149       currentHodJobs.remove(hodId);
150     }
151 
152   }
153 
154   private boolean loadQstatData(String hodId) throws IOException, SQLException {
155     TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
156     String userId = aJobData.get("userId");
157 
158     StringBuffer sb = new StringBuffer();
159     sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
160     String[] qstatCommand = new String[3];
161     qstatCommand[0] = "ssh";
162     qstatCommand[1] = torqueServer;
163     qstatCommand[2] = sb.toString();
164 
165     String command = qstatCommand[0] + " " + qstatCommand[1] + " "
166         + qstatCommand[2];
167     ProcessBuilder pb = new ProcessBuilder(qstatCommand);
168     Process p = pb.start();
169 
170     Timer timeout = new Timer();
171     TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
172     timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
173 
174     BufferedReader result = new BufferedReader(new InputStreamReader(p
175         .getInputStream()));
176     ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
177         command, false);
178     errorHandler.start();
179     String line = null;
180     String hosts = null;
181     long startTimeValue = -1;
182     long endTimeValue = Calendar.getInstance().getTimeInMillis();
183     long executeTimeValue = Calendar.getInstance().getTimeInMillis();
184     boolean qstatfinished;
185 
186     while ((line = result.readLine()) != null) {
187       if (line.indexOf("ctime") >= 0) {
188         String startTime = line.split("=")[1].trim();
189         // Tue Sep 9 23:44:29 2008
190         SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
191         Date startTimeDate;
192         try {
193           startTimeDate = sdf.parse(startTime);
194           startTimeValue = startTimeDate.getTime();
195         } catch (ParseException e) {
196           // TODO Auto-generated catch block
197           e.printStackTrace();
198         }
199 
200       }
201       if (line.indexOf("mtime") >= 0) {
202         String endTime = line.split("=")[1].trim();
203         SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
204         Date endTimeDate;
205         try {
206           endTimeDate = sdf.parse(endTime);
207           endTimeValue = endTimeDate.getTime();
208         } catch (ParseException e) {
209           // TODO Auto-generated catch block
210           e.printStackTrace();
211         }
212 
213       }
214       if (line.indexOf("etime") >= 0) {
215         String executeTime = line.split("=")[1].trim();
216         SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
217         Date executeTimeDate;
218         try {
219           executeTimeDate = sdf.parse(executeTime);
220           executeTimeValue = executeTimeDate.getTime();
221         } catch (ParseException e) {
222           // TODO Auto-generated catch block
223           e.printStackTrace();
224         }
225 
226       }
227       if (line.indexOf("exec_host") >= 0) {
228         hosts = line.split("=")[1].trim();
229       }
230     }
231 
232     if (hosts != null && startTimeValue >= 0) {
233       String[] items2 = hosts.split("[+]");
234       int num = 0;
235       for (int i = 0; i < items2.length; i++) {
236         String machinetmp = items2[i];
237         if (machinetmp.length() > 3) {
238           String machine = items2[i].substring(0, items2[i].length() - 2);
239           StringBuffer data = new StringBuffer();
240           data.append("HodId=").append(hodId);
241           data.append(", Machine=").append(machine);
242           if (domain != null) {
243             data.append(".").append(domain);
244           }
245           log.info(data);
246           num++;
247         }
248       }
249       Timestamp startTimedb = new Timestamp(startTimeValue);
250       Timestamp endTimedb = new Timestamp(endTimeValue);
251       StringBuffer data = new StringBuffer();
252       long timeQueued = executeTimeValue - startTimeValue;
253       data.append("HodID=").append(hodId);
254       data.append(", UserId=").append(userId);
255       data.append(", StartTime=").append(startTimedb);
256       data.append(", TimeQueued=").append(timeQueued);
257       data.append(", NumOfMachines=").append(num);
258       data.append(", EndTime=").append(endTimedb);
259       log.info(data);
260       qstatfinished = true;
261 
262     } else {
263 
264       qstatfinished = false;
265     }
266 
267     try {
268       errorHandler.join();
269     } catch (InterruptedException ie) {
270       log.error(ie.getMessage());
271     }
272     result.close();
273     timeout.cancel();
274 
275     return qstatfinished;
276   }
277 
278   private boolean loadTraceJobData(String hodId) throws IOException,
279       SQLException {
280     TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
281     String userId = aJobData.get("userId");
282 
283     StringBuffer sb = new StringBuffer();
284     sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
285     String[] traceJobCommand = new String[3];
286     traceJobCommand[0] = "ssh";
287     traceJobCommand[1] = torqueServer;
288     traceJobCommand[2] = sb.toString();
289 
290     String command = traceJobCommand[0] + " " + traceJobCommand[1] + " "
291         + traceJobCommand[2];
292     ProcessBuilder pb = new ProcessBuilder(traceJobCommand);
293 
294     Process p = pb.start();
295 
296     Timer timeout = new Timer();
297     TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
298     timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
299 
300     BufferedReader result = new BufferedReader(new InputStreamReader(p
301         .getInputStream()));
302     ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
303         command, false);
304     errorHandler.start();
305     String line = null;
306     String exit_status = null;
307     String hosts = null;
308     long timeQueued = -1;
309     long startTimeValue = -1;
310     long endTimeValue = -1;
311     boolean findResult = false;
312 
313     while ((line = result.readLine()) != null && !findResult) {
314       if (line.indexOf("end") >= 0 && line.indexOf("Exit_status") >= 0
315           && line.indexOf("qtime") >= 0) {
316         TreeMap<String, String> jobData = new TreeMap<String, String>();
317         String[] items = line.split("\\s+");
318         for (int i = 0; i < items.length; i++) {
319           String[] items2 = items[i].split("=");
320           if (items2.length >= 2) {
321             jobData.put(items2[0], items2[1]);
322           }
323 
324         }
325         String startTime = jobData.get("ctime");
326         startTimeValue = Long.valueOf(startTime);
327         startTimeValue = startTimeValue - startTimeValue % (60);
328         Timestamp startTimedb = new Timestamp(startTimeValue * 1000);
329 
330         String queueTime = jobData.get("qtime");
331         long queueTimeValue = Long.valueOf(queueTime);
332 
333         String sTime = jobData.get("start");
334         long sTimeValue = Long.valueOf(sTime);
335 
336         timeQueued = sTimeValue - queueTimeValue;
337 
338         String endTime = jobData.get("end");
339         endTimeValue = Long.valueOf(endTime);
340         endTimeValue = endTimeValue - endTimeValue % (60);
341         Timestamp endTimedb = new Timestamp(endTimeValue * 1000);
342 
343         exit_status = jobData.get("Exit_status");
344         hosts = jobData.get("exec_host");
345         String[] items2 = hosts.split("[+]");
346         int num = 0;
347         for (int i = 0; i < items2.length; i++) {
348           String machinetemp = items2[i];
349           if (machinetemp.length() >= 3) {
350             String machine = items2[i].substring(0, items2[i].length() - 2);
351             StringBuffer data = new StringBuffer();
352             data.append("HodId=").append(hodId);
353             data.append(", Machine=").append(machine);
354             if (domain != null) {
355               data.append(".").append(domain);
356             }
357             log.info(data.toString());
358             num++;
359           }
360         }
361 
362         StringBuffer data = new StringBuffer();
363         data.append("HodID=").append(hodId);
364         data.append(", UserId=").append(userId);
365         data.append(", Status=").append(exit_status);
366         data.append(", TimeQueued=").append(timeQueued);
367         data.append(", StartTime=").append(startTimedb);
368         data.append(", EndTime=").append(endTimedb);
369         data.append(", NumOfMachines=").append(num);
370         log.info(data.toString());
371         findResult = true;
372         log.debug(" hod info for job " + hodId + " has been loaded ");
373       }// if
374 
375     }// while
376 
377     try {
378       errorHandler.join();
379     } catch (InterruptedException ie) {
380       log.error(ie.getMessage());
381     }
382 
383     timeout.cancel();
384     boolean tracedone = false;
385     if (!findResult) {
386 
387       String traceCheckCount = aJobData.get("traceCheckCount");
388       int traceCheckCountValue = Integer.valueOf(traceCheckCount);
389       traceCheckCountValue = traceCheckCountValue + 1;
390       aJobData.put("traceCheckCount", String.valueOf(traceCheckCountValue));
391 
392       log.debug("did not find tracejob info for job " + hodId + ", after "
393           + traceCheckCountValue + " times checking");
394       if (traceCheckCountValue >= 2) {
395         tracedone = true;
396       }
397     }
398     boolean finished = findResult | tracedone;
399     return finished;
400   }
401 
402   private void process_data() throws SQLException {
403 
404     long currentTime = System.currentTimeMillis();
405     currentTime = currentTime - currentTime % (60 * 1000);
406 
407     Set<String> hodIds = currentHodJobs.keySet();
408 
409     Iterator<String> hodIdsIt = hodIds.iterator();
410     while (hodIdsIt.hasNext()) {
411       String hodId = hodIdsIt.next();
412       TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
413       String status = aJobData.get("status");
414       String process = aJobData.get("process");
415       if (process.equals("0") && (status.equals("R") || status.equals("E"))) {
416         try {
417           boolean result = loadQstatData(hodId);
418           if (result) {
419             aJobData.put("process", "1");
420             currentHodJobs.put(hodId, aJobData);
421           }
422         } catch (IOException ioe) {
423           log.error("load qsat data Error:" + ioe.getMessage());
424 
425         }
426       }
427       if (!process.equals("2") && status.equals("C")) {
428         try {
429           boolean result = loadTraceJobData(hodId);
430 
431           if (result) {
432             aJobData.put("process", "2");
433             currentHodJobs.put(hodId, aJobData);
434           }
435         } catch (IOException ioe) {
436           log.error("loadTraceJobData Error:" + ioe.getMessage());
437         }
438       }// if
439 
440     } // while
441 
442   }
443 
444   private void handle_jobData() throws SQLException {
445     try {
446       getHodJobInfo();
447     } catch (IOException ex) {
448       log.error("getQueueInfo Error:" + ex.getMessage());
449       return;
450     }
451     try {
452       process_data();
453     } catch (SQLException ex) {
454       log.error("process_data Error:" + ex.getMessage());
455       throw ex;
456     }
457   }
458 
459   public void run_forever() throws SQLException {
460     while (true) {
461       handle_jobData();
462       try {
463         log.debug("sleeping ...");
464         Thread.sleep(this.intervalValue * 1000);
465       } catch (InterruptedException e) {
466         log.error(e.getMessage());
467       }
468     }
469   }
470 
471   public void shutdown() {
472   }
473 }