Skip to content

Create a Storage Plugin

Paul Rogers edited this page Nov 14, 2019 · 22 revisions

Overview

To create a storage plugin, you must create a collection of classes that work in the planner and runtime:

  • Storage plugin
  • Storage plugin config
  • Default and optional nested schema definitions
  • Table definitions
  • Scan definitions, including one passed from the planner to the execution engine
  • Run-time record reader

Create the Project

Choose a name for your project. Let's call ours "example". Storage plugins typically go in the contrib module, in a subdirectory named storage-example.

To create the project in Eclipse:

  • Select the drill-contrib-parent node in the Eclipse Package Explorer.
  • Choose Maven → New Maven Module Project from the context menu.
  • Give your project the storage-example name.
  • Accept the other defaults and create the project.
  • Edit the resulting pom.xml file to add your needed dependencies. Use other storage plugins as a "cheat sheet" to see what is needed.
  • Edit drill-contrib-parent/pom.xml to verify your module appears in the module list. Add it if missing:
  <modules>
    ...
    <module>storage-example</module>
    <module>data</module>
    ...
  </modules>
  • (Review this part.) Get Eclipse to know about your project. Use File → Import → Existing Maven Projects. Select the Drill root. Your project should now appear in the Package Explorer as storage-example. (If this does not work, try selecting drill and Maven → Update Project.
  • Eclipse named your new package org.storage.example. Rename this package to org.apache.drill.exec.store.example.
  • Do the same for the test package.
  • Run the Maven-provided 'App' class to ensure everything works. Select the class name, then Debug as → Java Application from the context menu.
  • Delete the example App and AppTest classes.

Storage Plugin Class

The storage plugin class is Drill's main entry point to your plugin.

  • Create the storage plugin class in Eclipse. The initial file should look like this:
public class SumoStoragePlugin extends AbstractStoragePlugin {
  @Override
  public StoragePluginConfig getConfig() {
    // TODO Auto-generated method stub
    return null;
  }

  @Override
  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
      throws IOException {
    // TODO Auto-generated method stub
  }
}

We'll fill in each of these as we go. We'll start with a boilerplate method that says our plugin can read data:

  @Override
  public boolean supportsRead() { return true; }

Drill Config File

Create the plugin's config file:

  • Under your src folder, create a resources subfolder.
  • Select the resources folder in the Package Explorer, then, from the context menu: Build Path → Use as Source Folder.
  • Under resources, create drill-module.conf (easiest to just copy from another storage plugin such as OpenTSDB):
drill.classpath.scanning: {
  packages += "org.apache.drill.exec.store.example"
}

The above tells Drill about your storage plugin.

Storage Plugin Config Class

The config class provides all configuration information needed to execute your plugin, except the query-specific information.

  • Create the ExampleStoragePluginConfig class:
@JsonTypeName(ExampleStoragePluginConfig.NAME)
public class ExampleStoragePluginConfig extends StoragePluginConfigBase {

  public static final String NAME = "example";

  public ExampleStoragePluginConfig() {
  }

  @Override
  public boolean equals(Object o) {
    return Objects.equal(null, null);
  }

  @Override
  public int hashCode() {
    return Objects.hashCode(null, null);
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this)
        //.add("foo", foo)
        .toString();
  }

}

Add the config fields needed by your plugin. You will need:

  • The field itself, which must be public and should be a simple Java type such as an int or String. (More complex types are possible, but can be tricky. Look at other config classes for examples.)
  • Add either a public setFoo method or add the field to the constructor.
  • Create a public getFoo method for each field.
  • Add the field to the hashCode, equals and toString methods.

Update the Storage Plugin Class

Add your config to the storage plugin class:

  private final ExampleStoragePluginConfig config;

  public SumoStoragePlugin(ExampleStoragePluginConfig config,
      DrillbitContext context, String name) throws IOException {
    super(context, name);
    this.config = config;
  }

  @Override
  public StoragePluginConfig getConfig() { return config; }

The constructor has special meaning to Drill: Drill uses the type of the first argument (here ExampleStoragePluginConfig ) to match a storage plugin class with a storage plugin config class.

Bootstrap Config

Optionally create the default config, if any, you want for a newly-installed Drill. These defaults are also available to unit tests.

  • Create the file bootstrap-storage-plugins.json in your resources folder.
  • Put into the file the JSON-encoded version of your default configuration.

Or, if there is no good default, just omit this file. In this case, users will have to how to enter the JSON by hand in the Drill web console.

First Test

Create a test for your plugin:

public class TestExamplePlugin extends ClusterTest {

  @BeforeClass
  public static void setup() throws Exception {
    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher);
    startCluster(builder);

    StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
    ExampleStoragePluginConfig config =
        new ExampleStoragePluginConfig(/* Your fields here */);
    config.setEnabled(true);
    pluginRegistry.createOrUpdate(ExampleStoragePluginConfig.NAME, config, true);
  }

  @Test
  public void test() {
    fail("Not yet implemented");
  }
}

The setup() method does some "magic" to start an in-process Drill "cluster" (really, one Drillbit), create an instance of your plugin config, and register that config in Drill.

Notice that the test case does nothing yet.

To test, set a breakpoint in the constructor of your storage plugin. Run your test. If Eclipse hits the breakpoint, then you've got everything wired up correctly. If not, go back and review the above steps:

  • The config class is marked as Jackson serializable.
  • The storage plugin takes the config as its first option as shown above.
  • The drill-module.conf file exists and adds the correct package to the class path.
  • The test code is correct.

Define a Schema

The storage plugin operates at both plan and run times. The first step is to define one or more schemas that define the tables available to users. The details of the schema are unique to your plugin; here we'll cover the generalities. (Note that the term "schema" here is in the sense of a collection of tables, not in the sense of the set of columns within a table.)

Drill uses Apache Calcite as its planner. Calcite is a bit of a black art, but here are some basics.

Calcite defines a "default" schema for each storage plugin which is the one used if you simply use the plugin name itself followed by a table name:

SELECT * FROM examplePlugin.aTable

In this case, Calcite will look up the table aTable in our default schema.

Calcite also allows you to define nested schemas. For example, if we are connecting to MySQL, we must first specify a database, then a table within the database:

SELECT * FROM examplePlugin.aSchema.aTable

Here, Calcite will first resolve the schema name (aSchema) within the default schema, then will resolve the table name within the nested schema.

To keep this example simple, we assume our tables reside in the default schema.

How Calcite Finds your Schema

At this point, it is worth knowing how all the pieces fit together.

  • Drill Uses the plugin config name to find the storage plugin config
  • Drill Uses the plugin config class to find your plugin.
  • Calcite asks the plugin for a schema factory to provide the default schema for your plugin.
  • Calcite asks the default schema to resolve the nested schema name (if any).
  • Calcite asks the schema to resolve the table name.

Schema Factory

Calcite uses a schema factory class to create your schemas as needed. For our example, we'll create a default schema that contains a single table, "myTable". We pass along the plugin because a real implementation will likely obtain the list of schemas or tables from an external system.

One note of caution: Calcite recreates your schema classes over and over when planning each query. If schema and/or table creation is costly, consider implementing a cache.

Create the Schema Factory class:

public class ExampleSchemaFactory extends AbstractSchemaFactory {

  public static final String MY_TABLE = "myTable";

  private final ExampleStoragePlugin plugin;

  public ExampleSchemaFactory(ExampleStoragePlugin plugin) {
    super(plugin.getName());
    this.plugin = plugin;
  }

  @Override
  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent)
      throws IOException {
    parent.add(getName(), new DefaultSchema(getName()));
  }

  class DefaultSchema extends AbstractSchema {

    DefaultSchema(String name) {
      super(Collections.emptyList(), name);
    }

    @Override
    public Table getTable(String name) {
      if (MY_TABLE.contentEquals(name)) {
        return null; // TODO
      }
      return null; // Unknown table
    }

    @Override
    public Set<String> getTableNames() {
      return Sets.newHashSet(MY_TABLE);
    }

    @Override
    public String getTypeName() {
      return ExampleStoragePluginConfig.NAME;
    }
  }
}

Create the Schema Factory Instance

Create the schema factory in the storage plugin:

  private final ExampleSchemaFactory schemaFactory;

  public ExampleStoragePlugin(ExampleStoragePluginConfig config,
      DrillbitContext context, String name) throws IOException {
    super(context, name);
    this.config = config;
    schemaFactory = new ExampleSchemaFactory(this);
  }

  @Override
  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
    schemaFactory.registerSchemas(schemaConfig, parent);
  }

Test Schema Resolution

We can now test that the schema resolution works. Add the following test:

  @Test
  public void test() throws Exception {
    String sql = "SELECT * FROM sumo.logs.all";
    queryBuilder().sql(sql).run();
  }

Set breakpoints in the following methods:

  • ExampleStoragePlugin.registerSchemas
  • DefaultSchema.getTable

Run the test. You should see the debugger stop in each method (multiple times.) If so, then the name resolution works and we are ready to move on to creating tables.

If not, then double-check that the naming works correctly:

  • Register schemas in the storage plugin.
  • Give the default schema the same name as given to your plugin in its constructor (not necessarily the name defined in your config class.)
  • Give the default schema no parent path.
  • If you include a nested schema, in the default schema getSubSchema method, be sure to give it the sub-schema name and the path of its parent (the default schema.)
  • Provide the getTable method (which does nothing yet.)

