This is the second advantage post to understand how Storm SQL leverages Calcite. See the previous post here:
Compared Explain and Submit function in How Storm SQL is executed, the execution plan generated doesn’t match:
// explain function RelNode tree = planner.convert(validate); // submit function TridentRel relNode = getPlan(query);
This is the core part, about how Storm SQL leverages Calcite’s parser engine, and plugin its own planner.
To simplify, customized rules convert standard RelOpt into Storm-alien RelOpt.
ConverterRule
ConverterRule defines a rule which converts from one calling convertion to another without Semantics.
Take TridentScanRule for example:
public class TridentScanRule extends ConverterRule { public static final TridentScanRule INSTANCE = new TridentScanRule(); public static final int DEFAULT_PARALLELISM_HINT = 1; private TridentScanRule() { super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule"); } @Override public RelNode convert(RelNode rel) { final TableScan scan = (TableScan) rel; int parallelismHint = DEFAULT_PARALLELISM_HINT; final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class); if (parallelTable != null && parallelTable.parallelismHint() != null) { parallelismHint = parallelTable.parallelismHint(); } final Table table = scan.getTable().unwrap(Table.class); switch (table.getJdbcTableType()) { case STREAM: return new TridentStreamScanRel(scan.getCluster(), scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE), scan.getTable(), parallelismHint); default: throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType())); } } }
In this class, TridentScanRule
converts a EnumerableTableScan
to TridentStreamScanRel
.
With this way, Storm defines its own RelNode, and use to tridentPlan
to create a TridentTopology.