Eel is a toolkit for manipulating data in the hadoop ecosystem. By hadoop ecosystem we mean file formats common to the big-data world, such as parquet, orc, csv in locations such as HDFS or Hive tables. In contrast to distributed batch or streaming engines such as Spark or Flink, Eel is an SDK intended to be used directly in process. Eel is a lower level API than higher level engines like Spark and is aimed for those use cases when you want something like a file API.
- Importing from one source such as JDBC into another source such as Hive/HDFS
- Coalescing multiple files, such as the output from spark, into a single file
- Querying, streaming or reading into memory (relatively) small datasets directly from your process without reaching out to YARN or similar.
- Moving or altering partitions in hive
- Retrieving statistics on existing tables or datasets
- Reading or generating schemas for existing datasets
Here are some of our notes comparing eel to other tools that offer functionality similar to eel.
Sqoop is a popular Hadoop ETL tool and API used for loading foreign data (e.g. JDBC) into Hive/Hadoop
Sqoop executes N configurable Hadoop mappers jobs which are executed in parallel. Each mapper job makes a separate JDBC connection and adapts their queries to retrieve parts of the data.
To support the parallelism of mapper jobs you must specify a split by column key and Hive partitioning key columns if applicable.
- With this approach you can end up with several small part files (one for each mapper task) in HDFS which is not the most optimal way of storing data in Hadoop.
- To reduce the number of part files you must reduce the number of mappers hence reducing the parallelism
- At the time of this writing Oracle Number and Timestamp types aren't properly supported from Oracle to Hive with a Parquet dialect
- Sqoop depends on YARN to allocate resources for each mapper task
- Both the Sqoop CLI and API has a steep learning curve
Flume supports streaming data from a plethora of out-of-the-box Sources and Sinks.
Flume supports the notion of a channel which is like a persistent queue and glues together sources and sinks.
The channel is an attractive feature as it can buffer up transactions/events under heavy load conditions – channel types can be File, Kafka or JDBC.
- The Flume Hive sink is limited to streaming events containing delimited text or JSON data directly into a Hive table or partition - it’s possible to write a Custom EEL source and sink and therefore supporting all source/sink types such as Parquet, Orc, Hive, etc...
- Flume requires an additional maintenance of a Flume Agent topology - separate processes.
The Kite API and CLI are very similar in functionality to EEL but there are some subtle differences:
- Datasets in Kite require AVRO schemas
- A dataset is essentially a Hive table - the upcoming EEL 1.2 release you will be able to create Hive tables from the CLI - at the moment it’s possible generate the Hive DDL with EEL API using io.eels.component.hive.HiveDDL$#showDDL.
- For writing directly to AVRO or Parquet storage formats you must provide an AVRO schema – EEL dynamically infers a schema from the underlying source, for example a JDBC Query or CSV headers.
- Support for ingesting from storage formats (other than AVRO and Parquet) is be achieved by transforming each record/row with another module named Kite Morphlines - it uses another intermediate record format and is another API to learn.
- EEL supports transformations using regular Scala functions by invoking the map method on the Source’s underlying DataStream, e.g. source.toDataStream.map(f: (Row) => Row) – the map function returns a new row object.
- Kite has direct support for HBase but EEL doesn’t – will do with the upcoming EEL 1.2 release
- Kite currently doesn’t support Kudo – EEL does.
- Kite stores additional metadata on disk (HDFS) to be deemed a valid Kite dataset – if you externally change the Schema outside of Kite, i.e. through DDL then it can cause a dataset to be out-of-synch and potentially malfunction - EEL functions normally in this scenario as there is no additional metadata required.
- Kite handles Hive partitioning by specifying a partition strategies – there are a few out-of-the-box strategies derived from the current payload – with EEL this works auto-magically by virture of providing the same column on the source row, alternatively you can add a partition key column with addField on the fly on the source’s DataStream or use map transformation function.
The core data structure in Eel is the DataStream
. A DataStream consists of a Schema
, and zero or more Row
s which contain values for each field in the schema.
A DataStream is conceptually similar to a table in a relational database, or a dataframe in Spark, or a dataset in Flink.
DataStreams can be read from a Source
such as hive tables, jdbc databases, or even programatically from Scala or Java collections.
DataStreams can be written out to a Sink
such as a hive table or parquet file.
The current set of sources and sinks include: Apache Avro, Apache Parquet, Apache Orc, CSV, Kafka (sink only), HDFS, Kudu, JDBC, Hive, Json Files.
Once you have a reference to a DataStream, the DataStream can be manipulated in a similar way to regular Scala collections - many of the methods
share the same name, such as map
, filter
, take
, drop
, etc. All operations on a DataStream are lazy - they will only be executed
once an action takes place such as collect
, count
, or save
.
For example, you could load data from a CSV file, drop rows that don't match a predicate, and then save the data back out to a Parquet file all in a couple of lines of code.
val source = CsvSource(new Path("input.csv"))
val sink = ParquetSink(new Path("output.pq"))
source.toDataStream().filter(_.get("location") == "London").to(sink)
Eel Datatype | JVM Types |
---|---|
BigInteger | BigInt |
Binary | Array of Bytes |
Byte | Byte |
DateTime | java.sql.Date |
Decimal(precision,scale) | BigDecimal |
Double | Double |
Float | Float |
Int | Int |
Long | Long |
Short | Short |
String | String |
TimestampMillis | java.sql.Timestamp |
Array | Array, Java collection or Scala Seq |
Map | Java or Scala Map |
The following examples describe going from a JDBCSource to a specific Sink and therefore we first need to set up some test JDBC data using a H2 in-memory database with the following code snippet:
def executeBatchSql(dataSource: DataSource, sqlCmds: Seq[String]): Unit = {
val connection = dataSource.getConnection()
connection.clearWarnings()
sqlCmds.foreach { ddl =>
val statement = connection.createStatement()
statement.execute(ddl)
statement.close()
}
connection.close()
}
// Setup JDBC data in H2 in memory database
val dataSource = new BasicDataSource()
dataSource.setDriverClassName("org.h2.Driver")
dataSource.setUrl("jdbc:h2:mem:eel_test_data")
dataSource.setPoolPreparedStatements(false)
dataSource.setInitialSize(5)
val sql = Seq(
"CREATE TABLE IF NOT EXISTS PERSON(NAME VARCHAR(30), AGE INT, SALARY NUMBER(38,5), CREATION_TIME TIMESTAMP)",
"INSERT INTO PERSON VALUES ('Fred', 50, 50000.99, CURRENT_TIMESTAMP())",
"INSERT INTO PERSON VALUES ('Gary', 50, 20000.34, CURRENT_TIMESTAMP())",
"INSERT INTO PERSON VALUES ('Alice', 50, 99999.98, CURRENT_TIMESTAMP())"
)
executeBatchSql(dataSource, sql)
First let's create a Hive table named person in the database eel_test which is partitioned by Title
Note the following Hive DDL creates the table for Parquet format
CREATE EXTERNAL TABLE IF NOT EXISTS `eel_test.person` (
`NAME` string,
`AGE` int,
`SALARY` decimal(38,5),
`CREATION_TIME` timestamp)
PARTITIONED BY (`title` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/client/eel_test/persons';
Example Create Table
hive> CREATE EXTERNAL TABLE IF NOT EXISTS `eel_test.person` (
> `NAME` string,
> `AGE` int,
> `SALARY` decimal(38,5),
> `CREATION_TIME` timestamp)
> PARTITIONED BY (`title` string)
> ROW FORMAT SERDE
> 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
> OUTPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
> LOCATION '/client/eel_test/persons';
OK
Time taken: 1.474 seconds
// Write to a HiveSink from a JDBCSource
val query = "SELECT NAME, AGE, SALARY, CREATION_TIME FROM PERSON"
implicit val hadoopFileSystem = FileSystem.get(new Configuration())
implicit val hiveMetaStoreClient = new HiveMetaStoreClient(new HiveConf())
JdbcSource(() => dataSource.getConnection, query)
.withFetchSize(10)
.toDataStream
.withLowerCaseSchema
// Transformation - add title to row
.map { row =>
if (row.get("name").toString == "Alice") row.add("title", "Mrs") else row.add("title", "Mr")
}
.to(HiveSink("eel_test", "person").withIOThreads(1).withInheritPermission(true))
- The JDBCSource takes a connection function and a SQL query - it will execute the SQL and derive the EEL schema from it - also notice the withFetchSize which caches the number of rows per fetch reducing the number RPC calls to the database server.
- hadoopFileSystem is a Hadoop File System object scala implicit required by the HiveSink
- hiveMetaStoreClient is a Hive metastore client object scala implicit required by the HiveSink
- withLowerCaseSchema lowercases all the field names over the JdbcSource schema - internally Hive lowercases table objects and columns and therefore the source schema should also match
- The map function performs some transformation - it simply adds a new column called title which figures out whether the value should be Mr or Mrs - title is defined as a partition column key on the Hive table.
- HiveSink on the to method specifies the target Hive database and table respectively.
- withIOThreads on the HiveSink specifies the number of worker threads where each thread writes to its own file - the default is 4. This is set to 1 because we don't want to end up with too many files given that the source only has 3 rows.
- withInheritPermission on the HiveSink means that when the sink creates new files it should inherit the HDFS permissions from the parent folder - typically this is negated by the default UMASK policy set in the hadoop site files.
- Note the HiveSink takes care of automatically updating the HiveMetaStore when new partitions are added.
hive> select * from eel_test.person;
OK
Fred 50 50000.99000 2017-01-24 14:40:50.664 Mr
Gary 50 20000.34000 2017-01-24 14:40:50.664 Mr
Alice 50 99999.98000 2017-01-24 14:40:50.664 Mrs
Time taken: 2.59 seconds, Fetched: 3 row(s)
hive>
There should be 2 files created by the HiveSink one in the partiton for title called Mr and one in Mrs.
Here are the partitions using the hadoop fs -ls shell command:
$ hadoop fs -ls /client/eel_test/persons
Found 2 items
drwxrwxrwx - eeluser supergroup 0 2017-01-24 14:40 /client/eel_test/persons/title=Mr
drwxrwxrwx - eeluser supergroup 0 2017-01-24 14:40 /client/eel_test/persons/title=Mrs
Now let's see if a file was created for the Mr partition:
$ hadoop fs -ls /client/eel_test/persons/title=Mr
Found 1 items
-rw-r--r-- 3 eeluser supergroup 752 2017-01-24 14:40 /client/eel_test/persons/title=Mr/eel_2985827854647169_0
Now let's see if a file was created for the Mrs partition:
$ hadoop fs -ls /client/eel_test/persons/title=Mrs
Found 1 items
-rw-r--r-- 3 eeluser supergroup 723 2017-01-24 14:40 /client/eel_test/persons/title=Mrs/eel_2985828912259519_0
The 1.2 release for the HiveSource using Parquet and Orc storage formats exploits the following optimizations supported by these formats:
- column pruning or schema projection which means providing a read schema - the reader is interested only in certain fields but not all fields written by the writer. The Parquet and Orc columnar formats does this efficiently without reading the entire row, i.e. only reading the bytes required for those fields.
- predicate push-down means that filter expressions can applied to the read without reading the entire row - only reading the bytes required for the filter expressions
- In addition partition pruning is supported - if a table is organised by partitions then full table scans can be avoided by providing the partition key values
implicit val hadoopFileSystem = FileSystem.get(new Configuration())
implicit val hiveMetaStoreClient = new HiveMetaStoreClient(new HiveConf())
HiveSource("eel_test", "person")
.toDataStream()
.collect
.foreach(row => println(row))
- hadoopFileSystem is a Hadoop File System object scala implicit required by the HiveSource
- hiveMetaStoreClient is a Hive metastore client object scala implicit required by the HiveSource
- HiveSource specifies arguments for the Hive database and table respectively.
- To get the collection of rows you need to perform the action collect on the source's underlying DataStream: toDataStream().collect(), then iterate over each row and print it out using foreach(row => println(row))
Here are the results of the read:
[name = Fred,age = 50,salary = 50000.99000,creation_time = 2017-01-24 13:40:50.664,title = Mr]
[name = Gary,age = 50,salary = 20000.34000,creation_time = 2017-01-24 13:40:50.664,title = Mr]
[name = Alice,age = 50,salary = 99999.98000,creation_time = 2017-01-24 13:40:50.664,title = Mrs]
You can query data via the HiveSource using simple and/or predicates with relational operators such as equals, gt, ge, lt, le, etc...
implicit val hadoopFileSystem = FileSystem.get(new Configuration())
implicit val hiveMetaStoreClient = new HiveMetaStoreClient(new HiveConf())
HiveSource("eel_test", "person")
.withPredicate(Predicate.or(Predicate.equals("name", "Alice"), Predicate.equals("name", "Gary")))
.toDataStream()
.collect()
.foreach(row => println(row))
The above HiveSource predicate is equivalent to the SQL:
select * from eel_test.person
where name = 'Alice' or name = 'Gary'
The result is as follows:
[name = Gary,age = 50,salary = 20000.34000,creation_time = 2017-01-24 13:40:50.664,title = Mr]
[name = Alice,age = 50,salary = 99999.98000,creation_time = 2017-01-24 13:40:50.664,title = Mrs]
Specifying a partition key on the HiveSource using the method withPartitionConstraint restricts the predicate being performed on a specific partition. This significantly speeds up the query, i.e. avoids an expensive table scan.
If you have simple filtering requirements on relatively small datasets then this approach may be considerably faster than using Hive, Spark, Impala query engines. Here's an example:
implicit val hadoopFileSystem = FileSystem.get(new Configuration())
implicit val hiveMetaStoreClient = new HiveMetaStoreClient(new HiveConf())
HiveSource("eel_test", "person")
.withPredicate(Predicate.or(Predicate.equals("name", "Alice"), Predicate.equals("name", "Gary")))
.withPartitionConstraint(PartitionConstraint.equals("title", "Mr"))
.toDataStream()
.collect()
.foreach(row => println(row))
The withPartitionConstraint method homes in on the title partition whose value is Mr and peforms filtering on it using the withPredicate.
The equivalent SQL would be:
select * from eel_test.person
where title = 'Mr'
and (name = 'Alice' or name = 'Gary')
The result is as follows:
[name = Gary,age = 50,salary = 20000.34000,creation_time = 2017-01-24 13:40:50.664,title = Mr]
val query = "SELECT NAME, AGE, SALARY, CREATION_TIME FROM PERSON"
val parquetFilePath = new Path("hdfs://nameservice1/client/eel/person.parquet")
implicit val hadoopFileSystem = FileSystem.get(new Configuration()) // This is required
JdbcSource(() => dataSource.getConnection, query).withFetchSize(10)
.toDataStream.to(ParquetSink(parquetFilePath))
- The JDBCSource takes a connection function and a SQL query - it will execute the SQL and derive the EEL schema from it - also notice the withFetchSize which caches the number of rows per fetch reducing the number RPC calls to the database server.
- parquetFilePath is the ParquetSink file path pointing to a HDFS path - alternatively this could be a local file path if you qualify it with the file: scheme
- hadoopFileSystem is a scala implicit required by the ParquetSink
- If you have the parquet-tools installed on your system you can look at its native schema like so:
$ parquet-tools schema person.parquet
message row {
optional binary NAME (UTF8);
optional int32 AGE;
optional fixed_len_byte_array(16) SALARY (DECIMAL(38,5));
optional int96 CREATION_TIME;
}
- For Decimal Parquet encodes it as a fixed byte array and for Timestamp it's an int96
- Reading back the data via ParquetSource and printing to the console:
val parquetFilePath = new Path("hdfs://nameservice1/client/eel/person.parquet")
implicit val hadoopConfiguration = new Configuration()
implicit val hadoopFileSystem = FileSystem.get(hadoopConfiguration) // This is required
ParquetSource(parquetFilePath)
.toDataStream()
.collect()
.foreach(row => println(row))
- parquetFilePath is the ParquetSource file path pointing to a HDFS path - alternatively this could be a local file path if you qualify it with the file: scheme
- hadoopConfiguration and hadoopFileSystem are scala implicits required by the ParquetSource
- To get the collection of rows you need to perform the action collect on the source's underlying DataStream: toDataStream().collect(), then iterate over each row and print it out using foreach(row => println(row))
- Here are the results of the read:
[NAME = Fred,AGE = 50,SALARY = 50000.99000,CREATION_TIME = 2017-01-23 14:53:51.862]
[NAME = Gary,AGE = 50,SALARY = 20000.34000,CREATION_TIME = 2017-01-23 14:53:51.876]
[NAME = Alice,AGE = 50,SALARY = 99999.98000,CREATION_TIME = 2017-01-23 14:53:51.876]
You can query data via the ParquetSource using simple and/or predicates with relational operators such as equals, gt, ge, lt, le, etc...
predicate push-down means that filter expressions can applied to the read without reading the entire row (features of Parquet and Orc columnar formats), i.e. it only reads the bytes required for the filter expressions, e.g.:
val parquetFilePath = new Path("hdfs://nameservice1/client/eel/person.parquet")
implicit val hadoopConfiguration = new Configuration()
implicit val hadoopFileSystem = FileSystem.get(hadoopConfiguration) // This is required
ParquetSource(parquetFilePath)
.withPredicate(Predicate.or(Predicate.equals("NAME", "Alice"), Predicate.equals("NAME", "Gary")))
.toDataStream()
.collect()
.foreach(row => println(row))
The above ParquetSource predicate (withPredicate) is equivalent to the SQL predicate:
where name = 'Alice' or name = 'Gary'
The result is as follows:
[NAME = Gary,AGE = 50,SALARY = 20000.34000,CREATION_TIME = 2017-01-23 14:53:51.876]
[NAME = Alice,AGE = 50,SALARY = 99999.98000,CREATION_TIME = 2017-01-23 14:53:51.876]
column pruning or schema projection which means providing a read schema - the reader is interested only in certain fields but not all fields written by the writer. The Parquet and Orc columnar formats does this efficiently without reading the entire row, i.e. only reading the bytes required for those fields, e.g.:
val parquetFilePath = new Path("hdfs://nameservice1/client/eel/person.parquet")
implicit val hadoopConfiguration = new Configuration()
implicit val hadoopFileSystem = FileSystem.get(hadoopConfiguration) // This is required
ParquetSource(parquetFilePath)
.withProjection("NAME", "SALARY")
.withPredicate(Predicate.or(Predicate.equals("NAME", "Alice"), Predicate.equals("NAME", "Gary")))
.toDataStream()
.collect
.foreach(row => println(row))
The above ParquetSource projection (withProjection) is equivalent to the SQL select:
select NAME, SALARY
The result is as follows:
[NAME = Gary,SALARY = 20000.34000]
[NAME = Alice,SALARY = 99999.98000]
- The OrcSink is almost identical to the way the parquet sink works (see above)
// Write to a OrcSink from a JDBCSource
val query = "SELECT NAME, AGE, SALARY, CREATION_TIME FROM PERSON"
val orcFilePath = new Path("hdfs://nameservice1/client/eel/person.orc")
implicit val hadoopConfiguration = new Configuration()
JdbcSource(() => dataSource.getConnection, query).withFetchSize(10)
.toDataStream
.to(OrcSink(orcFilePath))
- Reading back the data via OrcSource and printing to the console:
val orcFilePath = new Path("hdfs://nameservice1/client/eel/person.orc")
implicit val hadoopConfiguration = new Configuration()
OrcSource(orcFilePath)
.toDataStream.collect().foreach(row => println(row))
TBD
// Write to a AvroSink from a JDBCSource
val query = "SELECT NAME, AGE, SALARY, CREATION_TIME FROM PERSON"
val avroFilePath = Paths.get(s"${sys.props("user.home")}/person.avro")
JdbcSource(() => dataSource.getConnection, query)
.withFetchSize(10)
.toDataStream
.replaceFieldType(DecimalType.Wildcard, DoubleType)
.replaceFieldType(TimestampMillisType, StringType)
.to(AvroSink(avroFilePath))
- The JDBCSource takes a connection function and a SQL query - it will execute the SQL and derive the EEL schema from it - also notice the withFetchSize which caches the number of rows per fetch reducing the number RPC calls to the database server.
- avroFilePath is the AvroSource file path pointing to a path on the local file system
- The 2 replaceFieldType method calls map DecimalType to DoubleType and TimestampMillisType to StringType as Decimals and Timestamps are not supported in Avro Schema
- If you have the avro-tools installed on your system you can look at its native schema like so - alternatively use the AvroSource to read it back in - see below.
$ avro-tools getschema person.avro
{
"type" : "record",
"name" : "row",
"namespace" : "namespace",
"fields" : [ {
"name" : "NAME",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "AGE",
"type" : [ "null", "int" ],
"default" : null
}, {
"name" : "SALARY",
"type" : [ "null", "double" ],
"default" : null
}, {
"name" : "CREATION_TIME",
"type" : [ "null", "string" ],
"default" : null
} ]
}
- Reading back the data via AvroSource and printing to the console:
val avroFilePath = Paths.get(s"${sys.props("user.home")}/person.avro")
AvroSource(avroFilePath)
.toDataStream()
.collect
.foreach(row => println(row))
- avroFilePath is the AvroSource file path pointing to a path on the local file system
- To get the collection of rows you need to perform the action collect on the source's underlying DataStream: toDataStream().collect, then iterate over each row and print it out using foreach(row => println(row))
- Here are the results of the read:
[NAME = Fred,AGE = 50,SALARY = 50000.99,CREATION_TIME = 2017-01-24 16:13:07.524]
[NAME = Gary,AGE = 50,SALARY = 20000.34,CREATION_TIME = 2017-01-24 16:13:07.532]
[NAME = Alice,AGE = 50,SALARY = 99999.98,CREATION_TIME = 2017-01-24 16:13:07.532]
- The CsvSink is almost identical to the way the parquet sink works (see above)
// Write to a CsvSink from a JDBCSource
val query = "SELECT NAME, AGE, SALARY, CREATION_TIME FROM PERSON"
val csvFilePath = new Path("hdfs://nameservice1/client/eel/person.csv")
implicit val hadoopConfiguration = new Configuration()
implicit val hadoopFileSystem = FileSystem.get(new Configuration()) // This is required
JdbcSource(() => dataSource.getConnection, query).withFetchSize(10)
.toDataStream
.to(CsvSink(csvFilePath))
- Reading back the data via CsvSource and printing to the console:
val csvFilePath = new Path("hdfs://nameservice1/client/eel/person.csv")
implicit val hadoopConfiguration = new Configuration()
implicit val hadoopFileSystem = FileSystem.get(hadoopConfiguration) // This is required
CsvSource(csvFilePath).toDataStream().schema.fields.foreach(f => println(f))
CsvSource(csvFilePath)
.toDataStream()
.collect()
.foreach(row => println(row))
Note by default the CsvSource converts all types to a string - the following code prints out the fields in the schema:
CsvSource(csvFilePath).toDataStream().schema.fields.foreach(f => println(f))
You can enforce the types on the CSVSource by supplying SchemaInferrer:
val csvFilePath = new Path("hdfs://nameservice1/client/eel/person.csv")
implicit val hadoopConfiguration = new Configuration()
implicit val hadoopFileSystem = FileSystem.get(hadoopConfiguration) // This is required
val schemaInferrer = SchemaInferrer(StringType,
DataTypeRule("AGE", IntType.Signed),
DataTypeRule("SALARY", DecimalType.Wildcard),
DataTypeRule(".*\\_TIME", TimeMillisType))
CsvSource(csvFilePath).withSchemaInferrer(schemaInferrer)
.toDataStream()
.collect()
.foreach(row => println(row))
The above schemaInferrer object sets up some rules for mapping field name AGE to an int, Salary to a Decimal and a field name ending in TIME using REGEX to a Timestamp.
Note the first parameter on SchemaInferrer is StringType which means that this is the default type for all fields.
Storage formats Parquet and Orc support nested types such as struct, map and list.
The following example describes how to write rows containing a single struct column named PERSON_DETAILS:
struct PERSON_DETAILS {
NAME String,
AGE Int,
SALARY DECIMAL(38,5),
CREATION_TIME TIMESTAMP
}
val parquetFilePath = new Path("hdfs://nameservice1/client/eel_struct/person.parquet")
implicit val hadoopConfiguration = new Configuration()
implicit val hadoopFileSystem = FileSystem.get(hadoopConfiguration)
val personDetailsStruct = Field.createStructField("PERSON_DETAILS",
Seq(
Field("NAME", StringType),
Field("AGE", IntType.Signed),
Field("SALARY", DecimalType(Precision(38), Scale(5))),
Field("CREATION_TIME", TimestampMillisType)
)
)
val schema = StructType(personDetailsStruct)
- A struct is encoded as a list of Fields with their corresponding type definitions.
val rows = Vector(
Vector(Vector("Fred", 50, BigDecimal("50000.99000"), new Timestamp(System.currentTimeMillis()))),
Vector(Vector("Gary", 50, BigDecimal("20000.34000"), new Timestamp(System.currentTimeMillis()))),
Vector(Vector("Alice", 50, BigDecimal("99999.98000"), new Timestamp(System.currentTimeMillis())))
)
- The first Vector, e.g. val rows = Vector(...) is a list of rows - 3 in this case.
- Each inner Vector, e.g. Vector(...) is a single row of column values
- The column values in this case is another Vector representing the the struct, e.g. Vector("Alice", 50, BigDecimal("99999.98000"), new Timestamp(System.currentTimeMillis()))
DataStream.fromValues(schema, rows)
.to(ParquetSink(parquetFilePath))
If you have the parquet-tools installed on your system you can look at its native schema like so:
$ parquet-tools schema person.parquet
message row {
optional group PERSON_DETAILS {
optional binary NAME (UTF8);
optional int32 AGE;
optional fixed_len_byte_array(16) SALARY (DECIMAL(38,5));
optional int96 CREATION_TIME;
}
}
- Notice that parquet encodes the struct as group of columns.
ParquetSource(parquetFilePath)
.toDataStream()
.collect()
.foreach(row => println(row))
[PERSON_DETAILS = WrappedArray(Fred, 50, 50000.99000, 2017-01-25 15:56:06.212)]
[PERSON_DETAILS = WrappedArray(Gary, 50, 20000.34000, 2017-01-25 15:56:06.212)]
[PERSON_DETAILS = WrappedArray(Alice, 50, 99999.98000, 2017-01-25 15:56:06.212)]
ParquetSource(parquetFilePath)
.withPredicate(Predicate.or(Predicate.equals("PERSON_DETAILS.NAME", "Alice"), Predicate.equals("PERSON_DETAILS.NAME", "Gary")))
.toDataStream()
.collect()
.foreach(row => println(row))
The above is equivalent to the following in SQL:
select PERSON_DETAILS
where PERSON_DETAILS.NAME = 'Alice' or PERSON_DETAILS.NAME = 'Gary'
[PERSON_DETAILS = WrappedArray(Gary, 50, 20000.34000, 2017-01-25 16:03:37.678)]
[PERSON_DETAILS = WrappedArray(Alice, 50, 99999.98000, 2017-01-25 16:03:37.678)]
On the Parquet file just written we can create a Hive External table pointing at the HDFS location of the file.
CREATE EXTERNAL TABLE IF NOT EXISTS `eel_test.struct_person`(
PERSON_DETAILS STRUCT<NAME:String, AGE:Int, SALARY:decimal(38,5), CREATION_TIME:TIMESTAMP>
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/client/eel_struct';
- The location /client/eel_struct is the root directory of where all the files live - in this case its the root of folder of the Parquet write in step 4.
hive> select * from eel_test.struct_person;
OK
{"NAME":"Fred","AGE":50,"SALARY":50000.99,"CREATION_TIME":"2017-01-25 17:03:37.678"}
{"NAME":"Gary","AGE":50,"SALARY":20000.34,"CREATION_TIME":"2017-01-25 17:03:37.678"}
{"NAME":"Alice","AGE":50,"SALARY":99999.98,"CREATION_TIME":"2017-01-25 17:03:37.678"}
Time taken: 1.092 seconds, Fetched: 3 row(s)
hive>
hive> select person_details.name, person_details.age
> from eel_test.struct_person
> where person_details.name in ('Alice', 'Gary' );
OK
Gary 50
Alice 50
Time taken: 0.067 seconds, Fetched: 2 row(s)
hive>
- HiveQL has some nice features for cracking nested types - the query returns scalar values for name and age in the person_details structure.
- The same query is supported in Spark via HiveContext or SparkSession in version >= 2.x
EEL supports Parquet ARRAYS of any primitive type including structs. The following example extends the previous example by adding another column called PHONE_NUMBERS defined as an ARRAY of Strings.
val parquetFilePath = new Path("hdfs://nameservice1/client/eel_array/person.parquet")
implicit val hadoopConfiguration = new Configuration()
implicit val hadoopFileSystem = FileSystem.get(hadoopConfiguration)
// Create the schema with a STRUCT and an ARRAY
val personDetailsStruct = Field.createStructField("PERSON_DETAILS",
Seq(
Field("NAME", StringType),
Field("AGE", IntType.Signed),
Field("SALARY", DecimalType(Precision(38), Scale(5))),
Field("CREATION_TIME", TimestampMillisType)
)
)
val schema = StructType(personDetailsStruct, Field("PHONE_NUMBERS", ArrayType.Strings))
// Create 3 rows
val rows = Vector(
Vector(Vector("Fred", 50, BigDecimal("50000.99000"), new Timestamp(System.currentTimeMillis())), Vector("322", "987")),
Vector(Vector("Gary", 50, BigDecimal("20000.34000"), new Timestamp(System.currentTimeMillis())), Vector("145", "082")),
Vector(Vector("Alice", 50, BigDecimal("99999.98000"), new Timestamp(System.currentTimeMillis())), Vector("534", "129"))
)
// Write the rows
DataStream.fromValues(schema, rows)
.to(ParquetSink(parquetFilePath))
If you have the parquet-tools installed on your system you can look at its native schema like so:
$ parquet-tools schema person.parquet
message row {
optional group PERSON_DETAILS {
optional binary NAME (UTF8);
optional int32 AGE;
optional fixed_len_byte_array(16) SALARY (DECIMAL(38,5));
optional int96 CREATION_TIME;
}
repeated binary PHONE_NUMBERS (UTF8);
}
- Notice PHONE_NUMBERS is represented as a repeated UTF8 (String) in Parquet, i.e. an unbounded array.
ParquetSource(parquetFilePath)
.toDataStream()
.collect()
.foreach(row => println(row))
- The results
[PERSON_DETAILS = WrappedArray(Fred, 50, 50000.99000, 2017-01-25 20:33:48.302),PHONE_NUMBERS = Vector(322, 987)]
[PERSON_DETAILS = WrappedArray(Gary, 50, 20000.34000, 2017-01-25 20:33:48.302),PHONE_NUMBERS = Vector(145, 082)]
[PERSON_DETAILS = WrappedArray(Alice, 50, 99999.98000, 2017-01-25 20:33:48.302),PHONE_NUMBERS = Vector(534, 129)]
On the Parquet file just written we can create a Hive External table pointing at the HDFS location of the file.
CREATE EXTERNAL TABLE IF NOT EXISTS `eel_test.struct_person_phone`(
PERSON_DETAILS STRUCT<NAME:String, AGE:Int, SALARY:decimal(38,5), CREATION_TIME:TIMESTAMP>,
PHONE_NUMBERS Array<String>
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/client/eel_array';
- The location /client/eel_array is the root directory of where all the files live - in this case its the root of folder of the Parquet write
hive> select * from eel_test.struct_person_phone;
OK
{"NAME":"Fred","AGE":50,"SALARY":50000.99,"CREATION_TIME":"2017-01-26 10:50:57.192"} ["322","987"]
{"NAME":"Gary","AGE":50,"SALARY":20000.34,"CREATION_TIME":"2017-01-26 10:50:57.192"} ["145","082"]
{"NAME":"Alice","AGE":50,"SALARY":99999.98,"CREATION_TIME":"2017-01-26 10:50:57.192"} ["534","129"]
Time taken: 1.248 seconds, Fetched: 3 row(s)
hive>
hive> select person_details.name, person_details.age, phone_numbers
> from eel_test.struct_person_phone
> where person_details.name in ('Alice', 'Gary' );
OK
Gary 50 ["145","082"]
Alice 50 ["534","129"]
Time taken: 0.181 seconds, Fetched: 2 row(s)
hive>
- HiveQL has some nice features for cracking nested types - the query returns scalar values for name and age in the person_details structure and phone numbers from the phone_numbers array.
- The same query is supported in Spark via HiveContext or SparkSession in version >= 2.x
hive> select person_details.name, person_details.age, phone_numbers[0]
> from eel_test.struct_person_phone;
OK
Fred 50 322
Gary 50 145
Alice 50 534
Time taken: 0.08 seconds, Fetched: 3 row(s)
hive>
- To retrieve a specific array element, HiveQL requires the column index which is zero based, e.g. phone_numbers[0]
Query to show name, age and phone_number with repeated rows for each phone number from the phone_numbers array
hive> select person_details.name, person_details.age, phone_number
> from eel_test.struct_person_phone
> lateral view explode(phone_numbers) pns as phone_number;
OK
Fred 50 322
Fred 50 987
Gary 50 145
Gary 50 082
Alice 50 534
Alice 50 129
Time taken: 0.062 seconds, Fetched: 6 row(s)
hive>
- The above lateral view statement is used in conjunction with the explode UDTF(user-defined-table-function) to generate a row per array element
The parquet source will read from one or more parquet files. To use the source, create an instance of ParquetSource
specifying a file pattern or Path
object. The Parquet source implementation is optimized to use native parquet reading directly to an eel row object without creating intermediate formats such as Avro.
Example reading from a single file ParquetSource(new Path("hdfs:///myfile"))
Example reading from a wildcard pattern ParquetSource("hdfs:///user/warehouse/*"))
Parquet as a file format supports predicates, which are row level filter operations. Because parquet is a columnar store, row level filters can be extremely efficient. Whenever you are reading from parquet files - either directly or through hive - a row level filter will nearly always be faster than reading the data and filtering afterwards. This is because parquet is able to skip whole chunks of the file that do not match the predicate.
To use a predicate, simply add an instance of Predicate
to the Parquet source class.
val ds = ParquetSource(path).withPredicate(Predicate.equals("location", "westeros")).toDataStream()
Multiple predicates can be grouped together using Predicate.or
and Predicate.and
.
The parquet source also allows you to specify a projection which is a subset of the columns to return. Again, since parquet is columnar, if a column is not needed at all then the entire column can be skipped directly in the file making parquet extremely fast at this kind of operation.
To use a projection, simply use withProjection
on the Parquet source with the fields to keep.
val ds = ParquetSource(path).withProjection("amount", "type").toDataStream()
The Hive source will read from a hive table. To use this source, create an instance of HiveSource
specifying the database name, the table name, any partitions to limit the read. The source also requires instances of the Hadoop Filesystem object, and a HiveConf object.
Reading all rows from a table is the simplest use case: HiveSource("mydb", "mytable")
. We can also read rows from a table for a particular partition. For example, to read all rows which have the value '1975' for the partition column 'year': HiveSource("mydb", "mytable").withPartition("year", "1975")
The partition clause accepts an operator to perform more complicated querying, such as less than, greater than etc. For example to read all rows which have a year less than 1975 we can do: HiveSource("mydb", "mytable").withPartition("year", "<", "1975")
.
The Hive sink writes data to Hive tables stored in any of the following formats: ORC (Optimized Row Columnar), Parquet, Avro, or Text delimited.
To configure a Hive Sink, you specify the Hive database, the table to write to, and the format to write in. The sink also requires instances of the Hadoop Filesystem object, and a HiveConf object.
Properties
Parameter | Description |
---|---|
IO Threads | The number of concurrent writes to the sink |
Dynamic Partitioning | If set to true then any values on partitioned fields that are new, will automatically be created as partitions in the metastore. If set to false, then a new value will throw an error. |
Example
Simple example of writing to a Hive database ds.to(HiveSink("mydb", "mytable"))
We can specify the number of concurrent writes, by using the ioThreads parameter ds.to(HiveSink("mydb", "mytable").withIOThreads(4))
If the schema you need is in the form of the CSV headers, then we can easily parse those to create the schema. But obviously CSV won't encode any type information. Therefore, we can specify an instance of a SchemaInferrer
which can be customized with rules to determine the correct schema type for each header. So for example, you might say that "name" is a SchemaType.String, or that anything matching "*_id" is a SchemaType.Long. You can also specify the nullability, scale, precision and unsigned. A quick example:
val inferrer = SchemaInferrer(SchemaType.String, SchemaRule("qty", SchemaType.Int, false), SchemaRule(".*_id", SchemaType.Int))
CsvSource("myfile").withSchemaInferrer(inferrer)
Eel is released to maven central, so is very easy to include in your project. Just find the latest version on maven central and copy the includes.