New Spark configuration suggestions

I know that a fair amount of people still look at the configuration suggestions I made way back when here. Since then, we’ve made a number of changes to improve performance and reliability. We’ve moved a lot of the memory and core configurations out of the spark-env.sh script into the spark-defaults.conf, which allows them to be overridden by advanced users and is generally a better place to put those things. We’re also working on limiting the number of cores used by spark and the amount of memory in order to leave some space for OS and background processes, (hopefully preventing runaway load problems we’ve been fighting for a long time).

Upcoming changes will, I believe, provide even better performance and stability. We will be moving the spark.local.dirs from a single, slow HDD to 3x SSDs (consumer grade, but generally good performance/$). This should prevent the memory filling up from the reduce phase outrunning the disk. I strongly recommend making this change in your environment, if at all possible.

The new spark-defaults.conf:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
# spark.driver.memory              5g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#########################################################################
spark.akka.timeout=300s 
spark.rpc.askTimeout=300s 
spark.storage.blockManagerHeartBeatMs=30000
spark.rpc.retry.wait=30s 
spark.kryoserializer.buffer.max=1024m
spark.core.connection.ack.wait.timeout=600s
spark.driver.maxResultSize=0
spark.python.worker.memory=1536m
spark.driver.memory=70g
spark.executor.memory=25g
spark.executor.cores=5
spark.akka.frameSize=2047

The identical two timeouts are because under Spark 1.6 and newer, akka is deprecated (removed entirely in 2.0.0, I believe), and without it, our workers would repeatedly drop out of the cluster.

The spark.python.worker.memory is crucial to set to a relatively small value, because pyspark operates outside the jvm(s) that is/are configured with the spark.executor.memory value. So in our environment, with 128GB nodes, we need to keep the total memory usage below 120GB. The 1536MB isn’t a hard limit, but it is the point where the python processes are instructed to start dumping their memory to disk. Depending on the application, it might be more advantageous to use less executor.memory and more python.worker.memory.

spark.executor.cores determines the amount of cores allocated to that jvm and, thus, how many spark executors will be running on the worker node. In this case, it will be 3 executors with one core left free for system processes.

The spark-env.sh file is now very trimmed down from the original (created back in the 0.7 days…):


#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

ulimit -n 65535
export SCALA_HOME=/usr/local/scala-2.10.3

export SPARK_WORKER_DIR=/scratch/spark/work
export JAVA_HOME=/usr/local/jdk1.8.0_45
export SPARK_LOG_DIR=~/.spark/logs/$(date +%H-%F)/
export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true

###################################
#set disk for shuffle and spilling
#Local hdd 
export SPARK_LOCAL_DIRS=/scratch/spark/tmp

#3 local ssds
#export SPARK_LOCAL_DIRS=/scratch-ssd1/sparklocaldir,/scratch-ssd2/sparklocaldir,/scratch-ssd3/sparklocaldir

#################################

export PYSPARK_PYTHON=/usr/local/python-2.7.6/bin/python
export SPARK_SLAVES=/scratch/spark/tmp/slaves
export SPARK_SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=30"
export SPARK_PUBLIC_DNS=$HOSTNAME

## pull in users environment variables on the workers to that PYTHONPATH will transfer
source $HOME/.bash_profile

Basically all that we’re doing is selecting the versions of java, scala, and python to be used by default, allocating the location for the list of slaves (may not even be necessary with sparkflex), and setting the various local directories. Note the two options between local HDD and the 3 SSDs. These paths will of course be different in your environment. You’ll also need to make sure that the spark log directory already exists.

SparkFlex launch script

In my last update, I wrote about what I call SparkFlex, a better way than my old method of launching Spark jobs on a Grid Engine based HPC cluster. While that will work as is, I’ve also written a Python script called sparklaunch.py that will make spinning up a cluster much easier and more controlled. It is called as such:

spark-deploy.py [-h, --help] [-w --workers] [-j --jobid] [-M --master] [-s --stopmaster] [-r --removeworkers] [-v --version] [-M --master] [-f] [-t --runtime]

