Wednesday, October 29, 2014

InputFormat In Hive And The Way To Customize CombineHiveInputFormat

Part.1 InputFormat In Hive

There are two places where we can specify InputFormat in hive, when creating table and before executing HQL, respectively.

For the first case, we can specify InputFormat and OutputFormat when creating hive table, just like:
CREATE TABLE example_tbl
(
  id int,
  name string
)
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';

We could check out the specified InputFormat and OutputFormat for a table by:
hive> DESC FORMATTED example_tbl;
...
# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed:             No
...

In this case, the InputFormat and OutputFormat is responsible for Storing data in as well as Retrieving data out of HDFS. Thus, it is transparent to hive itself. For instance, some text content is saved in binary format in HDFS, which is mapped to a particular hive table. When we invoking a hive task on this table, it will load the data via its InputFormat so as to get the 'decoded' text content. After executing the HQL, the hive task will write the result to whatever the destination is(HDFS, local file system, screen, etc.) via its OutputFormat. 

For the second case, we could set 'hive.input.format' before invoking a HQL:
hive> set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
hive> select * from example_tbl where id > 10000;

If we set this parameter in hive-site.xml, it will be the default Hive InputFormat provided not setting 'hive.input.format' explicitly before the HQL.

The InputFormat in this scenario serves different function in comparison to the former one. Firstly, let's take a glance at 'org.apache.hadoop.mapred.FileInputFormat', which is the base class for all file-based InputFormat. There are three essential methods in this class:
boolean isSplitable(FileSystem fs, Path filename)
InputSplit[] getSplits(JobConf job, int numSplits)
RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter)

'isSplitable' is self-explaining: it will return whether the given filename is splitable. This method is valid when working around MapReduce program, when it comes to Hive-related one, we could set 'mapreduce.input.fileinputformat.split.minsize' in hive-site.xml to a very big value to achieve the same effect alternatively.

'getSplits' will return an array of InputSplit objects, whose size is corresponding to the number of mappers for this HQL task. Every InputSplit contains one or more file chunks in current file system, the details will be discussed later.

'getRecordReader' will return a 'org.apache.hadoop.mapred.RecordReader' object, whose function is to read data record by record from underlying file system. The main methods are as follows:
K createKey()
V createValue()
boolean next(K key, V value)
float getProgress()

'createKey', 'createValue' and 'getProgress' is well self-explaining. 'next' will evaluate the key and value parameters from current read position provided it returns true; when being at EOF, false is returned.

In the former case as mentioned above, only 'getRecordReader' method will be used; Whereas in the latter case, only 'getSplits' method will be used.

Part.2 Customize CombineHiveInputFormat

In my daily work, there's a need for me to rewrite CombineHiveInputFormat class. Our data in HDFS is partitioned by yyyyMMdd, in each partition, all files are named in pattern 'part-i'(i∈[0,64)):
/user/supertool/hiveTest/20140901/part-0
/user/supertool/hiveTest/20140901/part-1
/user/supertool/hiveTest/20140901/part-2
/user/supertool/hiveTest/20140902/part-0
/user/supertool/hiveTest/20140902/part-1
/user/supertool/hiveTest/20140902/part-2
/user/supertool/hiveTest/20140903/part-0
/user/supertool/hiveTest/20140903/part-1
/user/supertool/hiveTest/20140903/part-2

This experimental hive table is created by:
CREATE EXTERNAL table hive_combine_test
(id string,
rdm string)
PARTITIONED BY (dateid string)
row format delimited fields terminated by '\t'
stored as textfile;

ALTER TABLE hive_combine_test
ADD PARTITION (dateid='20140901')
location '/user/supertool/zhudi/hiveTest/20140901';

ALTER TABLE hive_combine_test
ADD PARTITION (dateid='20140902')
location '/user/supertool/zhudi/hiveTest/20140902';

ALTER TABLE hive_combine_test
ADD PARTITION (dateid='20140903')
location '/user/supertool/zhudi/hiveTest/20140903';

What we intend to do is to package all the files from different partition with the same i into one InputSplit, so as to package them into one mapper. Overall, there should be 64 mappers no matter how many days(partitions) are involved in my HQL.

The way to customize CombineHiveInputFormat in eclipe is as follows:

Step-1:
In eclipse, File-->New-->Other-->Maven Project-->Create a simple project.

