-
Notifications
You must be signed in to change notification settings - Fork 986
Create a Storage Plugin
To create a storage plugin, you must create a collection of classes that work in the planner and runtime:
- Storage plugin
- Storage plugin config
- Default and optional nested schema definitions
- Table definitions
- Scan definitions, including one passed from the planner to the execution engine
- Run-time scan operator
The example here is for the simplest possible storage plugin, implemented at runtime using the EVF framework.
- [Storage - Project|Create the Project]
- [Storage - Plugin|Create the Plugin]
- [Storage - Schema|Create the Schema and Table]
- [Storage - Scan|Create the Group and Sub Scans]
- [Storage - Execution|Create the Reader]
Choose a name for your project. Let's call ours "example". Storage plugins typically go in the contrib module, in a subdirectory named storage-example.
To create the project in Eclipse:
- Select the
drill-contrib-parentnode in the Eclipse Package Explorer. - Choose Maven → New Maven Module Project from the context menu.
- Give your project the
storage-examplename. - Accept the other defaults and create the project.
- Edit the resulting
pom.xmlfile to add your needed dependencies. Use other storage plugins as a "cheat sheet" to see what is needed. - Edit
drill-contrib-parent/pom.xmlto verify your module appears in the module list. Add it if missing:
<modules>
...
<module>storage-example</module>
<module>data</module>
...
</modules>
- (Review this part.) Get Eclipse to know about your project. Use File → Import → Existing Maven Projects. Select the Drill root. Your project should now appear in the Package Explorer as
storage-example. (If this does not work, try selectingdrilland Maven → Update Project. - Eclipse named your new package
org.storage.example. Rename this package toorg.apache.drill.exec.store.example. - Do the same for the test package.
- Run the Maven-provided 'App' class to ensure everything works. Select the class name, then Debug as → Java Application from the context menu.
- Delete the example
AppandAppTestclasses.
The storage plugin class is Drill's main entry point to your plugin.
- Create the storage plugin class in Eclipse. The initial file should look like this:
public class SumoStoragePlugin extends AbstractStoragePlugin {
@Override
public StoragePluginConfig getConfig() {
// TODO Auto-generated method stub
return null;
}
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
throws IOException {
// TODO Auto-generated method stub
}
}
We'll fill in each of these as we go. We'll start with a boilerplate method that says our plugin can read data:
@Override
public boolean supportsRead() { return true; }
Create the plugin's config file:
- Under your
srcfolder, create aresourcessubfolder. - Select the
resourcesfolder in the Package Explorer, then, from the context menu: Build Path → Use as Source Folder. - Under
resources, createdrill-module.conf(easiest to just copy from another storage plugin such as OpenTSDB):
drill.classpath.scanning: {
packages += "org.apache.drill.exec.store.example"
}
The above tells Drill about your storage plugin.
The config class provides all configuration information needed to execute your plugin, except the query-specific information.
- Create the
ExampleStoragePluginConfigclass:
@JsonTypeName(ExampleStoragePluginConfig.NAME)
public class ExampleStoragePluginConfig extends StoragePluginConfigBase {
public static final String NAME = "example";
public ExampleStoragePluginConfig() {
}
@Override
public boolean equals(Object o) {
return Objects.equal(null, null);
}
@Override
public int hashCode() {
return Objects.hashCode(null, null);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
//.add("foo", foo)
.toString();
}
}
Add the config fields needed by your plugin. You will need:
- The field itself, which must be public and should be a simple Java type such as an
intorString. (More complex types are possible, but can be tricky. Look at other config classes for examples.) - Add either a public
setFoomethod or add the field to the constructor. - Create a public
getFoomethod for each field. - Add the field to the
hashCode,equalsandtoStringmethods.
Add your config to the storage plugin class:
private final ExampleStoragePluginConfig config;
public SumoStoragePlugin(ExampleStoragePluginConfig config,
DrillbitContext context, String name) throws IOException {
super(context, name);
this.config = config;
}
@Override
public StoragePluginConfig getConfig() { return config; }
The constructor has special meaning to Drill: Drill uses the type of the first argument (here ExampleStoragePluginConfig ) to match a storage plugin class with a storage plugin config class.
Optionally create the default config, if any, you want for a newly-installed Drill. These defaults are also available to unit tests.
- Create the file
bootstrap-storage-plugins.jsonin yourresourcesfolder. - Put into the file the JSON-encoded version of your default configuration.
Or, if there is no good default, just omit this file. In this case, users will have to how to enter the JSON by hand in the Drill web console.
Create a test for your plugin:
public class TestExamplePlugin extends ClusterTest {
@BeforeClass
public static void setup() throws Exception {
ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher);
startCluster(builder);
StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
ExampleStoragePluginConfig config =
new ExampleStoragePluginConfig(/* Your fields here */);
config.setEnabled(true);
pluginRegistry.createOrUpdate(ExampleStoragePluginConfig.NAME, config, true);
}
@Test
public void test() {
fail("Not yet implemented");
}
}
The setup() method does some "magic" to start an in-process Drill "cluster" (really, one Drillbit), create an instance of your plugin config, and register that config in Drill.
Notice that the test case does nothing yet.
To test, set a breakpoint in the constructor of your storage plugin. Run your test. If Eclipse hits the breakpoint, then you've got everything wired up correctly. If not, go back and review the above steps:
- The config class is marked as Jackson serializable.
- The storage plugin takes the config as its first option as shown above.
- The
drill-module.conffile exists and adds the correct package to the class path. - The test code is correct.
The storage plugin operates at both plan and run times. The first step is to define one or more schemas that define the tables available to users. The details of the schema are unique to your plugin; here we'll cover the generalities. (Note that the term "schema" here is in the sense of a collection of tables, not in the sense of the set of columns within a table.)
Drill uses Apache Calcite as its planner. Calcite is a bit of a black art, but here are some basics.
Calcite defines a "default" schema for each storage plugin which is the one used if you simply use the plugin name itself followed by a table name:
SELECT * FROM examplePlugin.aTable
In this case, Calcite will look up the table aTable in our default schema.
Calcite also allows you to define nested schemas. For example, if we are connecting to MySQL, we must first specify a database, then a table within the database:
SELECT * FROM examplePlugin.aSchema.aTable
Here, Calcite will first resolve the schema name (aSchema) within the default schema, then will resolve the table name within the nested schema.
To keep this example simple, we assume our tables reside in the default schema.
At this point, it is worth knowing how all the pieces fit together.
- Drill Uses the plugin config name to find the storage plugin config
- Drill Uses the plugin config class to find your plugin.
- Calcite asks the plugin for a schema factory to provide the default schema for your plugin.
- Calcite asks the default schema to resolve the nested schema name (if any).
- Calcite asks the schema to resolve the table name.
Calcite uses a schema factory class to create your schemas as needed. For our example, we'll create a default schema that contains a single table, "myTable". We pass along the plugin because a real implementation will likely obtain the list of schemas or tables from an external system.
One note of caution: Calcite recreates your schema classes over and over when planning each query. If schema and/or table creation is costly, consider implementing a cache.
Create the Schema Factory class:
public class ExampleSchemaFactory extends AbstractSchemaFactory {
public static final String MY_TABLE = "myTable";
private final ExampleStoragePlugin plugin;
public ExampleSchemaFactory(ExampleStoragePlugin plugin) {
super(plugin.getName());
this.plugin = plugin;
}
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
throws IOException {
parent.add(getName(), new DefaultSchema(getName()));
}
class DefaultSchema extends AbstractSchema {
DefaultSchema(String name) {
super(Collections.emptyList(), name);
}
@Override
public Table getTable(String name) {
if (MY_TABLE.contentEquals(name)) {
return null; // TODO
}
return null; // Unknown table
}
@Override
public Set<String> getTableNames() {
return Sets.newHashSet(MY_TABLE);
}
@Override
public String getTypeName() {
return ExampleStoragePluginConfig.NAME;
}
}
}
Create the schema factory in the storage plugin:
private final ExampleSchemaFactory schemaFactory;
public ExampleStoragePlugin(ExampleStoragePluginConfig config,
DrillbitContext context, String name) throws IOException {
super(context, name);
this.config = config;
schemaFactory = new ExampleSchemaFactory(this);
}
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(schemaConfig, parent);
}
We can now test that the schema resolution works. Add the following test:
@Test
public void test() throws Exception {
String sql = "SELECT * FROM sumo.logs.all";
queryBuilder().sql(sql).run();
}
Set breakpoints in the following methods:
ExampleStoragePlugin.registerSchemasDefaultSchema.getTable
Run the test. You should see the debugger stop in each method (multiple times.) If so, then the name resolution works and we are ready to move on to creating tables.
If not, then double-check that the naming works correctly:
- Register schemas in the storage plugin.
- Give the default schema the same name as given to your plugin in its constructor (not necessarily the name defined in your config class.)
- Give the default schema no parent path.
- If you include a nested schema, in the default schema
getSubSchemamethod, be sure to give it the sub-schema name and the path of its parent (the default schema.) - Provide the
getTablemethod (which does nothing yet.)
Debugging though Calcite is a bit tricky; best to sort out any issues using the hints above.
The next step in the name resolution process is to provide the class that represents the resolved table name in a query:
SELECT * example.myTable;
Drill gives you complete flexibility: you can match existing tables, you can have a fixed set of tables, or you can even create tables on the fly (as the "mock" storage plugin does.) You can also store as little or as much information as you want in your table definition.
For our example, we will support a single table name, and the only information we need is the table names. As a result, we can use Drill's table class directly. However, if we needed more table information or logic, we could create our own table class.
In our nested schema we will resolve our one and only table name:
class LogSchema extends AbstractSchema {
@Override
public Table getTable(String tableName) {
if (MY_TABLE.contentEquals(tableName)) {
return new DynamicDrillTable(plugin, plugin.getName(), new ExampleScanSpec(tableName));
}
return null;
}
We have now helped Calcite resolve a table name. Next we begin to define what we will do with the table, which is to scan (read) it. Some systems are fancier than others: some read a single, contiguous data source, some read "splits" of a data source (such as file blocks in HDFS), some arbitrarily break up a scan into chunks, others "push" the scan to an external system, and so forth. Calcite provides a great deal of flexibility to define these cases. For our needs, we assume we scan a single data source from start to end.
We start by defining a "scan specification": a description of the overall logical scan, before we've started worrying about the details. Our class just needs the table name:
@JsonTypeName("example-scan-spec")
public class ExampleScanSpec {
private final String tableName;
@JsonCreator
public ExampleScanSpec(
@JsonProperty("tableName") String tableName) {
this.tableName = tableName;
}
public String getTableName() { return tableName; }
}
We've defined this class as Jackson-serializable so that we can pass it along to the execution engine. (We could have chosen a different format, but this approach is simplest.)
We can now repeat our test, stepping through the code to verify that the scan spec and dynamic table are created correctly.
Drill is a member of the Hadoop ecosystem, and so supports the idea that a table may be represented by a directory of files, where each file consists of blocks distributed across storage nodes. A "group scan" is the term for the logical scan of the pieces that make up a table, and represents one additional layer of refinement from the scan specification.
As we have noted, Drill uses Calcite for query planning, and so the scan objects fit into the Calcite structure. As a result, the group (and later, "sub") scans are a bit complex.
In our case, our table consists of a single chunk of data, so the "group" consists of a single physical scan.
The group scan brings together three layers of information:
- The configuration of the storage plugin (the "storage engine"),
- The (optional schema) and table,
- The set of columns.
That is, the group scan extends the scan spec by providing a list of columns from our query:
SELECT * FROM example.myTable
SELECT a, b, c FROM example.myTable
Drill uses schema-on-read, so we will assume that we can figure out the table names and types at runtime. However, if we know the available columns and types at plan time, we can tell Calcite to use that information. See the existing storage plugins to see how that is done.
public class ExampleGroupScan extends AbstractGroupScan {
private final ExampleStoragePluginConfig config;
private final ExampleScanSpec scanSpec;
private final List<SchemaPath> columns;
public ExampleGroupScan(ExampleGroupScan that) {
super(that);
this.config = that.config;
this.scanSpec = that.scanSpec;
this.columns = that.columns;
}
public ExampleGroupScan(
ExampleStoragePluginConfig config,
ExampleScanSpec scanSpec,
List<SchemaPath> columns) {
super("dummy-user"); // No real user for this service
this.config = config;
this.scanSpec = scanSpec;
this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS : columns;
}
This class is not serialized, so no Jackson serialization is needed. (Some of the existing Drill plugins do make the class serializable, but it appears this is actually unnecessary.)
We need both a copy constructor and a "normal" constructor. We accept the storage plugin, though this implementation does not really need it.
This class needs a number of methods to help with planning. First, we have to tell Drill how to assign the scan to nodes. By default, we do no assignments and let Drill decide:
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) { }
Next we tell Drill how much it can parallelize the scan. We assume we are calling an external service, so we allow only one thread of execution per query:
@Override
public int getMaxParallelizationWidth() { return 1; }
This class implements the next step in the planning process: creating specific scans, but we'll stub this part out for now:
@Override
public SubScan getSpecificScan(int minorFragmentId) {
// TODO Auto-generated method stub
return null;
}
Finally, we need some boilerplate required by Drill:
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new ExampleGroupScan(this);
}
@Override
public String getDigest() {
return toString();
}
@Override
public String toString() {
return "Example scan of " + scanSpec.getSchemaName() + "." + scanSpec.getTableName();
}
We can now again run our test. First set a breakpoint in the getSpecificScan, and run the test. This will verify that things work up to this point.
The group scan represents the general idea of "scan table X." The specific scan (also called a "sub scan") contains the information needed to implement the physical scan, including Hadoop splits, database shards or whatever is needed by the storage engine. In our case, we assume a single scan operator that just needs the information gathered above.
The specific scan is serialized to JSON and sent to each Drillbit, where it is deserialized and passed to the scan operator.
@JsonTypeName("example-sub-scan")
public class ExampleSubScan extends AbstractSubScan {
private final ExampleStoragePluginConfig pluginConfig;
private final ExampleScanSpec tableSpec;
private final List<SchemaPath> columns;
public ExampleSubScan(
@JsonProperty("config") ExampleStoragePluginConfig config,
@JsonProperty("tableSpec") ExampleScanSpec tableSpec,
@JsonProperty("columns") List<SchemaPath> columns) {
super("sumo"); // No real user for Sumo REST
this.pluginConfig = config;
this.tableSpec = tableSpec;
this.columns = columns;
}
@Override
@JsonIgnore
public int getOperatorType() {
return CoreOperatorType.EXAMPLE_SUB_SCAN_VALUE;
}
public ExampleStoragePluginConfig getConfig() {
return pluginConfig;
}
public ExampleScanSpec getTableSpec() {
return tableSpec;
}
public List<SchemaPath> getColumns() {
return columns;
}
}
Notice that we have included the storage plugin config, not the storage plugin itself. The config is Jackson-serializable, the plugin is not. We now see why we made the scan spec serializable, it will be included in our sub scan.
Notice above that we referenced CoreOperatorType.EXAMPLE_SUB_SCAN_VALUE. This is a unique numeric identifier for each operator in Drill. Unfortunately, all operators must be defined in a single global class, which makes it hard to add a true plugin not known to Drill itself. Until this is resolved, you must modify UserBitShared.proto to add your operator type:
enum CoreOperatorType {
SINGLE_SENDER = 0;
...
SHP_SUB_SCAN = 65;
EXAMPLE_SUB_SCAN = 66;
}
Choose the next available number for your ID.
Then, rebuild the Protobuf files as described in protocol/readme.txt.
We now connect up our sub scan with the group scan:
public class ExampleGroupScan extends AbstractGroupScan {
...
@Override
public SubScan getSpecificScan(int minorFragmentId) {
return new ExampleGroupScan(config, scanSpec, columns);
}
Set a breakpoint in the above method and run the test case. Execution should stop there and you can verify that your sub scan is created as you expect.
With this, we are done with the planner side of the project and are ready to move onto the execution side.