An Adapter for Apache Calcite that allows a Java service to query Apache Kudu. Apache Calcite provides a SQL parser and customizable optimizer that is used in several large products within the Apache Foundation, calcite-Kudu leverages it to execute queries against Kudu -- a fast analytics database.
Calcite Kudu ships a shaded jar file that can be used as a SQL Client for an existing Kudu Cluster. Maven can be used to download the jar and then executed through Java Virtual Machine
$ mvn org.apache.maven.plugins:maven-dependency-plugin:get \
-Dartifact=com.twilio:kudu-sql-cli:1.0.17
$ java -jar ~/.m2/repository/com/twilio/kudu-sql-cli/1.0.17/kudu-sql-cli-1.0.17.jar -c kudu-leader1,kudu-leader2,kudu-leader3
This will work from a machine able to communicate with both Kudu Leaders and Kudu TServers. To test this out locally follow the README in the cli directory.
Add the dependency to your project:
<dependency>
<groupId>com.twilio</groupId>
<artifactId>kudu-sql-adapter</artifactId>
<version>{VERSION}</version>
</dependency>
The library provides a JDBC connection template that can be used with any JDBC library. To create the JDBC connection string:
import com.twilio.kudu.sql.JDBCUtil;
import com.twilio.kudu.sql.schema.DefaultKuduSchemaFactory;
// Use JDBCUtil#CALCITE_MODEL_TEMPLATE_DML_DDL_ENABLED for INSERT and DDL support
final String jdbcURL = String.format(JDBCUtil.CALCITE_MODEL_TEMPLATE,
DefaultKuduSchemaFactory.class.getName(),
kuduConnectionString);
This JDBC url can be handed off to any of the database pooling libraries like
to execute queries.
Creates a new kudu table and specifies its properties. The table must define a column of TIMESTAMP data type with the ROW_TIMESTAMP attribute that will be used for range partitioning. The statement does support specifying range partitions, these have to be created using kudu tools.
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
(col_name data_type
[kudu_column_attribute ...]
[, ...]
[PRIMARY KEY (col_name[, ...])]
)
[PARTITION BY HASH [ (pk_col [, ...]) ] PARTITIONS n]
[TBLPROPERTIES ('key1'='value1', 'key2'='value2', ...)]
kudu column attributes:
[PRIMARY KEY]
| [ASC | DESC]
| [NOT NULL]
| [COLUMN_ENCODING codec]
| [COMPRESSION algorithm]
| [DEFAULT constant]
| [BLOCK_SIZE number]
| [ROW_TIMESTAMP]
| [COMMENT 'col_comment']
data_type:
TINYINT
| SMALLINT
| INT
| BIGINT
| BOOLEAN
| FLOAT
| DOUBLE
| DECIMAL
| STRING
| CHAR
| VARCHAR
| TIMESTAMP
Can be used to add or drop columns from a table.
ALTER TABLE name
ADD [IF NOT EXISTS]
(col_name data_type
[kudu_column_attribute ...]
[, ...]
)
ALTER TABLE name
DROP [IF EXISTS]
(col_name [, ...])
Used to register a rollup aggregate view that can be used for queries. The select query should contain a group by with a timestamp column that is being truncated. An ORDER BY can be optionally specified to store aggregated data in a specified order.
CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db_name.]materialized_view_name
AS query
query:
SELECT { aggFunc [, aggFunc ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ ORDER BY { orderItem [, orderItem ]* } ]
aggFunc:
COUNT | SUM
groupItem:
col_name | FLOOR(col_name TO interval)
orderItem:
col_name [DESC] | FLOOR(col_name TO interval) [DESC]
Apache Kudu doesn't support DESCENDING
sort keys. To provide this functionally, we decided to write the data in a particular way:
public static final Instant EPOCH_DAY_FOR_REVERSE_SORT = Instant.parse("9999-12-31T00:00:00.000000Z");
public static final Long EPOCH_FOR_REVERSE_SORT_IN_MILLISECONDS = EPOCH_DAY_FOR_REVERSE_SORT.toEpochMilli();
// EPOCH_FOR_REVERSE_SORT_IN_MICROSECONDS - ACTUAL-DATE-OF-EVENT = DATE-TO-STORE
public static final Long EPOCH_FOR_REVERSE_SORT_IN_MICROSECONDS = TimestampUtil
.timestampToMicros(new Timestamp(EPOCH_FOR_REVERSE_SORT_IN_MILLISECONDS));
Using these constants at write time, our system writes to Kudu a translated date field using this formula EPOCH_FOR_REVERSE_SORT_IN_MICROSECONDS - ACTUAL-DATE-OF-EVENT
. And at read time, our system reverses this logic keeping it transparent to the end user. Calcite-Kudu
translates timestamps at read time back into their actual values.
This implementation allows Calcite-Kudu
to leverage Kudu's existing sort logic eliminating the need to sort on the client.
When creating a table using DDL, specify the column as DESC
. In the following example, the data is stored in descending order of
event_time
.
CREATE TABLE "host_metrics" (
"host" VARCHAR NOT NULL,
"metric_name" VARCHAR NOT NULL,
"event_time" TIMESTAMP NOT NULL ROW_TIMESTAMP,
"metric_value" DOUBLE NOT NULL,
PRIMARY KEY ("host", "metric_name", "event_time" DESC))
PARTITION BY HASH ("host") PARTITIONS 2;
Data can be written to kudu by using an INSERT statement, but this should only be used for testing. Materialized views that are defined will be automatically maintained correctly as long as the data for a day is written from a single client in a single session.
INSERT INTO "host_metrics" VALUES ('host1', 'metric1', TIMESTAMP'2020-01-02', 10);
The adapter allows the querying of rollup aggregate materialized views which can be registered using the
CREATE MATERIALZIED VIEW
statement. These views have to be maintained by an external system.
While executing a query against a table, a view will be automatically
used if possible. For example the following statement creates a view that stores the sum of the metric_value
grouped by metric_name
for each day.
CREATE MATERIALIZED VIEW "host_metrics_view" AS
SELECT SUM("metric_value") as "sum_metric_value", COUNT(*) as "count_records"
FROM "host_metrics0" GROUP BY "metric_name", FLOOR("event_time" TO DAY);
If we execute the following sql that queries for the sum of metric_value`` grouped by
metric_name`` for each day
we see that the materialized view is used. Also note that a filter is pushed down to restrict data to the correct time range.
SELECT SUM("metric_value")
FROM host_metrics0
WHERE "event_time" >= timestamp '2020-01-01 00:00:00'
AND "event_time"<'2020-01-02 00:00:00'
GROUP BY "metric_name",
floor("event_time" TO day);
KuduToEnumerableRel
KuduProjectRel(EXPR$0=[$2])
KuduFilterRel(ScanToken 1=[event_time GREATER_EQUAL 1577836800000000, event_time LESS 1577923200000000])
KuduQuery(table=[[kudu, host_metrics0-host_metrics_view0-Day-Aggregation]])
If we execute the following query that returns the total number of metrics that span a time range of one day and 12 hours we see that a union query is used to query both the table and view with the appropriate time ranges.
SELECT count(*)
FROM host_metrics0
WHERE "event_time" >= timestamp '2020-01-01 00:00:00'
AND "event_time" < '2020-01-02 12:00:00';
EnumerableAggregate(group=[{}], EXPR$0=[$SUM0($0)])
EnumerableUnion(all=[true])
EnumerableAggregate(group=[{}], EXPR$0=[COUNT()])
KuduToEnumerableRel
KuduProjectRel(EVENT_TIME=[$2])
KuduFilterRel(ScanToken 1=[event_time GREATER_EQUAL 1577923200000000, event_time LESS 1577966400000000])
KuduQuery(table=[[kudu, host_metrics0]])
EnumerableAggregate(group=[{}], EXPR$0=[$SUM0($3)])
KuduToEnumerableRel
KuduFilterRel(ScanToken 1=[event_time GREATER_EQUAL 1577836800000000, event_time LESS 1577923200000000])
KuduQuery(table=[[kudu, host_metrics0-host_metrics_view0-Day-Aggregation]])
The calcite documentation describes how materialized views work in detail.