In my previous post How Storm SQL is executed, it goes through the main steps on how Storm processes Explain and Submit SQL. To leverage Calcite in a new project, there’s lots of details un-covered.
Here I’ll show how to invoke the Explain of Calcite in your code. If you want to try Calcite CLI directly, please go to Calcite Tutorial.
package com.***.***; import java.util.Date; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.calcite.rel.RelFieldCollation.NullDirection; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.schema.Schema.TableType; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Statistic; import org.apache.calcite.schema.Statistics; import org.apache.calcite.schema.StreamableTable; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.Planner; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; import org.apache.calcite.util.ImmutableBitSet; import com.google.common.collect.ImmutableList; public class BeamSqlExplainer { public static final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); public static final SchemaPlus defaultSchema = Frameworks.createRootSchema(true); private FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(defaultSchema).build(); private Planner planner = Frameworks.getPlanner(config); public BeamSqlExplainer(){ addTableSchema(); } public void addTableSchema(){ defaultSchema.add("ORDER_DETAILS", new StreamableTable() { public RelDataType getRowType(RelDataTypeFactory typeFactory) { RelDataTypeFactory.FieldInfoBuilder b = typeFactory.builder(); b.add("CK_TIME", typeFactory.createJavaType(Date.class)); b.add("ITEM_ID", typeFactory.createJavaType(Long.class)); b.add("ITEM_PRICE", typeFactory.createJavaType(Double.class)); b.add("BUYER_NAME", typeFactory.createJavaType(String.class)); b.add("QUANTITY", typeFactory.createJavaType(Integer.class)); return b.build(); } public Statistic getStatistic() { // return Statistics.of(100, ImmutableList.<ImmutableBitSet>of()); Direction dir = Direction.ASCENDING; RelFieldCollation collation = new RelFieldCollation(0, dir, NullDirection.UNSPECIFIED); return Statistics.of(5, ImmutableList.of(ImmutableBitSet.of(0)), ImmutableList.of(RelCollations.of(collation))); } public TableType getJdbcTableType() { return TableType.STREAM; } public Table stream() { return null; } }); } public void explain(String sql) throws SqlParseException, ValidationException, RelConversionException{ SqlNode parse = planner.parse(sql); System.out.println(parse.toString()); SqlNode validate = planner.validate(parse); RelNode tree = planner.convert(validate); String plan = RelOptUtil.toString(tree); //explain(tree, SqlExplainLevel.ALL_ATTRIBUTES); System.out.println("plan>"); System.out.println(plan); } }
This is one example:
- L34 create a typeFactory, to define data types;
- L35-38 create a Planner, with FrameworkConfig as the settings, mostly important a SchemaPlus is provided for TableSchema and UDF if available. SchemaPlus is referenced during the validation step;
- L44-74 create a table named as ORDER_DETAILS. It has 5 fields, with CK_TIME is monotonic, ASCENDING;
- L76-87 takes a SQL string as input, and print out logical execution plan;
A logical execution plan represents how a SQL is executed. Physically it may be altered depends on the storage provider.
Now, let’s have some examples:
1. Single table direct operation
SELECT STREAM `ITEM_ID`, `ITEM_PRICE`, `BUYER_NAME`, CASE WHEN `QUANTITY` > 1 THEN 'MULTIPLECK' ELSE 'SINGLECK' END AS `CK_FLAG` FROM `ORDER_DETAILS` WHERE `ITEM_PRICE` > 0 AND `QUANTITY` > 0
Execution plan:
plan> LogicalDelta LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], CK_FLAG=[CASE(>($4, 1), 'MULTIPLECK', CAST('SINGLECK'):CHAR(10) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)]) LogicalFilter(condition=[AND(>($2, 0), >($4, 0))]) EnumerableTableScan(table=[[ORDER_DETAILS]])
2. Single table with hourly GROUP BY
SELECT STREAM `ITEM_ID`, FLOOR(`CK_TIME` TO HOUR) AS `CK_TIME`, SUM(`QUANTITY`), COUNT(*) FROM `ORDER_DETAILS` WHERE `ITEM_PRICE` > 0 AND `QUANTITY` > 0 GROUP BY `ITEM_ID`, FLOOR(`CK_TIME` TO HOUR)
Execution plan:
plan> LogicalDelta LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()]) LogicalProject(ITEM_ID=[$1], CK_TIME=[FLOOR($0, FLAG(HOUR))], QUANTITY=[$4]) LogicalFilter(condition=[AND(>($2, 0), >($4, 0))]) EnumerableTableScan(table=[[ORDER_DETAILS]])
Update for join
First I add another table to the schema:
defaultSchema.add("LISTING_DETAILS", new ScannableTable() { public RelDataType getRowType(RelDataTypeFactory typeFactory) { RelDataTypeFactory.FieldInfoBuilder b = typeFactory.builder(); b.add("LIST_ID", typeFactory.createJavaType(Long.class)); b.add("ITEM_PRICE", typeFactory.createJavaType(Double.class)); b.add("SELLER_NAME", typeFactory.createJavaType(String.class)); b.add("AVAIL_QUANTITY", typeFactory.createJavaType(Integer.class)); return b.build(); } public Statistic getStatistic() { return Statistics.of(100, ImmutableList.of()); } public TableType getJdbcTableType() { return TableType.TABLE; } @Override public Enumerable scan(DataContext root) { return null; } });
Now let's see some examples:
- Filter before Join
SELECT STREAM `A`.`ITEM_ID`, `A`.`ITEM_PRICE`, `A`.`BUYER_NAME`, `B`.`SELLER_NAME` FROM `ORDER_DETAILS` AS `A` INNER JOIN `LISTING_DETAILS` AS `B` ON `A`.`ITEM_ID` = `B`.`LIST_ID` AND `A`.`ITEM_PRICE` > 0 AND `B`.`AVAIL_QUANTITY` > 0
Execution Plan
plan> LogicalDelta LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], SELLER_NAME=[$7]) LogicalProject(CK_TIME=[$0], ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], QUANTITY=[$4], LIST_ID=[$6], ITEM_PRICE0=[$7], SELLER_NAME=[$8], AVAIL_QUANTITY=[$9]) LogicalJoin(condition=[AND(=($1, $6), $5, $10)], joinType=[inner]) LogicalProject(CK_TIME=[$0], ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], QUANTITY=[$4], $f5=[>($2, 0)]) EnumerableTableScan(table=[[ORDER_DETAILS]]) LogicalProject(LIST_ID=[$0], ITEM_PRICE=[$1], SELLER_NAME=[$2], AVAIL_QUANTITY=[$3], $f4=[>($3, 0)]) LogicalTableScan(table=[[LISTING_DETAILS]])
- Filter after Join
SELECT STREAM `A`.`ITEM_ID`, `A`.`ITEM_PRICE`, `A`.`BUYER_NAME`, `B`.`SELLER_NAME` FROM `ORDER_DETAILS` AS `A` INNER JOIN `LISTING_DETAILS` AS `B` ON `A`.`ITEM_ID` = `B`.`LIST_ID` WHERE `A`.`ITEM_PRICE` > 0 AND `B`.`AVAIL_QUANTITY` > 0
Execution Plan
plan> LogicalDelta LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], SELLER_NAME=[$7]) LogicalFilter(condition=[AND(>($2, 0), >($8, 0))]) LogicalJoin(condition=[=($1, $5)], joinType=[inner]) EnumerableTableScan(table=[[ORDER_DETAILS]]) LogicalTableScan(table=[[LISTING_DETAILS]])
From the two examples above, you could find that, the Planner doesn’t do any optimal work, which is actually handle by the next layer.
[…] Query Explain with Calcite […]