Writing a Generic UDF in Hive


.

There are few type of UDFs that we can write in Hive.

  • Functions that act on each column value passed to it, e.g. Select Length(name) From Customer
    • Specific functions written for a specific data type (simple UDFs)
    • Generic functions written to working with more than one data type
  • Functions that act on a group of values and emit a single result, e.g. Select AVG(salary) From Employees
  • Functions that act on a combined values and explode it generate multiple rows

In an earlier article, we discussed simple UDFs. Here we will focus on the generic functions that act on each column value.

We will write here a generic UDF that will double the value of numeric columns or double the length of string columns by concatenating string value by itself –

GenericDouble

package com.hive.in.action.assignments;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde2.lazy.LazyDouble;
import org.apache.hadoop.hive.serde2.lazy.LazyInteger;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

@Description(
    name     = "GenericDouble",
    value    = "_FUNC_( value) : Double the value of numeric argument, " +
               "Concatinate value to itself for string arguments.",
    extended = "Example:\n" +
               "    SELECT _FUNC_(salary) FROM customers;\n" +
               "    (returns 12,000 if the salary was 6,000)\n\n" +
               "    SELECT _FUNC_(name) FROM customers;\n" +
               "    (returns \"Tim MayTim May\" if the name was \"Tim May\")\n"
)
/**
 * This class is a Generic User Defined Function meaning that we can call this
 * function with more than one type of arguments, i.e. int, long, float, double
 * and String. The function returns the same type of output as it gets the input.
 *
 * @author vpathak
 */
public final class GenericDouble extends GenericUDF {

    private ObjectInspector[] _inputObjectInspector = null;
    private GenericUDFUtils.ReturnObjectInspectorResolver
                           _returnObjectInspectorResolver = null;


    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {

        Object returnable = null;
        Object preparedOutput = null;
        String argumentType = arguments[0].get().getClass().getSimpleName();

        // System.out.println("Arguments[0]: Type: " + argumentType);

        // Check the type of argument ...
        if (argumentType.equalsIgnoreCase("LazyInteger"))    // select UDF(emp_no) from EMP
        {
            // The input parameter is an IntWritable ...
            LazyInteger lazyOut = new LazyInteger((LazyInteger) arguments[0].get());

            IntWritable underlyingInt = lazyOut.getWritableObject();
            underlyingInt.set( underlyingInt.get() * 2 );

            preparedOutput = lazyOut;
        }
        else if (argumentType.equalsIgnoreCase("IntWritable"))  // select UDF(9) from DUAL
        {
            // The input parameter is an IntWritable ...
            IntWritable inputParameter = (IntWritable) arguments[0].get();

            // Hive runs MR jobs in the background and Mappers/Reducers keep using
            // the same object as parameter, only the value is set() for iteration,
            // the Writable object remains the same. Therefore, we should be returning
            // a new object, instead of making changes in input Object's value.
            preparedOutput = new IntWritable( inputParameter.get() * 2 );

        }
        else if (argumentType.equalsIgnoreCase("LazyDouble"))    // select UDF(bonus) from EMP
        {
            // The input parameter is an IntWritable ...
            LazyDouble lazyOut = new LazyDouble((LazyDouble) arguments[0].get());

            DoubleWritable underlyingDouble = lazyOut.getWritableObject();
            underlyingDouble.set( underlyingDouble.get() * 2 );

            preparedOutput = lazyOut;
        }
        else if (argumentType.equalsIgnoreCase("DoubleWritable")) // select UDF(2.23) from dual;
        {
            // The input parameter is an DoubleWritable ...
            final DoubleWritable inputParameter = (DoubleWritable) arguments[0].get();

            // We should be returning a new object, instead of mutating the input.
            preparedOutput = new DoubleWritable( inputParameter.get() * 2 );
        }
        else if (argumentType.equalsIgnoreCase("LazyString"))    // select UDF(Job) from EMP
        {
            // The input parameter is a Wrapped Text ...
            LazyString lazyOut = new LazyString((LazyString) arguments[0].get());

            Text underlyingText = lazyOut.getWritableObject();
            underlyingText.set( underlyingText.toString() + underlyingText.toString() );

            preparedOutput = lazyOut;
        }
        else if (argumentType.equalsIgnoreCase("Text"))  // select UDF('Clerk') from dual
        {
            // The input parameter is an Text ...
            final Text inputParameter = (Text) arguments[0].get();

            // We should be returning a new object, instead of mutating the input.
            preparedOutput = new Text( inputParameter.toString() + inputParameter.toString() );
        }


        // Check input type (inputObjectInspector) and set the appropriate
        // output data type (outputValue) ...
        returnable = _returnObjectInspectorResolver.convertIfNecessary(preparedOutput, _inputObjectInspector[0]);

        return returnable;
    }


    @Override
    /**
     * This method is called within a Hive session (e.g.  hive> _ ) when the UDF is
     * called and an error occurs. The method gets a chance to put as much information
     * possible to show to the user. A standard string "HiveException: Error evaluating"
     * is prepended in front of whatever is returned by this function-
     *
     * e.g.  "HiveException: Error evaluating value in column emp_no (type: int)"
     */
    public String getDisplayString(String[] errorInfo)
    {
        return "value in column " + errorInfo[0] + " (type: " + _inputObjectInspector[0].getTypeName() + ").";
    }


    @Override
    /**
     * Called for each value of each column.
     */
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

        // Save the input Object Inspectors ...
        _inputObjectInspector = arguments;

        // Validate: Argument Count ...
        if (arguments.length <= 0 || arguments[0] == null) {
            throw new UDFArgumentException("No argument was detected.");
        }

