Using Pig in Datameer

Pig is a high-level platform for creating MapReduce programs used with Hadoop. The language for this platform is called Pig Latin. Pig Latin abstracts the programming from the Java MapReduce idiom into a notation which makes MapReduce programming high level, similar to that of SQL for RDBMS systems. Pig Latin can be extended using UDF (User Defined Functions) which the user can write in Java, Python or JavaScript and then call directly from the language.


Using Pig with Datameer

Say you are using Pig and have written some user-defined functions that work well for you. Now, you want to take advantage of your Pig functions in Datameer and are wondering how to migrate these functions.  As a reminder, documentation about Pig functions can be found here, and Datameer functions are documented here.

Let’s dive in to see the differences and similarities between the two frameworks.

Function Types

Pig function types

Function typeImplemented interfaceDescription
Eval functionorg.apache.Pig.EvalFuncTo be applied on a per-value basis
Aggregation functionorg.apache.Pig.AlgebraicTo be applied on a per-group basis
Filter functiondorg.apache.Pig.FilterFuncEval type of function to filter data

Datameer function types

Function typeImplemented interfaceDescription
Simple functiondatameer.dap.sdk.function.SimpleFunctionTypeAny simple function that takes a number of arguments and returns a result value. E.g. SUM, MOD, UPPER, etc.
Aggregation functiondatameer.dap.sdk.function.AggregationFunctionTypeAggregates values of a group that has been created by a “group by” function. E.g. GROUPSUM, GROUPAVERAGE, etc.
Group by functiondatameer.dap.sdk.function.GroupByFunctionTypeSimilar to a simple function this takes a number of arguments and returns a result value. The difference is that the returned value is used as a group that will only appear once in the result sheet. Group by functions can only used together with aggregation functions. Examples: GROUPBY, GROUPBYBIN


The function type similarities are obvious between the different interfaces. A Pig Eval function is similar to a Datameer simple function; aggregation works the same, and a Pig Filter function can be written as a Datameer SimpleFunctionType.

Data Types

When migrating a Pig Eval function, you might be able to reuse the code from an exec() method to a Datameer compute() method, with a few changes. Pig uses its own set of data types, such as Tuples (a data record consisting of a sequence of “fields”), Data bags (set of tuples), etc. whereas Datameer uses plain Java Objects as arguments; so, the method signature differs slightly. Datameer’s column concept lets you work directly on each of the column values without grouping the data.

Also of note, Datameer data types are simpler than Pig data types and are only comprised of: INTEGER, FLOAT, DATE, STRING, and BOOLEAN which are represented by the Java types Long, Double, Date, String and Boolean.

Schema

Pig functions communicate their output schema to Pig via an outputSchema() method. Similarly, in Datameer, this is done using the computeReturnType() method.

Example

Below is an example straight from a Pig function in Piggybank:

public class UnixToISO extends EvalFunc<String> {
 
    @Override
    public String exec(Tuple input) throws IOException {
        if (input == null || input.size() < 1) {
            return null;
        }
 
        // Set the time to default or the output is in UTC
        DateTimeZone.setDefault(DateTimeZone.UTC);
        DateTime result = new DateTime(DataType.toLong(input.get(0)));
        return result.toString();
    }
 
    @Override
    public Schema outputSchema(Schema input) {
      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), DataType.CHARARRAY));
    }
 
    @Override
    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
        funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.LONG))));
        return funcList;
    }
}

In Datameer this converts to:

public class UnixTimeFormatToIsoFunction extends BaseSimpleFunctionType {
 
    public UnixTimeFormatToIsoFunction() {
       super(FunctionGroup.DATE_AND_TIME, "UNIXFORMATDATE", "Converts date from Unix format to ISO format.",
     new ArgumentType[] { new IntegerArgumentType() });
    }
 
    @Override
    public String compute(Object... arguments) {
        Long argument = (Long) arguments[0];
        if (argument == null) {
            return null;
        }
 
        try {
            // Set the time to default or the output is in UTC
            DateTimeZone.setDefault(DateTimeZone.UTC);
            DateTime result = new DateTime(argument);
            return result.toString();
        } catch (Exception e) {
            throw ExceptionUtil.convertToRuntimeException(e);
        }
    }
 
