This project has retired. For details please refer to its Attic page.
ChukwaParquetWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.writer.parquet;
19  
20  import java.io.IOException;
21  import java.net.InetAddress;
22  import java.net.UnknownHostException;
23  import java.nio.ByteBuffer;
24  import java.util.Calendar;
25  import java.util.List;
26  
27  import org.apache.avro.Schema;
28  import org.apache.avro.generic.GenericData;
29  import org.apache.avro.generic.GenericRecord;
30  import org.apache.hadoop.chukwa.Chunk;
31  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
32  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
33  import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
34  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
35  import org.apache.hadoop.chukwa.util.ExceptionUtil;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.log4j.Logger;
40  import org.apache.parquet.avro.AvroParquetWriter;
41  import org.apache.parquet.hadoop.metadata.CompressionCodecName;
42  
43  public class ChukwaParquetWriter extends PipelineableWriter {
44    private static Logger LOG = Logger.getLogger(ChukwaParquetWriter.class);
45    public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
46    private int blockSize = 128 * 1024 * 1024;
47    private int pageSize = 1 * 1024 * 1024;
48    private Schema avroSchema = null;
49    private AvroParquetWriter<GenericRecord> parquetWriter = null;
50    protected String outputDir = null;
51    private Calendar calendar = Calendar.getInstance();
52    private String localHostAddr = null;
53    private long rotateInterval = 300000L;
54    private long startTime = 0;
55    private Path previousPath = null;
56    private String previousFileName = null;
57    private FileSystem fs = null;
58    
59    public ChukwaParquetWriter() throws WriterException {
60      this(ChukwaAgent.getStaticConfiguration());
61    }
62  
63    public ChukwaParquetWriter(Configuration c) throws WriterException {
64      setup(c);
65    }
66  
67    @Override
68    public void init(Configuration c) throws WriterException {
69    }
70  
71    private void setup(Configuration c) throws WriterException {
72      try {
73        localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
74      } catch (UnknownHostException e) {
75        localHostAddr = "-NA-";
76      }
77      outputDir = c.get(OUTPUT_DIR_OPT, "/chukwa/logs");
78      blockSize = c.getInt("dfs.blocksize", 128 * 1024 * 1024);
79      rotateInterval = c.getLong("chukwaCollector.rotateInterval", 300000L);
80      if(fs == null) {
81        try {
82          fs = FileSystem.get(c);
83        } catch (IOException e) {
84          throw new WriterException(e);
85        }
86      }
87  
88      // load Chukwa Avro schema
89      avroSchema = ChukwaAvroSchema.getSchema();
90      // generate the corresponding Parquet schema
91      rotate();
92    }
93  
94    @Override
95    public void close() throws WriterException {
96      try {
97        parquetWriter.close();
98        fs.rename(previousPath, new Path(previousFileName + ".done"));
99      } catch (IOException e) {
100       throw new WriterException(e);
101     }
102   }
103 
104   @Override
105   public CommitStatus add(List<Chunk> chunks) throws WriterException {
106     long elapsedTime = 0;
107     CommitStatus rv = ChukwaWriter.COMMIT_OK;
108     for(Chunk chunk : chunks) {
109       try {
110         GenericRecord record = new GenericData.Record(avroSchema);
111         record.put("dataType", chunk.getDataType());
112         record.put("data", ByteBuffer.wrap(chunk.getData()));
113         record.put("tags", chunk.getTags());
114         record.put("seqId", chunk.getSeqID());
115         record.put("source", chunk.getSource());
116         record.put("stream", chunk.getStreamName());
117         parquetWriter.write(record);
118         elapsedTime = System.currentTimeMillis() - startTime;
119         if(elapsedTime > rotateInterval) {
120           rotate();
121         }
122       } catch (IOException e) {
123         LOG.warn("Failed to store data to HDFS.");
124         LOG.warn(ExceptionUtil.getStackTrace(e));
125       }
126     }
127     if (next != null) {
128       rv = next.add(chunks); //pass data through
129     }
130     return rv;
131   }
132   
133   private void rotate() throws WriterException {
134     if(parquetWriter!=null) {
135       try {
136         parquetWriter.close();
137         String newFileName = previousFileName.substring(0, previousFileName.length() - 7);
138         fs.rename(previousPath, new Path(newFileName + ".done"));
139       } catch (IOException e) {
140         LOG.warn("Fail to close Chukwa write ahead log.");
141       }
142     }
143     startTime = System.currentTimeMillis();
144     calendar.setTimeInMillis(startTime);
145 
146     String newName = new java.text.SimpleDateFormat("yyyyMMddHHmmssSSS")
147         .format(calendar.getTime());
148     newName += localHostAddr + new java.rmi.server.UID().toString();
149     newName = newName.replace("-", "");
150     newName = newName.replace(":", "");
151     newName = newName.replace(".", "");
152     newName = outputDir + "/" + newName.trim() + ".chukwa";
153     LOG.info("writing: "+newName);
154     Path path = new Path(newName);
155     try {
156       parquetWriter = new AvroParquetWriter<GenericRecord>(path, avroSchema, CompressionCodecName.SNAPPY, blockSize, pageSize);
157       previousPath = path;
158       previousFileName = newName;
159     } catch (IOException e) {
160       throw new WriterException(e);
161     }
162   }
163 
164   /**
165    * Calculates delay for scheduling the next rotation in case of
166    * FixedTimeRotatorScheme. This delay is the time difference between the
167    * currentTimestamp (t1) and the next time the collector should rotate the
168    * sequence files (t2). t2 is the time when the current rotateInterval ends
169    * plus an offset (as set by chukwaCollector.FixedTimeIntervalOffset).
170    * So, delay = t2 - t1
171    *
172    * @param currentTime - the current timestamp
173    * @param rotateInterval - chukwaCollector.rotateInterval
174    * @param offsetInterval - chukwaCollector.fixedTimeIntervalOffset
175    * @return delay for scheduling next rotation
176    */
177   public long getDelayForFixedInterval(long currentTime, long rotateInterval, long offsetInterval){
178     // time since last rounded interval
179     long remainder = (currentTime % rotateInterval);
180     long prevRoundedInterval = currentTime - remainder;
181     long nextRoundedInterval = prevRoundedInterval + rotateInterval;
182     long delay = nextRoundedInterval - currentTime + offsetInterval;
183 
184     if (LOG.isInfoEnabled()) {
185       LOG.info("currentTime="+currentTime+" prevRoundedInterval="+
186              prevRoundedInterval+" nextRoundedInterval" +
187             "="+nextRoundedInterval+" delay="+delay);
188     }
189 
190     return delay;
191   }
192 }