Running Spark clusters as jobs on LSF

While Spark is often run on Mesos or YARN separate from a traditional HPC scheduler (and sometimes alongside them via a super-scheduler), our requirement was to run Spark clusters on top of an HPC cluster. Previous posts have discussed running Spark on top of a Univa Grid Engine cluster.

This article deals with running Spark clusters on top of IBM Spectrum LSF. Spark is configured in standalone mode, and individual masters and their workers are submitted as separate jobs to the LSF scheduler. From there, jobs can be submitted to LSF that act as Spark drivers connected to those Spark clusters.

LSF Settings

  1. Define the hostgroup in LSB_CONFDIR/cluster_name/configdir/lsb.hosts:
    Begin HostGroup GROUP_NAME  GROUP_MEMBER                        CONDENSE    #GROUP_ADMIN    # Key words spark       (c0[1- 8]u0[1-9] c0[1- 8]u[10-32])    Y End HostGroup

    This is optional, but it makes things easier in later config files. In this case, the spark hostgroup is defined as covering nodes number c01u01 through c08u32.

  2. Define the queue in LSB_CONFDIR/cluster_name/configdir/lsb.queues:
    Begin Queue QUEUE_NAME = spark priority = 40 HOSTS = spark DESCRIPTION = spark nodes EXCLUSIVE = Y JOB_CONTROLS = TERMINATE[SIGTERM] POST_EXEC = /PATH/TO/POSTSCRIPTS/postscript_spark_lsf.sh RUNLIMIT = 8:00 730:00 USERS        = allusers End Queue

    Note the JOB_CONTROLS section. By default, LSF will send SIGUSER2 to a job that is being killed. The spark daemons generate core dumps if sent this, so they need to be sent a SIGTERM. I have also set a default runlimit of 8 hours, with a maximum of 730 hours (user defined)

  3. Add the following to $LSF_TOP/conf/lsf.conf:
    LSB_SUB_COMMANDNAME=Y

    This allows for substitution of the command name in a bsub via an esub.

esub configuration

Define an esub for spark in LSF_SERVERDIR/esub.spark:

#!/bin/bash
#$RCSfile$Revision$Date$

# Redirect stderr to stdout so echo can be used for error
exec 1>&2

if [ -z "$LSB_SUB_PARM_FILE" ]; then
    # if not set do nothing
    exit 0
fi

. $LSB_SUB_PARM_FILE

if [ "$LSB_SUB_ADDITIONAL" = "" ]; then
    exit 0
fi

case $1 in
    "master")
        TYPE="master"
        echo 'LSB_SUB_JOB_NAME = "master"' >> $LSB_SUB_MODIFY_FILE
        ;;
    "worker")
        TYPE="worker"
        if [[ $LSB_SUB_JOB_NAME != W* ]] ; then
            echo "Wrong job name for a Spark worker. Please submit with the job name W"
            exit $LSB_SUB_ABORT_VALUE
        fi
        ;;
    *)
        echo '"Invalid job type. Please use bsub -a "spark(master)" or -a "spark(worker)"'
        exit $LSB_SUB_ABORT_VALUE
        ;;
esac

case $2 in
    "current"|"")
        VERSION="current"
        ;;
    "rc")
        VERSION="rc"
        ;;
    "test")
        VERSION="test"
        ;;
    "2")
        VERSION="2"
        ;;
    *)
        exit $LSB_SUB_ABORT_VALUE
        ;;
esac

if [[ -z $SPARK_LOG_DIR ]]; then
    export SPARK_LOG_DIR=$HOME/.spark/logs/$(date +%H-%F)
fi
echo "SPARK_LOG_DIR = $SPARK_LOG_DIR" >> $LSB_SUB_MODIFY_ENVFILE
mkdir -p $SPARK_LOG_DIR/lsf

COMMANDLINE="/misc/local/spark-versions/bin/start_${TYPE}_lsf.sh $VERSION"
echo "LSB_SUB_COMMAND_LINE = \"$COMMANDLINE\"" >> $LSB_SUB_MODIFY_FILE
echo 'LSB_SUB_MAX_NUM_PROCESSORS = 16' >> $LSB_SUB_MODIFY_FILE
echo "LSB_SUB_OUT_FILE = \"$SPARK_LOG_DIR/lsf/%J.out\"" >> $LSB_SUB_MODIFY_FILE
echo 'LSB_SUB_QUEUE = "spark"' >> $LSB_SUB_MODIFY_FILE

exit 0

The two case statements interpret the options submitted in the -a flag; first chooses between master or worker, second is version type.

The worker option also tests to make sure that the worker is named appropriately for the launch scripts to work. The format is W

COMMANDLINE defines the actual command to be run; in the bsub one must include a command, but it can be an arbitrary string; this replaces it.

Site definitions and assumptions for this file:

  • Multiple versions are available and their launch scripts are named in the same way such that the version can be substituted in (e.g. start_master_lsf.sh 1_6)
  • Each node has 16 cores (LSB_SUB_MAX_NUM_PROCESSORS line)
  • STDOUT/STDERR is logged to a file named .out under the user’s home directory

Launch Scripts