Debugging though Calcite is a bit tricky; best to sort out any issues using the hints above.

Table Definitions

The next step in the name resolution process is to resolve the table name in a query:

SELECT * example.mySchema.myTable;

Drill allows you complete flexibility: you can match existing tables, you can have a fixed set of tables, or you can even create tables on the fly (as the "mock" storage plugin does.) You can also store as little or as much information as you want in your table definition.

For our example, we will support any table name (we assume the name will be resolved in an external system), and the only information we need is the schema and table names. As a result, we can use Drill's table class directly. However, if we needed more table information or logic, we could create our own table class.

In our nested schema we will resolve our one and only table name:

  class LogSchema extends AbstractSchema {
    @Override
    public Table getTable(String name) {
      if (MY_TABLE.contentEquals(name)) {
        return new DynamicDrillTable(plugin, plugin.getName(), new ExampleScanSpec(MY_SCHEMA_NAME, name));
      }
      return null;
    }

Scan Specification

We have now helped Calcite resolve a table name. Next we begin to define what we will do with the table, which is to scan (read) it. Some systems are fancier than others: some read a single, contiguous data source, some read "splits" of a data source (such as file blocks in HDFS), some arbitrarily break up a scan into chunks, others "push" the scan to an external system, and so forth. Calcite provides a great deal of flexibility to define these cases. For our needs, we assume we scan a single data source from start to end.

We start by defining a "scan specification": a description of the overall logical scan, before we've started worrying about the details. Our class just needs the schema and table name:

public class ExampleScanSpec {

  private final String schemaName;
  private final String tableName;

  @JsonCreator
  public ExampleScanSpec(
      @JsonProperty("schemaName") String schemaName,
      @JsonProperty("tableName") String tableName) {
    this.schemaName = schemaName;
    this.tableName = tableName;
  }

  public String getSchemaName() { return schemaName; }
  public String getTableName() { return tableName; }
}

For reasons that are not entirely clear, Drill (or Calcite) wants this class in Jackson-serializable form. (This is confusing because this part of the work never leaves the Foreman node; there is no reason to serialize this class. This is a mystery.)

Test the Scan Spec

We can now repeat our test, stepping through the code to verify that the scan spec and dynamic table are created correctly.

Group Scan Definition

Drill is a member of the Hadoop ecosystem, and so supports the idea that a table may be represented by a directory of files, where each file consists of blocks distributed across storage nodes. A "group scan" is the term for the logical scan of the pieces that make up a table, and represents one additional layer of refinement from the scan specification.

In our case, our table consists of a single chunk of data, so the "group" consists of a single physical scan. So, the group scan just passes along the schema and table names, by passing along our scan spec.

public class ExampleGroupScan extends AbstractGroupScan {

  private final ExampleScanSpec scanSpec;

  public ExampleGroupScan(ExampleGroupScan that) {
    super(that);
    this.scanSpec = that.scanSpec;
  }

  public ExampleGroupScan(ExampleStoragePlugin sumoStoragePlugin,
      ExampleScanSpec scanSpec) {
    super("example"); // No real user for this service
    this.scanSpec = scanSpec;
  }

We need both a copy constructor and a "normal" constructor. We accept the storage plugin, though this implementation does not really need it.

This class needs a number of obscure methods. First, we have to tell Drill how to assign the scan to nodes. By default, we do no assignments and let Drill decide:

  @Override
  public void applyAssignments(List<DrillbitEndpoint> endpoints)
      throws PhysicalOperatorSetupException { }

Next we tell Drill how much it can parallelize the scan. We assume we are calling an external service, so we allow only one thread of execution per query:

  @Override
  public int getMaxParallelizationWidth() { return 1; }

We will need to implement the next step in the planning process: creating specific scans, but we'll stub this part out for now:

  @Override
  public SubScan getSpecificScan(int minorFragmentId)
      throws ExecutionSetupException {
    // TODO Auto-generated method stub
    return null;
  }

Finally, we need some boilerplate required by Drill:

  @Override
  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
      throws ExecutionSetupException {
    Preconditions.checkArgument(children.isEmpty());
    return new ExampleGroupScan(this);
  }

  @Override
  public String getDigest() {
    return toString();
  }

  @Override
  public String toString() {
    return "Example scan of " + scanSpec.getSchemaName() + "." + scanSpec.getTableName();
  }

Test the Group Scan

We can now again run our test. First set a breakpoint in the getSpecificScan, and run the test. This will verify that things work up to this point.

Clone this wiki locally