This project has retired. For details please refer to its
Attic page.
QueueToWriterConnector xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.tools.backfilling;
20
21 import java.util.LinkedList;
22 import java.util.List;
23
24 import org.apache.hadoop.chukwa.Chunk;
25 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
26 import org.apache.hadoop.chukwa.datacollection.DataFactory;
27 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
28 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
29 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
30 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
31 import org.apache.hadoop.chukwa.util.DaemonWatcher;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.log4j.Logger;
34
35 public class QueueToWriterConnector implements Connector, Runnable {
36 static Logger log = Logger.getLogger(QueueToWriterConnector.class);
37 static final int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
38
39 protected Configuration conf = null;
40 protected volatile boolean isRunning = true;
41 protected ChunkQueue chunkQueue = DataFactory.getInstance().getEventQueue();
42 protected ChukwaWriter writer = null;
43 protected Thread runner = null;
44 protected boolean isBackfilling = false;
45 public QueueToWriterConnector(Configuration conf,boolean isBackfilling) {
46 this.conf = conf;
47 this.isBackfilling = isBackfilling;
48 }
49
50 @Override
51 public void reloadConfiguration() {
52
53 }
54
55 @Override
56 public void shutdown() {
57 isRunning = false;
58
59 log.info("Shutdown in progress ...");
60 while (isAlive()) {
61 try {
62 Thread.sleep(1000);
63 } catch (InterruptedException e) {}
64 }
65
66 try {
67 if (writer != null) {
68 writer.close();
69 }
70 } catch(Exception e) {
71 log.warn("Exception while closing writer: ", e);
72 }
73 log.info("Shutdown done.");
74 }
75
76 @Override
77 public void start() {
78 log.info("Starting QueueToWriterConnector thread");
79 runner = new Thread(this, "QueueToWriterConnectorThread");
80 runner.start();
81 }
82
83 protected boolean isAlive() {
84 return this.runner.isAlive();
85 }
86
87 @Override
88 public void run() {
89
90 log.info("initializing QueueToWriterConnector");
91 try {
92 String writerClassName = conf.get("chukwaCollector.writerClass",
93 SeqFileWriter.class.getCanonicalName());
94 Class<?> writerClass = Class.forName(writerClassName);
95 if (writerClass != null
96 && ChukwaWriter.class.isAssignableFrom(writerClass)) {
97 writer = (ChukwaWriter) writerClass.newInstance();
98 } else {
99 throw new RuntimeException("Wrong class type");
100 }
101 writer.init(conf);
102
103 } catch (Throwable e) {
104 log.warn("failed to use user-chosen writer class, Bail out!", e);
105 DaemonWatcher.bailout(-1);
106 }
107
108
109 List<Chunk> chunks = new LinkedList<Chunk>();
110 ChukwaAgent agent = null;
111
112 log.info("processing data for QueueToWriterConnector");
113
114 while ( isRunning || chunkQueue.size() != 0 || chunks.size() != 0) {
115 try {
116 if (chunks.size() == 0) {
117
118 if (isBackfilling && chunkQueue.size() == 0) {
119 Thread.sleep(300);
120 continue;
121 }
122 chunkQueue.collect(chunks, MAX_SIZE_PER_POST);
123 log.info("Got " + chunks.size() + " chunks back from the queue");
124 }
125
126 writer.add(chunks);
127
128 if (agent != null) {
129 for(Chunk chunk: chunks) {
130 agent.reportCommit(chunk.getInitiator(), chunk.getSeqID());
131 }
132 }
133
134 chunks.clear();
135
136 }
137 catch (Throwable e) {
138 log.warn("Could not save some chunks");
139 e.printStackTrace();
140 try {
141 Thread.sleep(5000);
142 } catch (InterruptedException e1) {}
143 }
144 }
145 }
146
147 }