Query Explain with Calcite

In my previous post How Storm SQL is executed, it goes through the main steps on how Storm processes Explain and Submit SQL. To leverage Calcite in a new project, there’s lots of details un-covered.

Here I’ll show how to invoke the Explain of Calcite in your code. If you want to try Calcite CLI directly, please go to Calcite Tutorial.

package com.***.***;

import java.util.Date;

import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.Schema.TableType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.ImmutableBitSet;

import com.google.common.collect.ImmutableList;

public class BeamSqlExplainer {
  public static final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
  public static final SchemaPlus defaultSchema = Frameworks.createRootSchema(true);

  private FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(defaultSchema).build();
  private Planner planner = Frameworks.getPlanner(config);

  public BeamSqlExplainer(){
    addTableSchema();
  }

  public void addTableSchema(){
    defaultSchema.add("ORDER_DETAILS", new StreamableTable() {
      public RelDataType getRowType(RelDataTypeFactory typeFactory) {
        RelDataTypeFactory.FieldInfoBuilder b = typeFactory.builder();
        b.add("CK_TIME", typeFactory.createJavaType(Date.class));
        b.add("ITEM_ID", typeFactory.createJavaType(Long.class));
        b.add("ITEM_PRICE", typeFactory.createJavaType(Double.class));
        b.add("BUYER_NAME", typeFactory.createJavaType(String.class));
        b.add("QUANTITY", typeFactory.createJavaType(Integer.class));

        return b.build();
      }

      public Statistic getStatistic() {
//        return Statistics.of(100, ImmutableList.<ImmutableBitSet>of());
        Direction dir = Direction.ASCENDING;
            RelFieldCollation collation = new RelFieldCollation(0, dir, NullDirection.UNSPECIFIED);
            return Statistics.of(5, ImmutableList.of(ImmutableBitSet.of(0)),
                ImmutableList.of(RelCollations.of(collation)));
      }

      public TableType getJdbcTableType() {
        return TableType.STREAM;
      }

      public Table stream() {
        return null;
      }

    });
  }

  public void explain(String sql) throws SqlParseException, ValidationException, RelConversionException{
    SqlNode parse = planner.parse(sql);
    System.out.println(parse.toString());

    SqlNode validate = planner.validate(parse);
    RelNode tree = planner.convert(validate);

    String plan = RelOptUtil.toString(tree); //explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
    System.out.println("plan>");
    System.out.println(plan);

  }

}

This is one example:

  • L34 create a typeFactory, to define data types;
  • L35-38 create a Planner, with FrameworkConfig as the settings, mostly important a SchemaPlus is provided for TableSchema and UDF if available. SchemaPlus is referenced during the validation step;
  • L44-74 create a table named as ORDER_DETAILS. It has 5 fields, with CK_TIME is monotonic, ASCENDING;
  • L76-87 takes a SQL string as input, and print out logical execution plan;

A logical execution plan represents how a SQL is executed. Physically it may be altered depends on the storage provider.

Now, let’s have some examples:

1. Single table direct operation

SELECT STREAM `ITEM_ID`, `ITEM_PRICE`, `BUYER_NAME`, CASE WHEN `QUANTITY` > 1 THEN 'MULTIPLECK' ELSE 'SINGLECK' END AS `CK_FLAG`
FROM `ORDER_DETAILS`
WHERE `ITEM_PRICE` > 0 AND `QUANTITY` > 0

Execution plan:

plan>
LogicalDelta
  LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], CK_FLAG=[CASE(>($4, 1), 'MULTIPLECK', CAST('SINGLECK'):CHAR(10) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)])
    LogicalFilter(condition=[AND(>($2, 0), >($4, 0))])
      EnumerableTableScan(table=[[ORDER_DETAILS]])

2. Single table with hourly GROUP BY