The launch scripts are referred to in the esub above. I have revised these significantly since my Grid Engine/sparkflex post, and they’re a lot smarter now.

start_master_lsf.sh:

#!/bin/bash                                                                                              
VERSION=$1
if [[ -n ${SPARK_LOG_DIR} ]]; then
  sparklogdir="${SPARK_LOG_DIR}"
else
  sparklogdir=~/.spark/logs/`date +%H-%F`
fi
mkdir -p $sparklogdir
export SPARK_HOME=/PATH/TO/spark-$VERSION    
. $SPARK_HOME/sbin/spark-config.sh    
. $SPARK_HOME/bin/load-spark-env.sh    
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master --host `hostname`

The VERSION is set by the esub (above).  sparklogdir defines the log location for all logs for this job, which is set similarly in the esub and in the spark conf directories. The SPSPARK_HOME is the path to the Spark install. Note that for the script to work properly, the versions must be named similarly. The SPARK_HOME/sbin and bin commands specify scripts provided by the Spark install.

start_worker_lsf.sh

#!/bin/bash                                                                                                                               
MasterID=`echo $LSB_JOBNAME | cut -c2-`
echo $MasterID

VERSION=$1

n=0
while [ $n -lt 15 ]; do
    n=$(($n + 1))
    MasterFQDN=`bjobs -r -noheader -X | awk "/$MasterID/ && /master/ "'{sub(/16\*/,""); print $6".fully.qualified.domain"}'`
    echo $MasterFQDN
    if [ ! -z $MasterFQDN ]; then
        break
    fi
    echo "No running master. Retrying..."
    sleep 120
done

if [ -z $MasterFQDN ]; then
    echo "No Master running. Please resubmit worker."
    exit 1
fi

MASTER="spark://${MasterFQDN}:7077"
export SPARK_HOME=/PATH/TO/spark-$VERSION
. $SPARK_HOME/sbin/spark-config.sh
. $SPARK_HOME/bin/load-spark-env.sh
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 $MASTER 2>&1

This script obtains the jobID of the master from the name of the worker job. At that point, it searches the bjobs output for a job with that ID, finds the hostname of that node, and appends the domain. If the Master has not yet started, it will wait for 2 minutes 15x, and error out after that time period. If the master is found, it will set the MASTER variable, and then launch the worker with the Spark scripts, similar to the start_master_lsf.sh script above.

Launching clusters

Launching a master is trivial:

bsub -a"spark(master,)" arbitrarystring

Note that arbitrarystring will be replaced by the esub above with the appropriate launch script depending on the inputs in the -a flag.

To launch a worker, it is necessary to have the master’s jobid in order to name the job correctly so that the script can associate the worker(s) with the master:

bsub -a "spark(worker,)"-J W  arbitrarystring

Repeat as needed for more workers.

To submit Spark jobs against this cluster, set the $MASTER variable to spark://:7077, the $SPARK_HOME variable appropriately, and bsub the command either with spark-submit or pyspark (etc).

Later posts will cover a wrapper script that simplifies the process of setting up, running jobs on, and finally tearing down these Spark clusters for end users, as well as a tightly coupled version of Spark analogous to my first attempt to run Spark on Grid Engine.  I also plan to write a new post on the spark configuration options and tuning we currently use.

Advertisements

New Spark configuration suggestions

I know that a fair amount of people still look at the configuration suggestions I made way back when here. Since then, we’ve made a number of changes to improve performance and reliability. We’ve moved a lot of the memory and core configurations out of the spark-env.sh script into the spark-defaults.conf, which allows them to be overridden by advanced users and is generally a better place to put those things. We’re also working on limiting the number of cores used by spark and the amount of memory in order to leave some space for OS and background processes, (hopefully preventing runaway load problems we’ve been fighting for a long time).

Upcoming changes will, I believe, provide even better performance and stability. We will be moving the spark.local.dirs from a single, slow HDD to 3x SSDs (consumer grade, but generally good performance/$). This should prevent the memory filling up from the reduce phase outrunning the disk. I strongly recommend making this change in your environment, if at all possible.

The new spark-defaults.conf:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
# spark.driver.memory              5g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#########################################################################
spark.akka.timeout=300s 
spark.rpc.askTimeout=300s 
spark.storage.blockManagerHeartBeatMs=30000
spark.rpc.retry.wait=30s 
spark.kryoserializer.buffer.max=1024m
spark.core.connection.ack.wait.timeout=600s
spark.driver.maxResultSize=0
spark.python.worker.memory=1536m
spark.driver.memory=70g
spark.executor.memory=25g
spark.executor.cores=5
spark.akka.frameSize=2047

The identical two timeouts are because under Spark 1.6 and newer, akka is deprecated (removed entirely in 2.0.0, I believe), and without it, our workers would repeatedly drop out of the cluster.

The spark.python.worker.memory is crucial to set to a relatively small value, because pyspark operates outside the jvm(s) that is/are configured with the spark.executor.memory value. So in our environment, with 128GB nodes, we need to keep the total memory usage below 120GB. The 1536MB isn’t a hard limit, but it is the point where the python processes are instructed to start dumping their memory to disk. Depending on the application, it might be more advantageous to use less executor.memory and more python.worker.memory.

