1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.hadoop.chukwa.dataloader;
1920import java.io.IOException;
21import java.util.concurrent.CompletionService;
22import java.util.concurrent.ExecutorCompletionService;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.Executors;
25import java.util.concurrent.TimeUnit;
2627import org.apache.commons.logging.Log;
28import org.apache.commons.logging.LogFactory;
29import org.apache.hadoop.fs.FileStatus;
30import org.apache.hadoop.fs.FileSystem;
31import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
32import org.apache.hadoop.chukwa.util.ExceptionUtil;
3334publicclassMetricDataLoaderPoolextendsDataLoaderFactory {
35privatestatic Log log = LogFactory.getLog(MetricDataLoaderPool.class);
3637protectedMetricDataLoader threads[] = null;
38privatestatic String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
39privateint size = 1;
40privatestatic CompletionService completion = null;
41privatestatic ExecutorService executor = null;
4243publicMetricDataLoaderPool() {
44 }
4546publicvoid load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] fileList) throws IOException {
4748if(executor==null) {
49try {
50this.size = Integer.parseInt(conf.get(DATA_LOADER_THREAD_LIMIT));
51 } catch(Exception e) {
52this.size = 1;
53 }
54 executor = Executors.newFixedThreadPool(size);
55 }
56if(completion==null) {
57 completion = new ExecutorCompletionService(executor);
58 }
59try {
60for(int i=0;i<fileList.length;i++) {
61 String filename = fileList[i].getPath().toUri().toString();
62 log.info("Processing: "+filename);
63 completion.submit(newMetricDataLoader(conf, fs, filename));
64 }
65for(int i=0;i<fileList.length;i++) {
66 completion.take().get();
67 }
68 } catch(Exception e) {
69 log.error(ExceptionUtil.getStackTrace(e));
70thrownew IOException();
71 }
72 }
7374publicvoid shutdown() throws InterruptedException {
75 executor.shutdown();
76 executor.awaitTermination(30, TimeUnit.SECONDS);
77 executor.shutdownNow();
78 }
79 }