This project has retired. For details please refer to its Attic page.
ConsoleWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer;
20  
21  
22  import java.nio.charset.Charset;
23  import java.util.List;
24  import java.util.Timer;
25  import java.util.TimerTask;
26  
27  import org.apache.hadoop.chukwa.Chunk;
28  import org.apache.hadoop.conf.Configuration;
29  
30  public class ConsoleWriter implements ChukwaWriter {
31  
32    boolean printData;
33    volatile long dataSize = 0;
34    final Timer statTimer;
35  
36    private class StatReportingTask extends TimerTask {
37      private long lastTs = System.currentTimeMillis();
38      private long lastDataSize = 0;
39  
40      public void run() {
41        long time = System.currentTimeMillis();
42        long interval = time - lastTs;
43        lastTs = time;
44  
45        long ds = dataSize;
46        long dataRate = 1000 * (ds - lastDataSize) / interval; // bytes/sec
47        // refers only to data field, not including http or chukwa headers
48        lastDataSize = ds;
49  
50        System.out.println("stat=datacollection.writer.ConsoleWriter|dataRate="
51            + dataRate);
52      }
53    };
54  
55    public ConsoleWriter() {
56      this(true);
57    }
58  
59    public ConsoleWriter(boolean printData) {
60      this.printData = printData;
61      statTimer = new Timer();
62    }
63  
64    public void close() {
65      statTimer.cancel();
66    }
67  
68    public void init(Configuration conf) throws WriterException {
69      System.out.println("----  DUMMY HDFS WRITER IN USE ---");
70  
71      statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
72    }
73  
74    public void add(Chunk data) throws WriterException {
75      int startOffset = 0;
76  
77      dataSize += data.getData().length;
78      if (printData) {
79        System.out.println(data.getData().length + " bytes of data in chunk");
80  
81        for (int offset : data.getRecordOffsets()) {
82          System.out.print(data.getStreamName());
83          System.out.print(" ");
84          System.out.print(data.getSource());
85          System.out.print(" ");
86          System.out.print(data.getDataType());
87          System.out.print(") ");
88          System.out.print(new String(data.getData(), startOffset, offset
89              - startOffset + 1, Charset.forName("UTF-8")));
90          startOffset = offset + 1;
91        }
92      }
93    }
94  
95    @Override
96    public CommitStatus add(List<Chunk> chunks) throws WriterException {
97      for (Chunk chunk : chunks) {
98        add(chunk);
99      }
100     return COMMIT_OK;
101   }
102 
103 }