This project has retired. For details please refer to its
Attic page.
PipelineConnector xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.connector;
20
21
22
23
24
25
26
27
28
29
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.Timer;
33 import java.util.TimerTask;
34
35 import org.apache.hadoop.chukwa.Chunk;
36 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
37 import org.apache.hadoop.chukwa.datacollection.DataFactory;
38 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
39 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
40 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
41 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
42 import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
43 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.log4j.Logger;
46
47 public class PipelineConnector implements Connector, Runnable {
48
49 static Logger log = Logger.getLogger(PipelineConnector.class);
50
51 Timer statTimer = null;
52 volatile int chunkCount = 0;
53
54 int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
55 int MIN_POST_INTERVAL = 5 * 1000;
56 public static final String MIN_POST_INTERVAL_OPT = "pipelineConnector.minPostInterval";
57 public static final String MAX_SIZE_PER_POST_OPT = "pipelineConnector.maxPostSize";
58 public static final String ASYNC_ACKS_OPT = "pipelineConnector.asyncAcks";
59
60 ChunkQueue chunkQueue;
61
62 private ChukwaAgent agent = null;
63
64 private volatile boolean stopMe = false;
65 protected ChukwaWriter writers = null;
66
67 public PipelineConnector() {
68
69 statTimer = new Timer();
70 statTimer.schedule(new TimerTask() {
71 public void run() {
72 int count = chunkCount;
73 chunkCount = 0;
74 log.info("# Data chunks sent since last report: " + count);
75 }
76 }, 100, 60 * 1000);
77 }
78
79 public void start() {
80 chunkQueue = DataFactory.getInstance().getEventQueue();
81 agent = ChukwaAgent.getAgent();
82 Configuration conf = agent.getConfiguration();
83 MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
84 MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
85 try {
86 writers = new PipelineStageWriter(conf);
87 (new Thread(this, "Pipeline connector thread")).start();
88 } catch(Exception e) {
89 log.error("Pipeline initialization error: ", e);
90 }
91 }
92
93 public void shutdown() {
94 stopMe = true;
95 try {
96 writers.close();
97 } catch (WriterException e) {
98 log.warn("Shutdown error: ",e);
99 }
100 }
101
102 public void run() {
103 log.info("PipelineConnector started at time:" + System.currentTimeMillis());
104
105 try {
106 long lastPost = System.currentTimeMillis();
107 while (!stopMe) {
108 List<Chunk> newQueue = new ArrayList<Chunk>();
109 try {
110
111 chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
112 } catch (InterruptedException e) {
113 log.warn("thread interrupted during addChunks(ChunkQueue)");
114 Thread.currentThread().interrupt();
115 break;
116 }
117 CommitStatus result = writers.add(newQueue);
118 if(result.equals(ChukwaWriter.COMMIT_OK)) {
119 chunkCount = newQueue.size();
120 for (Chunk c : newQueue) {
121 agent.reportCommit(c.getInitiator(), c.getSeqID());
122 }
123 }
124 long now = System.currentTimeMillis();
125 long delta = MIN_POST_INTERVAL - now + lastPost;
126 if(delta > 0) {
127 Thread.sleep(delta);
128 }
129 lastPost = now;
130 }
131 log.info("received stop() command so exiting run() loop to shutdown connector");
132 } catch (WriterException e) {
133 log.warn("PipelineStageWriter Exception: ", e);
134 } catch (OutOfMemoryError e) {
135 log.warn("Bailing out", e);
136 throw new RuntimeException("Shutdown pipeline connector.");
137 } catch (InterruptedException e) {
138
139 log.warn("Bailing out", e);
140 throw new RuntimeException("Shutdown pipeline connector.");
141 } catch (Throwable e) {
142 log.error("connector failed; shutting down agent: ", e);
143 throw new RuntimeException("Shutdown pipeline connector.");
144 }
145 }
146
147 @Override
148 public void reloadConfiguration() {
149 }
150
151 }