This project has retired. For details please refer to its
Attic page.
BackfillingLoader xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.tools.backfilling;
19
20 import java.io.File;
21
22 import org.apache.hadoop.chukwa.ChunkImpl;
23 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
24 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
25 import org.apache.hadoop.chukwa.datacollection.DataFactory;
26 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
27 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorFactory;
28 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
29 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.log4j.Logger;
32
33 public class BackfillingLoader {
34 static Logger log = Logger.getLogger(BackfillingLoader.class);
35
36 protected Configuration conf = null;
37 protected ChunkQueue queue = null;
38 protected Connector connector = null;
39
40 private String cluster = null;
41 private String machine = null;
42 private String adaptorName = null;
43 private String recordType = null;
44 private String logFile = null;
45
46 public BackfillingLoader(Configuration conf, String cluster, String machine,
47 String adaptorName, String recordType, String logFile) {
48
49 this.conf = conf;
50 this.cluster = cluster.trim();
51 this.machine = machine.trim();
52 this.adaptorName = adaptorName;
53 this.recordType = recordType;
54 this.logFile = logFile;
55
56 log.info("cluster >>>" + cluster) ;
57 log.info("machine >>>" + machine) ;
58 log.info("adaptorName >>>" + adaptorName) ;
59 log.info("recordType >>>" + recordType) ;
60 log.info("logFile >>>" + logFile) ;
61
62
63 DataFactory.getInstance().addDefaultTag("cluster=\"" + this.cluster + "\"");
64 ChunkImpl.setHostAddress(this.machine);
65
66 queue = DataFactory.getInstance().getEventQueue();
67 connector = new QueueToWriterConnector(conf,true);
68 }
69
70 public void process() throws AdaptorException {
71 File file = new File(logFile);
72 connector.start();
73 Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
74 adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath(),AdaptorManager.NULL);
75 adaptor.start("", recordType, 0l,queue);
76 adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED);
77 connector.shutdown();
78 if(!file.renameTo(new File(logFile + ".sav"))) {
79 System.err.println("Error in renaming "+logFile+" to "+logFile+".sav");
80 }
81 }
82
83 public static void usage() {
84 System.out.println("java org.apache.hadoop.chukwa.tools.backfilling.BackfillingLoader <cluster> <machine> <adaptorName> <recordType> <logFile>");
85 System.exit(-1);
86 }
87
88
89
90
91
92 public static void main(String[] args) throws Exception {
93
94 if (args.length != 5) {
95 usage();
96 }
97
98
99 String cluster = args[0];
100 String machine = args[1];
101 String adaptorName = args[2];
102 String recordType = args[3];
103 String logFile = args[4];
104
105 BackfillingLoader loader = new BackfillingLoader(new ChukwaConfiguration(),cluster,machine,adaptorName,recordType,logFile);
106 loader.process();
107 }
108
109 }