velocidi / adstax-sdk-scala

Tools for programmatic interaction with an AdStax instance using Scala

GitHub

AdStax Scala SDK Build Status Maven Central

The AdStax Scala SDK provides tools for programmatic interaction with an AdStax instance using Scala. Please take into account that the SDK is still in an experimental stage and the interfaces might change for subsequent releases.

Installation

The latest release of AdStax Scala SDK is 0.7.0 and is built against Scala 2.11 and 2.12.

To use it in an existing SBT project, add the following dependency to your build.sbt:

libraryDependencies += "eu.shiftforward" %% "adstax-sdk-scala" % "0.7.0"

If you need to interact with Spark in an AdStax instance, you also need to add (only available for Scala 2.11):

libraryDependencies += "eu.shiftforward" %% "adstax-sdk-scala-spark" % "0.7.0"

Usage

Products

Product details can be manipulated by using an instance of ProductFeederClient. A ProductFeederClient supplies the following interface:

trait ProductFeederClient {
  def getProduct(
    clientId: String,
    siteId: String,
    productId: String): Future[Option[ProductItem]]

  def updateProduct(
    clientId: String,
    siteId: String,
    productId: String,
    productAttributes: Map[String, Any]): Future[Boolean]

  def deleteProduct(
    clientId: String,
    siteId: String,
    productId: String): Future[Boolean]
}

The getProduct method returns a Future of an optional ProductItem that stores product information. The updateProduct method allows one to create or update the attributes associated with a given product, returning a Future of a Boolean that signals that the update was successful. The deleteProduct method allows one to delete a given product, returning a Future of a Boolean that signals that the update was successful.

The ProductItem class is something that represents a product through its id, a map of attributes and a last updated timestamp:

case class ProductItem(id: String, attributes: Map[String, Any], lastUpdated: DateTime)

The AdStax Scala SDK provides an implementation of ProductFeederClient using a RPC system built around AdStax AMQP-compatible message bus. In order to create a ProductFeederAmqpRpcClient it is necessary to have access to a ActorRefFactory from Akka. ActorSystem and AkkaContext both implement ActorRefFactory. You must also provide the configuration parameters both for AMQP and the RPC protocol. These are supplied as domain objects, which are loaded from Typesafe config objects using PureConfig. The config.RabbitMQ object should provide definitions for the host, port, username, password and timeout (for the operations). Here's the default configuration object that is loaded from reference.conf:

{
  host = "localhost"
  port = 5672
  username = "guest"
  password = "guest"
  timeout = 5m
}

The RPC protocol needs to be configured at the level of the AMQP exchange name and routing key prefix to use in the requests. Here's the default configuration object:

{
  exchange-name = "sf.data"
  routing-key-prefix = "productfeeder."
}

The ProductFeederAmqpRpcClient constructor also requires an implicit ExecutionContext to deal with futures. Here's how you create a product feeder client and interact with it. We're assuming the AMQP message broker is available at mq.sample-adstax-instance.dev.adstax.io (with credentials guest/guest) and using the default RPC configuration:

scala> import eu.shiftforward.adstax.productfeeder.api.rpc._
import eu.shiftforward.adstax.productfeeder.api.rpc._

scala> import akka.actor._
import akka.actor._

scala> implicit val system = ActorSystem("test-actor-system")
system: akka.actor.ActorSystem = akka://test-actor-system

scala> implicit val ec = system.dispatcher
ec: scala.concurrent.ExecutionContextExecutor = Dispatcher[akka.actor.default-dispatcher]

scala> val client = ProductFeederAmqpRpcClient()
client: eu.shiftforward.adstax.productfeeder.api.rpc.ProductFeederAmqpRpcClient = eu.shiftforward.adstax.productfeeder.api.rpc.ProductFeederAmqpRpcClient@227fdc4e

scala> client.updateProduct("client1", "site1", "p1", Map("price" -> 2, "somethingElse" -> "xpto")).onComplete(println)
Success(true)

