This project has retired. For details please refer to its Attic page.
PipelineConnector xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.connector;
20  
21  /**
22   * This class is responsible for setting up connections with configured
23   * storage writers base on configuration of chukwa_agent.xml.
24   * 
25   * On error, tries the list of available storage writers, pauses for a minute, 
26   * and then repeats.
27   *
28   */
29  
30  import java.util.ArrayList;
31  import java.util.List;
32  import java.util.Timer;
33  import java.util.TimerTask;
34  
35  import org.apache.hadoop.chukwa.Chunk;
36  import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
37  import org.apache.hadoop.chukwa.datacollection.DataFactory;
38  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
39  import org.apache.hadoop.chukwa.datacollection.connector.Connector;
40  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
41  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
42  import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
43  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.log4j.Logger;
46  
47  public class PipelineConnector implements Connector, Runnable {
48  
49    static Logger log = Logger.getLogger(PipelineConnector.class);
50  
51    Timer statTimer = null;
52    volatile int chunkCount = 0;
53    
54    int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
55    int MIN_POST_INTERVAL = 5 * 1000;
56    public static final String MIN_POST_INTERVAL_OPT = "pipelineConnector.minPostInterval";
57    public static final String MAX_SIZE_PER_POST_OPT = "pipelineConnector.maxPostSize";
58    public static final String ASYNC_ACKS_OPT = "pipelineConnector.asyncAcks";
59  
60    ChunkQueue chunkQueue;
61  
62    private ChukwaAgent agent = null;
63  
64    private volatile boolean stopMe = false;
65    protected ChukwaWriter writers = null;
66  
67    public PipelineConnector() {
68      //instance initializer block
69      statTimer = new Timer();
70      statTimer.schedule(new TimerTask() {
71        public void run() {
72          int count = chunkCount;
73          chunkCount = 0;
74          log.info("# Data chunks sent since last report: " + count);
75        }
76      }, 100, 60 * 1000);
77    }
78    
79    public void start() {
80      chunkQueue = DataFactory.getInstance().getEventQueue();
81      agent = ChukwaAgent.getAgent();
82      Configuration conf = agent.getConfiguration();
83      MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
84      MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
85      try {
86        writers = new PipelineStageWriter(conf);
87        (new Thread(this, "Pipeline connector thread")).start();
88      } catch(Exception e) {
89        log.error("Pipeline initialization error: ", e);
90      }
91    }
92  
93    public void shutdown() {
94      stopMe = true;
95      try {
96        writers.close();
97      } catch (WriterException e) {
98        log.warn("Shutdown error: ",e);
99      }
100   }
101 
102   public void run() {
103     log.info("PipelineConnector started at time:" + System.currentTimeMillis());
104 
105     try {
106       long lastPost = System.currentTimeMillis();
107       while (!stopMe) {
108         List<Chunk> newQueue = new ArrayList<Chunk>();
109         try {
110           // get all ready chunks from the chunkQueue to be sent
111           chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
112           CommitStatus result = writers.add(newQueue);
113           if(result.equals(ChukwaWriter.COMMIT_OK)) {
114             chunkCount = newQueue.size();
115             for (Chunk c : newQueue) {
116               agent.reportCommit(c.getInitiator(), c.getSeqID());
117             }          
118           }
119         } catch (WriterException e) {
120           log.warn("PipelineStageWriter Exception: ", e);
121         } catch (InterruptedException e) {
122           log.warn("thread interrupted during addChunks(ChunkQueue)");
123           Thread.currentThread().interrupt();
124           break;
125         }
126         long now = System.currentTimeMillis();
127         long delta = MIN_POST_INTERVAL - now + lastPost;
128         if(delta > 0) {
129           Thread.sleep(delta); // wait for stuff to accumulate
130         }
131         lastPost = now;
132       } // end of try forever loop
133       log.info("received stop() command so exiting run() loop to shutdown connector");
134     } catch (OutOfMemoryError e) {
135       log.warn("Bailing out", e);
136       throw new RuntimeException("Shutdown pipeline connector.");
137     } catch (InterruptedException e) {
138       // do nothing, let thread die.
139       log.warn("Bailing out", e);
140       throw new RuntimeException("Shutdown pipeline connector.");
141     } catch (Throwable e) {
142       log.error("connector failed; shutting down agent: ", e);
143       throw new RuntimeException("Shutdown pipeline connector.");
144     }
145   }
146 
147   @Override
148   public void reloadConfiguration() {
149   }
150   
151 }