spark.executor.cores determines the amount of cores allocated to that jvm and, thus, how many spark executors will be running on the worker node. In this case, it will be 3 executors with one core left free for system processes.

The spark-env.sh file is now very trimmed down from the original (created back in the 0.7 days…):


#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

ulimit -n 65535
export SCALA_HOME=/usr/local/scala-2.10.3

export SPARK_WORKER_DIR=/scratch/spark/work
export JAVA_HOME=/usr/local/jdk1.8.0_45
export SPARK_LOG_DIR=~/.spark/logs/$(date +%H-%F)/
export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true

###################################
#set disk for shuffle and spilling
#Local hdd 
export SPARK_LOCAL_DIRS=/scratch/spark/tmp

#3 local ssds
#export SPARK_LOCAL_DIRS=/scratch-ssd1/sparklocaldir,/scratch-ssd2/sparklocaldir,/scratch-ssd3/sparklocaldir

#################################

export PYSPARK_PYTHON=/usr/local/python-2.7.6/bin/python
export SPARK_SLAVES=/scratch/spark/tmp/slaves
export SPARK_SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=30"
export SPARK_PUBLIC_DNS=$HOSTNAME

## pull in users environment variables on the workers to that PYTHONPATH will transfer
source $HOME/.bash_profile

Basically all that we’re doing is selecting the versions of java, scala, and python to be used by default, allocating the location for the list of slaves (may not even be necessary with sparkflex), and setting the various local directories. Note the two options between local HDD and the 3 SSDs. These paths will of course be different in your environment. You’ll also need to make sure that the spark log directory already exists.

SparkFlex launch script

In my last update, I wrote about what I call SparkFlex, a better way than my old method of launching Spark jobs on a Grid Engine based HPC cluster. While that will work as is, I’ve also written a Python script called sparklaunch.py that will make spinning up a cluster much easier and more controlled. It is called as such:

spark-deploy.py [-h, --help] [-w --workers] [-j --jobid] [-M --master] [-s --stopmaster] [-r --removeworkers] [-v --version] [-M --master] [-f] [-t --runtime]

The job will launch, optionally, a master and any number of workers (including zero). It can also be used to add additional workers or shut down existing workers, including optional selection of which node to shut down. It can also be used to tear down the whole cluster. It can be used with multiple clusters launched by a single user, and tracks each cluster by either the job id or hostname of the master and will take an option that sets a hard runtime for the cluster.
Here’s the code:

####sparklaunch.py
####requires Python 2.7.x
#!/usr/bin/env python
import os
import sys
import getopt
import subprocess
import traceback
import xmltodict
import time

def getqstat():
    alljobs = []
    templist = []
    process = subprocess.Popen("qstat -xml", shell=True, stdout=subprocess.PIPE)
    rawxml = process.communicate()[0]
    xmldict = xmltodict.parse(rawxml)
# qstat -xml puts pending and running in separate namespaces. This code appends the two together. qstat -xml also outputs in unicode, so that needs to be parsed properly. 
    for qkey in xmldict[u'job_info']:
        if qkey == u'@xmlns:xsd':
            continue
        else:
            if xmldict[u'job_info'][qkey]:
# necessary because xmltodict passes single entry lists as just the entry rather than a list
                if type(xmldict[u'job_info'][qkey][u'job_list']) is not list:
                    templist.append(xmldict[u'job_info'][qkey][u'job_list'])
                else:
                    templist = xmldict[u'job_info'][qkey][u'job_list']
                alljobs = alljobs + templist
    return alljobs

####Use your own JC variables 
def startmaster(sparktype, runtime):
    if sparktype == "":
         sparktype = "default"
    elif sparktype == "rc":
        sparktype = "master-rc"
    elif sparktype == "test":
        sparktype = "master-test"
    if runtime != "default":
        options = "sparkflex.{} -l spark_exclusive=1,h_rt={}".format(sparktype, runtime)
    else:
        options = "sparkflex.{}".format(sparktype)
    process = subprocess.Popen(["qsub -jc {}".format(options)], shell=True, stdout=subprocess.PIPE)
    rawout = process.communicate()
    try:
        masterjobid = rawout[0].split(" ")[2]
    except:
        sys.exit(1)
    print "Master submitted. Job ID is {}".format(masterjobid)
    return masterjobid

def startworker(sparktype,masterjobid):
    masterfound = False
    rawout = getqstat()
    for i in range(len(rawout)):
        if masterjobid in str(rawout[i][u'JB_job_number']):
            masterfound = True
            break
    if not masterfound:
        print "No master with the job id {} running or queued. Please submit master first.".format(masterjobid)
        sys.exit()
    if sparktype == "rc":
        sparktype = "-rc"
    if sparktype == "test":
        sparktype = "-test"
    command = "qsub -jc sparkflex.worker{} -N W{}".format(sparktype,masterjobid)
    os.system(command)

def qdeljob(jobid):
    command = "qdel {}".format(jobid)
    os.system(command)

