Running Spark as a job on a Grid Engine HPC Cluster (part 3)

UPDATED GUIDE: SparkFlex, aka running Spark jobs on a Grid Engine cluster, take II

And now for the secret sauce–how to actually get Spark workers to run and talk to a master.

For a long time, I had my start script launching the master. Unfortunately, this resulted in very loose control by the scheduler/execd over the processes, and I had to do a ton of clean up at hte end to make sure there were no orphan processes running on nodes after the job closed or failed.

After some research, I realized that it made more sense to launch the master as part of the job submission (I’ll get to this later in the post), and that I could tightly couple the workers by launching them in the start script via the qrsh command.

qrsh places the processes that it runs under the control of the scheduler, unlike using ssh to do the same thing.

This also means that you MUST have passwordless ssh set up for any user who wants to use Spark in this fashion.

So here’s the start script referenced by the PE (see Part 2)


#!/bin/bash
echo "Starting SPARK PE from $HOSTNAME"
SPARK_HOME=/usr/local/spark-current

# Create the master file.
cat $PE_HOSTFILE | cut -f1 -d" " | head -1 > /scratch/spark/tmp/master

# Create all file.
cat $PE_HOSTFILE | cut -f1 -d" " | uniq > /scratch/spark/tmp/all

# Create the workers file.
grep -v `cat /scratch/spark/tmp/master` /scratch/spark/tmp/all > /scratch/spark/tmp/workers

# Logging/debugging info
mkdir -p ~/.spark/logs/$JOB_ID
cp /scratch/spark/tmp/workers ~/.spark/logs/$JOB_ID/workers.txt
cp -LR $SPARK_HOME/conf ~/.spark/logs/$JOB_ID/

# Start the workers
echo "Starting Spark workers" 
MASTER=$HOSTNAME
while read worker; do
    /sge/current/bin/lx-amd64/qrsh -inherit $worker "$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://$MASTER:7077" &
done < /scratch/spark/tmp/workers

echo "Spark Master WebUI for this job: http://$MASTER:8080"

$PE_HOSTFILE is a list of all nodes involved in the job, which is provided by Grid Engine. In this case, we only want the slave nodes, which will run the workers, so we need to do a little parsing to get those out. I’m sure that section could be a little cleaner, but there we are…

After I get my list of worker nodes, I do a little logging, which can be extremely helpful for troubleshooting launch failures and the like.

Then we get to the worker launches. I use the standalone cluster mode of launching the workers via scripts provided in Spark. Note the ampersand at the end of the command; this makes for a much faster startup of the workers. By backgrounding the qrsh process for each worker, we are not held up by serially launching the worker across each node. That allows us to spin up a large cluster (>20 nodes) in a fraction of the time that would be required to do it serially.

The assumption in this script is that the node that runs it is the Spark Master, which is why the master launch must be done as part of the job submission.

Here is a very simplistic submission script. It does nothing other than launch the master, so any processing on the Spark cluster would need to be included in the script or run via an interactive session (more about this later)


#sparkmaster.sh
#!/bin/bash
export PATH=/usr/local/spark-current/bin:/usr/local/spark-current/sbin:/usr/local/python-2.7.6/bin:$PATH
export SPARK_HOME=/usr/local/spark-current
/usr/local/spark-current/sbin/start-master.sh
export MASTER=spark://$HOSTNAME:7077
sleep 3600
exit 0

The only thing of note here is the sleep 3600 line–that keeps the master up for an hour. You’ll probably want to modify this to fit your environment. If you change it to sleep infinity, then you will need to manually qdel any cluster started with this script (lacking further instructions for the spark cluster in the script before the exit 0).

The qsub for this looks something like this, assuming you’ve set up your environment exactly like mine (ha)

qsub -jc spark -pe spark 5 -q hadoop2 -j y -o ~/sparklogs -V -b y sparkmaster.sh

This will launch a Spark cluster with 1 master and 4 workers (1+4=5, dontcherknow), log to sparklogs in your home directory.

