Running Spark clusters as jobs on LSF

While Spark is often run on Mesos or YARN separate from a traditional HPC scheduler (and sometimes alongside them via a super-scheduler), our requirement was to run Spark clusters on top of an HPC cluster. Previous posts have discussed running Spark on top of a Univa Grid Engine cluster.

This article deals with running Spark clusters on top of IBM Spectrum LSF. Spark is configured in standalone mode, and individual masters and their workers are submitted as separate jobs to the LSF scheduler. From there, jobs can be submitted to LSF that act as Spark drivers connected to those Spark clusters.

LSF Settings

  1. Define the hostgroup in LSB_CONFDIR/cluster_name/configdir/lsb.hosts:
    Begin HostGroup GROUP_NAME  GROUP_MEMBER                        CONDENSE    #GROUP_ADMIN    # Key words spark       (c0[1- 8]u0[1-9] c0[1- 8]u[10-32])    Y End HostGroup

    This is optional, but it makes things easier in later config files. In this case, the spark hostgroup is defined as covering nodes number c01u01 through c08u32.

  2. Define the queue in LSB_CONFDIR/cluster_name/configdir/lsb.queues:
    Begin Queue QUEUE_NAME = spark priority = 40 HOSTS = spark DESCRIPTION = spark nodes EXCLUSIVE = Y JOB_CONTROLS = TERMINATE[SIGTERM] POST_EXEC = /PATH/TO/POSTSCRIPTS/postscript_spark_lsf.sh RUNLIMIT = 8:00 730:00 USERS        = allusers End Queue

    Note the JOB_CONTROLS section. By default, LSF will send SIGUSER2 to a job that is being killed. The spark daemons generate core dumps if sent this, so they need to be sent a SIGTERM. I have also set a default runlimit of 8 hours, with a maximum of 730 hours (user defined)

  3. Add the following to $LSF_TOP/conf/lsf.conf:
    LSB_SUB_COMMANDNAME=Y

    This allows for substitution of the command name in a bsub via an esub.

esub configuration

Define an esub for spark in LSF_SERVERDIR/esub.spark:

#!/bin/bash
#$RCSfile$Revision$Date$

# Redirect stderr to stdout so echo can be used for error
exec 1>&2

if [ -z "$LSB_SUB_PARM_FILE" ]; then
    # if not set do nothing
    exit 0
fi

. $LSB_SUB_PARM_FILE

if [ "$LSB_SUB_ADDITIONAL" = "" ]; then
    exit 0
fi

case $1 in
    "master")
        TYPE="master"
        echo 'LSB_SUB_JOB_NAME = "master"' >> $LSB_SUB_MODIFY_FILE
        ;;
    "worker")
        TYPE="worker"
        if [[ $LSB_SUB_JOB_NAME != W* ]] ; then
            echo "Wrong job name for a Spark worker. Please submit with the job name W"
            exit $LSB_SUB_ABORT_VALUE
        fi
        ;;
    *)
        echo '"Invalid job type. Please use bsub -a "spark(master)" or -a "spark(worker)"'
        exit $LSB_SUB_ABORT_VALUE
        ;;
esac

case $2 in
    "current"|"")
        VERSION="current"
        ;;
    "rc")
        VERSION="rc"
        ;;
    "test")
        VERSION="test"
        ;;
    "2")
        VERSION="2"
        ;;
    *)
        exit $LSB_SUB_ABORT_VALUE
        ;;
esac

if [[ -z $SPARK_LOG_DIR ]]; then
    export SPARK_LOG_DIR=$HOME/.spark/logs/$(date +%H-%F)
fi
echo "SPARK_LOG_DIR = $SPARK_LOG_DIR" >> $LSB_SUB_MODIFY_ENVFILE
mkdir -p $SPARK_LOG_DIR/lsf

COMMANDLINE="/misc/local/spark-versions/bin/start_${TYPE}_lsf.sh $VERSION"
echo "LSB_SUB_COMMAND_LINE = \"$COMMANDLINE\"" >> $LSB_SUB_MODIFY_FILE
echo 'LSB_SUB_MAX_NUM_PROCESSORS = 16' >> $LSB_SUB_MODIFY_FILE
echo "LSB_SUB_OUT_FILE = \"$SPARK_LOG_DIR/lsf/%J.out\"" >> $LSB_SUB_MODIFY_FILE
echo 'LSB_SUB_QUEUE = "spark"' >> $LSB_SUB_MODIFY_FILE

