UDF Instance Limiting

This section explains how and when to limit the use of UDF instances per node and UDF call.

This feature is only available for Java, Python, and R programming languages. It is not available for Lua.

The main memory available for your UDFs is limited. If your queries allow the database to parallelize the UDF with many instances, all instances on the data node may use too much memory and the query might crash. You can avoid this issue in many cases by specifying limits for UDF instances per node and UDF call. In some scenarios, limiting will however not help.

If multiple Scalar UDF calls are made in a query, you can reduce the number of instances to stay within the memory limit. For example:

  • A transformation implemented as Python/Java/R UDF applied to many columns.
  • Scalar UDF where each instance requires a lot of main memory.
  • Set UDFs which will be called for many groups and need a lot of memory or which get called for many columns

In the following scenarios, limiting will not help:

  • A single instance of the UDF already consumes too much main memory.

  • Too many UDF calls in a single query, when the summed up main memory consumption for a single instance per Call already would break the memory limit

To specify a limit, add the option perNodeAndCallInstanceLimit to the UDF as in the following example:

CREATE OR REPLACE PYTHON3 SET SCRIPT process(i VARCHAR(2000000)) 
EMITS (o VARCHAR(2000000)) AS

%perNodeAndCallInstanceLimit 4;

def run(ctx):
    df = ctx.get_dataframe(100000)
    ctx.emit(df)
/

Examples

The following examples show some cases where this option can be applied, when it can help and when it cannot.

-- confirm version is 7.1 with at least 3 GB DB RAM
select dbms_version, db_ram_size
from exa_system_events
order by measure_time desc
limit 1;


-- confirm at least 2 cores
--/
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT nr_of_cores() RETURNS INTEGER AS
def run(ctx):
    import multiprocessing
    return multiprocessing.cpu_count()
/

select nr_of_cores();


drop schema if exists nodelimit cascade;

create schema nodelimit;
-- simple UDF WITHOUT instance limit
--/
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT oom_udf (input_value INTEGER)
    RETURNS INTEGER AS
def run(ctx):
    return ctx.input_value + 1
/
-- works well for low number of calls on a small data set:

select oom_udf(1), oom_udf(2), oom_udf(3);

-- let's create some higher data volume
create or replace table oom_table as select 1 as c0 from values between 1 and 1e5; -- 100.000 rows

-- this does 30 calls of the UDF (UDFs are treated as function with side-effect,
-- such that UDF call with some input don't get optimized away)
-- each call may start <number of cores> instances
SELECT distinct (
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0)
                    )
from oom_table;
-- This query will end with an out-of-memory failure of the query

-- need to open the schema again after re-connect
open schema nodelimit;

-- re-create UDF with new instance limit
--/
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT oom_udf(input_value INTEGER) RETURNS INTEGER AS
    %perNodeAndCallInstanceLimit 1;
def run(ctx):
    return ctx.input_value + 1
/
SELECT distinct (
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0)
                    )
from oom_table;
-- This query works fine for the 30 UDF calls, which failed before

-- Now do the last SELECT again
-- confirm the out-of-memory issue doesn't happen with lua
-- it's also way faster than python for this simple task
--/
CREATE OR REPLACE lua SCALAR SCRIPT oom_udf(input_value number)
RETURNS number AS
function run(ctx)
    return ctx.input_value + 1
end
/
SELECT distinct (
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0)
                    )
from oom_table;
-- This query works fine for the 30 UDF calls, which failed before and is much faster than the Python UDF

-- However, there exist also cases where the instance limit can't help anymore.
-- For example, if your query contains too many UDF calls. In the previous examples,
-- we used 30 call and that worked fine, but if we increase the number of UDF calls
-- to 50 or higher the query will reach the memory limit also with the instance limit.
-- re-create UDF with new instance limit
--/
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT oom_udf(input_value INTEGER) RETURNS INTEGER AS
    %perNodeAndCallInstanceLimit 1;
def run(ctx):
 return ctx.input_value + 1
/
-- this does 50 calls of the UDF
-- each call will start exact one instance, but the query still runs out of memory
SELECT distinct (
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0), oom_udf(c0),
                 oom_udf(c0), oom_udf(c0)
                    )
from oom_table;
-- This query will end with an out-of-memory failure of the query