Our users typically use Spark in an interactive fashion, so in addition to launching their clusters, they also do a qlogin and then define the $MASTER, $SPARK_HOME, and $PATH variables as above within that session. From that point, the qlogin session acts as the Spark driver. It is also possible to run the driver on the same node as the master, but you run the risk of running the master out of RAM.

The main Spark using group here has written a much more intelligent version of this (see this github repo), but here is a very basic script that can be run after the qlogin to set up the right variables (as long as the user isn’t running more than one Spark cluster…)


#!/bin/bash
export MASTER="spark://"`qstat | grep spark | awk '{print $8}' | cut -d@ -f2`":7077"
export PATH=$PATH:/usr/local/spark-current/bin:/usr/local/spark-current/sbin:/usr/local/python-2.7.6/bin
export SPARK_HOME=/usr/local/spark-current

The spark-pestop.sh script is very simple at this point (before using qrsh, I was having to do a TON of clean up in it); now all it has to do is remove the files created by the spark-pestart.sh script, otherwise future jobs by different users will fail:


#!/bin/bash
rm -rf /scratch/spark/tmp/workers
rm -rf /scratch/spark/tmp/all
rm -rf /scratch/spark/tmp/master
exit 0

Another little note here–for multiple users to run Spark in this fashion, that /scratch/spark directory MUST be read/write by all of them. In our environment, the easiest way to do this was to set the permissions on it to 777 and call it a day.

Please let me know in the comments if you have any questions or run into trouble.

Advertisements

Running Spark as a job on a Grid Engine HPC Cluster (part 2)

UPDATED GUIDE: SparkFlex, aka running Spark jobs on a Grid Engine cluster, take II

So the next thing you need to do to get Spark running in Grid Engine is to set up your queue, job class, and your parallel environment.

Our environment is normally run with users requesting slots, which roughly correspond to one cpu and ~8GB of RAM per host. So our normal Sandy Bridge nodes have 16 slots, and our Haswell nodes have 32.

Spark is much easier to run if you give it the whole machine, of course (although I suspect you can limit the workers in various ways), but in our environment, part of the reason Spark is used is to load huge datasets (>1TB) into RAM. So it doesn’t make sense to allocate Spark workers (or the master) based on slot count.

Therefore, our queue for spark (called hadoop2 for historical reasons) is set up to have only 1 slot per node. We also have a complex called hadoop_exclusive which forces jobs (or tasks) to be scheduled to whole nodes.

>qconf -sq hadoop2
qname                 hadoop2
hostlist              @h02 @h03 @h04 @h05 @h06 @h07 @h08
seq_no                0
load_thresholds       np_load_avg=3
suspend_thresholds    NONE
nsuspend              1
suspend_interval      00:05:00
priority              0
min_cpu_interval      00:05:00
qtype                 INTERACTIVE
ckpt_list             NONE
pe_list               hadoop,spark,spark-rc,spark-test
jc_list               hadoop.default,spark.default, \
                      spark-rc.default,spark-test.default
rerun                 FALSE
slots                 1
tmpdir                /tmp
shell                 /bin/sh
prolog                NONE
epilog                NONE
shell_start_mode      unix_behavior
starter_method        NONE
suspend_method        NONE
resume_method         NONE
terminate_method      NONE
notify                00:00:60
owner_list            NONE
user_lists            NONE
xuser_lists           NONE
subordinate_list      NONE
complex_values        NONE
projects              NONE
xprojects             NONE
calendar              NONE
initial_state         default
s_rt                  INFINITY
h_rt                  INFINITY
d_rt                  INFINITY
s_cpu                 INFINITY
h_cpu                 INFINITY
s_fsize               INFINITY
h_fsize               INFINITY
s_data                INFINITY
h_data                INFINITY
s_stack               INFINITY
h_stack               INFINITY
s_core                INFINITY
h_core                INFINITY
s_rss                 INFINITY
h_rss                 INFINITY
s_vmem                INFINITY
h_vmem                INFINITY

