Scala wrapper around Flink's REST API endpoints. This API is listed in the Flink documentation as the "Monitoring REST API", but it includes support for uploading JARs and managing jobs, so it's more fully-featured than the name would imply.
This project is meant to be an easy way for Scala developers to interact with Flink's REST API. It contains wrapper methods for API calls and returns responses in case classes for easy access.
This wrapper does not reference Flink as a library. This makes it for ideal for applications which need to invoke Flink jobs or report on Flink job statuses, but don't want to be tied to Flink at compile time.
This library was developed and tested against Flink 1.3.0. Compatibility with older or newer versions is not guaranteed.
Add an SBT reference:
libraryDependencies += "com.github.mjreid" %% "flink-wrapper" % "0.0.2"
Create an instance of the FlinkRestClient:
val flinkClient = FlinkRestClient("http://flink-server.example.com:8081/")
Use the FlinkRestClient to make a request, being sure to have an implicit ExecutionContext in scope:
import scala.concurrent.ExecutionContext.Implicits.global
val result = flinkClient.getJobsOverview()
Do something with the result.
result.map { jobsOverview => println(jobsOverview) }
This wrapper uses "basic" Scala idioms (namely, Future
and Option
) to communicate failures. Any caught exceptions in
a failed Future
will be wrapped in a FlinkWrapperException
or one of its subclasses.
The possible exceptions are FlinkWrapperInvalidJsonException
when an unexpected JSON value is returned, and
FlinkWrapperUnexpectedStatusCodeException
when an unexpected HTTP status code is returned.
This wrapper uses play-ws under the hood to do HTTP requests in a non-blocking fashion. As a result, all calls return
Future
objects, which require an execution context in scope.
If you're making a small app, you can import the global execution context. However, as a best practice for larger apps, we recommend using a dedicated executor service. This will isolate your thread pools so that your application doesn't stall if the Flink REST endpoints take a long time to respond, for example.
The follow APIs are available in this wrapper. See the Scaladoc for more details.
runProgram
starts a Flink program with the specified parameters. Important note: ThejarId
argument is not what appears in the Flink UI -- a hidden GUID is appended. It's recommended to use theuploadJar
method (see below), which will return the full filename that can be used as part ofrunProgram
.cancelJob
cancels a job immediately.cancelJobWithSavepoint
cancels a job with a savepoint. This method returns aCancelJobAccepted
object; use thegetCancellationStatus
method to query the status. Cancellations can fail if, for example, the savepoint files cannot be written.uploadJar
uploads a JAR that can be executed viarunProgram
.
getJobsList
returns all jobs in Flink, separated by state.getJobOverview
returns all running and finished jobs in Flink with additional execution details.getJobDetails
gets the details of a specific Flink job.getJobPlan
gets the plan of a specific Flink job. (The plan data is also included ingetJobDetails
.)getJobExceptions
gets any exceptions that occurred in a job.
getConfig
gets the system-level Flink configuration.getClusterOverview
gets the Flink cluster information.getJobManagerConfig
gets all the JobManager configurations.getClusterTaskManagers
gets the information about all the TaskManagers inside the Flink cluster.
- Michael Reid (mjreid @github)
- Your name here? Pull requests welcome!