This project has retired. For details please refer to its Attic page.
ConsoleWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer;
20  
21  
22  import java.util.List;
23  import java.util.Timer;
24  import java.util.TimerTask;
25  import org.apache.hadoop.chukwa.Chunk;
26  import org.apache.hadoop.conf.Configuration;
27  
28  public class ConsoleWriter implements ChukwaWriter {
29  
30    boolean printData;
31    volatile long dataSize = 0;
32    final Timer statTimer;
33  
34    private class StatReportingTask extends TimerTask {
35      private long lastTs = System.currentTimeMillis();
36      private long lastDataSize = 0;
37  
38      public void run() {
39        long time = System.currentTimeMillis();
40        long interval = time - lastTs;
41        lastTs = time;
42  
43        long ds = dataSize;
44        long dataRate = 1000 * (ds - lastDataSize) / interval; // bytes/sec
45        // refers only to data field, not including http or chukwa headers
46        lastDataSize = ds;
47  
48        System.out.println("stat=datacollection.writer.ConsoleWriter|dataRate="
49            + dataRate);
50      }
51    };
52  
53    public ConsoleWriter() {
54      this(true);
55    }
56  
57    public ConsoleWriter(boolean printData) {
58      this.printData = printData;
59      statTimer = new Timer();
60    }
61  
62    public void close() {
63      statTimer.cancel();
64    }
65  
66    public void init(Configuration conf) throws WriterException {
67      System.out.println("----  DUMMY HDFS WRITER IN USE ---");
68  
69      statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
70    }
71  
72    public void add(Chunk data) throws WriterException {
73      int startOffset = 0;
74  
75      dataSize += data.getData().length;
76      if (printData) {
77        System.out.println(data.getData().length + " bytes of data in chunk");
78  
79        for (int offset : data.getRecordOffsets()) {
80          System.out.print(data.getStreamName());
81          System.out.print(" ");
82          System.out.print(data.getSource());
83          System.out.print(" ");
84          System.out.print(data.getDataType());
85          System.out.print(") ");
86          System.out.print(new String(data.getData(), startOffset, offset
87              - startOffset + 1));
88          startOffset = offset + 1;
89        }
90      }
91    }
92  
93    @Override
94    public CommitStatus add(List<Chunk> chunks) throws WriterException {
95      for (Chunk chunk : chunks) {
96        add(chunk);
97      }
98      return COMMIT_OK;
99    }
100 
101 }