There doesn’t appear to be much here that’s non-default, other than the JC and PE lists. Note that you need to set up a separate pe and jc for every version of spark you’re using.

Here’s the PE setup:


>qconf -sp spark
pe_name                spark
slots                  99999
user_lists             NONE
xuser_lists            NONE
start_proc_args        /usr/local/uge-hadoop/spark-pestart.sh
stop_proc_args         /usr/local/uge-hadoop/spark-pestop.sh
allocation_rule        1
control_slaves         TRUE
job_is_first_task      FALSE
urgency_slots          min
accounting_summary     TRUE
daemon_forks_slaves    FALSE
master_forks_slaves    FALSE

The start_proc_arts and stop_proc_args are really the secret sauce. I’ll talk about those in part 3. The other options are fairly obvious; you want the master (ie, the job) to control the slaves, which are running the Spark workers. The accounting_summary allows you to track the load/memory usage on the slave nodes. job_is_first_task is set to FALSE, because the head node, while it contains the Spark master process (and sometimes the driver, depending on the job creation), it does not run any of the Spark workers.

On to the JC:


>qconf -sjc spark
jcname          spark
variant_list    default
owner           NONE
user_lists      NONE
xuser_lists     NONE
A               {+}UNSPECIFIED
a               {+}UNSPECIFIED
ar              {+}UNSPECIFIED
b               {+}UNSPECIFIED
binding         {+}UNSPECIFIED
c_interval      {+}UNSPECIFIED
c_occasion      {+}UNSPECIFIED
CMDNAME         {+}UNSPECIFIED
CMDARG          {+}UNSPECIFIED
ckpt            {+}UNSPECIFIED
ac              {+}UNSPECIFIED
cwd             {+}UNSPECIFIED
dl              {+}UNSPECIFIED
e               {+}UNSPECIFIED
h               {+}UNSPECIFIED
hold_jid        {+}UNSPECIFIED
hold_jid_ad     {+}UNSPECIFIED
i               {+}UNSPECIFIED
j               {+}UNSPECIFIED
js              {+}UNSPECIFIED
l_hard          hadoop_exclusive=1
l_soft          {+}UNSPECIFIED
masterl         {+}UNSPECIFIED
m               {+}UNSPECIFIED
mbind           {+}UNSPECIFIED
M               {+}UNSPECIFIED
masterq         {+}UNSPECIFIED
N               {+}UNSPECIFIED
notify          {+}UNSPECIFIED
now             {+}UNSPECIFIED
o               {+}UNSPECIFIED
P               {+}UNSPECIFIED
p               {+}UNSPECIFIED
pe_name         spark
pe_range        {+}UNSPECIFIED
q_hard          {+}UNSPECIFIED
q_soft          {+}UNSPECIFIED
R               TRUE
r               {+}UNSPECIFIED
rou             {+}UNSPECIFIED
S               {+}UNSPECIFIED
shell           {+}UNSPECIFIED
t               {+}UNSPECIFIED
tc              {+}UNSPECIFIED
V               {+}UNSPECIFIED
v               {+}UNSPECIFIED

The only modifications here are setting the complex hadoop_exclusive, the pe to be spark (see above), and setting the Reserved (or R) to TRUE. Reserved holds available nodes aside when the cluster load is high to enable the Spark job to start. For example, if the job is requested with 10 slots and there is only one free node in the cluster, that node is reserved for the Spark job and nothing is scheduled to it until all 10 nodes are available for the Spark job. Otherwise, Spark jobs are likely to never be run on a busy cluster.

Running Spark as a job on a Grid Engine HPC Cluster (part 1)

SEE THE UPDATED GUIDE TO RUNNING ON GRID ENGINE HERE: SparkFlex, aka running Spark jobs on a Grid Engine cluster, take II

NOTE: SOME SETTINGS BELOW ARE OBSOLETE AND NEED TO BE UPDATED

