Tutorial01 - Building an Input Adaptor for Custom File Types

Introduction

You can add support for custom file types by creating a class that extends datameer.dap.sdk.importjob.ImportFileType. The plug-in shown in this tutorial automatically scans for all classes and your custom file types are automatically registered to Datameer.

File Format

Let's assume you have a file that looks like this:

Transaction:1
Customer:SomeCustomer
Product-Id:product1

Transaction:2
Customer:OtherCustomer
Product-Id:product2

Records span multiple lines in this example.

Extending ImportFileType

You can have a look at some of the subclasses of ImportFileType that come with the Datameer SDK. In this case, it makes sense to implement TextBasedFileType, because you are writing an import format that operates on text input.

Here is the implementation:

CustomTransactionFileType.java
package datameer.das.plugin.tutorial01;

import org.apache.hadoop.conf.Configuration;

import datameer.dap.sdk.common.Field;
import datameer.dap.sdk.importjob.RecordParser;
import datameer.dap.sdk.importjob.RecordSchemaDetector;
import datameer.dap.sdk.importjob.TextBasedFileType;

public class CustomTransactionFileType extends TextBasedFileType {

    private static final String ID = CustomTransactionFileType.class.getName();

    @Override
    public RecordSchemaDetector<String> createRecordSchemaDetector(FileTypeModel<String> model) {
        return new CustomTransactionRecordSchemaDetector();
    }

    @Override
    public RecordParser<String> createRecordParser(Field[] fields, Configuration conf, FileTypeModel<String> model) {
        return new CustomTransactionRecordParser(fields);
    }

    @Override
    public String getName() {
        return "Custom Transaction File Type (Tutorial01)";
    }

    @Override
    public String getId() {
        return ID;
    }

    @Override
    public boolean canAutoMergeNewFields() {
        return true;
    }
}

The main things you have to provide in this implementation are:

  • A record parser that does the actual parsing of the file type
  • A name for this file type, which is used for displaying it in the Datameer web UI
  • A unique ID, which is used to identify this file type

Providing a Record Schema Detector

A schema detector can detect the fields that can be extracted from the file. This is fairly simple in your case, since you have a fixed schema. In other cases (such as .csv) one would have to look at the file to see how many columns it contains and of what type the columns might be. Have a look at available subclasses and interfaces of datameer.dap.sdk.importjob.RecordSchemaDetector<RS> that might help you when writing your own schema detector.

CustomTransactionRecordSchemaDetector.java
package datameer.das.plugin.tutorial01;

import java.util.ArrayList;
import java.util.List;

import datameer.dap.sdk.common.Field;
import datameer.dap.sdk.importjob.NoDataRecordSchemaDetector;
import datameer.dap.sdk.schema.ValueType;

public class CustomTransactionRecordSchemaDetector extends NoDataRecordSchemaDetector<String> {

    static final String TRANSACTION_PREFIX = "Transaction";
    static final String CUSTOMER = "Customer";
    static final String PRODUCT_ID = "ProductId";

    /**
     * Makes a good guess for the field definition. This includes:
     * <ul>
     * <li>The number of fields</li>
     * <li>Suggestion for header names</li>
     * <li>Type for each field.</li>
     * </ul>
     * 
     * @return a good first guess of how fields for this data source might look like.
     */
    @Override
    public Field[] detectFields() {
        List<Field> fields = new ArrayList<Field>();
        fields.add(new Field(TRANSACTION_PREFIX, "0", ValueType.INTEGER));
        fields.add(new Field(CUSTOMER, "1", ValueType.STRING));
        fields.add(new Field(PRODUCT_ID, "2", ValueType.STRING));
        return fields.toArray(new Field[fields.size()]);
    }
}

The true logic in mapping your custom records to fields lies in the implementation of the record parser. Again, it makes sense to look at the existing subclasses that come with the SDK. In this example, you extend MultiLineFixedStructureTextRecordParser, because you have records of a fixed structure that span multiple lines.

Providing a Record Parser

CustomTransactionRecordParser.java
package datameer.das.plugin.tutorial01;

import datameer.dap.sdk.common.Field;
import datameer.dap.sdk.common.RawRecordCollector;
import datameer.dap.sdk.importjob.FixedStructureTextParser;
import datameer.dap.sdk.importjob.MultiLineFixedStructureTextRecordParser;
import datameer.dap.sdk.importjob.TextFieldAnalyzer;
import datameer.dap.sdk.util.ManifestMetaData;

public class CustomTransactionRecordParser extends MultiLineFixedStructureTextRecordParser {

    public static class CustomTransactionTextParser implements FixedStructureTextParser {

        private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID;

        @Override
        public String[] parseStringFields(String line) throws Exception {
            String[] linesOfRecord = line.split("\n[\r]?");
            for (int i = 0; i < linesOfRecord.length; i++) {
                linesOfRecord[i] = linesOfRecord[i].split(":", 2)[1];
            }
            return linesOfRecord;
        }

