Creating Hadoop pe under SGE
Wednesday Jan 16, 2008
Sun Grid Engine is a DRM software with a flexible support for various parallel computing frameworks (Eg. MPI/PVM).
We could configure Grid Engine to provide a hadoop cluster environment where a map-reduce jobs can run. ( Hadoop is a framework for running applications on large clusters built of commodity hardware. )
Here we would try to setup a hadoop pe in SGE to run the jobs using hadoop-streaming.
Prerequisite: Refer hadoop wiki to setup a hadoop setup, just to make sure the setup works.
We create a hadoop pe for Sun Grid Engine that has:
- DFS (HDFS) running on atleast 2 nodes (dfs.replication)
- master and slaves are choosen by grid engine
- master runs the NameNode, SecondaryNameNode,(DFS) JobTracker (MapRed)
- slave runs DataNode (DFS), TaskTracker (MapRed)
- each slave's TaskTracker runs (N/no.of slaves) tasks simultaneously which could be either of N mapper tasks and N reducer tasks on the whole (N is the no. of slots the user requests for on the qsub -pe )
<name>fs.default.name</name>
<value>hdfs://master:54310/</value>
<name>mapred.job.tracker</name>
<value>master:54311</value>
<name>mapred.map.tasks</name>
<value>40</value>
<name>mapred.tasktracker.tasks.maximum</name>
<value>2</value>
<name>hadoop.tmp.dir</name>
<value>$TMP</value>
<name>dfs.replication</name>
<value>2</value>
Apart from this, we need to add master and slaves in conf/masters and conf/slaves respectively, which SGE will help us.
sgetest@master $ cat conf/masters
master
sgetest@master $ cat conf/slaves
slave
Configuring Grid Engine:
Assume that we have a SGE setup,
sgetest@master $ qstat -f
queuename qtype used/tot. load_avg arch states
----------------------------------------------------------------------------
all.q@master BIPC 0/5 0.02 sol-x86
----------------------------------------------------------------------------
all.q@slave BIPC 0/5 0.10 sol-x86
I have configured 5 slots to all.q (hence each queue instance can run 5 tasks simultaneously).
Now we need to create a hadoop pe,
sgetest@master $ qconf -sp hadoop
pe_name hadoop
slots 5
user_lists NONE
xuser_lists NONE
start_proc_args /export/home/sgetest/test/hadoop/hadoop-pe-start.sh
stop_proc_args /export/home/sgetest/test/hadoop/hadoop-pe-stop.sh
allocation_rule $round_robin
control_slaves FALSE
job_is_first_task TRUE
The hadoop-pe-start.sh and hadoop-pe-stop.sh are responsible for start/stop of the hdfs and map-reduce daemons. The hadoop-pe-start.sh and hadoop-pe-stop.sh would look something like:
hadoop-pe-start.sh
NOTE: We depend on jps supplied by jdk to check if all the daemons are started, there could be a better way to acheive this!!
This is required so as to make sure the pe-start exits only after the daemons are up and running, we don't want to start the jobs before the daemons start and start complaining about unavailability of daemons!!!
#!/bin/bash
JPS="$JAVA_HOME/bin/jps"
## we get HADOOP_HOME from jobs env
cd $HADOOP_HOME
### Create master and slave files
master=`head -1 $PE_HOSTFILE | cut -f1 -d' '`
echo $master > conf/masters
##TODO: Do we need master to run the tasks???
tail +2 $PE_HOSTFILE | cut -f1 -d' ' > conf/slaves
slave_cnt=`cat conf/slaves|wc -l`
### Create hadoop-site.xml
### We add the following vars are in conf/hadoop-site-template.xml
### so that it can be customized!
#HADDOP_MASTER_HOST:HDFSPORT : HDFS host:port
#HADDOP_MASTER_HOST:HMPRPORT : MapRed host:port
#HMTASKS : No. of Map tasks
#HRTASKS : No. of Reduce tasks
#HTPN : Simulataneos tasks per slave
#HTMPDIR : Hadoop temporary dir, we let hadoop use the SGE tmp dir
##In ideal cases mtasks=10xslaves, rtasks=2xslaves, tpn=2perslave
tpn=`expr $NSLOTS \/ $slave_cnt`
tmp=`echo $TMPDIR | sed 's/\//\\\\\//g'`
sed -e "s/HADDOP_MASTER_HOST/$master/g" -e "s/HDFSPORT/54310/g" \
-e "s/HMPRPORT/54311/g" -e "s/HMTASKS/$NSLOTS/g" \
-e "s/HRTASKS/$NSLOTS/g" -e "s/HTPN/$tpn/g" -e "s/HTMPDIR/$tmp/g" \
conf/hadoop-site-template.xml > conf/hadoop-site.xml
echo 'Y' > /tmp/yes
### Format namenode
bin/hadoop namenode -format < /tmp/yes
### Start hdfs daemons
bin/start-dfs.sh
### Is there a better way to do this, may be use jstatd and use jps <slave>
### wait for dfs daemons to start
### 3 = NameNode, DataNode, SecondaryNameNode
dcnt=0
while [ $dcnt -lt 3 ]
do
sleep 1
dcnt=`cat conf/slaves conf/masters | xargs -i rsh \{\} $JPS |grep -v Jps | wc -l`
done
hadoop-pe-stop.sh
#!/bin/bash
echo "start PE STOP `date`"
## We get HADOOP_HOME from the user
cd $HADOOP_HOME
### Start hdfs and mapred daemons
bin/stop-all.sh
### We could reset master,slave and hadoop-site.xml!
wait
These scripts need HADOOP_HOME set from qsub command and the mapper,reducer and the input files for the hadoop-streaming job.
Now we assign this to the queue, all.q
sgetest@master $ qconf -aattr queue pe_list hadoop all.q
sgetest@master $ qconf -sq all.q | grep pe_list
pe_list hadoop
NOTE: I have used mapper and reducer from the example for hadoop-streaming using python (which can be found here, Writing_An_Hadoop_MapReduce_Program_In_Python) and rewrote using bash scripts.
A typical job would just need to add the input files to the dfs
sgetest@master $ bin/hadoop dfs -put INPUT DFSIP
and run the mapper/reducer using the hadoop-streaming.
sgetest@master $ bin/hadoop jar contrib/hadoop-streaming.jar -mapper $HADOOP_HOME/$HADOOP_MAPPER -reducer $HADOOP_HOME/$HADOOP_REDUCER -input $INPUT/\* -output $OUTPUT
The hadoop-job.sh would look something like:
hadoop-job.sh
sgetest@master $ cat hadoop-job.sh
#!/bin/bash
### These env vars need to be exported at the qsub using -v
## $HADOOP_HOME : Hadoop dir with conf, ip, mapper and reducer
###These 3 vars are passed as args
## $HADOOP_INPUT : dir containing the i/p files reqd for the job
## this better be in $HADOOP_HOME
### Assuming them to be in $HADOOP_HOME
## $HADOOP_MAPPER : the mapper program, can be java/sh/perl/python etc...
## $HADOOP_REDUCER: the reducer program, can be java/sh/perl/python etc...
## output will be stored to $OUTPUT in $HADOOP_HOME
cd $HADOOP_HOME
## making sure the i/p and o/p dir names are unique
INPUT=$JOB_NAME"_"$JOB_ID"_IP"
OUTPUT=$JOB_NAME"_"$JOB_ID"_OP"
# usage: qsub -pe hadoop <n> -v HADOOP_HOME=/tmp hadoop-job.sh <mapper> <reducer> <input dir>
HADOOP_MAPPER=$1
HADOOP_REDUCER=$2
HADOOP_INPUT=$3
## We can actaully add a step to move the i/p and o/p files to a tmp dir,
## $TMPDIR supplied by SGE
### Upload the input files for HDFS
bin/hadoop dfs -copyFromLocal "${HADOOP_HOME}/${HADOOP_INPUT}" $INPUT
bin/hadoop dfs -ls
### Run the streamer job now!
bbin/hadoop jar contrib/hadoop-streaming.jar -mapper $HADOOP_HOME/$HADOOP_MAPPER -reducer $HADOOP_HOME/$HADOOP_REDUCER -input $INPUT/\* -output $OUTPUT
## We could have moved to the default o/p file location for SGE jobs, the $SGE_OUTPUT_PATH
## Download the output files back
bin/hadoop dfs -copyToLocal $OUTPUT $HADOOP_HOME/$OUTPUT
wait
NOTE: The $OUTPUT and the logs can be configured to use Sun Grid Engine's TMP (refer to Sun Grid Engine docs) directory. Now the job can be submitted like,
sgetest@master $ qsub -pe hadoop 4 -v JAVA_HOME=/usr/jdk/latest,HADOOP_HOME=/export/home/sgetest/test/hadoop hadoop-job.sh hadoop-mapper.sh hadoop-reducer.sh input_dir
where,
- input_dir contains the file(s) for use by the map-reduce task,
- hadoop-mapper - mapper program
- hadoop-reducer - reducer program
- -v option we export JAVA_HOME and HADOOP_HOME to job env for use by the scripts
Output from grid engine
sgetest@master $ qstat -f
queuename qtype used/tot. load_avg arch states
----------------------------------------------------------------------------
sus.q@master BI 0/5 0.00 sol-x86 s
----------------------------------------------------------------------------
all.q@slave BIPC 2/5 0.00 sol-x86
818 0.55500 hadoop-job sgetest r 01/15/2008 19:45:42 2
----------------------------------------------------------------------------
all.q@master BIPC 2/5 0.00 sol-x86
818 0.55500 hadoop-job sgetest r 01/15/2008 19:45:42 2
sgetest@master $ qstat -g t
job-ID prior name user state submit/start at queue master ja-task-ID
------------------------------------------------------------------------------------------------------------------
818 0.55500 hadoop-job sgetest r 01/15/2008 19:45:42 all.q@slave SLAVE
all.q@slave SLAVE
818 0.55500 hadoop-job sgetest r 01/15/2008 19:45:42 all.q@master MASTER
all.q@master SLAVE
The error/output stream from the job can be seen in the job's error and output files (*.[oe]JID), and output/error stream from the pe (hadoop daemons) can be seen in the job's pe error and output files (*.p[oe]JID), typically in users home directory.
(It would be better if we set the logs to the job's tmp dir and view it later!)
sgetest@master $ ls -l ~/*818
-rw-r--r-- 1 sgetest sgegrp 1361 Jan 16 05:00 /export/home/sgetest/hadoop-job.sh.e818
-rw-r--r-- 1 sgetest sgegrp 6287 Jan 16 05:00 /export/home/sgetest/hadoop-job.sh.o818
-rw-r--r-- 1 sgetest sgegrp 2172 Jan 16 05:00 /export/home/sgetest/hadoop-job.sh.pe818
-rw-r--r-- 1 sgetest sgegrp 1056 Jan 16 05:00 /export/home/sgetest/hadoop-job.sh.po818
sgetest@master $ cat /export/home/sgetest/hadoop-job.sh.po818
start PE START Wed Jan 16 04:59:45 IST 2008
starting namenode, logging to
/export/home/sgetest/test/hadoop/bin/../logs/hadoop-sgetest-namenode-master.out
slave: starting datanode, logging to
/export/home/sgetest/test/hadoop/bin/../logs/hadoop-sgetest-datanode-slave.out
master: starting secondarynamenode, logging to
/export/home/sgetest/test/hadoop/bin/../logs/hadoop-sgetest-secondarynamenode-master.out
starting jobtracker, logging to
/export/home/sgetest/test/hadoop/bin/../logs/hadoop-sgetest-jobtracker-master.out
slave: starting tasktracker, logging to
/export/home/sgetest/test/hadoop/bin/../logs/hadoop-sgetest-tasktracker-slave.out
done PE START Wed Jan 16 04:59:57 IST 2008 5
start PE STOP Wed Jan 16 05:00:27 IST 2008
stopping jobtracker
slave: stopping tasktracker
stopping namenode
slave: stopping datanode
master: stopping secondarynamenode
done PE STOP Wed Jan 16 05:00:28 IST 2008
sgetest@master $ cat /export/home/sgetest/hadoop-job.sh.o818
start JOB Wed Jan 16 04:59:57 IST 2008
Found 1 items
/user/sgetest/hadoop-job.sh_818_IP <dir> 2008-01-16 04:59
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/tmp/testsuite_6444/818.1.all.q/hadoop-unjar32275/] []
/var/tmp/streamjob32276.jar tmpDir=null
done JOB Wed Jan 16 05:00:27 IST 2008
sgetest@master $ cat /export/home/sgetest/hadoop-job.sh.e818
08/01/16 05:00:02 INFO mapred.FileInputFormat: Total input paths to process : 1
08/01/16 05:00:02 INFO streaming.StreamJob: getLocalDirs():
[/tmp/testsuite_6444/818.1.all.q/mapred/local]
08/01/16 05:00:02 INFO streaming.StreamJob: Running job: job_200801160459_0001
08/01/16 05:00:02 INFO streaming.StreamJob: To kill this job, run:
08/01/16 05:00:02 INFO streaming.StreamJob:
/export/home/sgetest/test/hadoop/bin/../bin/hadoop job
-Dmapred.job.tracker=master:54311 -kill job_200801160459_0001
08/01/16 05:00:02 INFO streaming.StreamJob: Tracking URL:
http://master:50030/jobdetails.jsp?jobid=job_200801160459_0001
08/01/16 05:00:03 INFO streaming.StreamJob: map 0% reduce 0%
08/01/16 05:00:09 INFO streaming.StreamJob: map 40% reduce 0%
08/01/16 05:00:11 INFO streaming.StreamJob: map 80% reduce 0%
08/01/16 05:00:13 INFO streaming.StreamJob: map 100% reduce 0%
08/01/16 05:00:18 INFO streaming.StreamJob: map 100% reduce 25%
08/01/16 05:00:20 INFO streaming.StreamJob: map 100% reduce 50%
08/01/16 05:00:25 INFO streaming.StreamJob: map 100% reduce 75%
08/01/16 05:00:26 INFO streaming.StreamJob: map 100% reduce 100%
08/01/16 05:00:26 INFO streaming.StreamJob: Job complete: job_200801160459_0001
08/01/16 05:00:26 INFO streaming.StreamJob: Output: hadoop-job.sh_818_OP
sgetest@master $ cat /export/home/sgetest/hadoop-job.sh.pe818
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = master/10.12.162.155
STARTUP_MSG: args = [-format]
************************************************************/
08/01/16 04:59:46 INFO dfs.Storage: Storage directory
/tmp/testsuite_6444/818.1.all.q/dfs/name has been successfully formatted.
08/01/16 04:59:46 INFO dfs.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at master/10.12.162.155
************************************************************/
The output is created in hadoop-job.sh_818_OP in our case, the contents
sgetest@master $ ls -l ~/test/hadoop/hadoop-job.sh_818_OP/
total 6
-rw-r--r-- 1 sgetest sgegrp 0 Jan 16 05:00 part-00000
-rw-r--r-- 1 sgetest sgegrp 8 Jan 16 05:00 part-00001
-rw-r--r-- 1 sgetest sgegrp 18 Jan 16 05:00 part-00002
-rw-r--r-- 1 sgetest sgegrp 7 Jan 16 05:00 part-00003
sgetest@master $ cat ~/test/hadoop/hadoop-job.sh_818_OP/*
Hello 1
Hell 1
Hi 2
Rav 1
Ravi 2
sgetest@master $ cat ~/test/hadoop/input/test
Hi Hello Ravi Hi Rav Hell Ravi
The map-reduce job has run using the Sun Grid Engine to setup the cluster. Further options in hadoop setup can be tuned using Grid Engine wrapping up the required arguments.
It is better to use the HDFS to run the jobs as we needn't share the whole of hadoop related files (for input to job) and is handled at the master.
NOTES:
Here we are relying on master node to start othe daemons ( [rs]sh the machine and start daemons) and distribute jobs , and we donot have control on the TaskTracker threads. This way of setting a pe in Grid Engine is called loose-integration
With some more effort one could also achieve a tighter integration wherein the task of starting daemons and tasks on other slaves could be done by SGE. But this would require further understanding of Hadoop internals.
As far as my understanding goes, hadoop's TaskTracker spawns N threads where N is the mapred.tasktracker.tasks.maximum (set in hadoop-site.xml), though there might be more than tasks assigned for this slave node.
Hence I am not sure how one could map the concept of a 'slot' in Grid Engine's perspective to a task in hadoop env.
If the user has requested n slots on a hadoop pe, the slots are provided as they are available on the exec hosts. Now pe_slots alloted per exec host is not same, which indicates that current job is entitled to run those no. of pe slots as alloted.
In the above example I used N slots (total pe slots for the job) / No.of slaves, which might not be the same as the slots alloted per exec host by SGE.
I assume we would have some way of getting around this, and probably many other tunables for hadoop which can be provided by SGE.
I look forward to see a Hadoop pe on Sun Grid wherein the map-reduce jobs could be directly scheduled to make use of the Hadoop setup available on the setup.











Hi I was hoping to find out if you knew how to con...