SparkFlex launch script

In my last update, I wrote about what I call SparkFlex, a better way than my old method of launching Spark jobs on a Grid Engine based HPC cluster. While that will work as is, I’ve also written a Python script called sparklaunch.py that will make spinning up a cluster much easier and more controlled. It is called as such:

spark-deploy.py [-h, --help] [-w --workers] [-j --jobid] [-M --master] [-s --stopmaster] [-r --removeworkers] [-v --version] [-M --master] [-f] [-t --runtime]

The job will launch, optionally, a master and any number of workers (including zero). It can also be used to add additional workers or shut down existing workers, including optional selection of which node to shut down. It can also be used to tear down the whole cluster. It can be used with multiple clusters launched by a single user, and tracks each cluster by either the job id or hostname of the master and will take an option that sets a hard runtime for the cluster.
Here’s the code:

####sparklaunch.py
####requires Python 2.7.x
#!/usr/bin/env python
import os
import sys
import getopt
import subprocess
import traceback
import xmltodict
import time

def getqstat():
    alljobs = []
    templist = []
    process = subprocess.Popen("qstat -xml", shell=True, stdout=subprocess.PIPE)
    rawxml = process.communicate()[0]
    xmldict = xmltodict.parse(rawxml)
# qstat -xml puts pending and running in separate namespaces. This code appends the two together. qstat -xml also outputs in unicode, so that needs to be parsed properly. 
    for qkey in xmldict[u'job_info']:
        if qkey == u'@xmlns:xsd':
            continue
        else:
            if xmldict[u'job_info'][qkey]:
# necessary because xmltodict passes single entry lists as just the entry rather than a list
                if type(xmldict[u'job_info'][qkey][u'job_list']) is not list:
                    templist.append(xmldict[u'job_info'][qkey][u'job_list'])
                else:
                    templist = xmldict[u'job_info'][qkey][u'job_list']
                alljobs = alljobs + templist
    return alljobs

####Use your own JC variables 
def startmaster(sparktype, runtime):
    if sparktype == "":
         sparktype = "default"
    elif sparktype == "rc":
        sparktype = "master-rc"
    elif sparktype == "test":
        sparktype = "master-test"
    if runtime != "default":
        options = "sparkflex.{} -l spark_exclusive=1,h_rt={}".format(sparktype, runtime)
    else:
        options = "sparkflex.{}".format(sparktype)
    process = subprocess.Popen(["qsub -jc {}".format(options)], shell=True, stdout=subprocess.PIPE)
    rawout = process.communicate()
    try:
        masterjobid = rawout[0].split(" ")[2]
    except:
        sys.exit(1)
    print "Master submitted. Job ID is {}".format(masterjobid)
    return masterjobid

def startworker(sparktype,masterjobid):
    masterfound = False
    rawout = getqstat()
    for i in range(len(rawout)):
        if masterjobid in str(rawout[i][u'JB_job_number']):
            masterfound = True
            break
    if not masterfound:
        print "No master with the job id {} running or queued. Please submit master first.".format(masterjobid)
        sys.exit()
    if sparktype == "rc":
        sparktype = "-rc"
    if sparktype == "test":
        sparktype = "-test"
    command = "qsub -jc sparkflex.worker{} -N W{}".format(sparktype,masterjobid)
    os.system(command)

def qdeljob(jobid):
    command = "qdel {}".format(jobid)
    os.system(command)

def stopworker(masterjobid, terminatew, workerlist, skipcheckstop):
    workername = "W{}".format(masterjobid) 
    rawout = getqstat()
    jobtokill = ""
    statuses = {}
    for i in range(len(rawout)):
        unpack=rawout[i]
        jobid=str(unpack[u'JB_job_number'])
        if 'sparkflex.worker' in str(unpack[u'jclass_name']) and str(masterjobid) in str(unpack[u'JB_name']):
            statuses[jobid] = str(unpack[u'@state']) 
    if "pending" in statuses.values() and not skipcheckstop:
        terminatew = raw_input("Terminate waiting job(s) first? (y/n) ")

    for wjobid in statuses.keys():
        if statuses[wjobid] == 'pending' and terminatew == 'y':
            jobtokill = wjobid
            break
        elif statuses[wjobid] == 'running' and not skipcheckstop:
            jobtokill = selectionlist(workerlist, 'worker')
            break
        else: 
            jobtokill = wjobid
            break
    try:
        if jobtokill != "":
            qdeljob(jobtokill)
            time.sleep(3)
    except:
        print "No running or waiting workers found corresponding to Master job {}".format(masterjobid)
        sys.exit()
    return terminatew

