SageMaker FeatureStore Spark is an open source Spark library for Amazon SageMaker FeatureStore.
With this spark connector, you can easily ingest data to FeatureGroup's online and offline store from Spark DataFrame
.
The library is compatible with Scala >= 2.12, and Spark >= 3.0.0. If your application is on EMR, pleas use emr-6.x.
TODO: Add instructions here how to install the library from Maven.
After the library is imported, you can build your application into a jar and submit the application using spark-shell
or spark-submit
.
Once you import the spark library as a dependency in your application, you shoud be able to submit the spark job according to this EMR documentation.
The library is compatible with Python >= 3.6, and PySpark >= 3.0.0. If your application is on EMR, pleas use emr-6.x.
Please also make sure that the environment has PySpark and Numpy installed.
The spark library is available on PyPi
Before installation, it is recommended to set SPARK_HOME
environment variable to the path where your Spark is installed, because during installation the library will automatically copy some depedent jars to SPARK_HOME
.
For EMR, the library installation will handle the path automatically, so there is no need to specify SPARK_HOME
if you're installing on EMR.
To install the library:
sudo -E pip3 install sagemaker-feature-store-pyspark --no-binary :all:
Create a custom jar step of EMR to start the library installation
If your EMR has single node:
Jar Location: command-runner.jar
Arguments: sudo -E pip3 install sagemaker-feature-store-pyspark —no-binary :all:
This will only install the library on Driver
node.
To distribute the library to all executor nodes, you can create a installation script and add a custom bootstrap while creating EMR cluster.
For more information, pleas take a look at EMR bootstramp.
Since bootstrap action is executed before all EMR applications are installed, so dependent jars cannot be automatically loaded to SPARK_HOME
.
So when submitting your application, please specify dependent jars using:
--jars `feature-store-pyspark-dependency-jars`
Since SageMaker Notebook instances are using older version of Spark library which is not compatible with the spark version of FeatureStore Spark library. The Spark on SageMaker Notebook instance has to be uninstalled first and reinstall with a newer version.
So, add a cell like this in your notebook:
# Install a newer versiion of Spark which is compatible with spark library
!pip3 install pyspark==3.1.1
After finish executing the notebook, you can restore the original version which is Spark-2.4.0.
FeatureStoreManager
is the interface for all Spark library operations, such as data ingestion and loading feature definitions.
To ingest a DataFrame into FeatureStore:
import com.amazonaws.services.sagemaker.featurestore.sparksdk.FeatureStoreManager
val featureGroupArn = <your-feature-group-arn>
val featureStoreManager = new FeatureStoreManager()
featureStoreManager.ingestData(dataFrame, featureGroupArn, directOfflineStore = true)
If directOfflineStore
is specified to true, the spark library will ingest data directly to OfflineStore without using FeatureStoreRuntime API which is going to cut the cost on FeatureStore WCU, the default value for this flag is false.
To load feature definitions:
val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(inputDataFrame)
After the feature definitions are retured, you can create feature groups using CreateFeatureGroup
API.
To ingest a DataFrame into FeatureStore:
from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
feature_group_arn = <your-feature-group-arn>
feature_store_manager = FeatureStoreManager()
feature_store_manager.ingest_data(input_data_frame=user_data_frame, feature_group_arn=feature_group_arn, direct_offline_store=True)
If direct_offline_store
is specified to true, the spark library will ingest data directly to OfflineStore without using FeatureStoreRuntime API which is going to cut the cost on FeatureStore WCU, the default value for this flag is false.
To load feature definitions:
feature_definitions = feature_store_manager.load_feature_definitions_from_schema(user_data_frame)
After the feature definitions are retured, you can create feature groups using CreateFeatureGroup
API.
This library is built in Scala, and Python methods are actually calling Scala via JVM through wrapper.py
. To add more features, please make sure you finish the implementation in Scala first and perhaps you need to update the wrapper so that functionality of Scala and Python are in sync.
Both Scala and Python versions have unit test covered. In addition to that, we have integration test for Python version which verifies there is no regressions in terms of functionality.
It is a good practice to keep our code always formatted correctly, we used scalafmt
to auto format the code. So, please run sbt scalafmtAll
to format your code.
To get the test coverage report and format check result, run sbt jacoco scalafmtCheckAll
.
We are using tox
for test purposes, you can check the build by running tox
. To configure or figure out the command we run by tox, please checkout tox.ini
.
The test execution script and test itself are included in pyspark-sdk/integration_test
, to run the test:
- Fetch the credential from our spark test account first.
- Run the test execution script
run-spark-integration-test
The Github repository is connected to the CodeBuild project in our spark test account. Modify the command steps in ci/buildspec.yml
according to your demands.
This project is licensed under the Apache-2.0 License.