This project has retired. For details please refer to its Attic page.
PipelineConnector xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.connector;
20  
21  /**
22   * This class is responsible for setting up connections with configured
23   * storage writers base on configuration of chukwa_agent.xml.
24   * 
25   * On error, tries the list of available storage writers, pauses for a minute, 
26   * and then repeats.
27   *
28   */
29  
30  import java.util.ArrayList;
31  import java.util.List;
32  import java.util.Timer;
33  import java.util.TimerTask;
34  
35  import org.apache.hadoop.chukwa.Chunk;
36  import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
37  import org.apache.hadoop.chukwa.datacollection.DataFactory;
38  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
39  import org.apache.hadoop.chukwa.datacollection.connector.Connector;
40  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
41  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
42  import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
43  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
44  import org.apache.hadoop.chukwa.util.DaemonWatcher;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.log4j.Logger;
47  
48  public class PipelineConnector implements Connector, Runnable {
49  
50    static Logger log = Logger.getLogger(PipelineConnector.class);
51  
52    Timer statTimer = null;
53    volatile int chunkCount = 0;
54    
55    int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
56    int MIN_POST_INTERVAL = 5 * 1000;
57    public static final String MIN_POST_INTERVAL_OPT = "pipelineConnector.minPostInterval";
58    public static final String MAX_SIZE_PER_POST_OPT = "pipelineConnector.maxPostSize";
59    public static final String ASYNC_ACKS_OPT = "pipelineConnector.asyncAcks";
60  
61    ChunkQueue chunkQueue;
62  
63    private static volatile ChukwaAgent agent = null;
64  
65    private volatile boolean stopMe = false;
66    protected ChukwaWriter writers = null;
67  
68    public PipelineConnector() {
69      //instance initializer block
70      statTimer = new Timer();
71      statTimer.schedule(new TimerTask() {
72        public void run() {
73          int count = chunkCount;
74          chunkCount = 0;
75          log.info("# Data chunks sent since last report: " + count);
76        }
77      }, 100, 60 * 1000);
78    }
79    
80    public void start() {
81      chunkQueue = DataFactory.getInstance().getEventQueue();
82      agent = ChukwaAgent.getAgent();
83      Configuration conf = agent.getConfiguration();
84      MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
85      MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
86      try {
87        writers = new PipelineStageWriter(conf);
88        (new Thread(this, "Pipeline connector thread")).start();
89      } catch(Exception e) {
90        log.error("Pipeline initialization error: ", e);
91      }
92    }
93  
94    public void shutdown() {
95      stopMe = true;
96      try {
97        writers.close();
98      } catch (WriterException e) {
99        log.warn("Shutdown error: ",e);
100     }
101   }
102 
103   public void run() {
104     log.info("PipelineConnector started at time:" + System.currentTimeMillis());
105 
106     try {
107       long lastPost = System.currentTimeMillis();
108       while (!stopMe) {
109         List<Chunk> newQueue = new ArrayList<Chunk>();
110         try {
111           // get all ready chunks from the chunkQueue to be sent
112           chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
113         } catch (InterruptedException e) {
114           log.warn("thread interrupted during addChunks(ChunkQueue)");
115           Thread.currentThread().interrupt();
116           break;
117         }
118         CommitStatus result = writers.add(newQueue);
119         if(result.equals(ChukwaWriter.COMMIT_OK)) {
120           chunkCount = newQueue.size();
121           for (Chunk c : newQueue) {
122             agent.reportCommit(c.getInitiator(), c.getSeqID());
123           }          
124         }
125         long now = System.currentTimeMillis();
126         long delta = MIN_POST_INTERVAL - now + lastPost;
127         if(delta > 0) {
128           Thread.sleep(delta); // wait for stuff to accumulate
129         }
130         lastPost = now;
131       } // end of try forever loop
132       log.info("received stop() command so exiting run() loop to shutdown connector");
133     } catch (WriterException e) {
134       log.warn("PipelineStageWriter Exception: ", e);
135     } catch (OutOfMemoryError e) {
136       log.warn("Bailing out", e);
137       DaemonWatcher.bailout(-1);
138     } catch (InterruptedException e) {
139       // do nothing, let thread die.
140       log.warn("Bailing out", e);
141       DaemonWatcher.bailout(-1);
142     } catch (Throwable e) {
143       log.error("connector failed; shutting down agent: ", e);
144       throw new RuntimeException("Shutdown pipeline connector.");
145     }
146   }
147 
148   @Override
149   public void reloadConfiguration() {
150   }
151   
152 }