def checkstop(inval, jobtype):
    if jobtype == "master":
        checkstop = raw_input("Stop master with job id {} (y/n):".format(inval))
    else: 
        checkstop = raw_input("Remove {} workers? (y/n):".format(inval))
    if checkstop != "y":
        print "No workers will be removed."
        sys.exit()
    else:
        return True

def mastername_to_jobid(mastername):
    masterfound = False
    masterlist = findmaster()
    for jobid in masterlist.keys():
        if mastername in masterlist[jobid][0]:
            masterfound = True
            break
    if masterfound:
        return jobid
    else:
        print "Node {} is not running a spark master job. If deleting workers alone, please use Master's Job ID".format(mastername)
        sys.exit()

def findmaster():
    rawout = getqstat()
    jobid = ""
    masterlist = {}
    for i in range(len(rawout)):
        unpack = rawout[i]
        jobid = str(unpack[u'JB_job_number'])
        if "sparkflex" in str(unpack[u'jclass_name']) and "master" in str(unpack[u'JB_name']):
            masterlist[jobid] = (str(unpack[u'queue_name']).replace('spark.q@',''),str(unpack[u'@state']))
    if len(masterlist.keys()) != 0:
        return masterlist
    else:
        print "No masters found. If deleting workers alone, please use Master's Job ID."
        sys.exit()

def findworker(masterjobid):
    rawout = getqstat()
    jobid = ""
    workername = "W{}".format(masterjobid)
    workerlist = {}
    for i in range(len(rawout)):
        unpack = rawout[i]
        jobid = str(unpack[u'JB_job_number'])
        if "sparkflex.worker" in str(unpack[u'jclass_name']) and workername in str(unpack[u'JB_name']):
            workerlist[jobid] = (str(unpack[u'queue_name']).replace('spark.q@',''),str(unpack[u'@state']))
    return workerlist

def checkmaster(masterjobid):
    rawout = getqstat()
    mastername = ""
    for i in range(len(rawout)):
        unpack = rawout[i]
        if masterjobid in str(unpack[u'JB_job_number']) and "running" in str(unpack[u'@state']):
            mastername = str(unpack[u'queue_name']).replace("spark.q@","")
            break
    if mastername == "":
        instruction = raw_input("Master is not running yet. (w)ait then retry in 1 minute, e(x)it this application, or (t)erminate master?" )
        if instruction == "w":
            time.sleep(60)
            checkmaster(masterjobid)
        elif instruction == "x":
            sys.exit()
        elif instruction == "t":
            qdeljob(masterjobid)
            sys.exit()
    return mastername

def qdelmaster(masterjobid, skipcheckstop):
    jobtype = "master"
    workername = "W{}".format(masterjobid)
    if not skipcheckstop:
         skipcheckstop = checkstop(masterjobid,jobtype)
    if skipcheckstop:
        try:
            qdeljob(masterjobid)
        except:
            print "Master with job id {} not found".format(masterjobid)
            sys.exit(1)
        try:
            qdeljob(workername)
        except:
#            traceback.print_exc()
            print "Workers for master with job id {} failed to stop".format(masterjobid)
            sys.exit(1)
    else:
        print "As requested, master not stopped."
        sys.exit(0)

def getworkercount(masterjobid):
    rawout = getqstat()
    workers = 0
    for i in range(len(rawout)):
        unpack = rawout[i]
        if masterjobid in str(unpack[u'JB_name']):
            workers = workers + 1
    return workers
            

def selectionlist(joblist, jobtype):
    i = 0
    selectionlist = {}
    print "Select {} to kill from list below:".format(jobtype)
    for jobid in joblist.keys():
        i = i + 1
        host, status = joblist[jobid]
        selectionlist[i] = jobid
        print "{}) Host: {} JobID: {} Status: {}".format(i, host, jobid, status)
    while True:
        selection = int(raw_input("Selection? "))
        if selection <= i:
            jobid = selectionlist[selection]
            skipcheckstop = True
            break
        else:
            print "Invalid selection."
    return jobid

