Calcite RelNode(Relational Expression) modifier

This is the second advantage post to understand how Storm SQL leverages Calcite. See the previous post here:

Compared Explain and Submit function in How Storm SQL is executed, the execution plan generated doesn’t match:

// explain function
RelNode tree = planner.convert(validate);

// submit function
TridentRel relNode = getPlan(query);

This is the core part, about how Storm SQL leverages Calcite’s parser engine, and plugin its own planner.

To simplify, customized rules convert standard RelOpt into Storm-alien RelOpt.

ConverterRule
ConverterRule defines a rule which converts from one calling convertion to another without Semantics.

Take TridentScanRule for example:

public class TridentScanRule extends ConverterRule {
  public static final TridentScanRule INSTANCE = new TridentScanRule();
  public static final int DEFAULT_PARALLELISM_HINT = 1;

  private TridentScanRule() {
    super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule");
  }

  @Override
  public RelNode convert(RelNode rel) {
    final TableScan scan = (TableScan) rel;
    int parallelismHint = DEFAULT_PARALLELISM_HINT;

    final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
    if (parallelTable != null && parallelTable.parallelismHint() != null) {
      parallelismHint = parallelTable.parallelismHint();
    }

    final Table table = scan.getTable().unwrap(Table.class);
    switch (table.getJdbcTableType()) {
      case STREAM:
        return new TridentStreamScanRel(scan.getCluster(),
            scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
            scan.getTable(), parallelismHint);
      default:
        throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
    }
  }
}

In this class, TridentScanRule converts a EnumerableTableScan to TridentStreamScanRel.

With this way, Storm defines its own RelNode, and use to tridentPlan to create a TridentTopology.

Query Explain with Calcite

In my previous post How Storm SQL is executed, it goes through the main steps on how Storm processes Explain and Submit SQL. To leverage Calcite in a new project, there’s lots of details un-covered.

Here I’ll show how to invoke the Explain of Calcite in your code. If you want to try Calcite CLI directly, please go to Calcite Tutorial.

package com.***.***;

import java.util.Date;

import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.Schema.TableType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.ImmutableBitSet;

import com.google.common.collect.ImmutableList;

public class BeamSqlExplainer {
  public static final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
  public static final SchemaPlus defaultSchema = Frameworks.createRootSchema(true);

  private FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(defaultSchema).build();
  private Planner planner = Frameworks.getPlanner(config);

  public BeamSqlExplainer(){
    addTableSchema();
  }

  public void addTableSchema(){
    defaultSchema.add("ORDER_DETAILS", new StreamableTable() {
      public RelDataType getRowType(RelDataTypeFactory typeFactory) {
        RelDataTypeFactory.FieldInfoBuilder b = typeFactory.builder();
        b.add("CK_TIME", typeFactory.createJavaType(Date.class));
        b.add("ITEM_ID", typeFactory.createJavaType(Long.class));
        b.add("ITEM_PRICE", typeFactory.createJavaType(Double.class));
        b.add("BUYER_NAME", typeFactory.createJavaType(String.class));
        b.add("QUANTITY", typeFactory.createJavaType(Integer.class));

        return b.build();
      }

      public Statistic getStatistic() {
//        return Statistics.of(100, ImmutableList.<ImmutableBitSet>of());
        Direction dir = Direction.ASCENDING;
            RelFieldCollation collation = new RelFieldCollation(0, dir, NullDirection.UNSPECIFIED);
            return Statistics.of(5, ImmutableList.of(ImmutableBitSet.of(0)),
                ImmutableList.of(RelCollations.of(collation)));
      }

      public TableType getJdbcTableType() {
        return TableType.STREAM;
      }

      public Table stream() {
        return null;
      }

    });
  }

  public void explain(String sql) throws SqlParseException, ValidationException, RelConversionException{
    SqlNode parse = planner.parse(sql);
    System.out.println(parse.toString());

    SqlNode validate = planner.validate(parse);
    RelNode tree = planner.convert(validate);

    String plan = RelOptUtil.toString(tree); //explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
    System.out.println("plan>");
    System.out.println(plan);

  }

}

This is one example:

  • L34 create a typeFactory, to define data types;
  • L35-38 create a Planner, with FrameworkConfig as the settings, mostly important a SchemaPlus is provided for TableSchema and UDF if available. SchemaPlus is referenced during the validation step;
  • L44-74 create a table named as ORDER_DETAILS. It has 5 fields, with CK_TIME is monotonic, ASCENDING;
  • L76-87 takes a SQL string as input, and print out logical execution plan;

A logical execution plan represents how a SQL is executed. Physically it may be altered depends on the storage provider.

Now, let’s have some examples:

1. Single table direct operation

