This is the second advantage post to understand how Storm SQL leverages Calcite. See the previous post here:
Compared Explain and Submit function in How Storm SQL is executed, the execution plan generated doesn’t match:
// explain function RelNode tree = planner.convert(validate); // submit function TridentRel relNode = getPlan(query);
This is the core part, about how Storm SQL leverages Calcite’s parser engine, and plugin its own planner.
To simplify, customized rules convert standard RelOpt into Storm-alien RelOpt.
ConverterRule
ConverterRule defines a rule which converts from one calling convertion to another without Semantics.
Take TridentScanRule for example:
public class TridentScanRule extends ConverterRule {
public static final TridentScanRule INSTANCE = new TridentScanRule();
public static final int DEFAULT_PARALLELISM_HINT = 1;
private TridentScanRule() {
super(EnumerableTableScan.class, EnumerableConvention.INSTANCE, TridentLogicalConvention.INSTANCE, "TridentScanRule");
}
@Override
public RelNode convert(RelNode rel) {
final TableScan scan = (TableScan) rel;
int parallelismHint = DEFAULT_PARALLELISM_HINT;
final ParallelStreamableTable parallelTable = scan.getTable().unwrap(ParallelStreamableTable.class);
if (parallelTable != null && parallelTable.parallelismHint() != null) {
parallelismHint = parallelTable.parallelismHint();
}
final Table table = scan.getTable().unwrap(Table.class);
switch (table.getJdbcTableType()) {
case STREAM:
return new TridentStreamScanRel(scan.getCluster(),
scan.getTraitSet().replace(TridentLogicalConvention.INSTANCE),
scan.getTable(), parallelismHint);
default:
throw new IllegalArgumentException(String.format("Unsupported table type: %s", table.getJdbcTableType()));
}
}
}
In this class, TridentScanRule converts a EnumerableTableScan to TridentStreamScanRel.
With this way, Storm defines its own RelNode, and use to tridentPlan to create a TridentTopology.