This project has retired. For details please refer to its Attic page.
FSMDataLoader xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.dataloader;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.util.HashSet;
23  import java.util.concurrent.CompletionService;
24  import java.util.concurrent.ExecutorCompletionService;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.util.ToolRunner;
35  import org.apache.hadoop.chukwa.analysis.salsa.fsm.FSMBuilder;
36  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
37  import org.apache.hadoop.chukwa.util.ExceptionUtil;
38  import org.apache.hadoop.conf.Configuration;
39  
40  public class FSMDataLoader extends DataLoaderFactory {
41    private static Log log = LogFactory.getLog(FSMDataLoader.class);
42  
43    protected MetricDataLoader threads[] = null;
44    private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
45    private int size = 1;
46    private CompletionService completion = null;
47    private ExecutorService executor = null;
48    private static String[] mappers = {
49      "org.apache.hadoop.chukwa.analysis.salsa.fsm.DataNodeClientTraceMapper",
50      "org.apache.hadoop.chukwa.analysis.salsa.fsm.TaskTrackerClientTraceMapper",
51      "org.apache.hadoop.chukwa.analysis.salsa.fsm.JobHistoryTaskDataMapper"
52    };
53    
54    public FSMDataLoader() {
55    }
56    
57    public void load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] fileList) throws IOException {
58  
59      if(executor==null) {
60        try {
61          this.size = Integer.parseInt(conf.get(DATA_LOADER_THREAD_LIMIT));
62        } catch(Exception e) {
63          this.size = 1;
64        }
65        executor = Executors.newFixedThreadPool(size);
66      }
67      if(completion==null) {
68        completion = new ExecutorCompletionService(executor);
69      }
70      
71      try {
72        // Locate directory output directories of the current demux, and create a unique directory list.
73        HashSet<Path> inputPaths = new HashSet<Path>();
74        HashSet<Path> outputPaths = new HashSet<Path>();
75        int counter = 0;
76        for(int i=0;i<fileList.length;i++) {
77          Path temp = fileList[i].getPath().getParent();
78          if(!inputPaths.contains(temp)) {
79            inputPaths.add(temp);
80          }
81        }
82        String outputDir= conf.get("chukwa.tmp.data.dir")+File.separator+"fsm_"+System.currentTimeMillis()+"_";
83        if(inputPaths.size()>0) {
84          Configuration fsmConf = new Configuration();
85          // Run fsm map reduce job for dn, tt, and jobhist.
86          for(String mapper : mappers) {
87            String[] args = new String[inputPaths.size()+3];
88            args[0]="-in";
89            int k=2;
90            boolean hasData=false;
91            for(Path temp : inputPaths) {
92              String tempPath = temp.toUri().toString();
93              if((mapper.intern()==mappers[0].intern() && tempPath.indexOf("ClientTraceDetailed")>0) ||
94                  (mapper.intern()==mappers[1].intern() && tempPath.indexOf("ClientTraceDetailed")>0) ||
95                  (mapper.intern()==mappers[2].intern() && tempPath.indexOf("TaskData")>0) ||
96                  (mapper.intern()==mappers[2].intern() && tempPath.indexOf("JobData")>0)) {
97                args[k]=tempPath;
98                k++;
99                hasData=true;
100             }
101           }
102           args[1]=k-2+"";
103           fsmConf.set("chukwa.salsa.fsm.mapclass", mapper);
104           args[k]=outputDir+mapper;
105           Path outputPath = new Path(args[k]);
106           outputPaths.add(outputPath);
107           if(hasData) {
108             int res = ToolRunner.run(fsmConf, new FSMBuilder(), args);
109             log.debug("Job Status: "+res);
110           }
111         }
112       }
113       // Find the mapreduce output and load to MDL.
114       for(Path outputPath : outputPaths) {
115         Path searchDir = new Path(outputPath.toUri().toString()+"/*/*/*.evt");
116         log.info("Search dir:"+searchDir.toUri().toString());
117         FileStatus[] outputList = fs.globStatus(searchDir);
118         if(outputList!=null) {
119           for(int j=0;j<outputList.length;j++) {
120             String outputFile = outputList[j].getPath().toUri().toString();
121             log.info("FSM -> MDL loading: "+outputFile);
122             completion.submit(new MetricDataLoader(conf, fs, outputFile));
123             counter++;
124           }
125         } else {
126           log.warn("No output to load.");
127         }
128       }
129       for(int i=0;i<counter;i++) {
130         completion.take().get();
131       }
132       // Clean up mapreduce output of fsm.
133       for(Path dir : outputPaths) {
134         fs.delete(dir, true);
135       }
136     } catch(Exception e) {
137       log.error(ExceptionUtil.getStackTrace(e));
138       throw new IOException();
139     }
140   }
141 
142   public void shutdown() throws InterruptedException {
143     executor.shutdown();
144     executor.awaitTermination(30, TimeUnit.SECONDS);
145     executor.shutdownNow();
146   }
147 }