This project has retired. For details please refer to its
Attic page.
CreateRecordFile xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.util;
20
21 import org.apache.hadoop.fs.Path;
22 import org.apache.hadoop.fs.FileSystem;
23 import org.apache.hadoop.fs.FSDataOutputStream;
24 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
25 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor;
26 import org.apache.hadoop.chukwa.extraction.demux.Demux;
27 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29 import org.apache.hadoop.chukwa.ChunkImpl;
30 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.io.SequenceFile;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.JobConf;
35 import org.apache.hadoop.mapred.Reporter;
36
37 import java.io.FileInputStream;
38 import java.io.IOException;
39 import java.io.File;
40 import java.io.BufferedReader;
41 import java.io.InputStreamReader;
42 import java.nio.charset.Charset;
43
44
45
46
47 public class CreateRecordFile {
48
49 public static void makeTestSequenceFile(File inputFile,
50 Path outputFile,
51 String clusterName,
52 String dataType,
53 String streamName,
54 MapProcessor processor) throws IOException {
55
56
57 MockOutputCollector collector = new MockOutputCollector();
58 if (processor == null) processor = new TsProcessor();
59
60
61 Configuration conf = new Configuration();
62 FileSystem fs = outputFile.getFileSystem(conf);
63 FSDataOutputStream out = fs.create(outputFile);
64
65 SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
66 ChukwaRecordKey.class, ChukwaRecord.class,
67 SequenceFile.CompressionType.NONE, null);
68 long lastSeqID = 0;
69 String line;
70 FileInputStream fis = new FileInputStream(inputFile);
71 BufferedReader reader = new BufferedReader(new InputStreamReader(fis, Charset.forName("UTF-8")));
72
73
74
75 while ((line = reader.readLine()) != null) {
76
77 ChunkImpl chunk = new ChunkImpl(dataType, streamName,
78 line.length() + lastSeqID, line.getBytes(Charset.forName("UTF-8")), null);
79 lastSeqID += line.length();
80 chunk.addTag("cluster=\"" + clusterName + "\"");
81
82 ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
83 archiveKey.setTimePartition(System.currentTimeMillis());
84 archiveKey.setDataType(chunk.getDataType());
85 archiveKey.setStreamName(chunk.getStreamName());
86 archiveKey.setSeqId(chunk.getSeqID());
87
88 processor.process(archiveKey, chunk, collector, Reporter.NULL);
89 seqFileWriter.append(collector.getChukwaRecordKey(),
90 collector.getChukwaRecord());
91 }
92
93 out.flush();
94 out.close();
95 seqFileWriter.close();
96 reader.close();
97 }
98
99 private static class MockOutputCollector
100 implements OutputCollector<ChukwaRecordKey, ChukwaRecord> {
101 ChukwaRecordKey chukwaRecordKey;
102 ChukwaRecord chukwaRecord;
103
104 public void collect(ChukwaRecordKey chukwaRecordKey,
105 ChukwaRecord chukwaRecord) throws IOException {
106 this.chukwaRecordKey = chukwaRecordKey;
107 this.chukwaRecord = chukwaRecord;
108 }
109
110 public ChukwaRecordKey getChukwaRecordKey() { return chukwaRecordKey; }
111 public ChukwaRecord getChukwaRecord() { return chukwaRecord; }
112 }
113
114 public static void main(String[] args) throws IOException,
115 ClassNotFoundException,
116 IllegalAccessException,
117 InstantiationException {
118 if(args.length == 0 || (args.length==1 && args[0].contains("-h"))) {
119 usage();
120 return;
121 }
122
123 File inputFile = new File(args[0]);
124 Path outputFile = new Path(args[1]);
125 String clusterName = "testClusterName";
126 String dataType = "testDataType";
127 String streamName = "testStreamName";
128 MapProcessor processor = new TsProcessor();
129 Path confFile = null;
130
131 if (args.length > 2) clusterName = args[2];
132 if (args.length > 3) dataType = args[3];
133 if (args.length > 4) streamName = args[4];
134
135 if (args.length > 5) {
136 Class<?> clazz = null;
137 try {
138 clazz = Class.forName(args[5]);
139 }
140 catch (ClassNotFoundException e) {
141 try {
142 clazz = Class.forName(
143 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper." + args[5]);
144 }
145 catch (Exception e2) {
146 throw e;
147 }
148 }
149 processor = (MapProcessor)clazz.newInstance();
150 }
151
152 if (args.length > 6) {
153 confFile = new Path(args[6]);
154 Demux.jobConf = new JobConf(confFile);
155 }
156
157 System.out.println("Creating sequence file using the following input:");
158 System.out.println("inputFile : " + inputFile);
159 System.out.println("outputFile : " + outputFile);
160 System.out.println("clusterName: " + clusterName);
161 System.out.println("dataType : " + dataType);
162 System.out.println("streamName : " + streamName);
163 System.out.println("processor : " + processor.getClass().getName());
164 System.out.println("confFile : " + confFile);
165
166 makeTestSequenceFile(inputFile, outputFile, clusterName, dataType, streamName, processor);
167
168 System.out.println("Done");
169 }
170
171 public static void usage() {
172 System.out.println("Usage: java " + CreateRecordFile.class.toString().split(" ")[1] + " <inputFile> <outputFile> [<clusterName> <dataType> <streamName> <processorClass> [confFile]]");
173 System.out.println("Description: Takes a plain text input file and generates a Hadoop sequence file contaning ChukwaRecordKey,ChukwaRecord entries");
174 System.out.println("Parameters: inputFile - Text input file to read");
175 System.out.println(" outputFile - Sequence file to create");
176 System.out.println(" clusterName - Cluster name to use in the records");
177 System.out.println(" dataType - Data type to use in the records");
178 System.out.println(" streamName - Stream name to use in the records");
179 System.out.println(" processorClass - Processor class to use. Defaults to TsProcessor");
180 System.out.println(" confFile - File to use to create the JobConf");
181 }
182 }