This project has retired. For details please refer to its
Attic page.
ConsoleWriter xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer;
20
21
22 import java.util.List;
23 import java.util.Timer;
24 import java.util.TimerTask;
25 import org.apache.hadoop.chukwa.Chunk;
26 import org.apache.hadoop.conf.Configuration;
27
28 public class ConsoleWriter implements ChukwaWriter {
29
30 boolean printData;
31 volatile long dataSize = 0;
32 final Timer statTimer;
33
34 private class StatReportingTask extends TimerTask {
35 private long lastTs = System.currentTimeMillis();
36 private long lastDataSize = 0;
37
38 public void run() {
39 long time = System.currentTimeMillis();
40 long interval = time - lastTs;
41 lastTs = time;
42
43 long ds = dataSize;
44 long dataRate = 1000 * (ds - lastDataSize) / interval;
45
46 lastDataSize = ds;
47
48 System.out.println("stat=datacollection.writer.ConsoleWriter|dataRate="
49 + dataRate);
50 }
51 };
52
53 public ConsoleWriter() {
54 this(true);
55 }
56
57 public ConsoleWriter(boolean printData) {
58 this.printData = printData;
59 statTimer = new Timer();
60 }
61
62 public void close() {
63 statTimer.cancel();
64 }
65
66 public void init(Configuration conf) throws WriterException {
67 System.out.println("---- DUMMY HDFS WRITER IN USE ---");
68
69 statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
70 }
71
72 public void add(Chunk data) throws WriterException {
73 int startOffset = 0;
74
75 dataSize += data.getData().length;
76 if (printData) {
77 System.out.println(data.getData().length + " bytes of data in chunk");
78
79 for (int offset : data.getRecordOffsets()) {
80 System.out.print(data.getStreamName());
81 System.out.print(" ");
82 System.out.print(data.getSource());
83 System.out.print(" ");
84 System.out.print(data.getDataType());
85 System.out.print(") ");
86 System.out.print(new String(data.getData(), startOffset, offset
87 - startOffset + 1));
88 startOffset = offset + 1;
89 }
90 }
91 }
92
93 @Override
94 public CommitStatus add(List<Chunk> chunks) throws WriterException {
95 for (Chunk chunk : chunks) {
96 add(chunk);
97 }
98 return COMMIT_OK;
99 }
100
101 }