scala> client.getProduct("client1", "site1", "p1").onComplete(println)
Success(Some(ProductItem(p1,Map(price -> 2, somethingElse -> xpto),2016-07-07T13:08:58.741+01:00)))

scala> client.updateProduct("client1", "site1", "p1", Map("price" -> 5)).onComplete(println)
Success(true)

scala> client.getProduct("client1", "site1", "p1").onComplete(println)
Success(Some(ProductItem(p1,Map(price -> 5),2016-07-07T13:10:06.113+01:00)))

scala> client.deleteProduct("client1", "site1", "p1").onComplete(println)
Success(true)

scala> client.getProduct("client1", "site1", "p1").onComplete(println)
Success(None)

Users

User attributes can be manipulated by using an instance of UserProfileStorageClient. A UserProfileStorageClient supplies the following interface:

trait UserProfileStorageClient {
  def get(
    userId: String,
    clientId: String): Future[Option[UserAttributes]]

  def update(
    userId: String,
    clientId: String,
    attributes: UserAttributes,
    mergeStrategy: AttributeMergingStrategy = MergeMergingStrategy): Future[Boolean]

  def delete(
    userId: String,
    clientId: String): Future[Boolean]
}

The get method returns a Future of an optional UserAttributes object that stores user attributes information. The update method allows one to create or update the attributes associated with a given user, returning a Future of a Boolean that signals that the update was successful. The mergeStrategy parameter allows one to specify how the newly sent attributes should be merged with the previous ones, either retaining the ones that aren't specified in the attributes parameter (with MergeMergingStrategy) or replacing all of them with the ones that are specified (with ReplaceMergingStrategy). The delete method allows one to delete a given user, returning a Future of a Boolean that signals that the update was successful.

The UserAttributes class is something that represents a user through its attributes:

case class UserAttributes(attributes: Map[String, AttributeValue])

An AttributeValue can assume various forms:

case class StringAttrValue(value: String) extends AttributeValue
case class NumericAttrValue(value: Double) extends AttributeValue
case class BooleanAttrValue(value: Boolean) extends AttributeValue
case class GeoAttrValue(lat: Double, lon: Double) extends AttributeValue 
case class ArrayAttrValue(arr: Seq[AttributeValue]) extends AttributeValue
case class MapAttrValue(map: Map[String, AttributeValue]) extends AttributeValue

The AdStax Scala SDK provides an implementation of UserProfileStorageClient using a RPC system built around AdStax AMQP-compatible message bus. In order to create a UserProfileStorageAmqpRpcClient it is necessary to have access to a ActorRefFactory from Akka. ActorSystem and AkkaContext both implement ActorRefFactory. You must also provide the configuration parameters both for AMQP and the RPC protocol. These are supplied as domain objects, which are loaded from Typesafe config objects using PureConfig. The config.RabbitMQ object should provide definitions for the host, port, username, password and timeout (for the operations). Here's the default configuration object that is loaded from reference.conf:

{
  host = "localhost"
  port = 5672
  username = "guest"
  password = "guest"
  timeout = 5m
}

The RPC protocol needs to be configured at the level of the AMQP exchange name and routing key prefix to use in the requests. Here's the default configuration object:

{
  exchange-name = "sf.data"
  routing-key-prefix = "user-profile-storage."
}

The UserProfileStorageAmqpRpcClient constructor also requires an implicit ExecutionContext to deal with futures. Here's how you create a user profile storage client and interact with it. We're assuming the AMQP message broker is available at mq.sample-adstax-instance.dev.adstax.io (with credentials guest/guest) and using the default RPC configuration:

scala> import eu.shiftforward.adstax.ups.api._
import eu.shiftforward.adstax.ups.api._

scala> import eu.shiftforward.adstax.ups.api.rpc._
import eu.shiftforward.adstax.ups.api.rpc._

scala> import akka.actor._
import akka.actor._

scala> implicit val system = ActorSystem("test-actor-system")
system: akka.actor.ActorSystem = akka://test-actor-system