    @Override
    public FieldType computeReturnType(FieldType... argumentTypes) {
        return FieldType.STRING;
    }
}

This conversion is pretty straightforward, as the exec(Tuple) method translates into Datameer’s compute(Object) method quite easily, and, similarly, the outputSchema(Schema) method migrates to the Datameer class extension using its computeReturnType() method. The additional getArgToFuncMapping() Pig method maps from the input schema to the class that should be used to handle it.

Similarly, look at the aggregation function:

COUNT is an example of such a function because we can count the number of elements in a subset of data and then sum the counts to produce a final output. In the Hadoop world, this means that the map and the combiner can do partial computations, and the reducer can compute the final result.

public class COUNT extends EvalFunc<Long> implements Algebraic {
 
    public Long exec(Tuple input) throws IOException {
        return count(input);
    }
 
    public String getInitial() {
        return Initial.class.getName();
    }
 
    public String getIntermed() {
        return Intermed.class.getName();
    }
 
    public String getFinal() {
        return Final.class.getName();
    }
 
    static public class Initial extends EvalFunc<Tuple> {
        public Tuple exec(Tuple input) throws IOException {
            return TupleFactory.getInstance().newTuple(count(input));
        }
    }
 
    static public class Intermed extends EvalFunc<Tuple> {
        public Tuple exec(Tuple input) throws IOException {
            return TupleFactory.getInstance().newTuple(sum(input));
        }
    }
 
    static public class Final extends EvalFunc<Long> {
        public Tuple exec(Tuple input) throws IOException {
            return sum(input);
        }
    }
 
    static protected Long count(Tuple input) throws ExecException {
        Object values = input.get(0);
        if (values instanceof DataBag) {
            return ((DataBag)values).size();
        }else if (values instanceof Map) {
            return new Long(((Map)values).size());
        }
    }
 
    static protected Long sum(Tuple input) throws ExecException, NumberFormatException {
        DataBag values = (DataBag)input.get(0);
        long sum = 0;
        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
            Tuple t = it.next();
            sum += (Long)t.get(0);
        }
 
        return sum;
    }
}

In Datameer, this is even a bit simpler:

public class GroupCountFunction extends BaseFunctionType implements AggregationFunctionType {
 
    public GroupCountFunction() {
        super(FunctionGroup.AGGREGATION, "GROUPCOUNT", "Counts the records of a group.");
    }
 
    private static class GroupCountAggregator implements Aggregator {
        private long _count = 0;
 
        @Override
        public void aggregate(Object argument) {
        if (argument == null) {
            ++_count;
        } else {
            _count += (Long) argument;
        }
    }
 
    @Override
    public Long computeAggregationResult() {
        return _count;
    }
 
    @Override
    public FieldType computeReturnType(FieldType... argumentTypes) {
        return FieldType.INTEGER;
    }
 
    public Aggregator createAggregator(FieldType... argumentTypes) {
        return new GroupCountAggregator();
    }
 
    @Override
    public boolean isAssociative() {
        return true;
    }
 
}

Comparing these two classes, Datameer and Pig take a slightly different approach. Pig is closer to the actual Hadoop Mapper/Reducer/Combiner implementation with its algebraic interface compared to Datameer’s aggregator interface which implements and optimizes the Map Reduce job internally. The Pig Initial.exec() function is called once and is passed the original input tuple. The Pig Final.exec() function produces the final result and is the equivalent of the Datameer computeAggregationResult().

In Datameer, optimizing computation using combiners in an aggregation function is done by implementing the isAssociative() method as shown above. The aggregate() method must handle results from previous count results, be it passing a null argument into the aggregate method as in this case (for the initial round of computation) or passing the result of the previous count computation (intermediate results) as an argument. Handling the null argument is a special case of the count function that needs to be taken care of since the associativity property always assumes results from this function can be passed into the function again.