SBT plugin for Spark on AWS EMR.
- Add sbt-lighter in
project/plugins.sbt
addSbtPlugin("net.pishen" % "sbt-lighter" % "1.2.0")
- Setup sbt version for your project in
project/build.properties
(requires sbt 1.0):
sbt.version=1.1.6
- Prepare your
build.sbt
name := "sbt-lighter-demo"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.3.1" % "provided"
)
sparkAwsRegion := "ap-northeast-1"
//Since we use cluster mode, we need a bucket to store our application's jar.
sparkS3JarFolder := "s3://my-emr-bucket/my-emr-folder/"
//(optional) Set the subnet id if you want to run Spark in VPC.
sparkSubnetId := Some("subnet-********")
//(optional) Additional security groups that will be attached to Master and Core instances.
sparkSecurityGroupIds := Seq("sg-********")
//(optional) Total number of instances, including master node. The default value is 1.
sparkInstanceCount := 2
- Write your application at
src/main/scala/mypackage/Main.scala
package mypackage
import org.apache.spark._
object Main {
def main(args: Array[String]): Unit = {
//setup spark
val sc = new SparkContext(new SparkConf())
//your algorithm
val n = 10000000
val count = sc.parallelize(1 to n).map { i =>
val x = scala.math.random
val y = scala.math.random
if (x * x + y * y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
}
}
- Submit your Spark application
> sparkSubmit arg0 arg1 ...
Note that a cluster with the same name as your project's
name
will be created by this command. This cluster will terminate itself automatically if there's no further steps (beyond the one you just submitted) waiting in the queue. (You can submit multiple steps into the queue by runningsparkSubmit
multiple times.)If you want a keep-alive cluster, run
sparkCreateCluster
beforesparkSubmit
, and remember to terminate it withsparkTerminateCluster
when you are done:> sparkCreateCluster > sparkSubmit arg0 arg1 ... == Wait for your job to finish == > sparkTerminateCluster
The id of the created cluster will be stored in a file named
.cluster_id
. This file will be used to find the cluster when running other commands.
//Your cluster's name. Default value is copied from your project's `name` setting.
sparkClusterName := "your-new-cluster-name"
sparkClusterIdFile := file(".cluster_id")
sparkEmrRelease := "emr-5.17.0"
sparkEmrServiceRole := "EMR_DefaultRole"
//EMR applications that will be installed, default value is Seq("Spark")
sparkEmrApplications := Seq("Spark", "Zeppelin")
sparkVisibleToAllUsers := true
//EC2 instance type of EMR Master node, default is m4.large
//Note that this is *not* the master node of Spark
sparkMasterType := "m4.large"
//EC2 instance type of EMR Core nodes, default is m4.large
sparkCoreType := "m4.large"
//EBS (disk) size of EMR Master node, default is 32GB
sparkMasterEbsSize := Some(32)
//EBS (disk) size of EMR Core nodes, default is 32GB
sparkCoreEbsSize := Some(32)
//Spot instance bid price of Master node, default is None.
sparkMasterPrice := Some(0.1)
//Spot instance bid price of Core nodes, default is None.
sparkCorePrice := Some(0.1)
sparkInstanceRole := "EMR_EC2_DefaultRole"
//EC2 keypair, default is None.
sparkInstanceKeyName := Some("your-keypair")
//EMR logging folder, default is None.
sparkS3LogUri := Some("s3://my-emr-bucket/my-emr-log-folder/")
//Configs of --conf when running spark-submit, default is an empty Map.
sparkSubmitConfs := Map("spark.executor.memory" -> "10G", "spark.executor.instances" -> "2")
//List of EMR bootstrap scripts and their parameters, if any, default is Seq.empty.
sparkEmrBootstrap := Seq(BootstrapAction("my-bootstrap", "s3://my-production-bucket/bootstrap.sh", "--full"))
> show sparkClusterId
> sparkListClusters
> sparkTerminateCluster
> sparkSubmitMain mypackage.Main arg0 arg1 ...
If you accidentally deleted your .cluster_id
file, you can bind it back using:
> sparkBindCluster j-*************
EMR provides a JSON syntax to configure the applications on cluster, including spark. Here we provide a helper class called EmrConfig
, which lets you setup the configuration in an easier way.
For example, to maximize the memory allocation for each Spark job, one can use the following JSON config:
[
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
}
]
Instead of using this JSON config, one can add the following setting in build.sbt
to achieve the same effect:
import sbtlighter.EmrConfig
sparkEmrConfigs := Seq(
EmrConfig("spark").withProperties("maximizeResourceAllocation" -> "true")
)
For people who already have a JSON config, there's a parsing function EmrConfig.parseJson(jsonString: String)
which can convert the JSON array into List[EmrConfig]
. And, if your JSON is located on S3, you can also parse the file on S3 directly (note that this will read the file from S3 right after you execute sbt):
import sbtlighter.EmrConfig
sparkEmrConfigs := EmrConfig
.parseJsonFromS3("s3://your-bucket/your-config.json")(sparkS3Client.value)
.right
.get
There are two settings called sparkJobFlowInstancesConfig
and sparkRunJobFlowRequest
, which corresponds to JobFlowInstancesConfig and RunJobFlowRequest in AWS Java SDK. Some default values are already configured in these settings, but you can modify it for your own purpose, for example:
To set the master and slave security groups separately (This requires you leaving sparkSecurityGroupIds
as None
in step 2):
sparkJobFlowInstancesConfig := sparkJobFlowInstancesConfig.value
.withAdditionalMasterSecurityGroups("sg-aaaaaaaa")
.withAdditionalSlaveSecurityGroups("sg-bbbbbbbb")
sparkRunJobFlowRequest := sparkRunJobFlowRequest.value.withAutoScalingRole("EMR_AutoScaling_DefaultRole")
import com.amazonaws.services.elasticmapreduce.model.Tag
sparkRunJobFlowRequest := sparkRunJobFlowRequest.value.withTags(new Tag("Name", "my-cluster-name"))
import com.amazonaws.services.elasticmapreduce.model._
sparkRunJobFlowRequest := sparkRunJobFlowRequest.value
.withSteps(
new StepConfig()
.withActionOnFailure(ActionOnFailure.CANCEL_AND_WAIT)
.withName("Install components")
.withHadoopJarStep(
new HadoopJarStepConfig()
.withJar("s3://path/to/jar")
.withArgs(Seq("arg1", "arg2").asJava)
)
)
import com.amazonaws.services.s3.model.ObjectMetadata
sparkS3PutObjectDecorator := { req =>
val metadata = new ObjectMetadata()
metadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION)
req.withMetadata(metadata)
}
If you have multiple environments (e.g. different subnet, different AWS region, ...etc) for your Spark project, you can use SBT's config to provide multiple setting combinations:
import sbtlighter._
//Since we don't use the global scope now, we can disable it.
LighterPlugin.disable
//And setup your configurations
lazy val Testing = config("testing")
lazy val Production = config("production")
inConfig(Testing)(LighterPlugin.baseSettings ++ Seq(
sparkAwsRegion := "ap-northeast-1",
sparkSubnetId := Some("subnet-aaaaaaaa"),
sparkSecurityGroupIds := Seq("sg-aaaaaaaa"),
sparkInstanceCount := 1,
sparkS3JarFolder := "s3://my-testing-bucket/my-emr-folder/"
))
inConfig(Production)(LighterPlugin.baseSettings ++ Seq(
sparkAwsRegion := "us-west-2",
sparkSubnetId := Some("subnet-bbbbbbbb"),
sparkSecurityGroupIds := Seq("sg-bbbbbbbb"),
sparkInstanceCount := 20,
sparkS3JarFolder := "s3://my-production-bucket/my-emr-folder/",
sparkS3LogUri := Some("s3://aws-logs-************-us-west-2/elasticmapreduce/")
sparkCorePrice := Some(0.39),
sparkEmrConfigs := Seq(EmrConfig("spark", Map("maximizeResourceAllocation" -> "true")))
))
Then, in sbt, activate different config by the <config>:<task/setting>
syntax:
> testing:sparkSubmit
> production:sparkSubmit
There's a special command called
> sparkMonitor
which will poll on the cluster's status until it terminates or exceeds the time limit.
The time limit can be defined by:
import scala.concurrent.duration._
sparkTimeoutDuration := 90.minutes
(the default value of sparkTimeoutDuration
is 90 minutes)
And this command will fall into one of the three following behaviors:
- If the cluster ran for a duration longer than
sparkTimeoutDuration
, terminate the cluster and throw an exception. - If the cluster terminated within
sparkTimeoutDuration
but had some failed steps, throw an exception. - If the cluster terminated without any failed step, return Unit (exit code == 0).
This command would be useful if you want to trigger some notifications. For example, a bash command like this
$ sbt 'sparkSubmit arg0 arg1' sparkMonitor
will exit with error if the job fail or running too long (Don't enter the sbt console here, just append the task names after sbt
like above). You can then put this command into a cron job for scheduled computation, and let cron notify yourself when something go wrong.