This project has retired. For details please refer to its Attic page.
QueueToWriterConnector xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.tools.backfilling;
20  
21  import java.util.LinkedList;
22  import java.util.List;
23  
24  import org.apache.hadoop.chukwa.Chunk;
25  import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
26  import org.apache.hadoop.chukwa.datacollection.DataFactory;
27  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
28  import org.apache.hadoop.chukwa.datacollection.connector.Connector;
29  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
30  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
31  import org.apache.hadoop.chukwa.util.DaemonWatcher;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.log4j.Logger;
34  
35  public class QueueToWriterConnector implements Connector, Runnable {
36    static Logger log = Logger.getLogger(QueueToWriterConnector.class);
37    static final int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
38  
39    protected Configuration conf = null;
40    protected volatile boolean isRunning = true;
41    protected ChunkQueue chunkQueue = DataFactory.getInstance().getEventQueue();
42    protected ChukwaWriter writer = null;
43    protected Thread runner = null;
44    protected boolean isBackfilling = false;
45    public QueueToWriterConnector(Configuration conf,boolean isBackfilling) {
46      this.conf = conf;
47      this.isBackfilling = isBackfilling;
48    }
49  
50    @Override
51    public void reloadConfiguration() {
52      // do nothing here
53    }
54  
55    @Override
56    public void shutdown() {
57      isRunning = false;
58      
59      log.info("Shutdown in progress ...");
60      while (isAlive()) {
61        try {
62          Thread.sleep(1000);
63        } catch (InterruptedException e) {}
64      }
65     
66      try {
67        if (writer != null) {
68          writer.close();
69        }
70      } catch(Exception e) {
71        log.warn("Exception while closing writer: ", e);
72      }
73      log.info("Shutdown done.");
74    }
75  
76    @Override
77    public void start() {
78      log.info("Starting QueueToWriterConnector thread");
79      runner = new Thread(this, "QueueToWriterConnectorThread");
80      runner.start();
81    }
82  
83    protected boolean isAlive() {
84      return this.runner.isAlive();
85    }
86    
87    @Override
88    public void run() {
89  
90      log.info("initializing QueueToWriterConnector");
91      try {
92        String writerClassName = conf.get("chukwaCollector.writerClass",
93            SeqFileWriter.class.getCanonicalName());
94        Class<?> writerClass = Class.forName(writerClassName);
95        if (writerClass != null
96            && ChukwaWriter.class.isAssignableFrom(writerClass)) {
97          writer = (ChukwaWriter) writerClass.newInstance();
98        } else {
99          throw new RuntimeException("Wrong class type");
100       }
101       writer.init(conf);
102 
103     } catch (Throwable e) {
104       log.warn("failed to use user-chosen writer class, Bail out!", e);
105       DaemonWatcher.bailout(-1);
106     }
107 
108     
109     List<Chunk> chunks = new LinkedList<Chunk>();
110     ChukwaAgent agent = null;// ChukwaAgent.getAgent();
111     
112     log.info("processing data for QueueToWriterConnector");
113     
114     while ( isRunning ||  chunkQueue.size() != 0 || chunks.size() != 0) {
115       try {
116         if (chunks.size() == 0) {
117           
118           if (isBackfilling && chunkQueue.size() == 0) {
119             Thread.sleep(300);
120             continue;
121           }
122           chunkQueue.collect(chunks, MAX_SIZE_PER_POST);
123           log.info("Got " + chunks.size() + " chunks back from the queue");
124         }       
125         
126         writer.add(chunks);
127         
128         if (agent != null) {
129           for(Chunk chunk: chunks) {
130             agent.reportCommit(chunk.getInitiator(), chunk.getSeqID());
131           }
132         }
133         
134         chunks.clear();
135         
136       }
137       catch (Throwable e) {
138         log.warn("Could not save some chunks");
139         e.printStackTrace();
140         try {
141           Thread.sleep(5000);
142         } catch (InterruptedException e1) {}
143       } 
144     }
145   }
146 
147 }