This project has retired. For details please refer to its
Attic page.
ChukwaParquetWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.writer.parquet;
19
20 import java.io.IOException;
21 import java.net.InetAddress;
22 import java.net.UnknownHostException;
23 import java.nio.ByteBuffer;
24 import java.util.Calendar;
25 import java.util.List;
26
27 import org.apache.avro.Schema;
28 import org.apache.avro.generic.GenericData;
29 import org.apache.avro.generic.GenericRecord;
30 import org.apache.hadoop.chukwa.Chunk;
31 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
32 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
33 import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
34 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
35 import org.apache.hadoop.chukwa.util.ExceptionUtil;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.log4j.Logger;
40 import org.apache.parquet.avro.AvroParquetWriter;
41 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
42
43 public class ChukwaParquetWriter extends PipelineableWriter {
44 private static Logger LOG = Logger.getLogger(ChukwaParquetWriter.class);
45 public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
46 private int blockSize = 128 * 1024 * 1024;
47 private int pageSize = 1 * 1024 * 1024;
48 private Schema avroSchema = null;
49 private AvroParquetWriter<GenericRecord> parquetWriter = null;
50 protected String outputDir = null;
51 private Calendar calendar = Calendar.getInstance();
52 private String localHostAddr = null;
53 private long rotateInterval = 300000L;
54 private long startTime = 0;
55 private Path previousPath = null;
56 private String previousFileName = null;
57 private FileSystem fs = null;
58
59 public ChukwaParquetWriter() throws WriterException {
60 this(ChukwaAgent.getStaticConfiguration());
61 }
62
63 public ChukwaParquetWriter(Configuration c) throws WriterException {
64 setup(c);
65 }
66
67 @Override
68 public void init(Configuration c) throws WriterException {
69 }
70
71 private void setup(Configuration c) throws WriterException {
72 try {
73 localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
74 } catch (UnknownHostException e) {
75 localHostAddr = "-NA-";
76 }
77 outputDir = c.get(OUTPUT_DIR_OPT, "/chukwa/logs");
78 blockSize = c.getInt("dfs.blocksize", 128 * 1024 * 1024);
79 rotateInterval = c.getLong("chukwaCollector.rotateInterval", 300000L);
80 if(fs == null) {
81 try {
82 fs = FileSystem.get(c);
83 } catch (IOException e) {
84 throw new WriterException(e);
85 }
86 }
87
88
89 avroSchema = ChukwaAvroSchema.getSchema();
90
91 rotate();
92 }
93
94 @Override
95 public void close() throws WriterException {
96 try {
97 parquetWriter.close();
98 fs.rename(previousPath, new Path(previousFileName + ".done"));
99 } catch (IOException e) {
100 throw new WriterException(e);
101 }
102 }
103
104 @Override
105 public CommitStatus add(List<Chunk> chunks) throws WriterException {
106 long elapsedTime = 0;
107 CommitStatus rv = ChukwaWriter.COMMIT_OK;
108 for(Chunk chunk : chunks) {
109 try {
110 GenericRecord record = new GenericData.Record(avroSchema);
111 record.put("dataType", chunk.getDataType());
112 record.put("data", ByteBuffer.wrap(chunk.getData()));
113 record.put("tags", chunk.getTags());
114 record.put("seqId", chunk.getSeqID());
115 record.put("source", chunk.getSource());
116 record.put("stream", chunk.getStreamName());
117 parquetWriter.write(record);
118 elapsedTime = System.currentTimeMillis() - startTime;
119 if(elapsedTime > rotateInterval) {
120 rotate();
121 }
122 } catch (IOException e) {
123 LOG.warn("Failed to store data to HDFS.");
124 LOG.warn(ExceptionUtil.getStackTrace(e));
125 }
126 }
127 if (next != null) {
128 rv = next.add(chunks);
129 }
130 return rv;
131 }
132
133 private void rotate() throws WriterException {
134 if(parquetWriter!=null) {
135 try {
136 parquetWriter.close();
137 String newFileName = previousFileName.substring(0, previousFileName.length() - 7);
138 fs.rename(previousPath, new Path(newFileName + ".done"));
139 } catch (IOException e) {
140 LOG.warn("Fail to close Chukwa write ahead log.");
141 }
142 }
143 startTime = System.currentTimeMillis();
144 calendar.setTimeInMillis(startTime);
145
146 String newName = new java.text.SimpleDateFormat("yyyyMMddHHmmssSSS")
147 .format(calendar.getTime());
148 newName += localHostAddr + new java.rmi.server.UID().toString();
149 newName = newName.replace("-", "");
150 newName = newName.replace(":", "");
151 newName = newName.replace(".", "");
152 newName = outputDir + "/" + newName.trim() + ".chukwa";
153 LOG.info("writing: "+newName);
154 Path path = new Path(newName);
155 try {
156 parquetWriter = new AvroParquetWriter<GenericRecord>(path, avroSchema, CompressionCodecName.SNAPPY, blockSize, pageSize);
157 previousPath = path;
158 previousFileName = newName;
159 } catch (IOException e) {
160 throw new WriterException(e);
161 }
162 }
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177 public long getDelayForFixedInterval(long currentTime, long rotateInterval, long offsetInterval){
178
179 long remainder = (currentTime % rotateInterval);
180 long prevRoundedInterval = currentTime - remainder;
181 long nextRoundedInterval = prevRoundedInterval + rotateInterval;
182 long delay = nextRoundedInterval - currentTime + offsetInterval;
183
184 if (LOG.isInfoEnabled()) {
185 LOG.info("currentTime="+currentTime+" prevRoundedInterval="+
186 prevRoundedInterval+" nextRoundedInterval" +
187 "="+nextRoundedInterval+" delay="+delay);
188 }
189
190 return delay;
191 }
192 }