def stopworker(masterjobid, terminatew, workerlist, skipcheckstop):
    workername = "W{}".format(masterjobid) 
    rawout = getqstat()
    jobtokill = ""
    statuses = {}
    for i in range(len(rawout)):
        unpack=rawout[i]
        jobid=str(unpack[u'JB_job_number'])
        if 'sparkflex.worker' in str(unpack[u'jclass_name']) and str(masterjobid) in str(unpack[u'JB_name']):
            statuses[jobid] = str(unpack[u'@state']) 
    if "pending" in statuses.values() and not skipcheckstop:
        terminatew = raw_input("Terminate waiting job(s) first? (y/n) ")

    for wjobid in statuses.keys():
        if statuses[wjobid] == 'pending' and terminatew == 'y':
            jobtokill = wjobid
            break
        elif statuses[wjobid] == 'running' and not skipcheckstop:
            jobtokill = selectionlist(workerlist, 'worker')
            break
        else: 
            jobtokill = wjobid
            break
    try:
        if jobtokill != "":
            qdeljob(jobtokill)
            time.sleep(3)
    except:
        print "No running or waiting workers found corresponding to Master job {}".format(masterjobid)
        sys.exit()
    return terminatew

def checkstop(inval, jobtype):
    if jobtype == "master":
        checkstop = raw_input("Stop master with job id {} (y/n):".format(inval))
    else: 
        checkstop = raw_input("Remove {} workers? (y/n):".format(inval))
    if checkstop != "y":
        print "No workers will be removed."
        sys.exit()
    else:
        return True

def mastername_to_jobid(mastername):
    masterfound = False
    masterlist = findmaster()
    for jobid in masterlist.keys():
        if mastername in masterlist[jobid][0]:
            masterfound = True
            break
    if masterfound:
        return jobid
    else:
        print "Node {} is not running a spark master job. If deleting workers alone, please use Master's Job ID".format(mastername)
        sys.exit()

def findmaster():
    rawout = getqstat()
    jobid = ""
    masterlist = {}
    for i in range(len(rawout)):
        unpack = rawout[i]
        jobid = str(unpack[u'JB_job_number'])
        if "sparkflex" in str(unpack[u'jclass_name']) and "master" in str(unpack[u'JB_name']):
            masterlist[jobid] = (str(unpack[u'queue_name']).replace('spark.q@',''),str(unpack[u'@state']))
    if len(masterlist.keys()) != 0:
        return masterlist
    else:
        print "No masters found. If deleting workers alone, please use Master's Job ID."
        sys.exit()

def findworker(masterjobid):
    rawout = getqstat()
    jobid = ""
    workername = "W{}".format(masterjobid)
    workerlist = {}
    for i in range(len(rawout)):
        unpack = rawout[i]
        jobid = str(unpack[u'JB_job_number'])
        if "sparkflex.worker" in str(unpack[u'jclass_name']) and workername in str(unpack[u'JB_name']):
            workerlist[jobid] = (str(unpack[u'queue_name']).replace('spark.q@',''),str(unpack[u'@state']))
    return workerlist

def checkmaster(masterjobid):
    rawout = getqstat()
    mastername = ""
    for i in range(len(rawout)):
        unpack = rawout[i]
        if masterjobid in str(unpack[u'JB_job_number']) and "running" in str(unpack[u'@state']):
            mastername = str(unpack[u'queue_name']).replace("spark.q@","")
            break
    if mastername == "":
        instruction = raw_input("Master is not running yet. (w)ait then retry in 1 minute, e(x)it this application, or (t)erminate master?" )
        if instruction == "w":
            time.sleep(60)
            checkmaster(masterjobid)
        elif instruction == "x":
            sys.exit()
        elif instruction == "t":
            qdeljob(masterjobid)
            sys.exit()
    return mastername

def qdelmaster(masterjobid, skipcheckstop):
    jobtype = "master"
    workername = "W{}".format(masterjobid)
    if not skipcheckstop:
         skipcheckstop = checkstop(masterjobid,jobtype)
    if skipcheckstop:
        try:
            qdeljob(masterjobid)
        except:
            print "Master with job id {} not found".format(masterjobid)
            sys.exit(1)
        try:
            qdeljob(workername)
        except:
#            traceback.print_exc()
            print "Workers for master with job id {} failed to stop".format(masterjobid)
            sys.exit(1)
    else:
        print "As requested, master not stopped."
        sys.exit(0)

def getworkercount(masterjobid):
    rawout = getqstat()
    workers = 0
    for i in range(len(rawout)):
        unpack = rawout[i]
        if masterjobid in str(unpack[u'JB_name']):
            workers = workers + 1
    return workers
            

def selectionlist(joblist, jobtype):
    i = 0
    selectionlist = {}
    print "Select {} to kill from list below:".format(jobtype)
    for jobid in joblist.keys():
        i = i + 1
        host, status = joblist[jobid]
        selectionlist[i] = jobid
        print "{}) Host: {} JobID: {} Status: {}".format(i, host, jobid, status)
    while True:
        selection = int(raw_input("Selection? "))
        if selection <= i:
            jobid = selectionlist[selection]
            skipcheckstop = True
            break
        else:
            print "Invalid selection."
    return jobid

