This project has retired. For details please refer to its
Attic page.
ConsoleWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer;
20
21
22 import java.nio.charset.Charset;
23 import java.util.List;
24 import java.util.Timer;
25 import java.util.TimerTask;
26
27 import org.apache.hadoop.chukwa.Chunk;
28 import org.apache.hadoop.conf.Configuration;
29
30 public class ConsoleWriter implements ChukwaWriter {
31
32 boolean printData;
33 volatile long dataSize = 0;
34 final Timer statTimer;
35
36 private class StatReportingTask extends TimerTask {
37 private long lastTs = System.currentTimeMillis();
38 private long lastDataSize = 0;
39
40 public void run() {
41 long time = System.currentTimeMillis();
42 long interval = time - lastTs;
43 lastTs = time;
44
45 long ds = dataSize;
46 long dataRate = 1000 * (ds - lastDataSize) / interval;
47
48 lastDataSize = ds;
49
50 System.out.println("stat=datacollection.writer.ConsoleWriter|dataRate="
51 + dataRate);
52 }
53 };
54
55 public ConsoleWriter() {
56 this(true);
57 }
58
59 public ConsoleWriter(boolean printData) {
60 this.printData = printData;
61 statTimer = new Timer();
62 }
63
64 public void close() {
65 statTimer.cancel();
66 }
67
68 public void init(Configuration conf) throws WriterException {
69 System.out.println("---- DUMMY HDFS WRITER IN USE ---");
70
71 statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
72 }
73
74 public void add(Chunk data) throws WriterException {
75 int startOffset = 0;
76
77 dataSize += data.getData().length;
78 if (printData) {
79 System.out.println(data.getData().length + " bytes of data in chunk");
80
81 for (int offset : data.getRecordOffsets()) {
82 System.out.print(data.getStreamName());
83 System.out.print(" ");
84 System.out.print(data.getSource());
85 System.out.print(" ");
86 System.out.print(data.getDataType());
87 System.out.print(") ");
88 System.out.print(new String(data.getData(), startOffset, offset
89 - startOffset + 1, Charset.forName("UTF-8")));
90 startOffset = offset + 1;
91 }
92 }
93 }
94
95 @Override
96 public CommitStatus add(List<Chunk> chunks) throws WriterException {
97 for (Chunk chunk : chunks) {
98 add(chunk);
99 }
100 return COMMIT_OK;
101 }
102
103 }