The job will launch, optionally, a master and any number of workers (including zero). It can also be used to add additional workers or shut down existing workers, including optional selection of which node to shut down. It can also be used to tear down the whole cluster. It can be used with multiple clusters launched by a single user, and tracks each cluster by either the job id or hostname of the master and will take an option that sets a hard runtime for the cluster.
Here’s the code:

####sparklaunch.py
####requires Python 2.7.x
#!/usr/bin/env python
import os
import sys
import getopt
import subprocess
import traceback
import xmltodict
import time

def getqstat():
    alljobs = []
    templist = []
    process = subprocess.Popen("qstat -xml", shell=True, stdout=subprocess.PIPE)
    rawxml = process.communicate()[0]
    xmldict = xmltodict.parse(rawxml)
# qstat -xml puts pending and running in separate namespaces. This code appends the two together. qstat -xml also outputs in unicode, so that needs to be parsed properly. 
    for qkey in xmldict[u'job_info']:
        if qkey == u'@xmlns:xsd':
            continue
        else:
            if xmldict[u'job_info'][qkey]:
# necessary because xmltodict passes single entry lists as just the entry rather than a list
                if type(xmldict[u'job_info'][qkey][u'job_list']) is not list:
                    templist.append(xmldict[u'job_info'][qkey][u'job_list'])
                else:
                    templist = xmldict[u'job_info'][qkey][u'job_list']
                alljobs = alljobs + templist
    return alljobs

####Use your own JC variables 
def startmaster(sparktype, runtime):
    if sparktype == "":
         sparktype = "default"
    elif sparktype == "rc":
        sparktype = "master-rc"
    elif sparktype == "test":
        sparktype = "master-test"
    if runtime != "default":
        options = "sparkflex.{} -l spark_exclusive=1,h_rt={}".format(sparktype, runtime)
    else:
        options = "sparkflex.{}".format(sparktype)
    process = subprocess.Popen(["qsub -jc {}".format(options)], shell=True, stdout=subprocess.PIPE)
    rawout = process.communicate()
    try:
        masterjobid = rawout[0].split(" ")[2]
    except:
        sys.exit(1)
    print "Master submitted. Job ID is {}".format(masterjobid)
    return masterjobid

def startworker(sparktype,masterjobid):
    masterfound = False
    rawout = getqstat()
    for i in range(len(rawout)):
        if masterjobid in str(rawout[i][u'JB_job_number']):
            masterfound = True
            break
    if not masterfound:
        print "No master with the job id {} running or queued. Please submit master first.".format(masterjobid)
        sys.exit()
    if sparktype == "rc":
        sparktype = "-rc"
    if sparktype == "test":
        sparktype = "-test"
    command = "qsub -jc sparkflex.worker{} -N W{}".format(sparktype,masterjobid)
    os.system(command)

def qdeljob(jobid):
    command = "qdel {}".format(jobid)
    os.system(command)

def stopworker(masterjobid, terminatew, workerlist, skipcheckstop):
    workername = "W{}".format(masterjobid) 
    rawout = getqstat()
    jobtokill = ""
    statuses = {}
    for i in range(len(rawout)):
        unpack=rawout[i]
        jobid=str(unpack[u'JB_job_number'])
        if 'sparkflex.worker' in str(unpack[u'jclass_name']) and str(masterjobid) in str(unpack[u'JB_name']):
            statuses[jobid] = str(unpack[u'@state']) 
    if "pending" in statuses.values() and not skipcheckstop:
        terminatew = raw_input("Terminate waiting job(s) first? (y/n) ")

    for wjobid in statuses.keys():
        if statuses[wjobid] == 'pending' and terminatew == 'y':
            jobtokill = wjobid
            break
        elif statuses[wjobid] == 'running' and not skipcheckstop:
            jobtokill = selectionlist(workerlist, 'worker')
            break
        else: 
            jobtokill = wjobid
            break
    try:
        if jobtokill != "":
            qdeljob(jobtokill)
            time.sleep(3)
    except:
        print "No running or waiting workers found corresponding to Master job {}".format(masterjobid)
        sys.exit()
    return terminatew

def checkstop(inval, jobtype):
    if jobtype == "master":
        checkstop = raw_input("Stop master with job id {} (y/n):".format(inval))
    else: 
        checkstop = raw_input("Remove {} workers? (y/n):".format(inval))
    if checkstop != "y":
        print "No workers will be removed."
        sys.exit()
    else:
        return True