def usage():
    print "spark-deploy.py [-h, --help] [-w --workers] [-j --jobid] [-M --master] [-s --stopmaster] [-r --removeworkers] [-v --version] [-M --master] [-f] [-t --runtime]"
    print "-h, --help: print this help."
    print "-w, --workers: number of workers to start."
    print "-j, --jobid: Job ID of running Master."
    print "-M, --master: Hostname of running Master."
    print "-s, --stopmaster: Stop running Master. Requires Master Job ID or Master hostname."
    print "-r, --removeworkers: number of workers to stop, requires Master Job ID or Master hostname."
    print "-f: Do not ask for confirmation of commands or present choice of jobs"
    print "-t, --runtime: Change the default time at which the Spark cluster will self-terminate in format hh:mm:ss"
    print "-v, --version: version of Spark to run (optional, permissible options rc, test)."
    print "If run with no options, will start a master with no workers. To start a master with workers, use spark-deploy.py -w "

def main(argv):
# Booleans to make this whole mess work right
    stopw = False
    stopm = False
    skipcheckstop = False
    masternamed = False
    needmaster = True
    novers = True
    runtime = "default"

# Get user options
    try:
        opts, args = getopt.getopt(argv,"fshw:v:j:r:M:t:",["workers=","version=","jobid=", "removeworkers=", "stopmaster=", "help", "master=", "runtime="])
    except getopt.GetoptError:
        usage()
        sys.exit(2)
   
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
            sys.exit()
        elif opt in ("-v", "--version"):
            sparktype = arg
            novers = False
            if sparktype not in (rc, test):
                usage()
                sys.exit(1)
        elif opt in ("-w", "--workers"):
            workers = int(arg)
        elif opt in ("-j", "--jobid"):
            masterjobid = arg
            needmaster = False
        elif opt in ("-r", "--removeworkers"):
            workers = int(arg)
            stopw = True
        elif opt in ("-s", "--stopmaster"):
            stopm = True
        elif opt == "-f":
            skipcheckstop = True
        elif opt in ("-M", "--master"):
            masterjobid = mastername_to_jobid(arg)
            needmaster = False
        elif opt in ("-t", "--runtime"):
            runtime = arg
    
    if novers:
        sparktype = ""

# Stop running master
    if stopm:
        try:
            masterjobid
        except NameError:
            masterlist = findmaster()
            masterjobid = selectionlist(masterlist,'master')
            skipcheckstop = True
        qdelmaster(masterjobid, skipcheckstop)
        try:
            os.remove("tmp{}.sh".format(masterjobid))
            skipcheckstop = True
        except:
            print "tmp{}.sh not found. Please delete manually in the directory from which you started the spark job."

# Stop worker(s)
    if stopw:
        jobtype = "worker"
        terminatew = ""
        try:
            for i in range(workers):
                workerlist = findworker(masterjobid)
                terminatew = stopworker(masterjobid, terminatew, workerlist, skipcheckstop)
        except NameError:
            print "JobID or name of master required to stop worker."
#            traceback.print_exc()
            sys.exit()

# Either way, we're done here
    if stopm or stopw:
        sys.exit()

# Start master and create temporary script for the user to use which contains the necessary environment variables to communicate with the Spark cluster. 
 
    if needmaster:
        masterjobid = startmaster(sparktype, runtime)

# Sleep is necessary to make sure the scheduler has time to spin up the job. You may need to adjust this based on your scheduler configuration. 

        time.sleep(2)
        mastername = str(checkmaster(masterjobid))
        filename = "tmp{}.sh".format(masterjobid)
        if sparktype == "default":
            version = "current"
        else:
            version = sparktype
        with open(filename, 'w') as f:
            f.write(
                    "#!/bin/bash\n"
                    "export MASTER=spark://{}:7077\n"
                    "export SPARK_HOME=/usr/local/spark{}\n"
                    "export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH\n".format(mastername,version)
                    )

# Start workers and exit if not starting master 
    try:
        for i in range(workers):
            startworker(sparktype, masterjobid)
    except:
        print "No workers specified or other error"
#        traceback.print_exc() #debug line
        sys.exit()
    if not needmaster:
        sys.exit()

# Print out Master name and addresses. 
    print "Master's name is {}. WebUI is at http://{}:8080. The $MASTER environment variable can be set to spark://{}:7077 manually or by running . tmp{}.sh".format(mastername, mastername, mastername, masterjobid)

if __name__ == '__main__':
    main(sys.argv[1:])

 