def usage():
    print "spark-deploy.py [-h, --help] [-w --workers] [-j --jobid] [-M --master] [-s --stopmaster] [-r --removeworkers] [-v --version] [-M --master] [-f] [-t --runtime]"
    print "-h, --help: print this help."
    print "-w, --workers: number of workers to start."
    print "-j, --jobid: Job ID of running Master."
    print "-M, --master: Hostname of running Master."
    print "-s, --stopmaster: Stop running Master. Requires Master Job ID or Master hostname."
    print "-r, --removeworkers: number of workers to stop, requires Master Job ID or Master hostname."
    print "-f: Do not ask for confirmation of commands or present choice of jobs"
    print "-t, --runtime: Change the default time at which the Spark cluster will self-terminate in format hh:mm:ss"
    print "-v, --version: version of Spark to run (optional, permissible options rc, test)."
    print "If run with no options, will start a master with no workers. To start a master with workers, use spark-deploy.py -w "

def main(argv):
# Booleans to make this whole mess work right
    stopw = False
    stopm = False
    skipcheckstop = False
    masternamed = False
    needmaster = True
    novers = True
    runtime = "default"

# Get user options
    try:
        opts, args = getopt.getopt(argv,"fshw:v:j:r:M:t:",["workers=","version=","jobid=", "removeworkers=", "stopmaster=", "help", "master=", "runtime="])
    except getopt.GetoptError:
        usage()
        sys.exit(2)
   
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
            sys.exit()
        elif opt in ("-v", "--version"):
            sparktype = arg
            novers = False
            if sparktype not in (rc, test):
                usage()
                sys.exit(1)
        elif opt in ("-w", "--workers"):
            workers = int(arg)
        elif opt in ("-j", "--jobid"):
            masterjobid = arg
            needmaster = False
        elif opt in ("-r", "--removeworkers"):
            workers = int(arg)
            stopw = True
        elif opt in ("-s", "--stopmaster"):
            stopm = True
        elif opt == "-f":
            skipcheckstop = True
        elif opt in ("-M", "--master"):
            masterjobid = mastername_to_jobid(arg)
            needmaster = False
        elif opt in ("-t", "--runtime"):
            runtime = arg
    
    if novers:
        sparktype = ""

# Stop running master
    if stopm:
        try:
            masterjobid
        except NameError:
            masterlist = findmaster()
            masterjobid = selectionlist(masterlist,'master')
            skipcheckstop = True
        qdelmaster(masterjobid, skipcheckstop)
        try:
            os.remove("tmp{}.sh".format(masterjobid))
            skipcheckstop = True
        except:
            print "tmp{}.sh not found. Please delete manually in the directory from which you started the spark job."

# Stop worker(s)
    if stopw:
        jobtype = "worker"
        terminatew = ""
        try:
            for i in range(workers):
                workerlist = findworker(masterjobid)
                terminatew = stopworker(masterjobid, terminatew, workerlist, skipcheckstop)
        except NameError:
            print "JobID or name of master required to stop worker."
#            traceback.print_exc()
            sys.exit()

# Either way, we're done here
    if stopm or stopw:
        sys.exit()

# Start master and create temporary script for the user to use which contains the necessary environment variables to communicate with the Spark cluster. 
 
    if needmaster:
        masterjobid = startmaster(sparktype, runtime)

# Sleep is necessary to make sure the scheduler has time to spin up the job. You may need to adjust this based on your scheduler configuration. 

        time.sleep(2)
        mastername = str(checkmaster(masterjobid))
        filename = "tmp{}.sh".format(masterjobid)
        if sparktype == "default":
            version = "current"
        else:
            version = sparktype
        with open(filename, 'w') as f:
            f.write(
                    "#!/bin/bash\n"
                    "export MASTER=spark://{}:7077\n"
                    "export SPARK_HOME=/usr/local/spark{}\n"
                    "export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH\n".format(mastername,version)
                    )

# Start workers and exit if not starting master 
    try:
        for i in range(workers):
            startworker(sparktype, masterjobid)
    except:
        print "No workers specified or other error"
#        traceback.print_exc() #debug line
        sys.exit()
    if not needmaster:
        sys.exit()

# Print out Master name and addresses. 
    print "Master's name is {}. WebUI is at http://{}:8080. The $MASTER environment variable can be set to spark://{}:7077 manually or by running . tmp{}.sh".format(mastername, mastername, mastername, masterjobid)

if __name__ == '__main__':
    main(sys.argv[1:])

 

SparkFlex, aka running Spark jobs on a Grid Engine cluster, take II

I’ve left everyone who looks at my Spark posts hanging for months with a teaser about a new method I’ve been working on. This is the first of my posts about this.

The high level basics on this is that it’s better to run Spark as independent jobs rather than a monolithic multi-node job. That way if the HPC cluster is heavily used, at least PART of the Spark cluster can get scheduled and work can begin (assuming that not all nodes are strictly necessary for the work to be successful). Additionally, nodes can be added or removed from the cluster as needed.

This is fairly easily achieved by running the Spark master as one job, then getting the Master’s url and submitting each worker as an independent job with the MASTER variable set (and things configured like my previous example), ie