exit 0

The two case statements interpret the options submitted in the -a flag; first chooses between master or worker, second is version type.

The worker option also tests to make sure that the worker is named appropriately for the launch scripts to work. The format is W

COMMANDLINE defines the actual command to be run; in the bsub one must include a command, but it can be an arbitrary string; this replaces it.

Site definitions and assumptions for this file:

  • Multiple versions are available and their launch scripts are named in the same way such that the version can be substituted in (e.g. start_master_lsf.sh 1_6)
  • Each node has 16 cores (LSB_SUB_MAX_NUM_PROCESSORS line)
  • STDOUT/STDERR is logged to a file named .out under the user’s home directory

Launch Scripts

The launch scripts are referred to in the esub above. I have revised these significantly since my Grid Engine/sparkflex post, and they’re a lot smarter now.

start_master_lsf.sh:

#!/bin/bash                                                                                              
VERSION=$1
if [[ -n ${SPARK_LOG_DIR} ]]; then
  sparklogdir="${SPARK_LOG_DIR}"
else
  sparklogdir=~/.spark/logs/`date +%H-%F`
fi
mkdir -p $sparklogdir
export SPARK_HOME=/PATH/TO/spark-$VERSION    
. $SPARK_HOME/sbin/spark-config.sh    
. $SPARK_HOME/bin/load-spark-env.sh    
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master --host `hostname`

The VERSION is set by the esub (above).  sparklogdir defines the log location for all logs for this job, which is set similarly in the esub and in the spark conf directories. The SPSPARK_HOME is the path to the Spark install. Note that for the script to work properly, the versions must be named similarly. The SPARK_HOME/sbin and bin commands specify scripts provided by the Spark install.

start_worker_lsf.sh

#!/bin/bash                                                                                                                               
MasterID=`echo $LSB_JOBNAME | cut -c2-`
echo $MasterID

VERSION=$1

n=0
while [ $n -lt 15 ]; do
    n=$(($n + 1))
    MasterFQDN=`bjobs -r -noheader -X | awk "/$MasterID/ && /master/ "'{sub(/16\*/,""); print $6".fully.qualified.domain"}'`
    echo $MasterFQDN
    if [ ! -z $MasterFQDN ]; then
        break
    fi
    echo "No running master. Retrying..."
    sleep 120
done

if [ -z $MasterFQDN ]; then
    echo "No Master running. Please resubmit worker."
    exit 1
fi

MASTER="spark://${MasterFQDN}:7077"
export SPARK_HOME=/PATH/TO/spark-$VERSION
. $SPARK_HOME/sbin/spark-config.sh
. $SPARK_HOME/bin/load-spark-env.sh
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 $MASTER 2>&1

This script obtains the jobID of the master from the name of the worker job. At that point, it searches the bjobs output for a job with that ID, finds the hostname of that node, and appends the domain. If the Master has not yet started, it will wait for 2 minutes 15x, and error out after that time period. If the master is found, it will set the MASTER variable, and then launch the worker with the Spark scripts, similar to the start_master_lsf.sh script above.

Launching clusters

Launching a master is trivial:

bsub -a"spark(master,)" arbitrarystring

Note that arbitrarystring will be replaced by the esub above with the appropriate launch script depending on the inputs in the -a flag.

To launch a worker, it is necessary to have the master’s jobid in order to name the job correctly so that the script can associate the worker(s) with the master:

bsub -a "spark(worker,)"-J W  arbitrarystring

Repeat as needed for more workers.

To submit Spark jobs against this cluster, set the $MASTER variable to spark://:7077, the $SPARK_HOME variable appropriately, and bsub the command either with spark-submit or pyspark (etc).

Later posts will cover a wrapper script that simplifies the process of setting up, running jobs on, and finally tearing down these Spark clusters for end users, as well as a tightly coupled version of Spark analogous to my first attempt to run Spark on Grid Engine.  I also plan to write a new post on the spark configuration options and tuning we currently use.