Revise pom.xml according to your own hadoop and hive version:
<dependencies>
 <dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-exec</artifactId>
  <version>0.13.1</version>
 </dependency>

 <dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-serde</artifactId>
  <version>0.13.1</version>
 </dependency>

 <dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-common</artifactId>
  <version>0.13.1</version>
 </dependency>

 <dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.2.0</version>
 </dependency>

 <dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>2.2.0</version>
 </dependency>

 <dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.2.0</version>
 </dependency>

 <dependency>
  <groupId>jdk.tools</groupId>
  <artifactId>jdk.tools</artifactId>
  <version>1.7.0_25</version>
  <scope>system</scope>
  <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
 </dependency>
</dependencies>

At the same time, we should insert maven-assembly-plugin in pom.xml in order to package:
<build>
 <plugins>
  <plugin>
   <artifactId>maven-assembly-plugin</artifactId>
   <version>2.4</version>
   <configuration>
    <descriptorRefs>
     <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
   </configuration>
   <executions>
    <execution>
     <id>make-assembly</id> <!-- this is used for inheritance merges -->
     <phase>package</phase> <!-- bind to the packaging phase -->
     <goals>
      <goal>single</goal>
     </goals>
    </execution>
   </executions>
  </plugin>
 </plugins>
</build>

Step-2:
After all the peripheral settings, now we can just create a new class derived from CombineHiveInputFormat. What we intend to do is to reconstruct the array of InputSplit returned from CombineHiveInputFormat.getSplits():
public class JudCombineHiveInputFormatOld<K extends WritableComparable, V extends Writable>
  extends CombineHiveInputFormat<WritableComparable, Writable> {

 @Override
 public InputSplit[] getSplits(JobConf job, int numSplits)
   throws IOException {
  InputSplit[] iss = super.getSplits(job, numSplits);
  //TODO: Reconstruct the iss to what we want.

  return null;
 }

}

Consequently, it is time that we get some knowledge on InputSplit. In CombineHiveInputFormat, the implementation class for InputSplit is CombineHiveInputSplit, which contains a 'org.apache.hadoop.hive.shims.HadoopShimsSecure.InputSplitShim' implementation class. The constructor for 'org.apache.hadoop.hive.shims.HadoopShimsSecure.InputSplitShim' needs a 'org.apache.hadoop.mapred.lib.CombineFileSplit' object, whose constructor is like:
CombineFileSplit(JobConf job, Path[] files, long[] start, long[] lengths, String[] locations)

Apparently, all parameters are corresponding to the InputSplit in MapReduce, standing for JobConf info, file paths info, file start positions, file chunk size, the hive cluster that all the files will be sent to, respectively.

After getting familiar with the structure of InputSplit Class, we can simply rearrange all the files in InputSplit according to the file name pattern.

Just one more thing: CombineHiveInputSplit has a field named 'inputFormatClassName', which is the name of InputFormat configured when creating the hive table(In the former case as stated above). In the process of executing a hive task, files may come from different source with different InputFormat(Some come from hive table's source data, some come from hive temporary data). Thus, InputFormatClassName should be grouped when we rearrange InputSplit.

Here's a code snippet for reconstruction of CombineHiveInputFormat:
Path[] files = new Path[curSplitInfos.size()];
long[] starts = new long[curSplitInfos.size()];
long[] lengths = new long[curSplitInfos.size()];
for(int i = 0; i < curSplitInfos.size(); ++i) {
 SplitInfo si = curSplitInfos.get(i);
 files[i] = si.getFile();
 starts[i] = si.getStart();
 lengths[i] = si.getLength();
}
String[] locations = new String[1];
locations[0] = slice2host.get(sliceid);
org.apache.hadoop.mapred.lib.CombineFileSplit cfs = 
  new org.apache.hadoop.mapred.lib.CombineFileSplit(
    job, 
    files, 
    starts, 
    lengths, 
    locations);
org.apache.hadoop.hive.shims.HadoopShimsSecure.InputSplitShim iqo = 
  new org.apache.hadoop.hive.shims.HadoopShimsSecure.InputSplitShim(cfs);
CombineHiveInputSplit chis = new CombineHiveInputSplit(job, iqo);
chis.setInputFormatClassName(curInputFormatClassName);
After implementing, we can simply issue mvn clean package -Dmaven.test.skip=true, then copy '*jar-with-dependencies*.jar' in project target folder to ($HIVE_HOME/lib in every hive clusters) as well as ($HADOOP_HOME/share/hadoop/common/lib in every hive clusters).

At last, we can set hive.input.format to our own version by 'set hive.input.format=com.judking.hive.inputformat.JudCombineHiveInputFormat;' before invoking a HQL.

If debugging is needed, we can System.out in our InputFormat class, in which way the info will be printed to screen. Alternatively, we can use 'LoggerFactory.getLog()' to retrieve a Log object, the content will output to '/tmp/(current_user)/hive.log'.




© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

3 comments: