This project has retired. For details please refer to its
Attic page.
FSMDataLoader xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.dataloader;
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.HashSet;
23 import java.util.concurrent.CompletionService;
24 import java.util.concurrent.ExecutorCompletionService;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.fs.FileStatus;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.util.ToolRunner;
35 import org.apache.hadoop.chukwa.analysis.salsa.fsm.FSMBuilder;
36 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
37 import org.apache.hadoop.chukwa.util.ExceptionUtil;
38 import org.apache.hadoop.conf.Configuration;
39
40 public class FSMDataLoader extends DataLoaderFactory {
41 private static Log log = LogFactory.getLog(FSMDataLoader.class);
42
43 protected MetricDataLoader threads[] = null;
44 private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
45 private int size = 1;
46 private CompletionService completion = null;
47 private ExecutorService executor = null;
48 private static String[] mappers = {
49 "org.apache.hadoop.chukwa.analysis.salsa.fsm.DataNodeClientTraceMapper",
50 "org.apache.hadoop.chukwa.analysis.salsa.fsm.TaskTrackerClientTraceMapper",
51 "org.apache.hadoop.chukwa.analysis.salsa.fsm.JobHistoryTaskDataMapper"
52 };
53
54 public FSMDataLoader() {
55 }
56
57 public void load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] fileList) throws IOException {
58
59 if(executor==null) {
60 try {
61 this.size = Integer.parseInt(conf.get(DATA_LOADER_THREAD_LIMIT));
62 } catch(Exception e) {
63 this.size = 1;
64 }
65 executor = Executors.newFixedThreadPool(size);
66 }
67 if(completion==null) {
68 completion = new ExecutorCompletionService(executor);
69 }
70
71 try {
72
73 HashSet<Path> inputPaths = new HashSet<Path>();
74 HashSet<Path> outputPaths = new HashSet<Path>();
75 int counter = 0;
76 for(int i=0;i<fileList.length;i++) {
77 Path temp = fileList[i].getPath().getParent();
78 if(!inputPaths.contains(temp)) {
79 inputPaths.add(temp);
80 }
81 }
82 String outputDir= conf.get("chukwa.tmp.data.dir")+File.separator+"fsm_"+System.currentTimeMillis()+"_";
83 if(inputPaths.size()>0) {
84 Configuration fsmConf = new Configuration();
85
86 for(String mapper : mappers) {
87 String[] args = new String[inputPaths.size()+3];
88 args[0]="-in";
89 int k=2;
90 boolean hasData=false;
91 for(Path temp : inputPaths) {
92 String tempPath = temp.toUri().toString();
93 if((mapper.intern()==mappers[0].intern() && tempPath.indexOf("ClientTraceDetailed")>0) ||
94 (mapper.intern()==mappers[1].intern() && tempPath.indexOf("ClientTraceDetailed")>0) ||
95 (mapper.intern()==mappers[2].intern() && tempPath.indexOf("TaskData")>0) ||
96 (mapper.intern()==mappers[2].intern() && tempPath.indexOf("JobData")>0)) {
97 args[k]=tempPath;
98 k++;
99 hasData=true;
100 }
101 }
102 args[1]=k-2+"";
103 fsmConf.set("chukwa.salsa.fsm.mapclass", mapper);
104 args[k]=outputDir+mapper;
105 Path outputPath = new Path(args[k]);
106 outputPaths.add(outputPath);
107 if(hasData) {
108 int res = ToolRunner.run(fsmConf, new FSMBuilder(), args);
109 log.debug("Job Status: "+res);
110 }
111 }
112 }
113
114 for(Path outputPath : outputPaths) {
115 Path searchDir = new Path(outputPath.toUri().toString()+"/*/*/*.evt");
116 log.info("Search dir:"+searchDir.toUri().toString());
117 FileStatus[] outputList = fs.globStatus(searchDir);
118 if(outputList!=null) {
119 for(int j=0;j<outputList.length;j++) {
120 String outputFile = outputList[j].getPath().toUri().toString();
121 log.info("FSM -> MDL loading: "+outputFile);
122 completion.submit(new MetricDataLoader(conf, fs, outputFile));
123 counter++;
124 }
125 } else {
126 log.warn("No output to load.");
127 }
128 }
129 for(int i=0;i<counter;i++) {
130 completion.take().get();
131 }
132
133 for(Path dir : outputPaths) {
134 fs.delete(dir, true);
135 }
136 } catch(Exception e) {
137 log.error(ExceptionUtil.getStackTrace(e));
138 throw new IOException();
139 }
140 }
141
142 public void shutdown() throws InterruptedException {
143 executor.shutdown();
144 executor.awaitTermination(30, TimeUnit.SECONDS);
145 executor.shutdownNow();
146 }
147 }