SELECT STREAM `ITEM_ID`, `ITEM_PRICE`, `BUYER_NAME`, CASE WHEN `QUANTITY` > 1 THEN 'MULTIPLECK' ELSE 'SINGLECK' END AS `CK_FLAG`
FROM `ORDER_DETAILS`
WHERE `ITEM_PRICE` > 0 AND `QUANTITY` > 0

Execution plan:

plan>
LogicalDelta
  LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], CK_FLAG=[CASE(>($4, 1), 'MULTIPLECK', CAST('SINGLECK'):CHAR(10) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL)])
    LogicalFilter(condition=[AND(>($2, 0), >($4, 0))])
      EnumerableTableScan(table=[[ORDER_DETAILS]])

2. Single table with hourly GROUP BY

SELECT STREAM `ITEM_ID`, FLOOR(`CK_TIME` TO HOUR) AS `CK_TIME`, SUM(`QUANTITY`), COUNT(*)
FROM `ORDER_DETAILS`
WHERE `ITEM_PRICE` > 0 AND `QUANTITY` > 0
GROUP BY `ITEM_ID`, FLOOR(`CK_TIME` TO HOUR)

Execution plan:

plan>
LogicalDelta
  LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
    LogicalProject(ITEM_ID=[$1], CK_TIME=[FLOOR($0, FLAG(HOUR))], QUANTITY=[$4])
      LogicalFilter(condition=[AND(>($2, 0), >($4, 0))])
        EnumerableTableScan(table=[[ORDER_DETAILS]])

Update for join

First I add another table to the schema:

defaultSchema.add("LISTING_DETAILS", new ScannableTable() {
  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    RelDataTypeFactory.FieldInfoBuilder b = typeFactory.builder();
    b.add("LIST_ID", typeFactory.createJavaType(Long.class));
    b.add("ITEM_PRICE", typeFactory.createJavaType(Double.class));
    b.add("SELLER_NAME", typeFactory.createJavaType(String.class));
    b.add("AVAIL_QUANTITY", typeFactory.createJavaType(Integer.class));

    return b.build();
  }

  public Statistic getStatistic() {
    return Statistics.of(100, ImmutableList.of());
  }

  public TableType getJdbcTableType() {
    return TableType.TABLE;
  }

  @Override
  public Enumerable scan(DataContext root) {
    return null;
  }

});

Now let's see some examples:

  1. Filter before Join
SELECT STREAM `A`.`ITEM_ID`, `A`.`ITEM_PRICE`, `A`.`BUYER_NAME`, `B`.`SELLER_NAME`
FROM `ORDER_DETAILS` AS `A`
INNER JOIN `LISTING_DETAILS` AS `B` ON `A`.`ITEM_ID` = `B`.`LIST_ID` AND `A`.`ITEM_PRICE` > 0 AND `B`.`AVAIL_QUANTITY` > 0

Execution Plan

plan>
LogicalDelta
  LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], SELLER_NAME=[$7])
    LogicalProject(CK_TIME=[$0], ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], QUANTITY=[$4], LIST_ID=[$6], ITEM_PRICE0=[$7], SELLER_NAME=[$8], AVAIL_QUANTITY=[$9])
      LogicalJoin(condition=[AND(=($1, $6), $5, $10)], joinType=[inner])
        LogicalProject(CK_TIME=[$0], ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], QUANTITY=[$4], $f5=[>($2, 0)])
          EnumerableTableScan(table=[[ORDER_DETAILS]])
        LogicalProject(LIST_ID=[$0], ITEM_PRICE=[$1], SELLER_NAME=[$2], AVAIL_QUANTITY=[$3], $f4=[>($3, 0)])
          LogicalTableScan(table=[[LISTING_DETAILS]])
  1. Filter after Join
SELECT STREAM `A`.`ITEM_ID`, `A`.`ITEM_PRICE`, `A`.`BUYER_NAME`, `B`.`SELLER_NAME`
FROM `ORDER_DETAILS` AS `A`
INNER JOIN `LISTING_DETAILS` AS `B` ON `A`.`ITEM_ID` = `B`.`LIST_ID`
WHERE `A`.`ITEM_PRICE` > 0 AND `B`.`AVAIL_QUANTITY` > 0

Execution Plan

plan>
LogicalDelta
  LogicalProject(ITEM_ID=[$1], ITEM_PRICE=[$2], BUYER_NAME=[$3], SELLER_NAME=[$7])
    LogicalFilter(condition=[AND(>($2, 0), >($8, 0))])
      LogicalJoin(condition=[=($1, $5)], joinType=[inner])
        EnumerableTableScan(table=[[ORDER_DETAILS]])
        LogicalTableScan(table=[[LISTING_DETAILS]])

From the two examples above, you could find that, the Planner doesn’t do any optimal work, which is actually handle by the next layer.