        // Create the instance of the most important object within this class ...
        _returnObjectInspectorResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);

        // Validate: Argument type checking ...
        if (_returnObjectInspectorResolver.update(arguments[0]) == false) {
            throw new UDFArgumentTypeException(2, "Datatype problem with the passed argument.");
        }

        return _returnObjectInspectorResolver.get();
    }
}

You can pass multiple arguments to the UDF. Whatever arguments you pass to the UDF, they are not presented in the evaluate() method as is. Rather, you will get an array of ObjectInspector objects, one ObjectInspector per argument. So arguments[0] represents an Inspector for the first argument you passed to the UDF, arguments[1] represents the Inspector for the 2nd argument and so on. ObjectInspector are helpful in look into the internal structure of an object.

getDisplayString()

The getDisplayString method is really helpful to the developer, since it can return meaningful troubleshooting information. Instead of returning general error message, Hive calls this method whenever there is an error executing the UDF. The UDF developer can really compile useful information, that can be instrumental in troubleshooting the runtime error/exception. When a problem is detected while executing the UDF, hive throws a HiveException but append information returned by GetDisplayString method to the exception thrown by it. In the above example, this method returns the name and type of the column that caused the problem.

Why Hive Provides The ObjectInspectors

As the name suggests, an ObjectInspector helps us to inspect the argument we are going to receive in the UDF. Since, Hive has a variety of data types and it can go to a very complex level of custom data type definition, Hive UDFs can be passed very basic data types (Primitive like long, double, boolean) as well as very complex data types (like an Array of Map of String key and Struct Value, where the Struct contains Name, Age, Salary and Location, i.e. ARRAY<MAP<STRING, STRUCT>> ). Since, UDFs can be called on tables within the query, it is possible that columns with really complex data types can be passed to UDFs.

It is because of this possible complexity of data types, that can be passed to a generic UDF (which is flexible to type until runtime), Hive passes an ObjectInspector instead of the object itself, since now the UDF code must understand the structure of the object and then process it. Similarly, the processed out can be equally complex. Therefore, an ObjectInspector for the output value is required that Hive will use when you return back the processed output.

ObjectInspectors are of great use within a generic UDF and we access the values of the parameters passes using them. There are ObjectInspectors for typically all types and they are categorized among PrimitiveObjectInspector, ListObjectInspector, MapObjectInspector and StructObjectInspector.

All the specialized ObjectInspectors are derived from these four, e.g. LazyDoubleObjectInspector that helps us in dealing with a DoubleWritable data type, is actually extended from a class that implements PrimitiveObjectInspector. An ObjectInspector of a complex object can return ObjectInspectors of underlying objects, e.g. myArrayObjInsp.getListElementObjectInspector() returns an inspector that can be type casted to a StandardMapObjectInspector, if the Array contains Map objects in the Input to the UDF.

initialize()

When a UDF is used in a query, Hive loads the UDF in memory. The initialize() is called for the first time, when the UDF is invoked. The purpose of call to this method, is to check the type of arguments that will be passed to the UDF. For each value that will be passed to the UDF, the evaluate() method will be called. So if there are 10 rows for which the UDF is going to be called, evaluate() will be called 10 times. However, Hive first call the initialize() method of the Generic UDF before any call to evaluate(). The goals for initialize() are to

  • validate the input arguments and complain if input is not as per expectation
  • save the Object Inspectors of input arguments for later use during evaluate()
  • provide an Object Inspector to Hive for the return type

You can do various ways to validate the input, like checking the arguments array for size, category on input type (remember PrimitiveObjectInspector, MapObjectInspector etc. ?), checking the size of underlying objects (in case of a Map or Struct etc.). Validation can go up to any extent that you choose, including traversing the entire object hierarchy and validating every object. When the validation fails, we can throw a UDFArgumentException or one of its subtypes to indicate error.

The Object Inspector for the return type, should be constructed within the initialize() method and returned. We can use the factory methods of ObjectInspectorFactory class. For example, if the UDF is going to return a MAP type, then we can use the getStandardMapObjectInspector() method which accept information about how the Map will be constructed (e.g. Key type of the Map and the Value type of the Map).

The saved Object inspectors are instrumental when we try to obtain the input value in the evaluate() method.

evaluate()

SELECT GenericDouble(bonus) FROM emp;

Suppose the temp table has 10 rows in it. The the evaluate() method will be called 10 times for each column value in 10 rows. All the values passed to evaluate() however are serialized bytes. Hive delay the instantiation of objects until a request for the object is made, hence the name DeferredObject. Based on what type of value was passed to the UDF, the DeferredObject could represent lazily initialized objects. In the above example, it could be an instance of LazyDouble class. When the value is requested, like LazyDouble.getWritableObject() then the bytes are deserialized into an object and returned.

However, if the same GenericUDF is called with a value provided at command line (instead of as a result of IO), it could be a DoubleWritable object in the first place and doesn’t need a deserialization. Based on the type of object we get in the Input, we need to use its data accordingly and process it.

Finally, based on the type of input we received, we want to return the same type of Output, since we just doubled the input and returned. The convertIfNecessary() method helps us in this and turn the output type the same as the Input type based on the Object Inspector we pass to it.
.

2 responses to “Writing a Generic UDF in Hive”

  1. […] Generic functions written to working with more than one data type (GenericUDF) […]

    Like

  2. HIMANISH BHATTACHARJEE Avatar
    HIMANISH BHATTACHARJEE

    How can I output a value of the 3rd column in hive based on the conditions of 2 columns?
    Suppose i give count(similar guys)where age>30 and hight<6ft.
    Waiting for your response.

    Like

Leave a reply to HIMANISH BHATTACHARJEE Cancel reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.