SparkFlex, aka running Spark jobs on a Grid Engine cluster, take II

I’ve left everyone who looks at my Spark posts hanging for months with a teaser about a new method I’ve been working on. This is the first of my posts about this.

The high level basics on this is that it’s better to run Spark as independent jobs rather than a monolithic multi-node job. That way if the HPC cluster is heavily used, at least PART of the Spark cluster can get scheduled and work can begin (assuming that not all nodes are strictly necessary for the work to be successful). Additionally, nodes can be added or removed from the cluster as needed.

This is fairly easily achieved by running the Spark master as one job, then getting the Master’s url and submitting each worker as an independent job with the MASTER variable set (and things configured like my previous example), ie

export MASTER=spark://<mastername>:7077
qsub -pe spark 1 -jc spark -N sparkworker -q spark -b y "$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://$MASTER:7077"

But wait! There’s still a better way to do this, using job classes to a much greater extent. Note that you’ll need to change the paths to the scripts, name of queue, and exclusive complex name to fit your environment

qconf -sjc sparkflex
jcname          sparkflex
variant_list    default worker master-rc worker-rc master-test worker-test
owner           NONE
user_lists      NONE
xuser_lists     NONE
A               {+}UNSPECIFIED
a               {+}UNSPECIFIED
ar              {+}UNSPECIFIED
b               TRUE
binding         {+}UNSPECIFIED
c_interval      {+}UNSPECIFIED
c_occasion      {+}UNSPECIFIED
                [worker=/PATH/TO/SCRIPTS/], \
                [master-rc=/PATH/TO/SCRIPTS/], \
                [worker-rc=/PATH/TO/SCRIPTS/], \
                [master-test=/PATH/TO/SCRIPTS/], \
ckpt            {+}UNSPECIFIED
ac              {+}UNSPECIFIED
cwd             .
dl              {+}UNSPECIFIED
e               {+}UNSPECIFIED
h               {+}UNSPECIFIED
hold_jid        {+}UNSPECIFIED
hold_jid_ad     {+}UNSPECIFIED
i               {+}UNSPECIFIED
j               TRUE
js              {+}UNSPECIFIED
l_hard          {-~}spark_exclusive=1,{-~}h_rt=43200
l_soft          {+}UNSPECIFIED
masterl         {+}UNSPECIFIED
m               {+}UNSPECIFIED
mbind           {+}UNSPECIFIED
M               {+}UNSPECIFIED
masterq         {+}UNSPECIFIED
N               master,[{~}worker={~}W],[{~}worker-rc={~}W], \
notify          {+}UNSPECIFIED
now             {+}UNSPECIFIED
o               $HOME/.spark/ugelogs
P               {+}UNSPECIFIED
p               {+}UNSPECIFIED
pe_name         sparkflex
pe_range        1
q_hard          spark.q
q_soft          {+}UNSPECIFIED
R               {+}UNSPECIFIED
r               {+}UNSPECIFIED
rou             {+}UNSPECIFIED
S               {+}UNSPECIFIED
shell           {+}UNSPECIFIED
t               {+}UNSPECIFIED
tc              {+}UNSPECIFIED
V               TRUE
v               {+}UNSPECIFIED

As you can see, the JC specifies quite a few things about the job that had previously been in the qsub, including a hard runtime, the pe, the queue, and even the scripts. With the variant list, a single JC can be used for multiple kinds of spark jobs, including the master (default), the worker, and a couple of other variants of spark installed separately.

The master start scripts referenced in the CMDNAME line are pretty simple:

####CHANGE PATH TO SPARK INSTALL                                                                                            
. $SPARK_HOME/sbin/    
. $SPARK_HOME/bin/    
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master --host `hostname`


The worker is a bit more complicated, since it has to figure out where the master is from the job number provided:

MasterID=`echo $JOB_NAME | cut -c2-`
echo $MasterID

while [ $n -lt 15 ]; do 
    n=$(($n + 1))
    MasterFQDN=`qstat -s r | awk "/$MasterID/ && /master/ "'{sub(/spark.q@/,""); gsub(/\.i.*/,"."); print $8}'`
    echo $MasterFQDN
    if [ ! -z $MasterFQDN ]; then
        echo "No running master. Retrying..."
    sleep 120 

if [ -z $MasterFQDN ]; then
    echo "No Master running. Please resubmit worker."
    exit 1

. $SPARK_HOME/sbin/    
. $SPARK_HOME/bin/    
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 $MASTER


So now the submission would be:

qsub -jc sparkflex.default
qsub -jc sparkflex.worker -N W<jobid of master>


Breaking this down, the script first checks if there is a master running, and will keep checking every 2 minutes for 30 minutes (that’s that while loop at the top). It does this in the huge awk statement, which checks for the master job ID in the parsed qsub output and returns the master’s FQDN. After that, it then launches the worker as above, passing it the $MASTER variable.

I have written a python script that makes this even easier and includes some error checking, etc, but that’s a post for another day.


About kcarlile
Twitter: @overclockdlemon

4 Responses to SparkFlex, aka running Spark jobs on a Grid Engine cluster, take II

  1. Pingback: Running Spark as a job on a Grid Engine HPC Cluster (part 3) | Unscrupulous Modifier

  2. Pingback: Running Spark as a job on a Grid Engine HPC Cluster (part 2) | Unscrupulous Modifier

  3. Pingback: Running Spark as a job on a Grid Engine HPC Cluster (part 1) | Unscrupulous Modifier

  4. Pingback: SparkFlex launch script | Unscrupulous Modifier

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: