Tutorial05 - Building an Import Adaptor for Custom Connections

Introduction

You can add support for custom connections, by creating a class that extends datameer.dap.sdk.datastore.DataStoreType. Your plug-in automatically scans for all classes that extend this class and custom connections are automatically registered to Datameer.

Example Connection

As a simple example, you write a connection that generates random data instead of reading an external data source. Do this to keep the example code as simple as possible. It should be straight forward to modify the example to read from an external source instead of generating random records.

Explaining Connections and Import Jobs

There are two levels to configure your custom connection. One is the connection, which is used to configure global connection properties that can be reused for multiple import jobs. Additional properties that configure which exact data is imported happen on the import job level.
For example, for databases, connection string, username, and password are configured on the connection level, while table names are configured on the import job level.

Extending Connection Types

Here is the implementation for our ExampleDataStoreModel. It is very simple, because you don't have any properties that have to be configured on this level:

ExampleDataStoreType.java
package datameer.das.plugin.tutorial05;

import org.apache.hadoop.conf.Configuration;

import datameer.dap.sdk.common.DasContext;
import datameer.dap.sdk.datastore.DataStoreModel;
import datameer.dap.sdk.datastore.DataStoreType;
import datameer.dap.sdk.entity.DataStore;
import datameer.dap.sdk.property.WizardPageDefinition;

public class ExampleDataStoreType extends DataStoreType {

    public final static String ID = ExampleDataStoreType.class.getName();

    @SuppressWarnings("serial")
    public static class ExampleDataStoreModel extends DataStoreModel {

        public ExampleDataStoreModel(DataStore dataStore) {
            super(dataStore);
        }

        @Override
        public boolean isLocal() {
            return false;
        }

        @Override
        public void setupConf(Configuration conf) {
        }

        @Override
        public void testConnect(DasContext dasContext) throws InterruptedException {
        }
    }

    public ExampleDataStoreType() {
        super(new ExampleDataImportJobType());
    }

    @Override
    public DataStoreModel createModel(DataStore dataStore) {
        return new ExampleDataStoreModel(dataStore);
    }

    @Override
    public String getId() {
        return ID;
    }

    @Override
    public String getName() {
        return "Example Data Creator";
    }

    @Override
    protected void populateWizardPage(WizardPageDefinition page) {
    }
}

Extending ImportJobType

Here is the implementation for our ExampleDataImportJobType. It is very simple, because the main custom logic is in the ImportJobModel that is created by this type.

ExampleDataImportJobType.java
package datameer.das.plugin.tutorial05;

import datameer.dap.sdk.entity.DataSourceConfiguration;
import datameer.dap.sdk.importjob.ImportJobType;

public class ExampleDataImportJobType extends ImportJobType<String> {

    @Override
    public ExampleDataImportJobModel createModel(DataSourceConfiguration configuration) {
        return new ExampleDataImportJobModel(configuration);
    }
}

All the logic to read and parse the custom data and create Datameer records goes to the ExampleDataImportJobModel. It looks like this:

ExampleDataImportJobModel.java
package datameer.das.plugin.tutorial05;

import java.util.List;

import datameer.dap.sdk.entity.DataSourceConfiguration;
import datameer.dap.sdk.importjob.ImportFormat;
import datameer.dap.sdk.importjob.ImportJobModel;
import datameer.dap.sdk.property.NumberValidator;
import datameer.dap.sdk.property.PropertyDefinition;
import datameer.dap.sdk.property.PropertyGroupDefinition;
import datameer.dap.sdk.property.PropertyType;
import datameer.dap.sdk.property.WizardPageDefinition;
import datameer.dap.sdk.util.ManifestMetaData;

/**
 * Creates n files with m random records each. Number of files (n) and number of records (m) can be
 * configured using the import job wizard.
 */
public class ExampleDataImportJobModel extends ImportJobModel<String> {

    private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID;

