This project has retired. For details please refer to its Attic page.
ExecAdaptor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.adaptor;
20  
21  
22  import org.apache.hadoop.chukwa.ChunkImpl;
23  import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
24  import org.apache.log4j.Logger;
25  import org.apache.log4j.helpers.ISO8601DateFormat;
26  import org.json.simple.JSONObject;
27  import org.json.simple.parser.ParseException;
28  
29  import java.util.*;
30  
31  /**
32   * Runs a command inside chukwa. Takes as params the interval in seconds at
33   * which to run the command, and the path and args to execute.
34   * 
35   * Interval is optional, and defaults to 5 seconds.
36   * 
37   * Example usage: add
38   * org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor Ps 2 /bin/ps aux
39   * 0
40   * 
41   */
42  public class ExecAdaptor extends AbstractAdaptor {
43    
44    public static final boolean FULL_PATHS = false;
45    
46    static class EmbeddedExec extends ExecPlugin {
47  
48      String cmd;
49      
50      public EmbeddedExec(String c) {
51        cmd = c;
52      }
53      
54      @Override
55      public String getCmde() {
56        return cmd;
57      }
58    }
59    
60    EmbeddedExec exec;
61    static final boolean FAKE_LOG4J_HEADER = true;
62    static final boolean SPLIT_LINES = false;
63    static Logger log = Logger.getLogger(ExecAdaptor.class);
64    
65    class RunToolTask extends TimerTask {
66      public void run() {
67        log.info("calling exec");
68        JSONObject o = exec.execute();
69        try {
70  
71          if (((Integer) o.get("status")).intValue() == exec.statusKO) {
72            deregisterAndStop();
73            return;
74          }
75  
76          // FIXME: downstream customers would like timestamps here.
77          // Doing that efficiently probably means cutting out all the
78          // excess buffer copies here, and appending into an OutputBuffer.
79          byte[] data;
80          if (FAKE_LOG4J_HEADER) {
81            StringBuilder result = new StringBuilder();
82            ISO8601DateFormat dateFormat = new org.apache.log4j.helpers.ISO8601DateFormat();
83            result.append(dateFormat.format(new java.util.Date()));
84            result.append(" INFO org.apache.hadoop.chukwa.");
85            result.append(type);
86            result.append("= ");
87            result.append(o.get("exitValue"));
88            result.append(": ");
89            result.append((String) o.get("stdout"));
90            data = result.toString().getBytes();
91          } else {
92            String stdout = (String) o.get("stdout");
93            data = stdout.getBytes();
94          }
95  
96          sendOffset += data.length;
97          ChunkImpl c = new ChunkImpl(ExecAdaptor.this.type, "results from "
98              + cmd, sendOffset, data, ExecAdaptor.this);
99  
100         if (SPLIT_LINES) {
101           ArrayList<Integer> carriageReturns = new ArrayList<Integer>();
102           for (int i = 0; i < data.length; ++i)
103             if (data[i] == '\n')
104               carriageReturns.add(i);
105 
106           c.setRecordOffsets(carriageReturns);
107         } // else we get default one record
108 
109 
110         //We can't replay exec data, so we might as well commit to it now.
111         control.reportCommit(ExecAdaptor.this, sendOffset);
112         dest.add(c);
113       } catch (InterruptedException e) {
114         log.debug(e);
115       } 
116     }
117   };
118 
119   String cmd;
120   final java.util.Timer timer;
121   long period = 5;
122   volatile long sendOffset = 0;
123 
124   public ExecAdaptor() {
125     timer = new java.util.Timer();
126   }
127 
128   @Override
129   public String getCurrentStatus() {
130     return type + " " + period + " " + cmd;
131   }
132  
133    @Override
134    public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
135        throws AdaptorException {
136      log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
137      switch(shutdownPolicy) {
138      case GRACEFULLY :
139      case WAIT_TILL_FINISHED :
140        try {
141          timer.cancel();
142          exec.waitFor();
143        } catch (InterruptedException e) {
144       }
145       break;   
146       default:
147         timer.cancel();
148         exec.stop();
149         break;
150     }
151     log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
152     return sendOffset;
153   }
154 
155   @Override
156   public void start(long offset) throws AdaptorException {
157     if(FULL_PATHS && !(new java.io.File(cmd)).exists())
158       throw new AdaptorException("Can't start ExecAdaptor. No command " + cmd);
159     this.sendOffset = offset;
160     this.exec = new EmbeddedExec(cmd);
161     TimerTask execTimer = new RunToolTask();
162     timer.schedule(execTimer, 0L, period*1000L);
163   }
164 
165 
166   @Override
167   public String parseArgs(String status) { 
168     int spOffset = status.indexOf(' ');
169     if (spOffset > 0) {
170       try {
171         period = Integer.parseInt(status.substring(0, spOffset));
172         cmd = status.substring(spOffset + 1);
173       } catch (NumberFormatException e) {
174         log.warn("ExecAdaptor: sample interval "
175             + status.substring(0, spOffset) + " can't be parsed");
176         cmd = status;
177       }
178     } else
179       cmd = status;
180     
181     return cmd;
182   }
183 
184 
185 }