This project has retired. For details please refer to its Attic page.
MetricDataLoaderPool xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.dataloader;
19  
20  import java.io.IOException;
21  import java.util.concurrent.CompletionService;
22  import java.util.concurrent.ExecutorCompletionService;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.fs.FileStatus;
30  import org.apache.hadoop.fs.FileSystem;
31  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
32  import org.apache.hadoop.chukwa.util.ExceptionUtil;
33  
34  public class MetricDataLoaderPool extends DataLoaderFactory {
35    private static Log log = LogFactory.getLog(MetricDataLoaderPool.class);
36  
37    protected MetricDataLoader threads[] = null;
38    private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
39    private int size = 1;
40    private static CompletionService completion = null;
41    private static ExecutorService executor = null;
42    
43    public MetricDataLoaderPool() {
44    }
45    
46    public void load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] fileList) throws IOException {
47  
48      if(executor==null) {
49        try {
50          this.size = Integer.parseInt(conf.get(DATA_LOADER_THREAD_LIMIT));
51        } catch(Exception e) {
52          this.size = 1;
53        }
54        executor = Executors.newFixedThreadPool(size);
55      }
56      if(completion==null) {
57        completion = new ExecutorCompletionService(executor);
58      }
59      try {
60        for(int i=0;i<fileList.length;i++) {
61          String filename = fileList[i].getPath().toUri().toString();
62          log.info("Processing: "+filename);
63          completion.submit(new MetricDataLoader(conf, fs, filename));      
64        }
65        for(int i=0;i<fileList.length;i++) {
66          completion.take().get();
67        }
68      } catch(Exception e) {
69        log.error(ExceptionUtil.getStackTrace(e));
70        throw new IOException();
71      }
72    }
73  
74    public void shutdown() throws InterruptedException {
75      executor.shutdown();
76      executor.awaitTermination(30, TimeUnit.SECONDS);
77      executor.shutdownNow();
78    }
79  }