scala> implicit val ec = system.dispatcher
ec: scala.concurrent.ExecutionContextExecutor = Dispatcher[akka.actor.default-dispatcher]

scala> val client = UserProfileStorageAmqpRpcClient()
client: eu.shiftforward.adstax.ups.api.rpc.UserProfileStorageAmqpRpcClient = eu.shiftforward.adstax.storage.rpc.UserProfileStorageAmqpRpcClient@750147c3

scala> client.update("user1", "client1", UserAttributes(Map("nAttr" -> NumericAttrValue(2.0), "sAttr" -> StringAttrValue("val")))).onComplete(println)
Success(true)

scala> client.get("user1", "client1").onComplete(println)
Success(Some(UserAttributes(Map(nAttr -> NumericAttrValue(2.0), sAttr -> StringAttrValue(val)))))

scala> client.update("user1", "client1", UserAttributes(Map("nAttr" -> NumericAttrValue(5.0)))).onComplete(println)
Success(true)

scala> client.get("user1", "client1").onComplete(println)
Success(Some(UserAttributes(Map(nAttr -> NumericAttrValue(5.0), sAttr -> StringAttrValue(val)))))

scala> client.delete("user1", "client1").onComplete(println)
Success(true)

scala> client.get("user1", "client1").onComplete(println)
Success(None)

Spark Jobs

The AdStax Scala SDK allows one to define jobs to run on Spark and that are able to access AdStax events. In order to define a spark job, one should create a class that implements the SparkJob trait. The SparkJob trait has the following interface:

trait SparkJob {
  def name: String
  def run(args: Array[String])(implicit context: AdStaxSparkContext): Unit
}

The name method should return the name of the job, whereas the run method should implement the primary job logic. The run method accepts as parameter the command-line arguments of the job, and is supplied with an implicit AdStaxSparkContext. The AdStaxSparkContext has an eventsRDD method to access AdStax events:

def eventsRDD(eventTypes: Set[String], startDateTime: DateTime, endDateTime: DateTime): RDD[String]

The returned RDD provides access to the events in JSON format. Refer to AdStax public documentation for the expected event format. Once implemented, you can submit the Spark job to run on your AdStax instance using the adstax-spark-job-manager command line utility. You can find examples of Spark jobs to run on AdStax here.

Tracking

The AdStax Scala SDK provides a way to make any Akka Actor start receiving AdStax tracking events. A TrackingListenerActor is able to connect to the Tracking module in an AdStax system and have AdStax events directed to an actor. Consider a very simple actor that prints all events to stdout:

import akka.actor._
import eu.shiftforward.adstax.tracking._
import com.typesafe.config._

class ListenerActor extends Actor {
  def receive = {
    case x => println("Got " + x)
  }
}

After providing the AMQP configuration and the routing keys in the same way as the modules above, you can start having the AdStax events forwarded to your actor:

scala> import eu.shiftforward.adstax.tracking._
import eu.shiftforward.adstax.tracking._

scala> import akka.actor._
import akka.actor._

scala> val system = ActorSystem("test-actor-system")
system: akka.actor.ActorSystem = akka://test-actor-system

