build release version for Storm

Compared with the branches of Storm source code in GitHub, Storm doesn’t have any release package for version 1.1.* and 2.*. So I decide to build it from source code, to test the new features on Storm 2.*.

Step 1. clone source code from github

git clone https://github.com/apache/storm.git master

Step 2. build the project with Maven

cd  storm
mvn clean install -DskipTests=true # ignore test to save time

Step 3. create the binary distribution

cd storm-dist/binary
mvn package

The .zip and .tar.gz binary archives will be created in the storm-dist/binary/final-package/target/ sub-directory.

$ ls -lth final-package/target/
total 1071952
-rw-r--r--  1 mingmxu  110050932   801B Jan 26 12:23 apache-storm-2.0.0-SNAPSHOT.zip.asc
-rw-r--r--  1 mingmxu  110050932   801B Jan 26 12:23 apache-storm-2.0.0-SNAPSHOT.tar.gz.asc
-rw-r--r--  1 mingmxu  110050932   3.3K Jan 26 12:23 apache-storm-2.0.0-SNAPSHOT.pom
-rw-r--r--  1 mingmxu  110050932   801B Jan 26 12:23 apache-storm-2.0.0-SNAPSHOT.pom.asc
-rw-r--r--  1 mingmxu  110050932   262M Jan 26 12:23 apache-storm-2.0.0-SNAPSHOT.zip
drwxr-xr-x  8 mingmxu  110050932   272B Jan 26 12:23 archive-tmp
-rw-r--r--  1 mingmxu  110050932   262M Jan 26 12:23 apache-storm-2.0.0-SNAPSHOT.tar.gz
drwxr-xr-x  3 mingmxu  110050932   102B Jan 26 12:11 maven-shared-archive-resources

Issue I meet and solutions:

1. Could not find artifact org.apache.storm:storm-maven-plugins:jar:1.1.0-SNAPSHOT

solution: I change the version used in storm-core/pom.xml directly to an available version.

@@ -875,7 +875,7 @@
             <plugin>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-maven-plugins</artifactId>
-               <version>${project.version}</version>
+               <version>1.0.2</version>
                <executions>
                  <execution>
                    <id>version-info</id>
@@ -970,6 +970,41 @@

2. Failed to execute goal org.apache.maven.plugins:maven-gpg-plugin:1.6:sign (default) on project final-package: Unable to execute gpg command: Error while executing process. Cannot run program “gpg”: error=2, No such file or directory

solution: You need to install GPG. In Mac, I install GPG suit, and generate a new key pair in GPG Keychain.

How Storm SQL is executed

I’m working on a project, to provide SQL interface for real time data processing on Beam. With some research, Storm SQL is mostly closed with my scenario, without new feature pending.

The main idea of Storm SQL is quite straight-forward, shown as image below:
1. SQL is parsed with Calcite, and generate an Execution Plan;
2. The execution plan is expressed with Trident Topology;
3. Submit the Trident Topology to Storm cluster;

Workflow of StormSQL: Alt

Next, let’s move to the code base of Storm, to see how it’s implemented.

Note that, Storm SQL is a new feature in version 2.0.0-SNASHOT, and not officially available yet. So first of all, you need to clone the code base from Github:Storm with master branch. You can only import Storm SQL part under storm/external/sql.

Step 1. the launcher

Refer to Wiki: Storm SQL Integration. The command to run Storm SQL is:

$ bin/storm sql <sql-file> <topo-name>

In storm.py, it’s actually launched as

