After using Apache Spark for ~2.5 years, we've come across a specific scenario where Spark's resiliency fails. After some research, we've found that this scenario is documented as an open issue (SPARK-1272) since March 2014. The issue is rather simple:
- No matter what data layer you're using (HDFS / S3 /Cassandra / anything else really), Spark also uses the local disks on the worker nodes to store temporary data (mostly shuffle data and cache spillover)
- These local disks often have to support substantial workloads - large volumes of shuffle data, for example, could be read, written and deleted constantly on a busy Spark cluster
- If one of the local disks fails (we've seen disks entering "read only" mode, for example) - the running Spark job(s) using this disk will fail, with no retries
- Moreover, future jobs might fail too, since Spark won't detect the failed disk and continue to try and use it until this node (or disk) are taken out of the cluster
This is all very... depressing.
So, other than digging into Apache Spark code trying to fix the issue once and for all, what can one do to work around it? Our solution was to test each disk on each node ourselves after any job failure - and block all further processing until this check is healthy.
How is that implemented?
First, our most crucial Spark jobs run from within a "job server" we've written ourselves. It's basically a Dropwizard-based application that keeps a live SparkContext (and can replace it on failure / on demand) and executes jobs. One of the nice features this application has is the HealthAwareExecutor - an executor that would only start new jobs if a set of health checks are healthy. It runs these healthchecks periodically, as well as immediately after any job failure (perhaps we'll give more details and some code of this component in a following post).
So - to stop all executions until all disks are functioning well, all we have to do is to add a new healthcheck to this set - one that tests all local disks on all nodes. How would this driver application know which disks are used by Spark, and how would it access them? We wouldn't want to introduce any extra configuration that might go out date, neither would we want to handle the communication itself.
It took some head-scratching, but we eventually came up with an implementation that proved to be very effective - 100% reliable with no false negatives and no misses. It "forces" Spark to run something equivalent to "touch" on each of Spark's configured "local dirs" (see spark.local.dirs here) on each node, by creating a dummy RDD with enough partitions (>= number of nodes), and using mapPartitions to run the test code on each node. Here it is (code depends on nothing but Java, Scala and Spark - so you should be able to reuse it easily):
It took some head-scratching, but we eventually came up with an implementation that proved to be very effective - 100% reliable with no false negatives and no misses. It "forces" Spark to run something equivalent to "touch" on each of Spark's configured "local dirs" (see spark.local.dirs here) on each node, by creating a dummy RDD with enough partitions (>= number of nodes), and using mapPartitions to run the test code on each node. Here it is (code depends on nothing but Java, Scala and Spark - so you should be able to reuse it easily):
So - now, when a disk fails:
- The currently running job(s) fail. That's a shame, but usually our applications are resilient to these "single" failures by retrying them or just waiting for next execution
- That failure triggers execution of all healthchecks, including this one, which logs all failures and throws an exception
- That exception tells the HealthAwareExecutor to stop processing and keep running the test periodically until it passes
- The exception is also displayed on Dropwizard's healthcheck page for humans to see
- Lastly, the exception triggers an alert (we sample the healthcheck page using Cabot) so the right people are notified immediately. The relevant disk is either fixed or removed from the cluster
- Removing / fixing the disk means the next healthcheck execution will pass with no errors, which in turn switches HealthAwareExecutor back to normal execution mode, and the next jobs (perhaps including a retry of the original failed job) are executed
Bottom line - we lose no data, no job executions, and minimal time.
Whether your Spark setup allows such healthchecks to actually control the next executions or not, the healthcheck itself can be a valuable monitoring trick to quickly identifying failed disks.
Whether your Spark setup allows such healthchecks to actually control the next executions or not, the healthcheck itself can be a valuable monitoring trick to quickly identifying failed disks.
So until that faulty local disk is fixed, the whole cluster is halted?
ReplyDeleteBtw, would increasing RAM significantly on each node force spark off using its local disk?
Yes - this solution means that the application is halted until the disk is fixed, or REMOVED (excluded from YARN's local-disks configuration), which in our case is done quickly (minutes) by the on-call engineer. Increasing RAM won't help much, I think, as it won't prevent Spark from using the local disks for cross-node shuffling. As far as I know, some operations go to disk regardless of RAM availability, but I might be wrong on that.
ReplyDelete