SELECT STREAM `ITEM_ID`, FLOOR(`CK_TIME` TO HOUR) AS `CK_TIME`, SUM(`QUANTITY`), COUNT(*)
FROM `ORDER_DETAILS`
WHERE `ITEM_PRICE` > 0 AND `QUANTITY` > 0
GROUP BY `ITEM_ID`, FLOOR(`CK_TIME` TO HOUR)

Execution plan:

plan>
LogicalDelta
  LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
    LogicalProject(ITEM_ID=[$1], CK_TIME=[FLOOR($0, FLAG(HOUR))], QUANTITY=[$4])
      LogicalFilter(condition=[AND(>($2, 0), >($4, 0))])
        EnumerableTableScan(table=[[ORDER_DETAILS]])

Update for join

First I add another table to the schema:

defaultSchema.add("LISTING_DETAILS", new ScannableTable() {
  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    RelDataTypeFactory.FieldInfoBuilder b = typeFactory.builder();
    b.add("LIST_ID", typeFactory.createJavaType(Long.class));
    b.add("ITEM_PRICE", typeFactory.createJavaType(Double.class));
    b.add("SELLER_NAME", typeFactory.createJavaType(String.class));
    b.add("AVAIL_QUANTITY", typeFactory.createJavaType(Integer.class));

    return b.build();
  }

  public Statistic getStatistic() {
    return Statistics.of(100, ImmutableList.of());
  }

  public TableType getJdbcTableType() {
    return TableType.TABLE;
  }

  @Override
  public Enumerable scan(DataContext root) {
    return null;
  }

});

Now let's see some examples:

  1. Filter before Join
SELECT STREAM `A`.`ITEM_ID`, `A`.`ITEM_PRICE`, `A`.`BUYER_NAME`, `B`.`SELLER_NAME`
FROM `ORDER_DETAILS` AS `A`
INNER JOIN `LISTING_DETAILS` AS `B` ON `A`.`ITEM_ID` = `B`.`LIST_ID` AND `A`.`ITEM_PRICE` > 0 AND `B`.`AVAIL_QUANTITY` > 0

Execution Plan

plan>
LogicalDelta
  LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], SELLER_NAME=[$7])
    LogicalProject(CK_TIME=[$0], ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], QUANTITY=[$4], LIST_ID=[$6], ITEM_PRICE0=[$7], SELLER_NAME=[$8], AVAIL_QUANTITY=[$9])
      LogicalJoin(condition=[AND(=($1, $6), $5, $10)], joinType=[inner])
        LogicalProject(CK_TIME=[$0], ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], QUANTITY=[$4], $f5=[>($2, 0)])
          EnumerableTableScan(table=[[ORDER_DETAILS]])
        LogicalProject(LIST_ID=[$0], ITEM_PRICE=[$1], SELLER_NAME=[$2], AVAIL_QUANTITY=[$3], $f4=[>($3, 0)])
          LogicalTableScan(table=[[LISTING_DETAILS]])
  1. Filter after Join
SELECT STREAM `A`.`ITEM_ID`, `A`.`ITEM_PRICE`, `A`.`BUYER_NAME`, `B`.`SELLER_NAME`
FROM `ORDER_DETAILS` AS `A`
INNER JOIN `LISTING_DETAILS` AS `B` ON `A`.`ITEM_ID` = `B`.`LIST_ID`
WHERE `A`.`ITEM_PRICE` > 0 AND `B`.`AVAIL_QUANTITY` > 0

Execution Plan

plan>
LogicalDelta
  LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], SELLER_NAME=[$7])
    LogicalFilter(condition=[AND(>($2, 0), >($8, 0))])
      LogicalJoin(condition=[=($1, $5)], joinType=[inner])
        EnumerableTableScan(table=[[ORDER_DETAILS]])
        LogicalTableScan(table=[[LISTING_DETAILS]])

From the two examples above, you could find that, the Planner doesn’t do any optimal work, which is actually handle by the next layer.

One thought on “Query Explain with Calcite

Comments are closed.