This project has retired. For details please refer to its
Attic page.
ChukwaArchiveManager xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.archive;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.text.SimpleDateFormat;
25
26 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
27 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.util.ToolRunner;
32 import org.apache.log4j.Logger;
33
34 public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
35 static Logger log = Logger.getLogger(ChukwaArchiveManager.class);
36 SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
37
38 static final int ONE_HOUR = 60 * 60 * 1000;
39 static final int ONE_DAY = 24*ONE_HOUR;
40 static final int MAX_FILES = 500;
41
42 private static final int DEFAULT_MAX_ERROR_COUNT = 4;
43
44 protected ChukwaConfiguration conf = null;
45 protected FileSystem fs = null;
46 protected boolean isRunning = true;
47
48 public ChukwaArchiveManager() throws Exception {
49 conf = new ChukwaConfiguration();
50 init();
51 }
52
53 protected void init() throws IOException, URISyntaxException {
54 String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
55 fs = FileSystem.get(new URI(fsName), conf);
56 }
57
58 public static void main(String[] args) throws Exception {
59
60 ChukwaArchiveManager manager = new ChukwaArchiveManager();
61 manager.start();
62 }
63
64 public void shutdown() {
65 this.isRunning = false;
66 }
67
68 public void start() throws Exception {
69
70 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
71 if ( ! chukwaRootDir.endsWith("/") ) {
72 chukwaRootDir += "/";
73 }
74 log.info("chukwaRootDir:" + chukwaRootDir);
75
76 String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
77 if ( ! archiveRootDir.endsWith("/") ) {
78 archiveRootDir += "/";
79 }
80 log.info("archiveDir:" + archiveRootDir);
81 Path pArchiveRootDir = new Path(archiveRootDir);
82 setup(pArchiveRootDir);
83
84 String archivesRootProcessingDir = chukwaRootDir + ARCHIVES_PROCESSING_DIR_NAME;
85
86 String archivesMRInputDir = archivesRootProcessingDir + ARCHIVES_MR_INPUT_DIR_NAME;
87 String archivesMROutputDir = archivesRootProcessingDir+ ARCHIVES_MR_OUTPUT_DIR_NAME;
88 String finalArchiveOutput = chukwaRootDir + DEFAULT_FINAL_ARCHIVES;
89
90 int maxPermittedErrorCount = conf.getInt(CHUKWA_ARCHIVE_MAX_ERROR_COUNT_FIELD,
91 DEFAULT_MAX_ERROR_COUNT);
92
93 Path pDailyRawArchivesInput = new Path(archiveRootDir);
94 Path pArchivesMRInputDir = new Path(archivesMRInputDir);
95 Path pArchivesRootProcessingDir = new Path(archivesRootProcessingDir);
96 Path pFinalArchiveOutput = new Path(finalArchiveOutput);
97
98
99 if (!archivesMRInputDir.endsWith("/")) {
100 archivesMRInputDir +="/";
101 }
102 setup( pArchivesRootProcessingDir );
103 setup( pDailyRawArchivesInput );
104 setup( pFinalArchiveOutput );
105
106 int errorCount = 0;
107
108 long lastRun = 0l;
109
110 while (isRunning) {
111 try {
112
113 if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
114 log.warn("==================\nToo many errors (" + errorCount +
115 "), Bail out!\n==================");
116 break;
117 }
118
119
120
121
122 if (fs.exists(pArchivesMRInputDir)) {
123 FileStatus[] days = fs.listStatus(pArchivesMRInputDir);
124 if (days.length > 0) {
125 log.info("reprocessing current Archive input" + days[0].getPath());
126
127 runArchive(archivesMRInputDir + days[0].getPath().getName() + "/",archivesMROutputDir,finalArchiveOutput);
128 errorCount = 0;
129 continue;
130 }
131 }
132
133
134 log.info("Raw Archive dir:" + pDailyRawArchivesInput);
135 long now = System.currentTimeMillis();
136 int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
137 FileStatus[] daysInRawArchiveDir = fs.listStatus(pDailyRawArchivesInput);
138
139 if (daysInRawArchiveDir.length == 0 ) {
140 log.debug( pDailyRawArchivesInput + " is empty, going to sleep for 1 minute");
141 Thread.sleep(1 * 60 * 1000);
142 continue;
143 }
144
145
146 if (daysInRawArchiveDir.length == 1 ) {
147 int workingDay = Integer.parseInt(daysInRawArchiveDir[0].getPath().getName());
148 long nextRun = lastRun + (2*ONE_HOUR) - (1*60*1000);
149 if (workingDay == currentDay && now < nextRun) {
150 log.info("lastRun < 2 hours so skip archive for now, going to sleep for 30 minutes, currentDate is:" + new java.util.Date());
151 Thread.sleep(30 * 60 * 1000);
152 continue;
153 }
154 }
155
156 String dayArchivesMRInputDir = null;
157 for (FileStatus fsDay : daysInRawArchiveDir) {
158 dayArchivesMRInputDir = archivesMRInputDir + fsDay.getPath().getName() + "/";
159 processDay(fsDay, dayArchivesMRInputDir,archivesMROutputDir, finalArchiveOutput);
160 lastRun = now;
161 }
162
163 }catch (Throwable e) {
164 errorCount ++;
165 e.printStackTrace();
166 log.warn(e);
167 }
168
169 }
170
171 }
172
173 public void processDay(FileStatus fsDay, String archivesMRInputDir,
174 String archivesMROutputDir,String finalArchiveOutput) throws Exception {
175 FileStatus[] dataSinkDirsInRawArchiveDir = fs.listStatus(fsDay.getPath());
176 long now = System.currentTimeMillis();
177
178 int currentDay = Integer.parseInt(day.format(System.currentTimeMillis()));
179 int workingDay = Integer.parseInt(fsDay.getPath().getName());
180
181 long oneHourAgo = now - ONE_HOUR;
182 if (dataSinkDirsInRawArchiveDir.length == 0 && workingDay < currentDay) {
183 fs.delete(fsDay.getPath(),false);
184 log.info("deleting raw dataSink dir for day:" + fsDay.getPath().getName());
185 return;
186 }
187
188 int fileCount = 0;
189 for (FileStatus fsDataSinkDir : dataSinkDirsInRawArchiveDir) {
190 long modificationDate = fsDataSinkDir.getModificationTime();
191 if (modificationDate < oneHourAgo || workingDay < currentDay) {
192 log.info("processDay,modificationDate:" + modificationDate +", adding: " + fsDataSinkDir.getPath() );
193 fileCount += fs.listStatus(fsDataSinkDir.getPath()).length;
194 moveDataSinkFilesToArchiveMrInput(fsDataSinkDir,archivesMRInputDir);
195
196 if (fileCount >= MAX_FILES) {
197 log.info("processDay, reach capacity");
198 runArchive(archivesMRInputDir,archivesMROutputDir,finalArchiveOutput);
199 fileCount = 0;
200 } else {
201 log.info("processDay,modificationDate:" + modificationDate +", skipping: " + fsDataSinkDir.getPath() );
202 }
203 }
204 }
205 }
206
207 public void runArchive(String archivesMRInputDir,String archivesMROutputDir,
208 String finalArchiveOutput) throws Exception {
209 String[] args = new String[3];
210
211
212 args[0] = conf.get("archive.grouper","Stream");
213 args[1] = archivesMRInputDir + "*/*.done" ;
214 args[2] = archivesMROutputDir;
215
216 Path pArchivesMRInputDir = new Path(archivesMRInputDir);
217 Path pArchivesMROutputDir = new Path(archivesMROutputDir);
218
219
220 if (fs.exists(pArchivesMROutputDir)) {
221 log.warn("Deleteing mroutput dir for archive ...");
222 fs.delete(pArchivesMROutputDir, true);
223 }
224
225 log.info("ChukwaArchiveManager processing :" + args[1] + " going to output to " + args[2] );
226 int res = ToolRunner.run(this.conf, new ChukwaArchiveBuilder(),args);
227 log.info("Archive result: " + res);
228 if (res != 0) {
229 throw new Exception("Archive result != 0");
230 }
231
232 if (!finalArchiveOutput.endsWith("/")) {
233 finalArchiveOutput +="/";
234 }
235 String day = pArchivesMRInputDir.getName();
236 finalArchiveOutput += day;
237 Path pDay = new Path(finalArchiveOutput);
238 setup(pDay);
239
240 finalArchiveOutput += "/archive_" + System.currentTimeMillis();
241 Path pFinalArchiveOutput = new Path(finalArchiveOutput);
242
243 log.info("Final move: moving " + pArchivesMROutputDir + " to " + pFinalArchiveOutput);
244
245 if (fs.rename(pArchivesMROutputDir, pFinalArchiveOutput ) ) {
246 log.info("deleting " + pArchivesMRInputDir);
247 fs.delete(pArchivesMRInputDir, true);
248 } else {
249 log.warn("move to final archive folder failed!");
250 }
251
252
253
254 }
255
256 public void moveDataSinkFilesToArchiveMrInput(FileStatus fsDataSinkDir,
257 String archivesMRInputDir) throws IOException {
258
259 if (!archivesMRInputDir.endsWith("/")) {
260 archivesMRInputDir +="/";
261 }
262
263 Path pArchivesMRInputDir = new Path(archivesMRInputDir);
264 setup(pArchivesMRInputDir);
265 fs.rename(fsDataSinkDir.getPath(), pArchivesMRInputDir);
266 log.info("moving " + fsDataSinkDir.getPath() + " to " + pArchivesMRInputDir);
267 }
268
269
270
271
272
273
274 protected void setup(Path directory) throws IOException {
275 if ( ! fs.exists(directory)) {
276 fs.mkdirs(directory);
277 }
278 }
279
280 }