SparkFlex, aka running Spark jobs on a Grid Engine cluster, take II

I’ve left everyone who looks at my Spark posts hanging for months with a teaser about a new method I’ve been working on. This is the first of my posts about this.

The high level basics on this is that it’s better to run Spark as independent jobs rather than a monolithic multi-node job. That way if the HPC cluster is heavily used, at least PART of the Spark cluster can get scheduled and work can begin (assuming that not all nodes are strictly necessary for the work to be successful). Additionally, nodes can be added or removed from the cluster as needed.

This is fairly easily achieved by running the Spark master as one job, then getting the Master’s url and submitting each worker as an independent job with the MASTER variable set (and things configured like my previous example), ie

export MASTER=spark://<mastername>:7077
export SPARK_HOME=/PATH/TO/SPARK
qsub -pe spark 1 -jc spark -N sparkworker -q spark -b y "$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://$MASTER:7077"

But wait! There’s still a better way to do this, using job classes to a much greater extent. Note that you’ll need to change the paths to the scripts, name of queue, and exclusive complex name to fit your environment

qconf -sjc sparkflex
jcname          sparkflex
variant_list    default worker master-rc worker-rc master-test worker-test
owner           NONE
user_lists      NONE
xuser_lists     NONE
A               {+}UNSPECIFIED
a               {+}UNSPECIFIED
ar              {+}UNSPECIFIED
b               TRUE
binding         {+}UNSPECIFIED
c_interval      {+}UNSPECIFIED
c_occasion      {+}UNSPECIFIED
CMDNAME         /PATH/TO/SCRIPTS/start-flexmaster.sh, \
                [worker=/PATH/TO/SCRIPTS/start-flexworker.sh], \
                [master-rc=/PATH/TO/SCRIPTS/start-flexmaster-rc.sh], \
                [worker-rc=/PATH/TO/SCRIPTS/start-flexworker-rc.sh], \
                [master-test=/PATH/TO/SCRIPTS/start-flexmaster-test.sh], \
                [worker-test=/PATH/TO/SCRIPTS/start-flexworker-test.sh]
CMDARG          {+}UNSPECIFIED
ckpt            {+}UNSPECIFIED
ac              {+}UNSPECIFIED
cwd             .
dl              {+}UNSPECIFIED
e               {+}UNSPECIFIED
h               {+}UNSPECIFIED
hold_jid        {+}UNSPECIFIED
hold_jid_ad     {+}UNSPECIFIED
i               {+}UNSPECIFIED
j               TRUE
js              {+}UNSPECIFIED
l_hard          {-~}spark_exclusive=1,{-~}h_rt=43200
l_soft          {+}UNSPECIFIED
masterl         {+}UNSPECIFIED
m               {+}UNSPECIFIED
mbind           {+}UNSPECIFIED
M               {+}UNSPECIFIED
masterq         {+}UNSPECIFIED
N               master,[{~}worker={~}W],[{~}worker-rc={~}W], \
                [{~}worker-test={~}W]
notify          {+}UNSPECIFIED
now             {+}UNSPECIFIED
o               $HOME/.spark/ugelogs
P               {+}UNSPECIFIED
p               {+}UNSPECIFIED
pe_name         sparkflex
pe_range        1
q_hard          spark.q
q_soft          {+}UNSPECIFIED
R               {+}UNSPECIFIED
r               {+}UNSPECIFIED
rou             {+}UNSPECIFIED
S               {+}UNSPECIFIED
shell           {+}UNSPECIFIED
t               {+}UNSPECIFIED
tc              {+}UNSPECIFIED
V               TRUE
v               {+}UNSPECIFIED

As you can see, the JC specifies quite a few things about the job that had previously been in the qsub, including a hard runtime, the pe, the queue, and even the scripts. With the variant list, a single JC can be used for multiple kinds of spark jobs, including the master (default), the worker, and a couple of other variants of spark installed separately.

The master start scripts referenced in the CMDNAME line are pretty simple:


#!/bin/bash
# start-flexmaster.sh 
####CHANGE PATH TO SPARK INSTALL                                                                                            
export SPARK_HOME=/PATH/TO/SPARK   
. $SPARK_HOME/sbin/spark-config.sh    
. $SPARK_HOME/bin/load-spark-env.sh    
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master --host `hostname`

 

The worker is a bit more complicated, since it has to figure out where the master is from the job number provided:


#!/bin/bash  
# start-flexworker.sh                                                                                           
MasterID=`echo $JOB_NAME | cut -c2-`
echo $MasterID

n=0
while [ $n -lt 15 ]; do 
    n=$(($n + 1))
####USE YOUR NODES' DOMAIN FOR  AND THE CORRECT QUEUE NAME FOR YOUR ENVIRONMENT
    MasterFQDN=`qstat -s r | awk "/$MasterID/ && /master/ "'{sub(/spark.q@/,""); gsub(/\.i.*/,"."); print $8}'`
    echo $MasterFQDN
    if [ ! -z $MasterFQDN ]; then
        echo "No running master. Retrying..."
        break
    fi  
    sleep 120 
done

if [ -z $MasterFQDN ]; then
    echo "No Master running. Please resubmit worker."
    exit 1
fi

MASTER="spark://${MasterFQDN}:7077"
####USE YOUR PATH TO SPARK
export SPARK_HOME=/PATH/TO/SPARK
. $SPARK_HOME/sbin/spark-config.sh    
. $SPARK_HOME/bin/load-spark-env.sh    
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 $MASTER

 

So now the submission would be:


#MASTER
qsub -jc sparkflex.default
#WORKER
qsub -jc sparkflex.worker -N W<jobid of master>

 

Breaking this down, the start-flexworker.sh script first checks if there is a master running, and will keep checking every 2 minutes for 30 minutes (that’s that while loop at the top). It does this in the huge awk statement, which checks for the master job ID in the parsed qsub output and returns the master’s FQDN. After that, it then launches the worker as above, passing it the $MASTER variable.

I have written a python script that makes this even easier and includes some error checking, etc, but that’s a post for another day.

Running Spark as a job on a Grid Engine HPC Cluster (part 3)

UPDATED GUIDE: SparkFlex, aka running Spark jobs on a Grid Engine cluster, take II

And now for the secret sauce–how to actually get Spark workers to run and talk to a master.

For a long time, I had my start script launching the master. Unfortunately, this resulted in very loose control by the scheduler/execd over the processes, and I had to do a ton of clean up at hte end to make sure there were no orphan processes running on nodes after the job closed or failed.

After some research, I realized that it made more sense to launch the master as part of the job submission (I’ll get to this later in the post), and that I could tightly couple the workers by launching them in the start script via the qrsh command.

qrsh places the processes that it runs under the control of the scheduler, unlike using ssh to do the same thing.

This also means that you MUST have passwordless ssh set up for any user who wants to use Spark in this fashion.

So here’s the start script referenced by the PE (see Part 2)


#!/bin/bash
echo "Starting SPARK PE from $HOSTNAME"
SPARK_HOME=/usr/local/spark-current

# Create the master file.
cat $PE_HOSTFILE | cut -f1 -d" " | head -1 > /scratch/spark/tmp/master

# Create all file.
cat $PE_HOSTFILE | cut -f1 -d" " | uniq > /scratch/spark/tmp/all

# Create the workers file.
grep -v `cat /scratch/spark/tmp/master` /scratch/spark/tmp/all > /scratch/spark/tmp/workers

# Logging/debugging info
mkdir -p ~/.spark/logs/$JOB_ID
cp /scratch/spark/tmp/workers ~/.spark/logs/$JOB_ID/workers.txt
cp -LR $SPARK_HOME/conf ~/.spark/logs/$JOB_ID/

# Start the workers
echo "Starting Spark workers" 
MASTER=$HOSTNAME
while read worker; do
    /sge/current/bin/lx-amd64/qrsh -inherit $worker "$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://$MASTER:7077" &
done < /scratch/spark/tmp/workers

echo "Spark Master WebUI for this job: http://$MASTER:8080"

$PE_HOSTFILE is a list of all nodes involved in the job, which is provided by Grid Engine. In this case, we only want the slave nodes, which will run the workers, so we need to do a little parsing to get those out. I’m sure that section could be a little cleaner, but there we are…

After I get my list of worker nodes, I do a little logging, which can be extremely helpful for troubleshooting launch failures and the like.

Then we get to the worker launches. I use the standalone cluster mode of launching the workers via scripts provided in Spark. Note the ampersand at the end of the command; this makes for a much faster startup of the workers. By backgrounding the qrsh process for each worker, we are not held up by serially launching the worker across each node. That allows us to spin up a large cluster (>20 nodes) in a fraction of the time that would be required to do it serially.