        @Override
        public void configureSchemaDetection(TextFieldAnalyzer fieldAnalyzer) {
            // nothing todo
        }

        @Override
        public void configureRecordCollector(RawRecordCollector recordCollector) {
            // nothing todo
        }
    }

    public CustomTransactionRecordParser(Field[] allFields) {
        super(allFields, new CustomTransactionTextParser());
    }

    /**
     * This method helps to ensure the validness of splits over record sources. If the containing
     * record source element is a file and the record source is a line of that file, then this
     * method should return false only if the given line can't be parsed without previous lines.
     * This will not be called for the first split of a file.
     * 
     * Only implement if {@link #recordCanSpanMultipleRecordSources()} is true!
     * 
     * @return true if the give record source is valid as a first read record source
     */
    @Override
    public boolean isValidFirstReadRecordSource(String line) {
        return line.startsWith(CustomTransactionRecordSchemaDetector.TRANSACTION_PREFIX);
    }

    @Override
    protected boolean isLastRow(String line) {
        return line.startsWith(CustomTransactionRecordSchemaDetector.PRODUCT_ID);
    }

    @Override
    protected boolean isRecordSeparatorLine(String line) {
        return line.trim().isEmpty();
    }
}

Other File Types

You can also parse binary files or use other parsers to convert a file into Datameer records. To do this, you have to do the following:

  • Extend ImportFileType<Path> instead of ImportFileType<Record>; the createImportFormat() function should look like this:

        @Override
        public FileBasedInputFormat<Path> createImportFormat(FileTypeModel<Path> model) {
            return new CompleteFileImportFormat(model.getFileImportJobModel());
        }
    
  • Extend AbstractMapBasedRecordParser<Path> to write the parser.

Single Output Record Optimization

If you know your RecordParser implementation only outputs 1 RawRecord per logical input record, then your RecordParser implementation should be annotated with the @SingleRecordPerSource annotation.

Annotation Example
package datameer.das.plugin.tutorial01;

import datameer.dap.sdk.common.Field;
import datameer.dap.sdk.common.RawRecordCollector;
import datameer.dap.sdk.importjob.FixedStructureTextParser;
import datameer.dap.sdk.importjob.MultiLineFixedStructureTextRecordParser;
import datameer.dap.sdk.importjob.SingleRecordPerSource;
import datameer.dap.sdk.importjob.TextFieldAnalyzer;
import datameer.dap.sdk.util.ManifestMetaData;

 
@SingleRecordPerSource
public class CustomTransactionRecordParser extends MultiLineFixedStructureTextRecordParser {
  // ...
}