def mastername_to_jobid(mastername):
    masterfound = False
    masterlist = findmaster()
    for jobid in masterlist.keys():
        if mastername in masterlist[jobid][0]:
            masterfound = True
            break
    if masterfound:
        return jobid
    else:
        print "Node {} is not running a spark master job. If deleting workers alone, please use Master's Job ID".format(mastername)
        sys.exit()

def findmaster():
    rawout = getqstat()
    jobid = ""
    masterlist = {}
    for i in range(len(rawout)):
        unpack = rawout[i]
        jobid = str(unpack[u'JB_job_number'])
        if "sparkflex" in str(unpack[u'jclass_name']) and "master" in str(unpack[u'JB_name']):
            masterlist[jobid] = (str(unpack[u'queue_name']).replace('spark.q@',''),str(unpack[u'@state']))
    if len(masterlist.keys()) != 0:
        return masterlist
    else:
        print "No masters found. If deleting workers alone, please use Master's Job ID."
        sys.exit()

def findworker(masterjobid):
    rawout = getqstat()
    jobid = ""
    workername = "W{}".format(masterjobid)
    workerlist = {}
    for i in range(len(rawout)):
        unpack = rawout[i]
        jobid = str(unpack[u'JB_job_number'])
        if "sparkflex.worker" in str(unpack[u'jclass_name']) and workername in str(unpack[u'JB_name']):
            workerlist[jobid] = (str(unpack[u'queue_name']).replace('spark.q@',''),str(unpack[u'@state']))
    return workerlist

def checkmaster(masterjobid):
    rawout = getqstat()
    mastername = ""
    for i in range(len(rawout)):
        unpack = rawout[i]
        if masterjobid in str(unpack[u'JB_job_number']) and "running" in str(unpack[u'@state']):
            mastername = str(unpack[u'queue_name']).replace("spark.q@","")
            break
    if mastername == "":
        instruction = raw_input("Master is not running yet. (w)ait then retry in 1 minute, e(x)it this application, or (t)erminate master?" )
        if instruction == "w":
            time.sleep(60)
            checkmaster(masterjobid)
        elif instruction == "x":
            sys.exit()
        elif instruction == "t":
            qdeljob(masterjobid)
            sys.exit()
    return mastername

def qdelmaster(masterjobid, skipcheckstop):
    jobtype = "master"
    workername = "W{}".format(masterjobid)
    if not skipcheckstop:
         skipcheckstop = checkstop(masterjobid,jobtype)
    if skipcheckstop:
        try:
            qdeljob(masterjobid)
        except:
            print "Master with job id {} not found".format(masterjobid)
            sys.exit(1)
        try:
            qdeljob(workername)
        except:
#            traceback.print_exc()
            print "Workers for master with job id {} failed to stop".format(masterjobid)
            sys.exit(1)
    else:
        print "As requested, master not stopped."
        sys.exit(0)

def getworkercount(masterjobid):
    rawout = getqstat()
    workers = 0
    for i in range(len(rawout)):
        unpack = rawout[i]
        if masterjobid in str(unpack[u'JB_name']):
            workers = workers + 1
    return workers
            

def selectionlist(joblist, jobtype):
    i = 0
    selectionlist = {}
    print "Select {} to kill from list below:".format(jobtype)
    for jobid in joblist.keys():
        i = i + 1
        host, status = joblist[jobid]
        selectionlist[i] = jobid
        print "{}) Host: {} JobID: {} Status: {}".format(i, host, jobid, status)
    while True:
        selection = int(raw_input("Selection? "))
        if selection <= i:
            jobid = selectionlist[selection]
            skipcheckstop = True
            break
        else:
            print "Invalid selection."
    return jobid

