This project has retired. For details please refer to its
Attic page.
CreateRecordFile xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.util;
20
21 import org.apache.hadoop.fs.Path;
22 import org.apache.hadoop.fs.FileSystem;
23 import org.apache.hadoop.fs.FSDataOutputStream;
24 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
25 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor;
26 import org.apache.hadoop.chukwa.extraction.demux.Demux;
27 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
28 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29 import org.apache.hadoop.chukwa.ChunkImpl;
30 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.io.SequenceFile;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.JobConf;
35 import org.apache.hadoop.mapred.Reporter;
36
37 import java.io.IOException;
38 import java.io.File;
39 import java.io.BufferedReader;
40 import java.io.FileReader;
41
42
43
44
45 public class CreateRecordFile {
46
47 public static void makeTestSequenceFile(File inputFile,
48 Path outputFile,
49 String clusterName,
50 String dataType,
51 String streamName,
52 MapProcessor processor) throws IOException {
53
54
55 MockOutputCollector collector = new MockOutputCollector();
56 if (processor == null) processor = new TsProcessor();
57
58
59 Configuration conf = new Configuration();
60 FileSystem fs = outputFile.getFileSystem(conf);
61 FSDataOutputStream out = fs.create(outputFile);
62
63 SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
64 ChukwaRecordKey.class, ChukwaRecord.class,
65 SequenceFile.CompressionType.NONE, null);
66 long lastSeqID = 0;
67 String line;
68 BufferedReader reader = new BufferedReader(new FileReader(inputFile));
69
70
71
72 while ((line = reader.readLine()) != null) {
73
74 ChunkImpl chunk = new ChunkImpl(dataType, streamName,
75 line.length() + lastSeqID, line.getBytes(), null);
76 lastSeqID += line.length();
77 chunk.addTag("cluster=\"" + clusterName + "\"");
78
79 ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
80 archiveKey.setTimePartition(System.currentTimeMillis());
81 archiveKey.setDataType(chunk.getDataType());
82 archiveKey.setStreamName(chunk.getStreamName());
83 archiveKey.setSeqId(chunk.getSeqID());
84
85 processor.process(archiveKey, chunk, collector, Reporter.NULL);
86 seqFileWriter.append(collector.getChukwaRecordKey(),
87 collector.getChukwaRecord());
88 }
89
90 out.flush();
91 out.close();
92 seqFileWriter.close();
93 reader.close();
94 }
95
96 private static class MockOutputCollector
97 implements OutputCollector<ChukwaRecordKey, ChukwaRecord> {
98 ChukwaRecordKey chukwaRecordKey;
99 ChukwaRecord chukwaRecord;
100
101 public void collect(ChukwaRecordKey chukwaRecordKey,
102 ChukwaRecord chukwaRecord) throws IOException {
103 this.chukwaRecordKey = chukwaRecordKey;
104 this.chukwaRecord = chukwaRecord;
105 }
106
107 public ChukwaRecordKey getChukwaRecordKey() { return chukwaRecordKey; }
108 public ChukwaRecord getChukwaRecord() { return chukwaRecord; }
109 }
110
111 public static void main(String[] args) throws IOException,
112 ClassNotFoundException,
113 IllegalAccessException,
114 InstantiationException {
115 if((args.length < 0 && args[0].contains("-h")) || args.length < 2) {
116 usage();
117 }
118
119 File inputFile = new File(args[0]);
120 Path outputFile = new Path(args[1]);
121 String clusterName = "testClusterName";
122 String dataType = "testDataType";
123 String streamName = "testStreamName";
124 MapProcessor processor = new TsProcessor();
125 Path confFile = null;
126
127 if (args.length > 2) clusterName = args[2];
128 if (args.length > 3) dataType = args[3];
129 if (args.length > 4) streamName = args[4];
130
131 if (args.length > 5) {
132 Class clazz = null;
133 try {
134 clazz = Class.forName(args[5]);
135 }
136 catch (ClassNotFoundException e) {
137 try {
138 clazz = Class.forName(
139 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper." + args[5]);
140 }
141 catch (Exception e2) {
142 throw e;
143 }
144 }
145 processor = (MapProcessor)clazz.newInstance();
146 }
147
148 if (args.length > 6) {
149 confFile = new Path(args[6]);
150 Demux.jobConf = new JobConf(confFile);
151 }
152
153 System.out.println("Creating sequence file using the following input:");
154 System.out.println("inputFile : " + inputFile);
155 System.out.println("outputFile : " + outputFile);
156 System.out.println("clusterName: " + clusterName);
157 System.out.println("dataType : " + dataType);
158 System.out.println("streamName : " + streamName);
159 System.out.println("processor : " + processor.getClass().getName());
160 System.out.println("confFile : " + confFile);
161
162 makeTestSequenceFile(inputFile, outputFile, clusterName, dataType, streamName, processor);
163
164 System.out.println("Done");
165 }
166
167 public static void usage() {
168 System.out.println("Usage: java " + TempFileUtil.class.toString().split(" ")[1] + " <inputFile> <outputFile> [<clusterName> <dataType> <streamName> <processorClass> [confFile]]");
169 System.out.println("Description: Takes a plain text input file and generates a Hadoop sequence file contaning ChukwaRecordKey,ChukwaRecord entries");
170 System.out.println("Parameters: inputFile - Text input file to read");
171 System.out.println(" outputFile - Sequence file to create");
172 System.out.println(" clusterName - Cluster name to use in the records");
173 System.out.println(" dataType - Data type to use in the records");
174 System.out.println(" streamName - Stream name to use in the records");
175 System.out.println(" processorClass - Processor class to use. Defaults to TsProcessor");
176 System.out.println(" confFile - File to use to create the JobConf");
177 System.exit(0);
178 }
179 }