    private static final String NUMBER_OF_FILES = "files-number";
    private static final String RECORDS_PER_FILE = "file-records";

    private int _numberOfFiles;
    private int _recordsPerFile;

    public ExampleDataImportJobModel(DataSourceConfiguration conf) {
        super(conf);
        _numberOfFiles = conf.getIntProperty(NUMBER_OF_FILES, 1);
        _recordsPerFile = conf.getIntProperty(RECORDS_PER_FILE, 1);
    }

    public int getNumberOfFiles() {
        return _numberOfFiles;
    }

    public int getRecordsPerFile() {
        return _recordsPerFile;
    }

    @Override
    public ImportFormat<String> createImportFormat() {
        return new ExampleDataImportFormat(this);
    }

    @Override
    public void addPropertyValuesThatTriggerAFilterReset(List<Object> propertyValues) {
    }

    @Override
    public void addPropertyValuesThatTriggerAFieldReset(List<Object> propertyValues) {
    }

    @Override
    public WizardPageDefinition createDetailsWizardPage() {
        WizardPageDefinition page = new WizardPageDefinition("Details");
        PropertyGroupDefinition group = page.addGroup("Example Data");
        PropertyDefinition propertyDefinition = new PropertyDefinition(NUMBER_OF_FILES, "Number of files", PropertyType.STRING);
        propertyDefinition.setRequired(true);
        propertyDefinition.setValidators(new NumberValidator(1));
        group.addPropertyDefinition(propertyDefinition);

        propertyDefinition = new PropertyDefinition(RECORDS_PER_FILE, "Records per file", PropertyType.STRING);
        propertyDefinition.setRequired(true);
        propertyDefinition.setValidators(new NumberValidator(1));
        group.addPropertyDefinition(propertyDefinition);

        return page;
    }

    @Override
    public WizardPageDefinition createImportJobFilterPage() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean isSupportImportJobFilterPage() {
        return false;
    }

    @Override
    public boolean canAutoMergeNewFields() {
        return true;
    }

    @Override
    public void resetFilters() {
    }
}
ExampleDataImportFormat.java
package datameer.das.plugin.tutorial05;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;

import datameer.dap.sdk.common.Field;
import datameer.dap.sdk.function.FieldType;
import datameer.dap.sdk.importjob.AbstractImportFormat;
import datameer.dap.sdk.importjob.DasDelegateSplit;
import datameer.dap.sdk.importjob.MapBasedRecordParser;
import datameer.dap.sdk.importjob.MapBasedRecordSchemaDetector;
import datameer.dap.sdk.importjob.MapParser;
import datameer.dap.sdk.importjob.NoDataRecordSchemaDetector;
import datameer.dap.sdk.importjob.RecordParser;
import datameer.dap.sdk.importjob.RecordSchemaDetector;
import datameer.dap.sdk.importjob.RecordSourceReader;
import datameer.dap.sdk.importjob.Splitter;
import datameer.dap.sdk.importjob.TextFieldAnalyzer;
import datameer.dap.sdk.util.ManifestMetaData;

/**
 * Describes the file format and generates the random records. This would be the class that is used
 * to connect to external data and convert it to Datameer records.
 */
@SuppressWarnings("deprecation")
public class ExampleDataImportFormat extends AbstractImportFormat<String> {

    private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID;

    /**
     * Split that just keeps the number of records that should be written.
     */
    public static class ExampleDataSplit implements InputSplit {

        private int _records;

        @SuppressWarnings("unused")
        private ExampleDataSplit() {
        }

        public ExampleDataSplit(int records) {
            _records = records;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(_records);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            _records = in.readInt();
        }

        @Override
        public String[] getLocations() throws IOException {
            return new String[0];
        }

        @Override
        public long getLength() throws IOException {
            return _records;
        }
    }

    // Unique keys to identify the column origins
    private static final String ORIGIN_NAME = "name";
    private static final String ORIGIN_AMOUNT = "amount";
    private static final String ORIGIN_DOUBLE_VALUE = "doubleValue";