The assumption in this script is that the node that runs it is the Spark Master, which is why the master launch must be done as part of the job submission.

Here is a very simplistic submission script. It does nothing other than launch the master, so any processing on the Spark cluster would need to be included in the script or run via an interactive session (more about this later)


#sparkmaster.sh
#!/bin/bash
export PATH=/usr/local/spark-current/bin:/usr/local/spark-current/sbin:/usr/local/python-2.7.6/bin:$PATH
export SPARK_HOME=/usr/local/spark-current
/usr/local/spark-current/sbin/start-master.sh
export MASTER=spark://$HOSTNAME:7077
sleep 3600
exit 0

The only thing of note here is the sleep 3600 line–that keeps the master up for an hour. You’ll probably want to modify this to fit your environment. If you change it to sleep infinity, then you will need to manually qdel any cluster started with this script (lacking further instructions for the spark cluster in the script before the exit 0).

The qsub for this looks something like this, assuming you’ve set up your environment exactly like mine (ha)

qsub -jc spark -pe spark 5 -q hadoop2 -j y -o ~/sparklogs -V -b y sparkmaster.sh

This will launch a Spark cluster with 1 master and 4 workers (1+4=5, dontcherknow), log to sparklogs in your home directory.

Our users typically use Spark in an interactive fashion, so in addition to launching their clusters, they also do a qlogin and then define the $MASTER, $SPARK_HOME, and $PATH variables as above within that session. From that point, the qlogin session acts as the Spark driver. It is also possible to run the driver on the same node as the master, but you run the risk of running the master out of RAM.

The main Spark using group here has written a much more intelligent version of this (see this github repo), but here is a very basic script that can be run after the qlogin to set up the right variables (as long as the user isn’t running more than one Spark cluster…)


#!/bin/bash
export MASTER="spark://"`qstat | grep spark | awk '{print $8}' | cut -d@ -f2`":7077"
export PATH=$PATH:/usr/local/spark-current/bin:/usr/local/spark-current/sbin:/usr/local/python-2.7.6/bin
export SPARK_HOME=/usr/local/spark-current

The spark-pestop.sh script is very simple at this point (before using qrsh, I was having to do a TON of clean up in it); now all it has to do is remove the files created by the spark-pestart.sh script, otherwise future jobs by different users will fail:


#!/bin/bash
rm -rf /scratch/spark/tmp/workers
rm -rf /scratch/spark/tmp/all
rm -rf /scratch/spark/tmp/master
exit 0

Another little note here–for multiple users to run Spark in this fashion, that /scratch/spark directory MUST be read/write by all of them. In our environment, the easiest way to do this was to set the permissions on it to 777 and call it a day.

Please let me know in the comments if you have any questions or run into trouble.

Running Spark as a job on a Grid Engine HPC Cluster (part 2)

UPDATED GUIDE: SparkFlex, aka running Spark jobs on a Grid Engine cluster, take II

So the next thing you need to do to get Spark running in Grid Engine is to set up your queue, job class, and your parallel environment.

Our environment is normally run with users requesting slots, which roughly correspond to one cpu and ~8GB of RAM per host. So our normal Sandy Bridge nodes have 16 slots, and our Haswell nodes have 32.

Spark is much easier to run if you give it the whole machine, of course (although I suspect you can limit the workers in various ways), but in our environment, part of the reason Spark is used is to load huge datasets (>1TB) into RAM. So it doesn’t make sense to allocate Spark workers (or the master) based on slot count.

Therefore, our queue for spark (called hadoop2 for historical reasons) is set up to have only 1 slot per node. We also have a complex called hadoop_exclusive which forces jobs (or tasks) to be scheduled to whole nodes.

>qconf -sq hadoop2
qname                 hadoop2
hostlist              @h02 @h03 @h04 @h05 @h06 @h07 @h08
seq_no                0
load_thresholds       np_load_avg=3
suspend_thresholds    NONE
nsuspend              1
suspend_interval      00:05:00
priority              0
min_cpu_interval      00:05:00
qtype                 INTERACTIVE
ckpt_list             NONE
pe_list               hadoop,spark,spark-rc,spark-test
jc_list               hadoop.default,spark.default, \
                      spark-rc.default,spark-test.default
