Spark Streaming allows us to process large amounts of live data from multiple sources, in a scalable and fault tolerant manner. It is used widely to process for example live Twitter feeds, consume and process Kafka topic messages or data from Amazon S3, simple TCP sockets or custom input streams/feeds.

In many cases, we are perfectly fine having our Spark Streaming application run as long running process, and we do not need to bother ourselves about shutting it down gracefully. This can happen for example if we have simply never need to terminate, or if we know the application just needs to consume a set amount of data, or when we’re okay with losing some amount of data when terminating it.

In some cases however, we must recycle our spark application for any number of reasons, and without any data in the process. One example could be upgrading our spark application version – we need to take the currently running app down and then kickstart the new version.

In this post I provide sample (Java) code to gracefully terminate a running spark application by intercepting a SIGTERM signal and shutting down the spark streaming context. The code also takes care of interrupting and joining some worker thread we might have spawned, so that our application won’t hang due to a lingering thread.

 


public static void main(String[] args) {

    /* your code here to configure the spark app, create streaming context
       and input stream, setup map-reduce functions, start the app … */

    // invoke some worker thread we might need
    SomeWorkerThread workerThread = new SomeWorkerThread();
    workerThread.start();

    // install a SIGTERM intercaptor (so we can gracegfully stop the app
    SetupSigtermHandler(workerThread); // see this method below

    jssc.awaitTermination();
}


private static void installSigtermHandler(SomeWorkerThread workerThread) {

    // intercept SIGTERM, i.e. "kill PID" (but not "kill -9 PID")
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            // stop the background thread
            try {
                workerThread.interrupt();
                workerThread.join();
            } catch (Exception ie) {} 
            finally {
                // halt the VM in 20 seconds if all else fails
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        Runtime.getRuntime().halt(0);
                    }
                }, 20000);
                // gracefully stop the spark streaming context (note the two boolean truthy arguments) 
                jssc.stop(true, true);
            }
        }
    });
}