export MASTER=spark://<mastername>:7077
export SPARK_HOME=/PATH/TO/SPARK
qsub -pe spark 1 -jc spark -N sparkworker -q spark -b y "$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://$MASTER:7077"

But wait! There’s still a better way to do this, using job classes to a much greater extent. Note that you’ll need to change the paths to the scripts, name of queue, and exclusive complex name to fit your environment

qconf -sjc sparkflex
jcname          sparkflex
variant_list    default worker master-rc worker-rc master-test worker-test
owner           NONE
user_lists      NONE
xuser_lists     NONE
A               {+}UNSPECIFIED
a               {+}UNSPECIFIED
ar              {+}UNSPECIFIED
b               TRUE
binding         {+}UNSPECIFIED
c_interval      {+}UNSPECIFIED
c_occasion      {+}UNSPECIFIED
CMDNAME         /PATH/TO/SCRIPTS/start-flexmaster.sh, \
                [worker=/PATH/TO/SCRIPTS/start-flexworker.sh], \
                [master-rc=/PATH/TO/SCRIPTS/start-flexmaster-rc.sh], \
                [worker-rc=/PATH/TO/SCRIPTS/start-flexworker-rc.sh], \
                [master-test=/PATH/TO/SCRIPTS/start-flexmaster-test.sh], \
                [worker-test=/PATH/TO/SCRIPTS/start-flexworker-test.sh]
CMDARG          {+}UNSPECIFIED
ckpt            {+}UNSPECIFIED
ac              {+}UNSPECIFIED
cwd             .
dl              {+}UNSPECIFIED
e               {+}UNSPECIFIED
h               {+}UNSPECIFIED
hold_jid        {+}UNSPECIFIED
hold_jid_ad     {+}UNSPECIFIED
i               {+}UNSPECIFIED
j               TRUE
js              {+}UNSPECIFIED
l_hard          {-~}spark_exclusive=1,{-~}h_rt=43200
l_soft          {+}UNSPECIFIED
masterl         {+}UNSPECIFIED
m               {+}UNSPECIFIED
mbind           {+}UNSPECIFIED
M               {+}UNSPECIFIED
masterq         {+}UNSPECIFIED
N               master,[{~}worker={~}W],[{~}worker-rc={~}W], \
                [{~}worker-test={~}W]
notify          {+}UNSPECIFIED
now             {+}UNSPECIFIED
o               $HOME/.spark/ugelogs
P               {+}UNSPECIFIED
p               {+}UNSPECIFIED
pe_name         sparkflex
pe_range        1
q_hard          spark.q
q_soft          {+}UNSPECIFIED
R               {+}UNSPECIFIED
r               {+}UNSPECIFIED
rou             {+}UNSPECIFIED
S               {+}UNSPECIFIED
shell           {+}UNSPECIFIED
t               {+}UNSPECIFIED
tc              {+}UNSPECIFIED
V               TRUE
v               {+}UNSPECIFIED

As you can see, the JC specifies quite a few things about the job that had previously been in the qsub, including a hard runtime, the pe, the queue, and even the scripts. With the variant list, a single JC can be used for multiple kinds of spark jobs, including the master (default), the worker, and a couple of other variants of spark installed separately.

The master start scripts referenced in the CMDNAME line are pretty simple:


#!/bin/bash
# start-flexmaster.sh 
####CHANGE PATH TO SPARK INSTALL                                                                                            
export SPARK_HOME=/PATH/TO/SPARK   
. $SPARK_HOME/sbin/spark-config.sh    
. $SPARK_HOME/bin/load-spark-env.sh    
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master --host `hostname`

 

The worker is a bit more complicated, since it has to figure out where the master is from the job number provided:


#!/bin/bash  
# start-flexworker.sh                                                                                           
MasterID=`echo $JOB_NAME | cut -c2-`
echo $MasterID

n=0
while [ $n -lt 15 ]; do 
    n=$(($n + 1))
####USE YOUR NODES' DOMAIN FOR  AND THE CORRECT QUEUE NAME FOR YOUR ENVIRONMENT
    MasterFQDN=`qstat -s r | awk "/$MasterID/ && /master/ "'{sub(/spark.q@/,""); gsub(/\.i.*/,"."); print $8}'`
    echo $MasterFQDN
    if [ ! -z $MasterFQDN ]; then
        echo "No running master. Retrying..."
        break
    fi  
    sleep 120 
done

if [ -z $MasterFQDN ]; then
    echo "No Master running. Please resubmit worker."
    exit 1
fi

MASTER="spark://${MasterFQDN}:7077"
####USE YOUR PATH TO SPARK
export SPARK_HOME=/PATH/TO/SPARK
. $SPARK_HOME/sbin/spark-config.sh    
. $SPARK_HOME/bin/load-spark-env.sh    
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 $MASTER

 

So now the submission would be:


#MASTER
qsub -jc sparkflex.default
#WORKER
qsub -jc sparkflex.worker -N W<jobid of master>

 