rerun                 FALSE
slots                 1
tmpdir                /tmp
shell                 /bin/sh
prolog                NONE
epilog                NONE
shell_start_mode      unix_behavior
starter_method        NONE
suspend_method        NONE
resume_method         NONE
terminate_method      NONE
notify                00:00:60
owner_list            NONE
user_lists            NONE
xuser_lists           NONE
subordinate_list      NONE
complex_values        NONE
projects              NONE
xprojects             NONE
calendar              NONE
initial_state         default
s_rt                  INFINITY
h_rt                  INFINITY
d_rt                  INFINITY
s_cpu                 INFINITY
h_cpu                 INFINITY
s_fsize               INFINITY
h_fsize               INFINITY
s_data                INFINITY
h_data                INFINITY
s_stack               INFINITY
h_stack               INFINITY
s_core                INFINITY
h_core                INFINITY
s_rss                 INFINITY
h_rss                 INFINITY
s_vmem                INFINITY
h_vmem                INFINITY

There doesn’t appear to be much here that’s non-default, other than the JC and PE lists. Note that you need to set up a separate pe and jc for every version of spark you’re using.

Here’s the PE setup:


>qconf -sp spark
pe_name                spark
slots                  99999
user_lists             NONE
xuser_lists            NONE
start_proc_args        /usr/local/uge-hadoop/spark-pestart.sh
stop_proc_args         /usr/local/uge-hadoop/spark-pestop.sh
allocation_rule        1
control_slaves         TRUE
job_is_first_task      FALSE
urgency_slots          min
accounting_summary     TRUE
daemon_forks_slaves    FALSE
master_forks_slaves    FALSE

The start_proc_arts and stop_proc_args are really the secret sauce. I’ll talk about those in part 3. The other options are fairly obvious; you want the master (ie, the job) to control the slaves, which are running the Spark workers. The accounting_summary allows you to track the load/memory usage on the slave nodes. job_is_first_task is set to FALSE, because the head node, while it contains the Spark master process (and sometimes the driver, depending on the job creation), it does not run any of the Spark workers.

On to the JC:


>qconf -sjc spark
jcname          spark
variant_list    default
owner           NONE
user_lists      NONE
xuser_lists     NONE
A               {+}UNSPECIFIED
a               {+}UNSPECIFIED
ar              {+}UNSPECIFIED
b               {+}UNSPECIFIED
binding         {+}UNSPECIFIED
c_interval      {+}UNSPECIFIED
c_occasion      {+}UNSPECIFIED
CMDNAME         {+}UNSPECIFIED
CMDARG          {+}UNSPECIFIED
ckpt            {+}UNSPECIFIED
ac              {+}UNSPECIFIED
cwd             {+}UNSPECIFIED
dl              {+}UNSPECIFIED
e               {+}UNSPECIFIED
h               {+}UNSPECIFIED
hold_jid        {+}UNSPECIFIED
hold_jid_ad     {+}UNSPECIFIED
i               {+}UNSPECIFIED
j               {+}UNSPECIFIED
js              {+}UNSPECIFIED
l_hard          hadoop_exclusive=1
l_soft          {+}UNSPECIFIED
masterl         {+}UNSPECIFIED
m               {+}UNSPECIFIED
mbind           {+}UNSPECIFIED
M               {+}UNSPECIFIED
masterq         {+}UNSPECIFIED
N               {+}UNSPECIFIED
notify          {+}UNSPECIFIED
now             {+}UNSPECIFIED
o               {+}UNSPECIFIED
P               {+}UNSPECIFIED
p               {+}UNSPECIFIED
pe_name         spark
pe_range        {+}UNSPECIFIED
q_hard          {+}UNSPECIFIED
q_soft          {+}UNSPECIFIED
R               TRUE
r               {+}UNSPECIFIED
rou             {+}UNSPECIFIED
S               {+}UNSPECIFIED
shell           {+}UNSPECIFIED
t               {+}UNSPECIFIED
tc              {+}UNSPECIFIED
V               {+}UNSPECIFIED
v               {+}UNSPECIFIED

The only modifications here are setting the complex hadoop_exclusive, the pe to be spark (see above), and setting the Reserved (or R) to TRUE. Reserved holds available nodes aside when the cluster load is high to enable the Spark job to start. For example, if the job is requested with 10 slots and there is only one free node in the cluster, that node is reserved for the Spark job and nothing is scheduled to it until all 10 nodes are available for the Spark job. Otherwise, Spark jobs are likely to never be run on a busy cluster.