1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.hadoop.chukwa.extraction.demux;
202122import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
23import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
24import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
2526import org.apache.hadoop.mapred.JobConf;
27import org.apache.hadoop.mapred.Partitioner;
28import org.apache.log4j.Logger;
2930publicclass ChukwaRecordPartitioner<K, V> implements31 Partitioner<ChukwaRecordKey, ChukwaRecord> {
32static Logger log = Logger.getLogger(ChukwaRecordPartitioner.class);
3334publicvoid configure(JobConf arg0) {
35 }
3637publicint getPartition(
38 org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey key,
39 org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord record,
40int numReduceTasks) {
41if (log.isDebugEnabled()) {
4243 log
44 .debug("Partitioner key: ["45 + key.getReduceType()
46 + "] - Reducer:"47 + ((key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks));
48 }
49 String hashkey = key.getReduceType();
50if(key.getKey().startsWith(CHUKWA_CONSTANT.INCLUDE_KEY_IN_PARTITIONER)) hashkey = key.getReduceType()+"#"+key.getKey();
51return (hashkey.hashCode() & Integer.MAX_VALUE)
52 % numReduceTasks;
53 }
5455 }