This project has retired. For details please refer to its Attic page.
FSMIntermedEntry xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.analysis.salsa.fsm;
20  
21  import java.io.IOException;
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.util.Iterator;
25  import java.util.TreeMap;
26  import java.util.Set;
27  
28  import org.apache.hadoop.io.WritableComparable;
29  
30  /*
31   * FSM Intermediate State Entry
32   * 
33   * Each state corresponds to two of these entries:
34   * One corresponding to the start of the state, one corresponding to the end of the state
35   *
36   * Intermediate data-structure passed from Maps to Reduces
37   *
38   */
39  public class FSMIntermedEntry 
40  	implements Cloneable, WritableComparable 
41  {
42  	private final char DELIM = 1;
43  	
44  	/* Begin fields */
45  	public StateType state_type;
46  	public MapRedState state_mapred;
47  	public HDFSState state_hdfs;
48  	public FSMType fsm_type;
49  	
50  	public String state_name;
51  	public String identifier;
52  	public String unique_id; // state name + unique identifier 
53  														// (state-dependent)
54  														// this id should also correspond 
55  														// to the k2 value between
56  														// mappers and reducers
57  														
58  	public String timestamp;
59  	public String time_start;
60  	public String time_end;
61  	
62  	public String host_exec;
63  	public String host_other; // for instance, source host for shuffle, 
64  														// src/dest host for dfs read/write
65  	
66  	// These values filled in by splitting the original 
67  	// ChukwaRecordKey from Demux
68  	public String time_orig_epoch;
69  	public String time_orig;
70  	public String job_id; // we get this for free from the CRK
71  	
72  	TreeMap<String,String> add_info; // additional information 
73  																	 // e.g. locality information
74  																	
75  	/* End of fields */
76  	
77  	public FSMIntermedEntry() {
78  		this.state_mapred = new MapRedState(MapRedState.NONE);
79  		this.state_hdfs = new HDFSState(HDFSState.NONE);
80  		this.state_type = new StateType(StateType.STATE_NOOP);			
81  		this.add_info = new TreeMap<String, String>();
82  		this.host_other = new String("");
83  		this.job_id = new String("");
84  		this.time_orig_epoch = new String("");
85  		this.time_orig = new String("");
86  	}
87  	
88  	public String getUniqueID()
89  	{
90  		return new String(this.unique_id);
91  	}
92  	
93  	public String getFriendlyID()
94  	{
95  		return new String(this.identifier);
96  	}
97  	
98  	/**
99  	 * Set state_type and identifier before calling
100 	 */
101 	public void generateUniqueID()
102 	{
103 		if (this.fsm_type.val == FSMType.MAPREDUCE_FSM || 
104 			  this.fsm_type.val == FSMType.MAPREDUCE_FSM_INCOMPLETE) 
105 		{
106 			this.state_name = new String(this.state_mapred.toString());
107 		} else if (this.fsm_type.val == FSMType.MAPREDUCE_FSM || 
108 							 this.fsm_type.val == FSMType.MAPREDUCE_FSM_INCOMPLETE) 
109 		{
110 			this.state_name = new String(this.state_hdfs.toString());
111 		}
112 		this.unique_id = new String(this.state_name + "@" + this.identifier);
113 	}	
114 	
115 	public void write(DataOutput out) throws IOException {
116 		Set<String> mapKeys;
117 		
118 		out.writeInt(this.state_type.val);
119 		out.writeInt(this.state_mapred.val);
120 		out.writeInt(this.state_hdfs.val);
121 		out.writeInt(this.fsm_type.val);
122 		out.writeChar(DELIM);
123 		out.writeInt(state_name.length());
124 		if (state_name.length() > 0) out.writeUTF(state_name);
125 		out.writeInt(unique_id.length());
126 		if (unique_id.length() > 0) out.writeUTF(unique_id);
127 		out.writeInt(timestamp.length());
128 		if (timestamp.length() > 0) out.writeUTF(timestamp);
129 		out.writeInt(time_start.length());
130 		if (time_start.length() > 0) out.writeUTF(time_start);
131 		out.writeInt(time_end.length());
132 		if (time_end.length() > 0) out.writeUTF(time_end);
133 		out.writeInt(host_exec.length());
134 		if (host_exec.length() > 0) out.writeUTF(host_exec);
135 		out.writeInt(host_other.length());
136 		if (host_other.length() > 0) out.writeUTF(host_other);
137 		out.writeInt(time_orig_epoch.length());
138 		if (time_orig_epoch.length() > 0) out.writeUTF(time_orig_epoch);
139 		out.writeInt(time_orig.length());
140 		if (time_orig.length() > 0) out.writeUTF(time_orig);
141 		out.writeInt(job_id.length());
142 		if (job_id.length() > 0) out.writeUTF(job_id);
143 		out.writeInt(identifier.length());
144 		if (identifier.length() > 0) out.writeUTF(identifier);
145 					
146 		mapKeys = this.add_info.keySet();
147 		out.writeInt(mapKeys.size());
148 		
149 		Iterator<String> keyIter = mapKeys.iterator(); 
150 		
151 		for (int i = 0; i < mapKeys.size(); i++) {
152 			assert(keyIter.hasNext());
153 			String currKey = keyIter.next();
154 			if (currKey != null) {
155 				String currvalue = this.add_info.get(currKey);
156 				out.writeUTF(currKey);
157 				out.writeInt(currvalue.length());
158 				if (currvalue.length() > 0) {
159 					out.writeUTF(currvalue);
160 				} 
161 			} else {
162 				out.writeUTF(new String("NULL"));
163 				out.writeInt(0);
164 			}
165 		}
166 	}
167 
168 	public void readFields(DataInput in) throws IOException {
169 		int currlen, numkeys;
170 		
171 		this.state_type = new StateType(in.readInt());
172 		this.state_mapred = new MapRedState(in.readInt());
173 		this.state_hdfs = new HDFSState(in.readInt());
174 		this.fsm_type = new FSMType(in.readInt());
175 		in.readChar();
176 
177 		currlen = in.readInt();
178 		if (currlen > 0) this.state_name = in.readUTF();
179 		else this.state_name = new String("");
180 
181 		currlen = in.readInt();
182 		if (currlen > 0) this.unique_id = in.readUTF();
183 		else this.unique_id = new String("");
184 
185 		currlen = in.readInt();
186 		if (currlen > 0) this.timestamp = in.readUTF();
187 		else this.timestamp = new String("");
188 
189 		currlen = in.readInt();
190 		if (currlen > 0) this.time_start = in.readUTF();
191 		else this.time_start = new String("");
192 
193 		currlen = in.readInt();
194 		if (currlen > 0) this.time_end = in.readUTF();
195 		else this.time_end = new String("");
196 
197 		currlen = in.readInt();
198 		if (currlen > 0) this.host_exec = in.readUTF();
199 		else this.host_exec = new String("");
200 
201 		currlen = in.readInt();
202 		if (currlen > 0) this.host_other = in.readUTF();
203 		else this.host_other = new String("");
204 
205 		currlen = in.readInt();
206 		if (currlen > 0) this.time_orig_epoch = in.readUTF();
207 		else this.time_orig_epoch = new String("");
208 
209 		currlen = in.readInt();
210 		if (currlen > 0) this.time_orig = in.readUTF();
211 		else this.time_orig = new String("");
212 
213 		currlen = in.readInt();
214 		if (currlen > 0) this.job_id = in.readUTF();
215 		else this.job_id = new String("");
216 			
217 		currlen = in.readInt();
218 		if (currlen > 0) this.identifier = in.readUTF();
219 		else this.identifier = new String("");			
220 					
221 		numkeys = in.readInt();
222 
223 		this.add_info = new TreeMap<String, String>();
224 		
225 		if (numkeys > 0) {
226 			for (int i = 0; i < numkeys; i++) {
227 				String currkey, currval;
228 				currkey = in.readUTF();
229 				currlen = in.readInt();
230 				if (currlen > 0) {
231 					currval = in.readUTF();
232 					this.add_info.put(currkey, currval);
233 				}
234 			}
235 		}
236 	}
237 	 
238 	
239 	public boolean equals (Object o) {
240 		FSMIntermedEntry other = (FSMIntermedEntry) o;
241 		return this.unique_id.equals(other.unique_id);
242 	}
243 	
244 	public int compareTo (Object o) {
245 		FSMIntermedEntry other = (FSMIntermedEntry) o;
246 		return this.unique_id.compareTo(other.unique_id);
247 	}
248 	
249 	/*
250 	 * This method is to support convenient creating of new copies
251 	 * of states for Reduce to create sub-states ReduceShuffle, ReduceSort, and ReduceReducer
252 	 */
253 	public FSMIntermedEntry clone() throws CloneNotSupportedException {
254 		FSMIntermedEntry newObj = (FSMIntermedEntry) super.clone();
255 		Set<String> mapKeys;
256 
257 		newObj.state_type = new StateType(this.state_type.val);
258 		newObj.state_mapred = new MapRedState(this.state_mapred.val);
259 		newObj.state_hdfs = new HDFSState(this.state_hdfs.val);
260 		newObj.fsm_type = new FSMType(this.fsm_type.val);
261 
262 		/* Deep copy all strings */
263 		newObj.state_name = new String(this.state_name);
264 		newObj.unique_id = new String(this.unique_id);
265 		newObj.timestamp = new String(this.timestamp);
266 		newObj.time_start = new String(this.time_start);
267 		newObj.time_end = new String(this.time_end);
268 		
269 		newObj.time_orig_epoch = new String(this.time_orig_epoch);
270 		newObj.time_orig = new String(this.time_orig);
271 		newObj.job_id = new String(this.job_id);
272 		
273 		
274 		/* Deep copy of TreeMap */
275 		newObj.add_info = new TreeMap<String,String>();
276 		mapKeys = this.add_info.keySet();
277 		Iterator<String> keyIter = mapKeys.iterator();
278 		String currKey = null;
279 		
280 		for (int i = 0; i < mapKeys.size(); i++) {
281 			assert(keyIter.hasNext());
282 			currKey = keyIter.next();
283 			if (currKey != null) {
284 				newObj.add_info.put(currKey, this.add_info.get(currKey));
285 			}
286 		}
287 		
288 		return newObj;
289 	}
290 	
291 	public String toString() {
292 		return new String(this.state_name + "@" + this.unique_id);
293 	}
294 	
295 }