The annotation can be added to any RecordParser implementation. It tells the Datameer SDK that the RecordParser only outputs 1 RawRecord per "logical" input record. This lets Datameer optimize the import logic. If the RecordParser does output more than one record, via the RawRecordCollector then the last one wins (the previous records collected in via the calls to RecordParser#parse(ParseConfiguration, RawRecordCollector, Object) are ignored). This annotation works with MultipleSourceRecordParser implementations as well (as shown above), since in that case multiple incoming record sources are being converted to a single "logical" input record.

Source Code

This tutorial can by found in the Datameer plug-in SDK under plugin-tutorials/tutorial01.

Sequence File With Custom Key Value Types

In this example, you want to add a custom adapter to import a custom sequence file made up of custom key and value Writable types. The right extension point for this in the SDK would be FileBasedImportFormat. Here is an example for a import format that would import sequence files created by Datameer:

KeyValuePair.java
package datameer.dap.common.importjob.filetype;

import datameer.dap.sdk.importjob.RecordSourceReader;

/**
 * Hold key value pairs from a sequence file which can then be translated to fields by a
 * {@link RecordSourceReader} implementation.
 */
public class KeyValuePair {

    private Object _key;
    private Object _value;

    public KeyValuePair(Object key, Object value) {
        _key = key;
        _value = value;
    }

    public Object getKey() {
        return _key;
    }

    public Object getValue() {
        return _value;
    }
}
DasSequenceFileImportFormat.java
package datameer.dap.common.importjob.filetype;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;

import datameer.dap.common.job.mr.input.DapFileInputFormat;
import datameer.dap.sdk.common.Record;
import datameer.dap.sdk.importjob.FileBasedImportFormat;
import datameer.dap.sdk.importjob.FileImportJobModel;
import datameer.dap.sdk.importjob.FileSplitter.SplitInstruction;
import datameer.dap.sdk.importjob.FileSplitter.SplitInstruction.CanSplit;
import datameer.dap.sdk.importjob.RecordReaderBackedSourceReader;
import datameer.dap.sdk.importjob.RecordSourceReader;
import datameer.dap.sdk.util.ManifestMetaData;

@SuppressWarnings("deprecation")
public class DasSequenceFileImportFormat extends FileBasedImportFormat<KeyValuePair> {

    private static final long serialVersionUID = ManifestMetaData.SERIAL_VERSION_UID;

    public DasSequenceFileImportFormat(FileImportJobModel<KeyValuePair> importJobModel, boolean reparsableRecordSource) {
        super(importJobModel, reparsableRecordSource);
    }

    @Override
    public RecordSourceReader<KeyValuePair> createRecordSourceReader(InputSplit inputSplit) throws IOException {
        RecordReader<Object, Record> reader = DapFileInputFormat.createRecordReader(getConf(), (FileSplit) inputSplit);
        return new RecordReaderBackedSourceReader<Object, Record, KeyValuePair>(reader) {

            @Override
            protected KeyValuePair convertToRecordSource(Object key, Record value) {
                return new KeyValuePair(key, value);
            }
        };
    }

    @Override
    public SplitInstruction getSplitInstruction(Path[] inputPaths, JobConf conf) {
        return new SplitInstruction(CanSplit.ALL);
    }

    @Override
    protected void onConfigure(JobConf conf) {
    }
}

DasSequenceFileImportFormat.createRecordSourceReader() has to use a custom implementation to read records from the sequence files and convert them into a KeyValuePair object.

DasSequenceImportFileModel.java
package datameer.dap.common.importjob.filetype;

import datameer.dap.sdk.importjob.FileBasedImportFormat;
import datameer.dap.sdk.importjob.FileImportJobModel;
import datameer.dap.sdk.importjob.ImportFileType.FileTypeModel;

public class DasSequenceImportFileModel extends FileTypeModel<KeyValuePair> {

    public DasSequenceImportFileModel(FileImportJobModel<KeyValuePair> fileImportJobModel) {
        super(fileImportJobModel);
    }

    @Override
    public FileBasedImportFormat<KeyValuePair> createImportFormat() {
        return new DasSequenceFileImportFormat(getFileImportJobModel(), false);
    }
}
DasSequenceImportFileType.java
package datameer.dap.common.importjob.filetype;

import org.apache.hadoop.conf.Configuration;

import datameer.dap.sdk.common.Field;
import datameer.dap.sdk.importjob.FileImportJobModel;
import datameer.dap.sdk.importjob.ImportFileType;
import datameer.dap.sdk.importjob.MapBasedRecordParser;
import datameer.dap.sdk.importjob.MapBasedRecordSchemaDetector;
import datameer.dap.sdk.importjob.RecordParser;
import datameer.dap.sdk.importjob.RecordSchemaDetector;
import datameer.dap.sdk.plugin.annotation.IgnoreExtension;

@IgnoreExtension
public class DasSequenceImportFileType extends ImportFileType<KeyValuePair> {

    @Override
    public String getId() {
        return "DasSequenceImportFileType";
    }

    @Override
    public String getName() {
        return "Datameer sequence file type";
    }

    @Override
    public RecordSchemaDetector<KeyValuePair> createRecordSchemaDetector(FileTypeModel<KeyValuePair> model) {
        return new MapBasedRecordSchemaDetector<KeyValuePair>(new DasSequenceFileParser(), 1000);
    }

    @Override
    public RecordParser<KeyValuePair> createRecordParser(Field[] fields, Configuration conf, FileTypeModel<KeyValuePair> model) {
        return new MapBasedRecordParser<KeyValuePair>(fields, new DasSequenceFileParser());
    }

    @Override
    public DasSequenceImportFileModel createModel(FileImportJobModel<KeyValuePair> model) {
        return new DasSequenceImportFileModel(model);
    }

    @Override
    public boolean canAutoMergeNewFields() {
        return true;
    }
}

DasSequenceFileParser.parseRecordSource() has to be changed to extract columns out of the key and value that was read from the sequence file.

DasSequenceFileParser.java
package datameer.dap.common.importjob.filetype;

import java.util.Map;

import com.google.common.collect.Maps;

import datameer.dap.sdk.common.Record;
import datameer.dap.sdk.importjob.MapBasedRecordSchemaDetector;
import datameer.dap.sdk.importjob.MapParser;
import datameer.dap.sdk.importjob.TextFieldAnalyzer;

public class DasSequenceFileParser implements MapParser<KeyValuePair> {

    @Override
    public void configureSchemaDetection(MapBasedRecordSchemaDetector<KeyValuePair> schemaDetector, TextFieldAnalyzer fieldAnalyzer) {
    }

    @Override
    public Map<String, Object> parseRecordSource(KeyValuePair recordSource) throws Exception {
        Map<String, Object> map = Maps.newLinkedHashMap();
        Record record = (Record) recordSource.getValue();
        for (int i = 0; i < record.getFieldCount(); i++) {
            map.put("column" + i, record.getValue(i));
        }
        return map;
    }
}