def usage():
    print "spark-deploy.py [-h, --help] [-w --workers] [-j --jobid] [-M --master] [-s --stopmaster] [-r --removeworkers] [-v --version] [-M --master] [-f] [-t --runtime]"
    print "-h, --help: print this help."
    print "-w, --workers: number of workers to start."
    print "-j, --jobid: Job ID of running Master."
    print "-M, --master: Hostname of running Master."
    print "-s, --stopmaster: Stop running Master. Requires Master Job ID or Master hostname."
    print "-r, --removeworkers: number of workers to stop, requires Master Job ID or Master hostname."
    print "-f: Do not ask for confirmation of commands or present choice of jobs"
    print "-t, --runtime: Change the default time at which the Spark cluster will self-terminate in format hh:mm:ss"
    print "-v, --version: version of Spark to run (optional, permissible options rc, test)."
    print "If run with no options, will start a master with no workers. To start a master with workers, use spark-deploy.py -w "

def main(argv):
# Booleans to make this whole mess work right
    stopw = False
    stopm = False
    skipcheckstop = False
    masternamed = False
    needmaster = True
    novers = True
    runtime = "default"

# Get user options
    try:
        opts, args = getopt.getopt(argv,"fshw:v:j:r:M:t:",["workers=","version=","jobid=", "removeworkers=", "stopmaster=", "help", "master=", "runtime="])
    except getopt.GetoptError:
        usage()
        sys.exit(2)
   
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
            sys.exit()
        elif opt in ("-v", "--version"):
            sparktype = arg
            novers = False
            if sparktype not in (rc, test):
                usage()
                sys.exit(1)
        elif opt in ("-w", "--workers"):
            workers = int(arg)
        elif opt in ("-j", "--jobid"):
            masterjobid = arg
            needmaster = False
        elif opt in ("-r", "--removeworkers"):
            workers = int(arg)
            stopw = True
        elif opt in ("-s", "--stopmaster"):
            stopm = True
        elif opt == "-f":
            skipcheckstop = True
        elif opt in ("-M", "--master"):
            masterjobid = mastername_to_jobid(arg)
            needmaster = False
        elif opt in ("-t", "--runtime"):
            runtime = arg
    
    if novers:
        sparktype = ""

# Stop running master
    if stopm:
        try:
            masterjobid
        except NameError:
            masterlist = findmaster()
            masterjobid = selectionlist(masterlist,'master')
            skipcheckstop = True
        qdelmaster(masterjobid, skipcheckstop)
        try:
            os.remove("tmp{}.sh".format(masterjobid))
            skipcheckstop = True
        except:
            print "tmp{}.sh not found. Please delete manually in the directory from which you started the spark job."

# Stop worker(s)
    if stopw:
        jobtype = "worker"
        terminatew = ""
        try:
            for i in range(workers):
                workerlist = findworker(masterjobid)
                terminatew = stopworker(masterjobid, terminatew, workerlist, skipcheckstop)
        except NameError:
            print "JobID or name of master required to stop worker."
#            traceback.print_exc()
            sys.exit()

# Either way, we're done here
    if stopm or stopw:
        sys.exit()

# Start master and create temporary script for the user to use which contains the necessary environment variables to communicate with the Spark cluster. 
 
    if needmaster:
        masterjobid = startmaster(sparktype, runtime)

# Sleep is necessary to make sure the scheduler has time to spin up the job. You may need to adjust this based on your scheduler configuration. 

        time.sleep(2)
        mastername = str(checkmaster(masterjobid))
        filename = "tmp{}.sh".format(masterjobid)
        if sparktype == "default":
            version = "current"
        else:
            version = sparktype
        with open(filename, 'w') as f:
            f.write(
                    "#!/bin/bash\n"
                    "export MASTER=spark://{}:7077\n"
                    "export SPARK_HOME=/usr/local/spark{}\n"
                    "export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH\n".format(mastername,version)
                    )

# Start workers and exit if not starting master 
    try:
        for i in range(workers):
            startworker(sparktype, masterjobid)
    except:
        print "No workers specified or other error"
#        traceback.print_exc() #debug line
        sys.exit()
    if not needmaster:
        sys.exit()

# Print out Master name and addresses. 
    print "Master's name is {}. WebUI is at http://{}:8080. The $MASTER environment variable can be set to spark://{}:7077 manually or by running . tmp{}.sh".format(mastername, mastername, mastername, masterjobid)

if __name__ == '__main__':
    main(sys.argv[1:])