    public ExampleDataImportFormat(ExampleDataImportJobModel model) {
        super(model, false);
    }

    @Override
    public ExampleDataImportJobModel getImportJobModel() {
        return (ExampleDataImportJobModel) super.getImportJobModel();
    }

    /**
     * Describes the schema of the data.
     */
    @Override
    public RecordSchemaDetector<String> createRecordSchemaDetector() throws IOException {
        return new NoDataRecordSchemaDetector<String>() {
            @Override
            public Field[] detectFields() {
                return new Field[] { new Field("name", ORIGIN_NAME, FieldType.STRING, true), new Field("amount", ORIGIN_AMOUNT, FieldType.INTEGER, true),
                        new Field("double", ORIGIN_DOUBLE_VALUE, FieldType.FLOAT, true) };
            }
        };
    }

    /**
     * Converts records read from an external data source into Datameer records. For simplicity we are
     * just producing random records.
     */
    @Override
    public RecordParser<String> createRecordParser(Field[] fields) throws IOException {
        return new MapBasedRecordParser<String>(fields, new MapParser<String>() {

            private Random _random = new Random();

            @Override
            public void configureSchemaDetection(MapBasedRecordSchemaDetector<String> schemaDetector, TextFieldAnalyzer fieldAnalyzer) {
            }

            @Override
            public Map<String, Object> parseRecordSource(String recordSource) throws Exception {
                Map<String, Object> map = new HashMap<String, Object>();
                map.put(ORIGIN_NAME, ORIGIN_NAME + _random.nextInt(1000));
                map.put(ORIGIN_AMOUNT, _random.nextInt(10000));
                map.put(ORIGIN_DOUBLE_VALUE, _random.nextDouble());
                return map;
            }
        });
    }

    /**
     * Normally reads from an external data source and returns any Java object that will later be
     * converted to Datameer records using the {@link RecordParser} created by
     * {@link #createRecordParser(Field[])}. In our case we just create fake String records.
     */
    @Override
    public RecordSourceReader<String> createRecordSourceReader(final InputSplit inputSplit) throws IOException {
        return new RecordSourceReader<String>() {

            private int _counter = 0;

            @Override
            public String readNext() throws IOException {
                return _counter++ < inputSplit.getLength() ? "record" : null;
            }

            @Override
            public long getPos() throws IOException {
                return _counter;
            }

            @Override
            public void close() throws IOException {
            }
        };
    }

    /**
     * Splitter that creates as many splits as configured files.
     */
    @Override
    public Splitter<DasDelegateSplit> getSplitter() throws IOException {
        return new Splitter<DasDelegateSplit>() {

            @Override
            public DasDelegateSplit[] createSplits(SplitHint splitHint) throws IOException {
                int numberOfFiles = getImportJobModel().getNumberOfFiles();
                int recordsPerFile = getImportJobModel().getRecordsPerFile();
                DasDelegateSplit[] splits = new DasDelegateSplit[numberOfFiles];
                for (int i = 0; i < numberOfFiles; i++) {
                    splits[i] = toDasDelegateSplit(new ExampleDataSplit(recordsPerFile));
                }
                return splits;
            }

            @Override
            public DasDelegateSplit[] createPreviewSplits(Configuration conf, int desiredNumberOfRecords) throws IOException {
                return new DasDelegateSplit[] { toDasDelegateSplit(new ExampleDataSplit(10)) };
            }

            private DasDelegateSplit toDasDelegateSplit(final ExampleDataSplit split) {
                return new DasDelegateSplit(split, "plugin-tutorial");
            }
        };
    }

    @Override
    protected void onConfigure(JobConf conf) {
    }
}

For more information, refer to the Java documentation. 

Source Code

This tutorial can by found in the Datameer plug-in SDK under plugin-tutorials/tutorial05.