In my previous post How Storm SQL is executed, it goes through the main steps on how Storm processes Explain and Submit SQL. To leverage Calcite in a new project, there’s lots of details un-covered.
Here I’ll show how to invoke the Explain of Calcite in your code. If you want to try Calcite CLI directly, please go to Calcite Tutorial.
package com.***.***;
import java.util.Date;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.Schema.TableType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.ImmutableBitSet;
import com.google.common.collect.ImmutableList;
public class BeamSqlExplainer {
public static final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
public static final SchemaPlus defaultSchema = Frameworks.createRootSchema(true);
private FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(defaultSchema).build();
private Planner planner = Frameworks.getPlanner(config);
public BeamSqlExplainer(){
addTableSchema();
}
public void addTableSchema(){
defaultSchema.add("ORDER_DETAILS", new StreamableTable() {
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
RelDataTypeFactory.FieldInfoBuilder b = typeFactory.builder();
b.add("CK_TIME", typeFactory.createJavaType(Date.class));
b.add("ITEM_ID", typeFactory.createJavaType(Long.class));
b.add("ITEM_PRICE", typeFactory.createJavaType(Double.class));
b.add("BUYER_NAME", typeFactory.createJavaType(String.class));
b.add("QUANTITY", typeFactory.createJavaType(Integer.class));
return b.build();
}
public Statistic getStatistic() {
// return Statistics.of(100, ImmutableList.<ImmutableBitSet>of());
Direction dir = Direction.ASCENDING;
RelFieldCollation collation = new RelFieldCollation(0, dir, NullDirection.UNSPECIFIED);
return Statistics.of(5, ImmutableList.of(ImmutableBitSet.of(0)),
ImmutableList.of(RelCollations.of(collation)));
}
public TableType getJdbcTableType() {
return TableType.STREAM;
}
public Table stream() {
return null;
}
});
}
public void explain(String sql) throws SqlParseException, ValidationException, RelConversionException{
SqlNode parse = planner.parse(sql);
System.out.println(parse.toString());
SqlNode validate = planner.validate(parse);
RelNode tree = planner.convert(validate);
String plan = RelOptUtil.toString(tree); //explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
System.out.println("plan>");
System.out.println(plan);
}
}
This is one example:
- L34 create a typeFactory, to define data types;
- L35-38 create a Planner, with FrameworkConfig as the settings, mostly important a SchemaPlus is provided for TableSchema and UDF if available. SchemaPlus is referenced during the validation step;
- L44-74 create a table named as ORDER_DETAILS. It has 5 fields, with CK_TIME is monotonic, ASCENDING;
- L76-87 takes a SQL string as input, and print out logical execution plan;
A logical execution plan represents how a SQL is executed. Physically it may be altered depends on the storage provider.
Now, let’s have some examples:
1. Single table direct operation
SELECT STREAM `ITEM_ID`, `ITEM_PRICE`, `BUYER_NAME`, CASE WHEN `QUANTITY` > 1 THEN 'MULTIPLECK' ELSE 'SINGLECK' END AS `CK_FLAG`
FROM `ORDER_DETAILS`
WHERE `ITEM_PRICE` > 0 AND `QUANTITY` > 0
Execution plan:
plan>
LogicalDelta
LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], CK_FLAG=[CASE(>($4, 1), 'MULTIPLECK', CAST('SINGLECK'):CHAR(10) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)])
LogicalFilter(condition=[AND(>($2, 0), >($4, 0))])
EnumerableTableScan(table=[[ORDER_DETAILS]])
2. Single table with hourly GROUP BY
SELECT STREAM `ITEM_ID`, FLOOR(`CK_TIME` TO HOUR) AS `CK_TIME`, SUM(`QUANTITY`), COUNT(*)
FROM `ORDER_DETAILS`
WHERE `ITEM_PRICE` > 0 AND `QUANTITY` > 0
GROUP BY `ITEM_ID`, FLOOR(`CK_TIME` TO HOUR)
Execution plan:
plan>
LogicalDelta
LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
LogicalProject(ITEM_ID=[$1], CK_TIME=[FLOOR($0, FLAG(HOUR))], QUANTITY=[$4])
LogicalFilter(condition=[AND(>($2, 0), >($4, 0))])
EnumerableTableScan(table=[[ORDER_DETAILS]])
Update for join
First I add another table to the schema:
defaultSchema.add("LISTING_DETAILS", new ScannableTable() {
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
RelDataTypeFactory.FieldInfoBuilder b = typeFactory.builder();
b.add("LIST_ID", typeFactory.createJavaType(Long.class));
b.add("ITEM_PRICE", typeFactory.createJavaType(Double.class));
b.add("SELLER_NAME", typeFactory.createJavaType(String.class));
b.add("AVAIL_QUANTITY", typeFactory.createJavaType(Integer.class));
return b.build();
}
public Statistic getStatistic() {
return Statistics.of(100, ImmutableList.of());
}
public TableType getJdbcTableType() {
return TableType.TABLE;
}
@Override
public Enumerable scan(DataContext root) {
return null;
}
});
Now let's see some examples:
- Filter before Join
SELECT STREAM `A`.`ITEM_ID`, `A`.`ITEM_PRICE`, `A`.`BUYER_NAME`, `B`.`SELLER_NAME`
FROM `ORDER_DETAILS` AS `A`
INNER JOIN `LISTING_DETAILS` AS `B` ON `A`.`ITEM_ID` = `B`.`LIST_ID` AND `A`.`ITEM_PRICE` > 0 AND `B`.`AVAIL_QUANTITY` > 0
Execution Plan
plan>
LogicalDelta
LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], SELLER_NAME=[$7])
LogicalProject(CK_TIME=[$0], ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], QUANTITY=[$4], LIST_ID=[$6], ITEM_PRICE0=[$7], SELLER_NAME=[$8], AVAIL_QUANTITY=[$9])
LogicalJoin(condition=[AND(=($1, $6), $5, $10)], joinType=[inner])
LogicalProject(CK_TIME=[$0], ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], QUANTITY=[$4], $f5=[>($2, 0)])
EnumerableTableScan(table=[[ORDER_DETAILS]])
LogicalProject(LIST_ID=[$0], ITEM_PRICE=[$1], SELLER_NAME=[$2], AVAIL_QUANTITY=[$3], $f4=[>($3, 0)])
LogicalTableScan(table=[[LISTING_DETAILS]])
- Filter after Join
SELECT STREAM `A`.`ITEM_ID`, `A`.`ITEM_PRICE`, `A`.`BUYER_NAME`, `B`.`SELLER_NAME`
FROM `ORDER_DETAILS` AS `A`
INNER JOIN `LISTING_DETAILS` AS `B` ON `A`.`ITEM_ID` = `B`.`LIST_ID`
WHERE `A`.`ITEM_PRICE` > 0 AND `B`.`AVAIL_QUANTITY` > 0
Execution Plan
plan>
LogicalDelta
LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], SELLER_NAME=[$7])
LogicalFilter(condition=[AND(>($2, 0), >($8, 0))])
LogicalJoin(condition=[=($1, $5)], joinType=[inner])
EnumerableTableScan(table=[[ORDER_DETAILS]])
LogicalTableScan(table=[[LISTING_DETAILS]])
From the two examples above, you could find that, the Planner doesn’t do any optimal work, which is actually handle by the next layer.