This project has retired. For details please refer to its Attic page.
BackfillingLoader xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.tools.backfilling;
19  
20  import java.io.File;
21  
22  import org.apache.hadoop.chukwa.ChunkImpl;
23  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
24  import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
25  import org.apache.hadoop.chukwa.datacollection.DataFactory;
26  import org.apache.hadoop.chukwa.datacollection.adaptor.*;
27  import org.apache.hadoop.chukwa.datacollection.agent.AdaptorFactory;
28  import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
29  import org.apache.hadoop.chukwa.datacollection.connector.Connector;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.log4j.Logger;
32  
33  public class BackfillingLoader {
34    static Logger log = Logger.getLogger(BackfillingLoader.class);
35    
36    protected Configuration conf = null;
37    protected ChunkQueue queue = null;
38    protected Connector connector = null;
39    
40    private String cluster =  null;
41    private String machine =  null;
42    private String adaptorName =  null;
43    private String recordType =  null;
44    private String logFile =  null;
45    
46    public BackfillingLoader(Configuration conf, String cluster, String machine, 
47        String adaptorName, String recordType, String logFile) {
48      
49      this.conf = conf;
50      this.cluster = cluster.trim();
51      this.machine = machine.trim();
52      this.adaptorName = adaptorName;
53      this.recordType = recordType;
54      this.logFile = logFile;
55      
56      log.info("cluster >>>" + cluster) ;
57      log.info("machine >>>" + machine) ;
58      log.info("adaptorName >>>" + adaptorName) ;
59      log.info("recordType >>>" + recordType) ;
60      log.info("logFile >>>" + logFile) ;
61      
62      // Set the right cluster and machine information
63      DataFactory.getInstance().addDefaultTag("cluster=\"" + this.cluster + "\"");
64      ChunkImpl.setHostAddress(this.machine);
65      
66      queue = DataFactory.getInstance().getEventQueue();
67      connector = new QueueToWriterConnector(conf,true);
68    }
69    
70    public void process() throws AdaptorException {
71      File file = new File(logFile);
72      connector.start();
73      Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
74      adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath(),AdaptorManager.NULL);
75      adaptor.start("", recordType,  0l,queue);
76      adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED);
77      connector.shutdown();
78      if(!file.renameTo(new File(logFile + ".sav"))) {
79        System.err.println("Error in renaming "+logFile+" to "+logFile+".sav");
80      }
81    }
82    
83    public static void usage() {
84      System.out.println("java org.apache.hadoop.chukwa.tools.backfilling.BackfillingLoader <cluster> <machine> <adaptorName> <recordType> <logFile>");
85      System.exit(-1);
86    }
87    
88    /**
89     * @param args
90     * @throws Exception 
91     */
92    public static void main(String[] args) throws Exception {
93  
94      if (args.length != 5) {
95        usage();
96      }
97      
98  
99      String cluster = args[0];
100     String machine = args[1];
101     String adaptorName = args[2];
102     String recordType = args[3];
103     String logFile = args[4];
104 
105     BackfillingLoader loader = new BackfillingLoader(new ChukwaConfiguration(),cluster,machine,adaptorName,recordType,logFile);
106     loader.process();
107   }
108 
109 }