This project has retired. For details please refer to its Attic page.
FilePerPostWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.test;
19  
20  import java.io.IOException;
21  import java.net.URI;
22  
23  
24  import java.util.List;
25  import java.util.Timer;
26  import java.util.concurrent.atomic.AtomicLong;
27  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
28  import org.apache.hadoop.chukwa.Chunk;
29  import org.apache.hadoop.chukwa.ChunkImpl;
30  import org.apache.hadoop.chukwa.datacollection.writer.*;
31  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
32  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter.StatReportingTask;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FSDataOutputStream;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.io.SequenceFile;
38  
39  /**
40   * A writer that writes a file for each post. Intended ONLY for architectural
41   * performance comparisons.  Do not use this in production.
42   *
43   */
44  public class FilePerPostWriter extends SeqFileWriter {
45  
46    String baseName;
47    AtomicLong counter = new AtomicLong(0);
48    
49    protected FileSystem fs = null;
50    protected Configuration conf = null;
51  
52    protected String outputDir = null;
53  //  private Calendar calendar = Calendar.getInstance();
54  
55    protected Path currentPath = null;
56    protected String currentFileName = null;
57  
58    
59    @Override
60    public synchronized CommitStatus add(List<Chunk> chunks) throws WriterException {
61  
62      try {
63        String newName = baseName +"_" +counter.incrementAndGet();
64        Path newOutputPath = new Path(newName + ".done");
65        FSDataOutputStream currentOutputStr = fs.create(newOutputPath);
66        currentPath = newOutputPath;
67        currentFileName = newName;
68        // Uncompressed for now
69        SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, currentOutputStr,
70            ChukwaArchiveKey.class, ChunkImpl.class,
71            SequenceFile.CompressionType.NONE, null);
72      
73        ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
74        
75        if (System.currentTimeMillis() >= nextTimePeriodComputation) {
76          computeTimePeriod();
77        }
78  
79        for (Chunk chunk : chunks) {
80          archiveKey.setTimePartition(timePeriod);
81          archiveKey.setDataType(chunk.getDataType());
82          archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
83              + "/" + chunk.getStreamName());
84          archiveKey.setSeqId(chunk.getSeqID());
85  
86          if (chunk != null) {
87            // compute size for stats
88            dataSize += chunk.getData().length;
89            bytesThisRotate += chunk.getData().length;
90            seqFileWriter.append(archiveKey, chunk);
91          }
92  
93        }
94        
95        seqFileWriter.close();
96        currentOutputStr.close();
97      } catch(IOException e) {
98        throw new WriterException(e);
99      }
100     return COMMIT_OK;
101   }
102 
103   @Override
104   public void close() {
105   }
106 
107   @Override
108   public void init(Configuration conf) throws WriterException {
109     try {
110       this.conf = conf;
111       outputDir = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa");
112       baseName = outputDir + "/"+System.currentTimeMillis()+ "_" + localHostAddr.hashCode();
113       
114       String fsname = conf.get("writer.hdfs.filesystem");
115       if (fsname == null || fsname.equals("")) {
116         // otherwise try to get the filesystem from hadoop
117         fsname = conf.get("fs.default.name");
118       }
119 
120       fs = FileSystem.get(new URI(fsname), conf);
121       isRunning = true;
122       
123       statTimer = new Timer();
124       statTimer.schedule(new StatReportingTask(), 1000,
125           STAT_INTERVAL_SECONDS * 1000);
126 
127 
128       nextTimePeriodComputation = 0;
129     } catch(Exception e) {
130       throw new WriterException(e);
131     }
132       
133   }
134 
135 }