Breaking this down, the start-flexworker.sh script first checks if there is a master running, and will keep checking every 2 minutes for 30 minutes (that’s that while loop at the top). It does this in the huge awk statement, which checks for the master job ID in the parsed qsub output and returns the master’s FQDN. After that, it then launches the worker as above, passing it the $MASTER variable.

I have written a python script that makes this even easier and includes some error checking, etc, but that’s a post for another day.

A Comedy of Errors, or How to recover (hopefully) from overwriting an entire system (part 2)

Part 1 contains the setup for this. Basic recap:

need to recover reformatted RAID 6 containing LVM with 4 partitions (/, /opt, swap, /data) on single VG/single PV.

After I stopped panicking and got a hot beverage, I started googling for how to deal with this. I came across TestDisk, which is a very power piece of software for recovering data from hard drives in various states of distress.

I needed a boot disk (since I’d overwritten that…), and TestDisk has a list of various ones. I chose Alt Linux Rescue, somewhat randomly, and pulled down the ISO. I used dd to put the ISO on a USB stick from my Mac:

dd if=/path/to/downloaded/rescue.iso of=/dev/&amp;amp;lt;disk number of USB stick&amp;amp;gt; bs=1M 

Then I went to the server, crossed my fingers, and (finally) convinced it to boot from the USB stick. (BIOS/EFI on Dell Rx20 vintage servers are a pain. Rx30 BIOS/EFI are soooo much better, not to mention a LOT faster). Note that the server booted in BIOS mode as we are stick in the muds and haven’t moved to EFI. #sysadmins.

I used the first option on the Rescue boot screen, which is Rescue LiveCD. This boots up without mounting any disks it findsregular-rescue

Note, I was unable to get the DHCP selection (seen above) to work on this system.

After booting, I ran TestDisk. TestDisk has excellent documentation, which I am liberally cadging from here.  I started by selecting Create a log file.

I then chose the RAID array from the list of disks and selected Proceed.

partition_table_type

Fortunately, it automatically recognized the partitioning as GPT and I was able to run Analyze on it.menus

At that point, it of course came back with the list of new partitions the kickstart had created. It did give me the option to do a Deeper Search. At this point I began to despair, because the Deeper Search, while finding all sorts of (possible?) partitions, was moving at a rather glacial rate. Do not despair at this point! I let it run for about 15-30 minutes and then gave up, hitting the Enter key to stop the search.

Let me say that again (since I didn’t figure it out until the second time around):

HIT THE ENTER KEY TO END THE DEEPER SEARCH.

This will NOT quit the program and lose the list, as hitting, oh, ESC or Cntl-C or what have you will.  It will drop out to interactive, where you can press p to list the data on a theoretical partition. It helps at this point to know what the format of the partition and about how large it was.

In my case, I had a list of possible partitions a mile long, so I started at the top and went through them, pressing p to see what was there (typically nothing) and then q to return to the list of partitions. Note, if you press  too often, it will drop out of the partition list and you’ll have to do the scan again. Nothing will be lost, at least.

When I discovered a partition, I wrote down the start and end, and then returned to the partition list and used right arrow to change that partition from Deleted to Primary.

I did NOT inspect any of the LVM partitions I found. This is key (and surprising). 

The first partition I found, naturally, was /. I then found /opt, ignored the swaps, and went looking for the big /data partition (~23.5TB). That, however, was formatted xfs, and TestDisk can’t list files on XFS. So that was a guessing game. There were two XFS partitions. However, one of them started immediately after the end of the /opt partition I found (remember that writing down the Start and End?), so I took a gamble and chose that one to change from D to P.

Taking a deep breath after I had found the 3 partitions I cared about, I pressed Enter to proceed to the next step, which looks sort of like this:

menu_write

I selected Write to write the recovered partition structure to disk. TestDisk told me I would have to reboot for the change to take effect. I backed out through the menus and attempted recovery on the SSD. Unfortunately, that didn’t work. However, since I had a backup of that, I didn’t really care much.

After reboot into TestDisk, I was presented with the option to mount the disks it found either read/write (normal) or read-only (forensic mode). I chose the forensic mode, and it mounted the partitions under /mnt. It indeed had the /, /opt, and /data, all of which had the correct files!

HOWEVER, they were not LVs any more. They had been converted to regular partitions, which was rather nice, since it simplified my life a great deal, not having to try to recover the LVs.

After verifying that it was there after a second reboot (and an aborted attempt to back up the 8TB of data on the disks–24 hrs was far too long on a rebuild I told the user would take a couple of hours), I bit the bullet and imaged the server using the correct kickstart/pxe combination.

At the disk partitioning screen, I was able to confirm that the 3 recovered partitions were present on the RAID6. I changed the mount points, set up the LVMs on the new boot RAID1, and ran the installation.

Unfortunately, it still didn’t boot.

It turns out that Dell PERC raid cards must have a single array designated as boot, and they will not boot if it is not so designated. This was doubly weird, because the MBR stub on the RAID6 was still present, and it kept trying to run grub, only to have (hd2,0) identified as the only one with a grub.conf on it.

Fix was in the BIOS (ok, fine UEFI console) under Integrated Devices->Dell PERC H710->Controller Configuration. Selected the third array, and I was in business!