Apache Spark has become a pretty big thing where I work. We were originally approached about running it on our HPC cluster about 3 years ago, and, knowing little to nothing about Big Data clusters, I agreed to set it up and get it rolling. Over the last 3 years, we have been gradually improving the process and getting it more and more under the control of execd, which makes it a bit more resilient to crashes and memory overruns.

The first thing that needs to be done is to make sure that all the prerequisites are present for Spark. We use Java 1.7.x, Scala 2.10.x, and Python 2.7.6. We do not use HDFS for the storage; we just use our usual Isilon NFS shares (mounted with nolock, of course) and occasionally GPFS. We use the precompiled version of Spark for Hadoop 1.x, since we happen to have CDH 3.x MapReduce installed on the cluster, although we never use it.

There are two main configuration files for Spark: spark-env.sh and spark-defaults.conf. Both need to be modified for the defaults to fit your environment. As an example, here are ours:

spark-env.sh:

ulimit -n 65535 export SCALA_HOME=/usr/local/scala-2.10.3 
export SPARK_WORKER_DIR=/scratch/spark/work 
export JAVA_HOME=/usr/local/jdk1.7.0_67 
export SPARK_LOG_DIR=~/.spark/logs/$JOB_ID/ 
export SPARK_EXECUTOR_MEMORY=90g 
export SPARK_DRIVER_MEMORY=50g 
export SPARK_WORKER_MEMORY=90g 
export SPARK_LOCAL_DIRS=/scratch/spark/tmp 
export PYSPARK_PYTHON=/usr/local/python-2.7.6/bin/python 
export SPARK_SLAVES=/scratch/spark/tmp/slaves 
export SPARK_SSH_OPTS="-o StrictHostKeyChecking=no -o ConnectTimeout=30"

Our nodes are 128GB/16 cores, so your mileage and settings may vary. /scratch is a local directory on the nodes. Note that you will want to have some kind of clean up script for your work and tmp directories. Spark will normally clean up behind itself, but if it exits abnormally, all bets are off.

 

spark-defaults.conf:

spark.akka.timeout=300 
spark.storage.blockManagerHeartBeatMs=30000 
spark.akka.retry.wait=30 
spark.akka.frameSize=2047 
spark.local.dir=/scratch/spark/tmp
spark.kryoserializer.buffer.max.mb=1024
spark.core.connection.ack.wait.timeout=600
spark.driver.maxResultSize=0

These are, for the most parts, settings I made many, many versions of Spark ago. Other than the local.dir, I’m afraid I don’t have much info about them. But this one is important. NEVER place the spark.local.dir on locking storage, because the workers will collide with each other.

In the next article, I’ll talk about the setup of Grid Engine to accommodate running the Spark cluster.

Addendum to mongodb on rocksdb-Haswell

So I finally got around to doing the install on the physical box today, and on the initial make static_lib, I got this:

>/opt/src/rocksdb>make static_lib
 GEN util/build_version.cc
 CC db/c.o
 /tmp/ccActpEz.s: Assembler messages:
 /tmp/ccActpEz.s:14025: Error: no such instruction: `shlx %rsi,%r9,%rax'
 /tmp/ccActpEz.s:40750: Error: no such instruction: `shlx %rcx,%r11,%rsi'
 /tmp/ccActpEz.s:40764: Error: no such instruction: `shlx %rax,%r11,%rdx'
 /tmp/ccActpEz.s:41219: Error: no such instruction: `shlx %r8,%rsi,%r9'
 /tmp/ccActpEz.s:41231: Error: no such instruction: `shlx %rdx,%rsi,%rax'
 /tmp/ccActpEz.s:41497: Error: no such instruction: `shlx %rbx,%rsi,%rdi'
 /tmp/ccActpEz.s:41511: Error: no such instruction: `shlx %r14,%rsi,%rdx'

The machine I was installing on has Haswell CPUs, which was what was causing this error. So we had to download/compile/install binutils-2.2.5, and then add that to the path. The link is here.