streamjoin

Compares the fields of stream data received as input to the subquery result field and joins them.

Syntax

streamjoin [OPTIONS] KEY_FIELD, ... [ SUBQUERY ]
Required Parameter
KEY_FIELD, ...
Key fields as the criteria for join, separated by a comma (,).
[ SUBQUERY ]
Subquery that returns the data to be joined with the input data, enclosed in a pair of square brackets ([ ]).
Optional Parameter
timeout=INT{s}
Time for waiting until the subquery is completed (default: no timemout).
type={inner|left|leftonly}
Join type (default: inner).
  • inner: In general, join refers to "inner join". It combines and returns only the records with matching keys. It does not return records that do not contain keys. This is similar to the intersection of data.
  • left: For records with matching keys, it returns by combining them, and for records with no matching keys, it returns only records of input data.
  • leftonly: It returns only records with keys that do not match the set of subquery results. It does not return records with matching keys.

Description

The streamjoin command loads the results of the subquery into off-heap memory and performs a hash join, so it is faster than the join command and can also be used in stream queries. However, only inner, left and leftonly are available, and the size of data that can be processed is limited by the capacity of the memory pool. If the subquery fails, the command adds an exception message to the _streamjoin_fail field.

You can adjust the size of the memory pool by specifying the following options when running Logpresso (default: 500M): -Dlogpresso.streamjoin.max_buffer_size=1G

You can check the status of memory usage with the following query:

  • Status of memory pool usage: system memory pools
  • Status of memory usage by query: system memory objects

Usage

  1. Join the data imported from the database with the code field as a key (See dbquery).

    json "[ {'code':1}, {'code':2}, {'code':3} ]"
    | streamjoin code
    [ dbquery ora select code, description from tbl_codes ]
    
  2. Join the data imported from the database with the code field as a key. However, limit SQL queries to 10 seconds.

    json "[ {'code':1}, {'code':2}, {'code':3} ]" 
    | streamjoin timeout=10s code
    [ dbquery ora select code, description from tbl_codes ]