scala> val listener = system.actorOf(TrackingListenerActor.props(Props[ListenerActor]))
listener: akka.actor.ActorRef = Actor[akka://test-actor-system/user/$b#342189667]
Got {"meta":{"type":"userCreation","uid":"673da5b975164db8900eff5014578036","timestamp":"2016-07-12T15:05:20.733Z"}}
Got {"meta":{"type":"userCookieSticky","uid":"673da5b975164db8900eff5014578036","timestamp":"2016-07-12T15:05:21.344Z"}}
Got {"method":"GET","data":{"cookieCheck":"true","":""},"uri":{"path":"/events","full":"http://tr.sample-adstax-instance.dev.adstax.io/events?cookieCheck=true&","query":{"cookieCheck":"true","":""},"scheme":"http","authority":{"host":"tr.sample-adstax-instance.dev.adstax.io","port":0,"userinfo":""}},"meta":{"city":"Philadelphia","location":"39.953,-75.1756","timestamp":"2016-07-12T15:05:21.902Z","country":"United States","cookie":{"sticky":true},"uid":"673da5b975164db8900eff5014578036","type":"null"},"entity":null,"headers":{"X-Forwarded-For":"76.72.167.0","Connection":"keep-alive","X-Forwarded-Port":"80","X-Forwarded-Proto":"http","Cookie":{"adstax_uid":"673da5b975164db8900eff5014578036"},"Accept-Encoding":"gzip, deflate, identity","User-Agent":"Pingdom.com_bot_version_1.4_ (http://www.pingdom.com)","Host":"tr.sample-adstax-instance.dev.adstax.io"},"protocol":"HTTP/1.1"}
Got {"meta":{"type":"userCreation","uid":"92f41b2799ea4b96ad196f70e7ce6cc7","timestamp":"2016-07-12T15:06:21.146Z"}}
Got {"meta":{"type":"userCookieSticky","uid":"92f41b2799ea4b96ad196f70e7ce6cc7","timestamp":"2016-07-12T15:06:21.176Z"}}
Got {"method":"GET","data":{"cookieCheck":"true","":""},"uri":{"path":"/events","full":"http://tr.sample-adstax-instance.dev.adstax.io/events?cookieCheck=true&","query":{"cookieCheck":"true","":""},"scheme":"http","authority":{"host":"tr.sample-adstax-instance.dev.adstax.io","port":0,"userinfo":""}},"meta":{"city":"Atlanta","location":"33.7516,-84.3915","timestamp":"2016-07-12T15:06:21.180Z","country":"United States","cookie":{"sticky":true},"uid":"92f41b2799ea4b96ad196f70e7ce6cc7","type":"null"},"entity":null,"headers":{"X-Forwarded-For":"174.34.162.0","Connection":"keep-alive","X-Forwarded-Port":"80","X-Forwarded-Proto":"http","Cookie":{"adstax_uid":"92f41b2799ea4b96ad196f70e7ce6cc7"},"Accept-Encoding":"gzip, deflate, identity","User-Agent":"Pingdom.com_bot_version_1.4_ (http://www.pingdom.com)","Host":"tr.sample-adstax-instance.dev.adstax.io"},"protocol":"HTTP/1.1"}
Got {"meta":{"type":"userCreation","uid":"459d80d56d7d4535ac48eb563e189f0f","timestamp":"2016-07-12T15:07:20.325Z"}}
Got {"meta":{"type":"userCookieSticky","uid":"459d80d56d7d4535ac48eb563e189f0f","timestamp":"2016-07-12T15:07:20.443Z"}}
Got {"method":"GET","data":{"cookieCheck":"true","":""},"uri":{"path":"/events","full":"http://tr.sample-adstax-instance.dev.adstax.io/events?cookieCheck=true&","query":{"cookieCheck":"true","":""},"scheme":"http","authority":{"host":"tr.sample-adstax-instance.dev.adstax.io","port":0,"userinfo":""}},"meta":{"city":"Marco","location":"45.85,11.0","timestamp":"2016-07-12T15:07:20.448Z","country":"Italy","cookie":{"sticky":true},"uid":"459d80d56d7d4535ac48eb563e189f0f","type":"null"},"entity":null,"headers":{"X-Forwarded-For":"95.141.32.0","Connection":"keep-alive","X-Forwarded-Port":"80","X-Forwarded-Proto":"http","Cookie":{"adstax_uid":"459d80d56d7d4535ac48eb563e189f0f"},"Accept-Encoding":"gzip, deflate, identity","User-Agent":"Pingdom.com_bot_version_1.4_ (http://www.pingdom.com)","Host":"tr.sample-adstax-instance.dev.adstax.io"},"protocol":"HTTP/1.1"}
...