exec_storm_class(
        "org.apache.storm.sql.StormSqlRunner",
        jvmtype="-client",
        extrajars=extrajars,
        args=args,
        daemon=False,
        jvmopts=["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])

org.apache.storm.sql.StormSqlRunner is the first place we start, depends on the input parameters, it could be
1. output the execution plan, which is function explain in StormSqlImpl.java;
2) submit the job, which is function submit in StormSqlImpl.java.

step 2. StormSql and StormParser

Before go to the details of Explain/Submit, I’d play more attention to classes StormSql and StormParser.

StormSql has an implementation class StormSqlImpl to coordinate the steps from SQL string to Explain outut, or a Trident Topology.

StormParser has an implementation class StormParserImpl for the physical parsing work. You may find that there’s not a StormParserImpl.java file. It’s generate by running mvn generate-sources.

It’s another BIG topic on how to generate the Parser, will put the details later in another post.

Step 3. explain a SQL

The top-level code for explain is included as below. It takes a list of SQL Statements, and print out the execution plan for each. In this part, it doesn’t touch much on Calcite-Storm integration, mostly it’s standard usage of Calcite parser/planner itself.

public void explain(Iterable<String> statements) throws Exception {
    Map<String, ISqlTridentDataSource> dataSources = new HashMap<>();
    for (String sql : statements) {
        StormParser parser = new StormParser(sql);
        SqlNode node = parser.impl().parseSqlStmtEof();

        System.out.println("===========================================================");
        System.out.println("query>");
        System.out.println(sql);
        System.out.println("-----------------------------------------------------------");

        if (node instanceof SqlCreateTable) {
            handleCreateTableForTrident((SqlCreateTable) node, dataSources);
            System.out.println("No plan presented on DDL");
        } else if (node instanceof SqlCreateFunction) {
            handleCreateFunction((SqlCreateFunction) node);
            System.out.println("No plan presented on DDL");
        } else {
            FrameworkConfig config = buildFrameWorkConfig();
            Planner planner = Frameworks.getPlanner(config);
            SqlNode parse = planner.parse(sql);
            SqlNode validate = planner.validate(parse);
            RelNode tree = planner.convert(validate);

            String plan = StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
            System.out.println("plan>");
            System.out.println(plan);
        }

        System.out.println("===========================================================");
    }
}

a. L5, Detect statement type

SqlNode node = parser.impl().parseSqlStmtEof();

This line parses the SQL statement string, and generate a SqlNode to represent it’s syntax structure. Below image show all the types supported wuth Calcite.
Alt

b. L12-14, define datasources

if (node instanceof SqlCreateTable) {
    handleCreateTableForTrident((SqlCreateTable) node, dataSources);
    System.out.println("No plan presented on DDL");
}

If it’s a CREATE TABLE statement, a ISqlTridentDataSource is created based on create schema. ISqlTridentDataSource defines how to read data from the source, and how to output data, with Trident. See Calcite Adapter to understand how Calcite handles data binding.

c. L15-17, add UDF/UDAF

if (node instanceof SqlCreateFunction) {
    handleCreateFunction((SqlCreateFunction) node);
    System.out.println("No plan presented on DDL");
}

UDF and UDAF are supported, you could visit wiki:Specifying UDF and UDAF for more details.

d. L19-27, Execution planner

FrameworkConfig config = buildFrameWorkConfig();
Planner planner = Frameworks.getPlanner(config);
SqlNode parse = planner.parse(sql);
SqlNode validate = planner.validate(parse);
RelNode tree = planner.convert(validate);

String plan = StormRelUtils.explain(tree, SqlExplainLevel.ALL_ATTRIBUTES);
System.out.println("plan>");
System.out.println(plan);
  • L1, a StdFrameworkConfig instance is created as planning config;
  • L2, a instance of Calcite’s default Planner org.apache.calcite.prepare.PlannerImpl is created.
  • L3-5, Planner parse and validate the SQL statement, and convert into a Relode. RelNode is a tree of relational expressions. You can see Calcite: Algebra for more details of SQL in the back-end.
  • L7, StormRelUtils output the execution plan represent by RelNode, with a easy-to-read format.

Step 3. submit a SQL

It’s similar as EXPLAIN to handle SqlCreateFunction and SqlCreateTable, and the information is kept in SchemaPlus as source schema and UDF/UDAF.

1). create QueryPlanner with schema

FrameworkConfig config = Frameworks.newConfigBuilder()
        .defaultSchema(schema)
        .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
        .traitDefs(traitDefs)
        .context(Contexts.EMPTY_CONTEXT)
        .ruleSets(TridentStormRuleSets.getRuleSets())
        .costFactory(null)
        .typeSystem(StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM)
        .build();
this.planner = Frameworks.getPlanner(config);

a. ruleSets(TridentStormRuleSets.getRuleSets())
TridentStormRuleSets.getRuleSets() returns all the rules for planner. It contains.

b. StormRelDataTypeSystem.STORM_REL_DATATYPE_SYSTEM
Here Storm overwrite max Numeric to 38, from default 19.

2). compile SQL to Trident

The actual logic is defined as below, here I skip IAggregatableStream lastStream as it’s for interactive streaming.

TridentRel relNode = getPlan(query);

TridentPlanCreator tridentPlanCreator = new TridentPlanCreator(sources, new RexBuilder(typeFactory));
relNode.tridentPlan(tridentPlanCreator);

final TridentTopology topology = tridentPlanCreator.getTopology();

a). L1, getPlan(query)

Back to EXPLAIN part, it ends with a standard RelNode, that’s why I call it not-related-with-Trident.

Function convertToStormRelconvert is applied to RelNode, to generate extended TridentRel expressions, by

planner.transform(STORM_REL_CONVERSION_RULES, traitSet.plus(TridentLogicalConvention.INSTANCE), relNode);

TODO, now RelNode is translated to TridentRel, which has a new interface tridentPlan, a separated post is planned to describe the parser details.

In the line of PLannerImpl,

Program program = programs.get(ruleSetIndex);

It points to the rule set defined in QueryPlanner as

addAll(StreamRules.RULES).addAll(calciteToStormConversionRules)

b). L3-4, create Trident Plan

Let’s take TridentFilterRel as an example

RelNode input = getInput();
StormRelUtils.getStormRelInput(input).tridentPlan(planCreator);
Stream inputStream = planCreator.pop().toStream();

First of all, it create inputStream in Trident. For example in TridentStreamScanRel, it calls newStream(stageName, sources.get(sourceName).getProducer()). There’re several DataSourceProvider in StormSQL, like KafkaDataSourcesProvider which returns a OpaqueTridentKafkaSpout.

Next, a filter function is applied to the inputStream,

inputStream.filter(new EvaluationFilter(expression, planCreator.getDataContext())).name(stageName);

Inside of EvaluationFilter, there’s one ScriptEvaluator of Janino.

c). L6, get Trident Topology

After L3-4, the Trident Topology is already assembled, as TridentTopology topology in TridentPlanCreator.

3). package and submit to run

Back to submit in StormSqlImpl, the last step is submit the topology to run,

jarPath = Files.createTempFile("storm-sql", ".jar");
System.setProperty("storm.jar", jarPath.toString());
packageEmptyTopology(jarPath);
StormSubmitter.submitTopologyAs(name, stormConf, topo.build(), opts, progressListener, asUser);

That’s end for how Storm SQL is implemented. There’re still several topics that are not expanded here:
1. The syntax and parser of Calcite Stream SQL;
2. extend rule to support customized operation in planner;
3. define new data source adapter;