This project has retired. For details please refer to its Attic page.
CreateRecordFile xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.util;
20  
21  import org.apache.hadoop.fs.Path;
22  import org.apache.hadoop.fs.FileSystem;
23  import org.apache.hadoop.fs.FSDataOutputStream;
24  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
25  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor;
26  import org.apache.hadoop.chukwa.extraction.demux.Demux;
27  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29  import org.apache.hadoop.chukwa.ChunkImpl;
30  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.io.SequenceFile;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.JobConf;
35  import org.apache.hadoop.mapred.Reporter;
36  
37  import java.io.IOException;
38  import java.io.File;
39  import java.io.BufferedReader;
40  import java.io.FileReader;
41  
42  /**
43   * Helper class used to create sequence files of Chukwa records
44   */
45  public class CreateRecordFile {
46  
47     public static void makeTestSequenceFile(File inputFile,
48                                             Path outputFile,
49                                             String clusterName,
50                                             String dataType,
51                                             String streamName,
52                                             MapProcessor processor) throws IOException {
53  
54       //initialize the output collector and the default processor
55       MockOutputCollector collector = new MockOutputCollector();
56       if (processor == null) processor = new TsProcessor();
57  
58       //initialize the sequence file writer
59       Configuration conf = new Configuration();
60       FileSystem fs = outputFile.getFileSystem(conf);
61       FSDataOutputStream out = fs.create(outputFile);
62  
63       SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
64                                      ChukwaRecordKey.class, ChukwaRecord.class,
65                                      SequenceFile.CompressionType.NONE, null);
66       long lastSeqID = 0;
67       String line;
68       BufferedReader reader = new BufferedReader(new FileReader(inputFile));
69  
70       // for each line, create a chunk and an arckive key, pass it to the
71       // processor, then write it to the sequence file.  
72       while ((line = reader.readLine()) != null) {
73  
74         ChunkImpl chunk = new ChunkImpl(dataType, streamName,
75           line.length()  + lastSeqID, line.getBytes(), null);
76         lastSeqID += line.length();
77         chunk.addTag("cluster=\"" + clusterName + "\"");
78  
79         ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
80         archiveKey.setTimePartition(System.currentTimeMillis());
81         archiveKey.setDataType(chunk.getDataType());
82         archiveKey.setStreamName(chunk.getStreamName());
83         archiveKey.setSeqId(chunk.getSeqID());
84  
85         processor.process(archiveKey, chunk, collector, Reporter.NULL);
86         seqFileWriter.append(collector.getChukwaRecordKey(),
87                              collector.getChukwaRecord());
88       }
89  
90       out.flush();
91       out.close();
92       seqFileWriter.close();
93       reader.close();
94     }
95  
96     private static class MockOutputCollector
97             implements OutputCollector<ChukwaRecordKey, ChukwaRecord> {
98       ChukwaRecordKey chukwaRecordKey;
99       ChukwaRecord chukwaRecord;
100 
101      public void collect(ChukwaRecordKey chukwaRecordKey,
102                          ChukwaRecord chukwaRecord) throws IOException {
103        this.chukwaRecordKey = chukwaRecordKey;
104        this.chukwaRecord = chukwaRecord;
105      }
106 
107      public ChukwaRecordKey getChukwaRecordKey() { return chukwaRecordKey; }
108      public ChukwaRecord getChukwaRecord() { return chukwaRecord; }
109    }
110 
111    public static void main(String[] args) throws IOException,
112                                                  ClassNotFoundException,
113                                                  IllegalAccessException,
114                                                  InstantiationException {
115      if((args.length < 0 && args[0].contains("-h")) || args.length < 2) {
116        usage();
117      }
118 
119      File inputFile = new File(args[0]);
120      Path outputFile = new Path(args[1]);
121      String clusterName = "testClusterName";
122      String dataType = "testDataType";
123      String streamName = "testStreamName";               
124      MapProcessor processor = new TsProcessor();
125      Path confFile = null;
126 
127      if (args.length > 2) clusterName = args[2];
128      if (args.length > 3) dataType = args[3];
129      if (args.length > 4) streamName = args[4];
130 
131      if (args.length > 5) {
132        Class clazz = null;
133        try {
134          clazz = Class.forName(args[5]);
135        }
136        catch (ClassNotFoundException e) {
137          try {
138            clazz = Class.forName(
139                  "org.apache.hadoop.chukwa.extraction.demux.processor.mapper." + args[5]);
140          }
141          catch (Exception e2) {
142            throw e;
143          }
144        }
145        processor = (MapProcessor)clazz.newInstance();
146      }
147 
148      if (args.length > 6) {
149        confFile = new Path(args[6]);
150        Demux.jobConf = new JobConf(confFile);
151      }
152 
153      System.out.println("Creating sequence file using the following input:");
154      System.out.println("inputFile  : " + inputFile);
155      System.out.println("outputFile : " + outputFile);
156      System.out.println("clusterName: " + clusterName);
157      System.out.println("dataType   : " + dataType);
158      System.out.println("streamName : " + streamName);
159      System.out.println("processor  : " + processor.getClass().getName());
160      System.out.println("confFile   : " + confFile);
161 
162      makeTestSequenceFile(inputFile, outputFile, clusterName, dataType, streamName, processor);
163 
164      System.out.println("Done");
165    }
166 
167    public static void usage() {
168      System.out.println("Usage: java " + TempFileUtil.class.toString().split(" ")[1] + " <inputFile> <outputFile> [<clusterName> <dataType> <streamName> <processorClass> [confFile]]");
169      System.out.println("Description: Takes a plain text input file and generates a Hadoop sequence file contaning ChukwaRecordKey,ChukwaRecord entries");
170      System.out.println("Parameters: inputFile      - Text input file to read");
171      System.out.println("            outputFile     - Sequence file to create");
172      System.out.println("            clusterName    - Cluster name to use in the records");
173      System.out.println("            dataType       - Data type to use in the records");
174      System.out.println("            streamName     - Stream name to use in the records");
175      System.out.println("            processorClass - Processor class to use. Defaults to TsProcessor");
176      System.out.println("            confFile       - File to use to create the JobConf");
177      System.exit(0);
178    }
179 }