Almaren Framework
The Almaren Framework provides a simplified consistent minimalistic layer over Apache Spark, while still allowing you to take advantage of native Apache Spark features. You can even combine it with standard Spark code.
Table of Contents
Introduction
The Almaren Framework provides a simplified consistent minimalistic layer over Apache Spark, while still allowing you to take advantage of native Apache Spark features. You can even combine it with standard Spark code.
Dependency
To add Almaren Framework dependency to your sbt build:
libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % "0.5.0-2.4"
To run in spark-shell:
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.11:0.5.0-2.4"
Batch Example
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.Almaren
import org.apache.spark.sql.DataFrame
val almaren = Almaren("App Name")
val spark = almaren.spark
.master("local[*]")
.config("spark.sql.shuffle.partitions", "1")
val df:DataFrame = almaren.builder
.sourceSql("select monotonically_increasing_id() as id,* from movies")
.dsl("""id$id:LongType
|title$title:StringType
|year$year:LongType
|cast[0]$actor:StringType
|cast[1]$support_actor:StringType
|genres[0]$genre:StringType
|director@director
| director.name$credit_name:StringType""".stripMargin)
.sql("""SELECT * FROM __TABLE__ WHERE actor NOT IN ("the","the life of")""")
.targetJdbc("jdbc:postgresql://localhost/almaren","org.postgresql.Driver","movies",SaveMode.Overwrite)
.batch
Streaming Example
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.Almaren
val almaren = Almaren("Streaming App")
val streaming = almaren.builder
.sourceSql("select CAST(value AS STRING) as json_column FROM __STREAMING__")
.deserializer("json","json_column")
.dsl("""user.id$user_id:LongType
|user.name$user_name:StringType
|user.time_zone$time_zone:StringType
|user.friends_count$friends_count:LongType
|user.followers_count$followers_count:LongType
|source$source:StringType
|place.country$country:StringType
|timestamp_ms$timestamp_ms:LongType
|text$message:StringType
|entities@entitie
| entitie.hashtags@hashtag
| hashtag.text$hashtag:StringType""".stripMargin)
.sql("SELECT DISTINCT * FROM __TABLE__")
.sql("""SELECT sha2(concat_ws("",array(*)),256) as unique_hash,*,current_timestamp from __TABLE__""")
.targetJdbc("jdbc:postgresql://localhost/almaren","org.postgresql.Driver","twitter_streaming",SaveMode.Append)
almaren.streaming(streaming,Map("kafka.bootstrap.servers" -> "localhost:9092","subscribe" -> "twitter", "startingOffsets" -> "earliest"))
Debugging
To debug the code you can turn on log4j.logger.com.github.music.of.the.ainur.almaren=DEBUG
, so you can see the state of each component.
You also can setup the debug from Scala code:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("com.github.music.of.the.ainur.almaren").setLevel(Level.DEBUG)
Example:
val df:DataFrame = almaren.builder
.sourceSql("select monotonically_increasing_id() as id,* from movies")
.dsl("""id$id:LongType
|title$title:StringType
|year$year:LongType
|cast[0]$actor:StringType
|cast[1]$support_actor:StringType
|genres[0]$genre:StringType
|director@director
| director.name$credit_name:StringType""".stripMargin)
.sql("""SELECT *,current_timestamp as date FROM __TABLE__ WHERE actor NOT IN ("the","the life of")""")
.targetJdbc("jdbc:postgresql://localhost/almaren","org.postgresql.Driver","movies",SaveMode.Overwrite)
.batch
The output:
20/10/08 11:57:10 INFO SourceSql: sql:{select monotonically_increasing_id() as id,* from movies}
+---+----------------+--------------------+--------------------+----+
| id| cast| genres| title|year|
+---+----------------+--------------------+--------------------+----+
| 0| []| []|After Dark in Cen...|1900|
| 1| []| []|Boarding School G...|1900|
| 2| []| []|Buffalo Bill's Wi...|1900|
| 3| []| []| Caught|1900|
| 4| []| []|Clowns Spinning Hats|1900|
| 5| []|[Short, Documentary]|Capture of Boer B...|1900|
| 6| []| []|The Enchanted Dra...|1900|
| 7| [Paul Boyton]| []| Feeding Sea Lions|1900|
| 8| []| [Comedy]|How to Make a Fat...|1900|
| 9| []| []| New Life Rescue|1900|
| 10| []| []| New Morning Bath|1900|
| 11| []| []|Searching Ruins o...|1900|
| 12| []| []|The Tribulations ...|1900|
| 13| []| [Comedy]|Trouble in Hogan'...|1900|
| 14| []| [Short]| Two Old Sparks|1900|
| 15|[Ching Ling Foo]| [Short]|The Wonder, Ching...|1900|
| 16| []| [Short]| Watermelon Contest|1900|
| 17| []| []| Acrobats in Cairo|1901|
| 18| []| []| An Affair of Honor|1901|
| 19| []| []|Another Job for t...|1901|
+---+----------------+--------------------+--------------------+----+
only showing top 20 rows
20/10/08 11:57:10 INFO Dsl: dsl:{id$id:LongType
title$title:StringType
year$year:LongType
cast[0]$actor:StringType
cast[1]$support_actor:StringType
genres[0]$genre:StringType
director@director
director.name$credit_name:StringType}
+---+--------------------+----+--------------+-------------+------+-----------+
| id| title|year| actor|support_actor| genre|credit_name|
+---+--------------------+----+--------------+-------------+------+-----------+
| 0|After Dark in Cen...|1900| null| null| null| null|
| 1|Boarding School G...|1900| null| null| null| null|
| 2|Buffalo Bill's Wi...|1900| null| null| null| null|
| 3| Caught|1900| null| null| null| null|
| 4|Clowns Spinning Hats|1900| null| null| null| null|
| 5|Capture of Boer B...|1900| null| null| Short| null|
| 6|The Enchanted Dra...|1900| null| null| null| null|
| 7| Feeding Sea Lions|1900| Paul Boyton| null| null| null|
| 8|How to Make a Fat...|1900| null| null|Comedy| null|
| 9| New Life Rescue|1900| null| null| null| null|
| 10| New Morning Bath|1900| null| null| null| null|
| 11|Searching Ruins o...|1900| null| null| null| null|
| 12|The Tribulations ...|1900| null| null| null| null|
| 13|Trouble in Hogan'...|1900| null| null|Comedy| null|
| 14| Two Old Sparks|1900| null| null| Short| null|
| 15|The Wonder, Ching...|1900|Ching Ling Foo| null| Short| null|
| 16| Watermelon Contest|1900| null| null| Short| null|
| 17| Acrobats in Cairo|1901| null| null| null| null|
| 18| An Affair of Honor|1901| null| null| null| null|
| 19|Another Job for t...|1901| null| null| null| null|
+---+--------------------+----+--------------+-------------+------+-----------+
only showing top 20 rows
20/10/08 11:57:11 INFO Sql: sql:{SELECT *,current_timestamp as date FROM __TABLE__ WHERE actor NOT IN ("the","the life of")}
+---+--------------------+----+------------------+-----------------+----------+-----------+--------------------+
| id| title|year| actor| support_actor| genre|credit_name| date|
+---+--------------------+----+------------------+-----------------+----------+-----------+--------------------+
| 7| Feeding Sea Lions|1900| Paul Boyton| null| null| null|2020-10-08 11:57:...|
| 15|The Wonder, Ching...|1900| Ching Ling Foo| null| Short| null|2020-10-08 11:57:...|
|105| Alice in Wonderland|1903| May Clark| null| null| null|2020-10-08 11:57:...|
|142| Nicholas Nickleby|1903|William Carrington| null| null| null|2020-10-08 11:57:...|
|242|The Automobile Th...|1906|J. Stuart Blackton|Florence Lawrence| Short| null|2020-10-08 11:57:...|
|245|Humorous Phases o...|1906|J. Stuart Blackton| null| Short| null|2020-10-08 11:57:...|
|250| Ben-Hur|1907| William S. Hart| null|Historical| null|2020-10-08 11:57:...|
|251| Daniel Boone|1907| William Craven|Florence Lawrence| Biography| null|2020-10-08 11:57:...|
|252|How Brown Saw the...|1907| Unknown| null| Comedy| null|2020-10-08 11:57:...|
|253| Laughing Gas|1907| Bertha Regustus| Edward Boulden| Comedy| null|2020-10-08 11:57:...|
|256|The Adventures of...|1908| Arthur V. Johnson| Linda Arvidson| Drama| null|2020-10-08 11:57:...|
|257|Antony and Cleopatra|1908| Florence Lawrence|William V. Ranous| null| null|2020-10-08 11:57:...|
|258| Balked at the Altar|1908| Linda Arvidson| George Gebhardt| Comedy| null|2020-10-08 11:57:...|
|259|The Bandit's Wate...|1908| Charles Inslee| Linda Arvidson| Drama| null|2020-10-08 11:57:...|
|260| The Black Viper|1908| D. W. Griffith| null| Drama| null|2020-10-08 11:57:...|
|261|A Calamitous Elop...|1908| Harry Solter| Linda Arvidson| Comedy| null|2020-10-08 11:57:...|
|262|The Call of the Wild|1908| Charles Inslee| null| Adventure| null|2020-10-08 11:57:...|
|263| A Christmas Carol|1908| Tom Ricketts| null| Drama| null|2020-10-08 11:57:...|
|264|Deceived Slumming...|1908| Edward Dillon| D. W. Griffith| Comedy| null|2020-10-08 11:57:...|
|265|Dr. Jekyll and Mr...|1908| Hobart Bosworth| Betty Harte| Horror| null|2020-10-08 11:57:...|
+---+--------------------+----+------------------+-----------------+----------+-----------+--------------------+
only showing top 20 rows
20/10/08 11:57:11 INFO TargetJdbc: url:{jdbc:postgresql://localhost/almaren}, driver:{org.postgresql.Driver}, dbtable:{movies}, user:{None}, params:{Map()}
+---+--------------------+----+------------------+-----------------+----------+-----------+--------------------+
| id| title|year| actor| support_actor| genre|credit_name| date|
+---+--------------------+----+------------------+-----------------+----------+-----------+--------------------+
| 7| Feeding Sea Lions|1900| Paul Boyton| null| null| null|2020-10-08 11:57:...|
| 15|The Wonder, Ching...|1900| Ching Ling Foo| null| Short| null|2020-10-08 11:57:...|
|105| Alice in Wonderland|1903| May Clark| null| null| null|2020-10-08 11:57:...|
|142| Nicholas Nickleby|1903|William Carrington| null| null| null|2020-10-08 11:57:...|
|242|The Automobile Th...|1906|J. Stuart Blackton|Florence Lawrence| Short| null|2020-10-08 11:57:...|
|245|Humorous Phases o...|1906|J. Stuart Blackton| null| Short| null|2020-10-08 11:57:...|
|250| Ben-Hur|1907| William S. Hart| null|Historical| null|2020-10-08 11:57:...|
|251| Daniel Boone|1907| William Craven|Florence Lawrence| Biography| null|2020-10-08 11:57:...|
|252|How Brown Saw the...|1907| Unknown| null| Comedy| null|2020-10-08 11:57:...|
|253| Laughing Gas|1907| Bertha Regustus| Edward Boulden| Comedy| null|2020-10-08 11:57:...|
|256|The Adventures of...|1908| Arthur V. Johnson| Linda Arvidson| Drama| null|2020-10-08 11:57:...|
|257|Antony and Cleopatra|1908| Florence Lawrence|William V. Ranous| null| null|2020-10-08 11:57:...|
|258| Balked at the Altar|1908| Linda Arvidson| George Gebhardt| Comedy| null|2020-10-08 11:57:...|
|259|The Bandit's Wate...|1908| Charles Inslee| Linda Arvidson| Drama| null|2020-10-08 11:57:...|
|260| The Black Viper|1908| D. W. Griffith| null| Drama| null|2020-10-08 11:57:...|
|261|A Calamitous Elop...|1908| Harry Solter| Linda Arvidson| Comedy| null|2020-10-08 11:57:...|
|262|The Call of the Wild|1908| Charles Inslee| null| Adventure| null|2020-10-08 11:57:...|
|263| A Christmas Carol|1908| Tom Ricketts| null| Drama| null|2020-10-08 11:57:...|
|264|Deceived Slumming...|1908| Edward Dillon| D. W. Griffith| Comedy| null|2020-10-08 11:57:...|
|265|Dr. Jekyll and Mr...|1908| Hobart Bosworth| Betty Harte| Horror| null|2020-10-08 11:57:...|
+---+--------------------+----+------------------+-----------------+----------+-----------+--------------------+
only showing top 20 rows
Components
Source
sourceSql
Read native Spark/Hive tables using Spark SQL.
sourceSql("select monotonically_increasing_id() as id,* from database.tabname")
sourceFile
Read files like CSV,Avro,JSON and XML
sourceFile("csv","/tmp/file.csv",Map("header" -> "true"))
sourceHbase
Read from Hbase using HBase Connector
sourceCassandra
Read from Cassandra using Spark Cassandra Connector
sourceJdbc
Read from JDBC using Spark JDBC
sourceJdbc("jdbc:postgresql://localhost/almaren","org.postgresql.Driver","select * from table_name",Some("user"),Some("password"))
sourceBigQuery
Read from BigQuery using Google BigQuery Connector
Main
Cache
Cache/Uncache both DataFrame or Table
cache(true)
Coalesce
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
coalesce(10)
Repartition
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartition(100)
Pipe
Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
pipe("""perl -npE 's/(?:\d+)\s+([^\w]+)/:$1/mg'""")
Alias
Creates a temporary view using the previews component, createOrReplaceTempView
.
alias("my_table")
Deserializer
Deserialize the following types XML, JSON and Avro to Spark DataFrame.
deserializer("JSON","column_name","`cast` ARRAY<STRING>,`genres` ARRAY<STRING>,`title` STRING,`year` BIGINT")
SQL
Spark SQL syntax. You can query preview component through the special table __TABLE__
.
sql("SELECT * FROM __TABLE__")
DSL
DSL(Domain Specific Language) simplifies the task to flatten, select, alias and properly set the datatype. It's very powerful to parse complex data structures.
dsl("""title$title:StringType
|year$year:LongType
|cast[0]$actor:StringType
|cast[1]$support_actor:StringType
|genres[0]$genre:StringType""".stripMargin)
HTTP
Start a HTTP keep-alive connection for each partition of the RDD and send a request for each row returning two columns, header
and body
.
Target
targetSql
Write native Spark/Hive tables using Spark SQL.
targetSql("INSERT OVERWRITE TABLE database.table SELECT * FROM __TABLE__")
targetHbase
Write to Hbase using HBase Connector
targetCassandra
Write to Cassandra using Spark Cassandra Connector
targetJdbc
Write to JDBC using Spark JDBC
targetJdbc("jdbc:postgresql://localhost/almaren","org.postgresql.Driver","movies",SaveMode.Overwrite)
targetKafka
Write to Kafka, you must have a column named value, the content of this column will be sent to Kafka. You can specify the topic either with a column named topic or in the option as in the example below. Check the documentation for the full list of parameters
sql("SELECT to_json(struct(*)) as value FROM __TABLE__").targetKafka("localhost:9092",Map("topic" -> "testing"))
targetBigQuery
Read from BigQuery using Google BigQuery Connector
Executors
Executors are responsible to execute Almaren Tree i.e Option[Tree]
to Apache Spark. Without invoke an executor, code won't be executed by Apache Spark. Follow the list of executors:
Batch
Executes the Almaren Tree returning a Dataframe.
val tree = almaren.builder
.sourceSql("select monotonically_increasing_id() as id,* from movies")
.dsl("""id$id:LongType
|title$title:StringType
|year$year:LongType
|cast[0]$actor:StringType
|cast[1]$support_actor:StringType
|genres[0]$genre:StringType
|director@director
| director.name$credit_name:StringType""".stripMargin)
.sql("""SELECT * FROM __TABLE__ WHERE actor NOT IN ("the","the life of")""")
.targetJdbc("jdbc:postgresql://localhost/almaren","org.postgresql.Driver","movies",SaveMode.Overwrite)
val df:DataFrame = tree.batch
Streaming Kafka
Read data from Kafka and execute's Almaren Tree providing the special table __STREAMING__
:
Column Name | Data Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
The streaming(tree,params:Map[String,String]
method params are the options available in readStream.format("kafka").options(params)
you can check all the options here
val tree = almaren.builder
.sourceSql("select CAST(value AS STRING) as json_column FROM __STREAMING__")
.deserializer("json","json_column")
.dsl("""user.id$user_id:LongType
|user.name$user_name:StringType
|user.time_zone$time_zone:StringType
|user.friends_count$friends_count:LongType
|user.followers_count$followers_count:LongType
|source$source:StringType
|place.country$country:StringType
|timestamp_ms$timestamp_ms:LongType
|text$message:StringType
|entities@entitie
| entitie.hashtags@hashtag
| hashtag.text$hashtag:StringType""".stripMargin)
.sql("SELECT DISTINCT * FROM __TABLE__")
.sql("""SELECT sha2(concat_ws("",array(*)),256) as unique_hash,*,current_timestamp from __TABLE__""")
.targetJdbc("jdbc:postgresql://localhost/almaren","org.postgresql.Driver","twitter_streaming",SaveMode.Append)
almaren.streaming(tree,Map("kafka.bootstrap.servers" -> "localhost:9092","subscribe" -> "twitter", "startingOffsets" -> "earliest"))
Examples
Example 1
val almaren = Almaren("appName")
val df:DataFrame = almaren.builder.sourceSql("SELECT * FROM db.schema.table")
.deserializer("JSON","json_str")
.dsl("uuid$id:StringType
|code$area_code:LongType
|names@name
| name.firstName$first_name:StringType
| name.secondName$second_name:StringType
| name.lastName$last_name:StringType
|source_id$source_id:LongType".stripMargin)
.sql("""SELECT *,unix_timestamp() as timestamp from __TABLE__""")
.targetSql("INSERT OVERWRITE TABLE default.target_table SELECT * FROM __TABLE__")
.batch
valdf:DataFrame =
Example 2
val almaren = Almaren("appName")
val target1 = almaren.builder.dsl("uuid$id:StringType
|code$area_code:LongType
|names@name
| name.firstName$first_name:StringType
| name.secondName$second_name:StringType
| name.lastName$last_name:StringType
|source_id$source_id:LongType".stripMargin)
.sql("SELECT *,unix_timestamp() as timestamp from __TABLE__")
.targetCassandra("test1","kv1")
val target2 = almaren.builder.dsl("uuid$id:StringType
|code$area_code:LongType
|phones@phone
| phone.number$phone_number:StringType
|source_id$source_id:LongType".stripMargin)
.sql("SELECT *,unix_timestamp() as timestamp from __TABLE__")
.targetCassandra("test2","kv2")
almaren.builder.sourceSql("SELECT * FROM db.schema.table")
.deserializer("XML","xml_str").cache.fork(target1,target2)
.batch
Example 3
val almaren = Almaren("appName")
val sourcePolicy = almaren.builder.sourceHbase("""{
|"table":{"namespace":"default", "name":"policy"},
|"rowkey":"id",
|"columns":{
|"rowkey":{"cf":"rowkey", "col":"id", "type":"long"},
|"number":{"cf":"Policy", "col":"number", "type":"long"},
|"source":{"cf":"Policy", "col":"source", "type":"string"},
|"status":{"cf":"Policy", "col":"status", "type":"string"},
|"person_id":{"cf":"Policy", "col":"source", "type":"long"}
|}
|}""").sql(""" SELECT * FROM __TABLE__ WHERE status = "ACTIVE" """).alias("policy")
val sourcePerson = almaren.builder.sourceHbase("""{
|"table":{"namespace":"default", "name":"person"},
|"rowkey":"id",
|"columns":{
|"rowkey":{"cf":"rowkey", "col":"id", "type":"long"},
|"name":{"cf":"Policy", "col":"number", "type":"string"},
|"type":{"cf":"Policy", "col":"type", "type":"string"},
|"age":{"cf":"Policy", "col":"source", "type":"string"}
|}
|}""").sql(""" SELECT * FROM __TABLE__ WHERE type = "PREMIUM" """).alias("person")
almaren.builder.sql(""" SELECT * FROM person JOIN policy ON policy.person_id = person.id """)
.sql("SELECT *,unix_timestamp() as timestamp FROM __TABLE__")
.coalesce(100)
.targetSql("INSERT INTO TABLE area.premimum_users SELECT * FROM __TABLE__")
.batch(sourcePolicy,sourceHbase)
Example 4
val almaren = Almaren("appName")
val sourceData = almaren.builder.sourceJdbc("oracle.jdbc.driver.OracleDriver","jdbc:oracle:thin:@localhost:1521:xe","SELECT * FROM schema.table WHERE st_date >= (sysdate-1) AND st_date < sysdate")
.sql("SELECT to_json(named_struct('id', id,))) as __BODY__ from __TABLE__")
.coalesce(30)
.targetHttp("https://host.com:9093/api/foo","post",Map("Authorization" -> "Basic QWxhZGRpbjpPcGVuU2VzYW1l"))
sourceData.batch
Author
Daniel Mantovani daniel.mantovani@modak.com