The AdStax Scala SDK provides tools for programmatic interaction with an AdStax instance using Scala. Please take into account that the SDK is still in an experimental stage and the interfaces might change for subsequent releases.
Installation
The latest release of AdStax Scala SDK is 0.7.0
and is built against Scala 2.11 and 2.12.
To use it in an existing SBT project, add the following dependency to your build.sbt
:
libraryDependencies += "eu.shiftforward" %% "adstax-sdk-scala" % "0.7.0"
If you need to interact with Spark in an AdStax instance, you also need to add (only available for Scala 2.11):
libraryDependencies += "eu.shiftforward" %% "adstax-sdk-scala-spark" % "0.7.0"
Usage
Products
Product details can be manipulated by using an instance of ProductFeederClient
. A ProductFeederClient
supplies the following interface:
trait ProductFeederClient {
def getProduct(
clientId: String,
siteId: String,
productId: String): Future[Option[ProductItem]]
def updateProduct(
clientId: String,
siteId: String,
productId: String,
productAttributes: Map[String, Any]): Future[Boolean]
def deleteProduct(
clientId: String,
siteId: String,
productId: String): Future[Boolean]
}
The getProduct
method returns a Future
of an optional ProductItem
that stores product information. The updateProduct
method allows one to create or update the attributes associated with a given product, returning a Future
of a Boolean
that signals that the update was successful. The deleteProduct
method allows one to delete a given product, returning a Future
of a Boolean
that signals that the update was successful.
The ProductItem
class is something that represents a product through its id, a map of attributes and a last updated timestamp:
case class ProductItem(id: String, attributes: Map[String, Any], lastUpdated: DateTime)
The AdStax Scala SDK provides an implementation of ProductFeederClient
using a RPC system built around AdStax AMQP-compatible message bus. In order to create a ProductFeederAmqpRpcClient
it is necessary to have access to a ActorRefFactory
from Akka. ActorSystem
and AkkaContext
both implement ActorRefFactory
. You must also provide the configuration parameters both for AMQP and the RPC protocol. These are supplied as domain objects, which are loaded from Typesafe config objects using PureConfig. The config.RabbitMQ
object should provide definitions for the host
, port
, username
, password
and timeout
(for the operations). Here's the default configuration object that is loaded from reference.conf
:
{
host = "localhost"
port = 5672
username = "guest"
password = "guest"
timeout = 5m
}
The RPC protocol needs to be configured at the level of the AMQP exchange name and routing key prefix to use in the requests. Here's the default configuration object:
{
exchange-name = "sf.data"
routing-key-prefix = "productfeeder."
}
The ProductFeederAmqpRpcClient
constructor also requires an implicit ExecutionContext
to deal with futures. Here's how you create a product feeder client and interact with it. We're assuming the AMQP message broker is available at mq.sample-adstax-instance.dev.adstax.io
(with credentials guest
/guest
) and using the default RPC configuration:
scala> import eu.shiftforward.adstax.productfeeder.api.rpc._
import eu.shiftforward.adstax.productfeeder.api.rpc._
scala> import akka.actor._
import akka.actor._
scala> implicit val system = ActorSystem("test-actor-system")
system: akka.actor.ActorSystem = akka://test-actor-system
scala> implicit val ec = system.dispatcher
ec: scala.concurrent.ExecutionContextExecutor = Dispatcher[akka.actor.default-dispatcher]
scala> val client = ProductFeederAmqpRpcClient()
client: eu.shiftforward.adstax.productfeeder.api.rpc.ProductFeederAmqpRpcClient = eu.shiftforward.adstax.productfeeder.api.rpc.ProductFeederAmqpRpcClient@227fdc4e
scala> client.updateProduct("client1", "site1", "p1", Map("price" -> 2, "somethingElse" -> "xpto")).onComplete(println)
Success(true)
scala> client.getProduct("client1", "site1", "p1").onComplete(println)
Success(Some(ProductItem(p1,Map(price -> 2, somethingElse -> xpto),2016-07-07T13:08:58.741+01:00)))
scala> client.updateProduct("client1", "site1", "p1", Map("price" -> 5)).onComplete(println)
Success(true)
scala> client.getProduct("client1", "site1", "p1").onComplete(println)
Success(Some(ProductItem(p1,Map(price -> 5),2016-07-07T13:10:06.113+01:00)))
scala> client.deleteProduct("client1", "site1", "p1").onComplete(println)
Success(true)
scala> client.getProduct("client1", "site1", "p1").onComplete(println)
Success(None)
Users
User attributes can be manipulated by using an instance of UserProfileStorageClient
. A UserProfileStorageClient
supplies the following interface:
trait UserProfileStorageClient {
def get(
userId: String,
clientId: String): Future[Option[UserAttributes]]
def update(
userId: String,
clientId: String,
attributes: UserAttributes,
mergeStrategy: AttributeMergingStrategy = MergeMergingStrategy): Future[Boolean]
def delete(
userId: String,
clientId: String): Future[Boolean]
}
The get
method returns a Future
of an optional UserAttributes
object that stores user attributes information. The update
method allows one to create or update the attributes associated with a given user, returning a Future
of a Boolean
that signals that the update was successful. The mergeStrategy
parameter allows one to specify how the newly sent attributes should be merged with the previous ones, either retaining the ones that aren't specified in the attributes
parameter (with MergeMergingStrategy
) or replacing all of them with the ones that are specified (with ReplaceMergingStrategy
). The delete
method allows one to delete a given user, returning a Future
of a Boolean
that signals that the update was successful.
The UserAttributes
class is something that represents a user through its attributes:
case class UserAttributes(attributes: Map[String, AttributeValue])
An AttributeValue
can assume various forms:
case class StringAttrValue(value: String) extends AttributeValue
case class NumericAttrValue(value: Double) extends AttributeValue
case class BooleanAttrValue(value: Boolean) extends AttributeValue
case class GeoAttrValue(lat: Double, lon: Double) extends AttributeValue
case class ArrayAttrValue(arr: Seq[AttributeValue]) extends AttributeValue
case class MapAttrValue(map: Map[String, AttributeValue]) extends AttributeValue
The AdStax Scala SDK provides an implementation of UserProfileStorageClient
using a RPC system built around AdStax AMQP-compatible message bus. In order to create a UserProfileStorageAmqpRpcClient
it is necessary to have access to a ActorRefFactory
from Akka. ActorSystem
and AkkaContext
both implement ActorRefFactory
. You must also provide the configuration parameters both for AMQP and the RPC protocol. These are supplied as domain objects, which are loaded from Typesafe config objects using PureConfig. The config.RabbitMQ
object should provide definitions for the host
, port
, username
, password
and timeout
(for the operations). Here's the default configuration object that is loaded from reference.conf
:
{
host = "localhost"
port = 5672
username = "guest"
password = "guest"
timeout = 5m
}
The RPC protocol needs to be configured at the level of the AMQP exchange name and routing key prefix to use in the requests. Here's the default configuration object:
{
exchange-name = "sf.data"
routing-key-prefix = "user-profile-storage."
}
The UserProfileStorageAmqpRpcClient
constructor also requires an implicit ExecutionContext
to deal with futures. Here's how you create a user profile storage client and interact with it. We're assuming the AMQP message broker is available at mq.sample-adstax-instance.dev.adstax.io
(with credentials guest
/guest
) and using the default RPC configuration:
scala> import eu.shiftforward.adstax.ups.api._
import eu.shiftforward.adstax.ups.api._
scala> import eu.shiftforward.adstax.ups.api.rpc._
import eu.shiftforward.adstax.ups.api.rpc._
scala> import akka.actor._
import akka.actor._
scala> implicit val system = ActorSystem("test-actor-system")
system: akka.actor.ActorSystem = akka://test-actor-system
scala> implicit val ec = system.dispatcher
ec: scala.concurrent.ExecutionContextExecutor = Dispatcher[akka.actor.default-dispatcher]
scala> val client = UserProfileStorageAmqpRpcClient()
client: eu.shiftforward.adstax.ups.api.rpc.UserProfileStorageAmqpRpcClient = eu.shiftforward.adstax.storage.rpc.UserProfileStorageAmqpRpcClient@750147c3
scala> client.update("user1", "client1", UserAttributes(Map("nAttr" -> NumericAttrValue(2.0), "sAttr" -> StringAttrValue("val")))).onComplete(println)
Success(true)
scala> client.get("user1", "client1").onComplete(println)
Success(Some(UserAttributes(Map(nAttr -> NumericAttrValue(2.0), sAttr -> StringAttrValue(val)))))
scala> client.update("user1", "client1", UserAttributes(Map("nAttr" -> NumericAttrValue(5.0)))).onComplete(println)
Success(true)
scala> client.get("user1", "client1").onComplete(println)
Success(Some(UserAttributes(Map(nAttr -> NumericAttrValue(5.0), sAttr -> StringAttrValue(val)))))
scala> client.delete("user1", "client1").onComplete(println)
Success(true)
scala> client.get("user1", "client1").onComplete(println)
Success(None)
Spark Jobs
The AdStax Scala SDK allows one to define jobs to run on Spark and that are able to access AdStax events. In order to define a spark job, one should create a class that implements the SparkJob
trait. The SparkJob
trait has the following interface:
trait SparkJob {
def name: String
def run(args: Array[String])(implicit context: AdStaxSparkContext): Unit
}
The name
method should return the name of the job, whereas the run
method should implement the primary job logic. The run
method accepts as parameter the command-line arguments of the job, and is supplied with an implicit AdStaxSparkContext
. The AdStaxSparkContext
has an eventsRDD
method to access AdStax events:
def eventsRDD(eventTypes: Set[String], startDateTime: DateTime, endDateTime: DateTime): RDD[String]
The returned RDD
provides access to the events in JSON format. Refer to AdStax public documentation for the expected event format. Once implemented, you can submit the Spark job to run on your AdStax instance using the adstax-spark-job-manager
command line utility. You can find examples of Spark jobs to run on AdStax here.
Tracking
The AdStax Scala SDK provides a way to make any Akka Actor start receiving AdStax tracking events. A TrackingListenerActor
is able to connect to the Tracking module in an AdStax system and have AdStax events directed to an actor. Consider a very simple actor that prints all events to stdout:
import akka.actor._
import eu.shiftforward.adstax.tracking._
import com.typesafe.config._
class ListenerActor extends Actor {
def receive = {
case x => println("Got " + x)
}
}
After providing the AMQP configuration and the routing keys in the same way as the modules above, you can start having the AdStax events forwarded to your actor:
scala> import eu.shiftforward.adstax.tracking._
import eu.shiftforward.adstax.tracking._
scala> import akka.actor._
import akka.actor._
scala> val system = ActorSystem("test-actor-system")
system: akka.actor.ActorSystem = akka://test-actor-system
scala> val listener = system.actorOf(TrackingListenerActor.props(Props[ListenerActor]))
listener: akka.actor.ActorRef = Actor[akka://test-actor-system/user/$b#342189667]
Got {"meta":{"type":"userCreation","uid":"673da5b975164db8900eff5014578036","timestamp":"2016-07-12T15:05:20.733Z"}}
Got {"meta":{"type":"userCookieSticky","uid":"673da5b975164db8900eff5014578036","timestamp":"2016-07-12T15:05:21.344Z"}}
Got {"method":"GET","data":{"cookieCheck":"true","":""},"uri":{"path":"/events","full":"http://tr.sample-adstax-instance.dev.adstax.io/events?cookieCheck=true&","query":{"cookieCheck":"true","":""},"scheme":"http","authority":{"host":"tr.sample-adstax-instance.dev.adstax.io","port":0,"userinfo":""}},"meta":{"city":"Philadelphia","location":"39.953,-75.1756","timestamp":"2016-07-12T15:05:21.902Z","country":"United States","cookie":{"sticky":true},"uid":"673da5b975164db8900eff5014578036","type":"null"},"entity":null,"headers":{"X-Forwarded-For":"76.72.167.0","Connection":"keep-alive","X-Forwarded-Port":"80","X-Forwarded-Proto":"http","Cookie":{"adstax_uid":"673da5b975164db8900eff5014578036"},"Accept-Encoding":"gzip, deflate, identity","User-Agent":"Pingdom.com_bot_version_1.4_ (http://www.pingdom.com)","Host":"tr.sample-adstax-instance.dev.adstax.io"},"protocol":"HTTP/1.1"}
Got {"meta":{"type":"userCreation","uid":"92f41b2799ea4b96ad196f70e7ce6cc7","timestamp":"2016-07-12T15:06:21.146Z"}}
Got {"meta":{"type":"userCookieSticky","uid":"92f41b2799ea4b96ad196f70e7ce6cc7","timestamp":"2016-07-12T15:06:21.176Z"}}
Got {"method":"GET","data":{"cookieCheck":"true","":""},"uri":{"path":"/events","full":"http://tr.sample-adstax-instance.dev.adstax.io/events?cookieCheck=true&","query":{"cookieCheck":"true","":""},"scheme":"http","authority":{"host":"tr.sample-adstax-instance.dev.adstax.io","port":0,"userinfo":""}},"meta":{"city":"Atlanta","location":"33.7516,-84.3915","timestamp":"2016-07-12T15:06:21.180Z","country":"United States","cookie":{"sticky":true},"uid":"92f41b2799ea4b96ad196f70e7ce6cc7","type":"null"},"entity":null,"headers":{"X-Forwarded-For":"174.34.162.0","Connection":"keep-alive","X-Forwarded-Port":"80","X-Forwarded-Proto":"http","Cookie":{"adstax_uid":"92f41b2799ea4b96ad196f70e7ce6cc7"},"Accept-Encoding":"gzip, deflate, identity","User-Agent":"Pingdom.com_bot_version_1.4_ (http://www.pingdom.com)","Host":"tr.sample-adstax-instance.dev.adstax.io"},"protocol":"HTTP/1.1"}
Got {"meta":{"type":"userCreation","uid":"459d80d56d7d4535ac48eb563e189f0f","timestamp":"2016-07-12T15:07:20.325Z"}}
Got {"meta":{"type":"userCookieSticky","uid":"459d80d56d7d4535ac48eb563e189f0f","timestamp":"2016-07-12T15:07:20.443Z"}}
Got {"method":"GET","data":{"cookieCheck":"true","":""},"uri":{"path":"/events","full":"http://tr.sample-adstax-instance.dev.adstax.io/events?cookieCheck=true&","query":{"cookieCheck":"true","":""},"scheme":"http","authority":{"host":"tr.sample-adstax-instance.dev.adstax.io","port":0,"userinfo":""}},"meta":{"city":"Marco","location":"45.85,11.0","timestamp":"2016-07-12T15:07:20.448Z","country":"Italy","cookie":{"sticky":true},"uid":"459d80d56d7d4535ac48eb563e189f0f","type":"null"},"entity":null,"headers":{"X-Forwarded-For":"95.141.32.0","Connection":"keep-alive","X-Forwarded-Port":"80","X-Forwarded-Proto":"http","Cookie":{"adstax_uid":"459d80d56d7d4535ac48eb563e189f0f"},"Accept-Encoding":"gzip, deflate, identity","User-Agent":"Pingdom.com_bot_version_1.4_ (http://www.pingdom.com)","Host":"tr.sample-adstax-instance.dev.adstax.io"},"protocol":"HTTP/1.1"}
...