After the fresh install booted up, my 3 recovered partitions were mounted where I had designated them in the installer.

 

 

A Comedy of Errors, or How to recover (hopefully) from overwriting an entire system (part 1)

Here’s hoping this’ll help someone else.

Yesterday, I was tasked with rebuilding the OS on a server that is fairly important and holds a lot of data that the developers who run it take responsibility for backing up.

Normally, we would toss rebuilds over to a less senior member of the team, but because of the abnormal requirements, my boss gave it to me to do.

In our environment, we typically nuke and pave servers when we rebuild them. Generally all data is kept on shared storage, so no big deal. In this case, the developers wanted to build their own storage. Fine, sez us, buy a Dell R720xd, and we’ll put our standard Scientific Linux 6.x build on it, and have fun. This was a few years ago, and I had specced it out with 8 4TB 7200RPM SAS drives and no separate boot drives to hit their budget number. (mistake #1)

But a few days ago, one of them had too much fun, and managed to torch the rpm database. My boss worked on it for several hours, and we came to the conclusion it needed to be rebuilt. Enter me. I backed up the logging SSD with a few hundred gigs of data on it, backed up the one directory in /opt that the dev said they needed, and the dev told us not to bother backing up the 8TB of data in the 24TB partition of the RAID6 that held both OS and data. The dev assured us he had taken a backup last night.

My plan was to put in a couple of new drives in the R720xd and put the OS on those, and then later expand the /data partition over the old OS partitions (/boot, /, swap, and /opt)

We image servers using pxe and kickstart, and with a few exceptions, our kickstarts set up to erase the MBR, create new partitions, and put LVM volumes on them before starting the install. We have a few outliers which are set up to ignore a certain drive, or to do manual partitioning.

What we didn’t have was a Scientific Linux 6.7 kickstart that did either. So I copied over a 6.6 one, did a search/replace for 6.6/6.7, and had me a 6.7 normal kickstart. Copied that, commented out all the formatting/erasing lines, and Bob’s your uncle.

When I went to change the pxeboot files, that’s where I ran into trouble. My coworker who used to maintain this stuff recently left, and I was a titch fuzzy on how to set up the pxeboot stuff. So I copied over a 6.6, did the same thing as above, and I figured I was good. Here’s where I screwed up. In my manual partitioning pxelinux.cfg for 6.7, I forgot to actually call the manual partitioning kickstart. DUR.

I fire off the deployment script and wander out to the datacenter to do the manual partitioning. To my horror, I see the script erasing /dev/sdc, /dev/sdd, /dev/sde… and creating new LVMs. I hit ctl-alt-del before it started writing actual data to the drives, but not before it had finished the repartitioning and LVM creation.

So to recap:

Server had 4 LVMs set up for / (ext4), /opt (ext4), /data (xfs, and that’s the important one) and swap on a hardware RAID 6, set up with GPT on a single physical volume, single volume group.

The kickstart overwrote the GPT table and then created /boot, and new LVMs (/, a very large /opt, swap) in a single physical volume/single volume group. It also overwrote the SSD (separate volume) and probably the new disks I put in for the OS.

Realizing that recovery from the backup of /data was possibly a non-starter, my boss and I decided the best thing for me to do was try to recover the data in place on the RAID 6.

On to part 2…

Isilon: Remove or Add a Node from/to Multiple Network Pools

It’s a pain to change networks in bulk in an Isilon cluster, particularly if it’s a complex environment. Adding a new node that will be serving multiple network pools in the same subnet is particularly time consuming. Similarly, tracking down all the interfaces and pools a node is in in order to remove it for maintenance or other purposes can be messy.

This script takes the node’s number as the first input and add or remove as the second. It checks if the interface is active before performing any operations on it.

 
#!/bin/bash
node=$1
operation=$2

#check if node number is valid
if [ "$(isi_nodes -n$node %{name})" != "clustername-$node" ]; then
 echo "Not a valid node"
 exit
fi

#check if operation is either add or remove
if [ "$operation" != add -a "$operation" != remove ]; then
 echo "Not a valid operation: $operation"
 exit
fi


#function to check if the interface's connection is active. 
check_ifaces_active(){
 isi_for_array -n$node "ifconfig $iface" | awk '/active/ {print 1}'
}

#function to perform the operation on the interface for a set of pools
operate_interfaces() {
 echo $isi_iface
 isi networks modify pool --$operation-ifaces=$node:$isi_iface subnet2:pool4-synciq
 sleep 5
 isi networks modify pool --$operation-ifaces=$node:$isi_iface subnet2:pool0
 sleep 5
 isi networks modify pool --$operation-ifaces=$node:$isi_iface subnet2:pool2
 sleep 5
}

check both 10GbE interfaces
for iface in bxe0 bxe1; do
 if [ "$(check_ifaces_active)" = "1" ]; then
  if [ $iface = bxe0 ]; then
   isi_iface=10gige-1 #Isilon uses different interface naming schemes for different things...
  elif [ $iface = bxe1 ]; then
   isi_iface=10gige-2
  else
   print "Something went wrong"
   exit
  fi
  operate_interfaces
  isi_for_array -n$node "ifconfig $iface"
 fi
done