streamjoin

Merges input data with subquery results using a hash join. Because subquery results are loaded into off-heap memory, this command is faster than the join command and can also be used in stream queries.

Command properties

ItemDescription
Command typeTransforming
Required permissionNone
License usageN/A
Parallel executionSupported
Distributed executionRuns on Data Node (mapper)

Syntax

streamjoin [type={inner|left|leftonly}] [timeout=INT{s|m|h|d|mon}] FIELD, ... [ SUBQUERY ]

Options

type={inner|left|leftonly}
Join type (Default: inner)
  • inner: Outputs only records where the join key matches on both sides
  • left: Outputs all left (input) records; merges right (subquery) fields when the join key matches
  • leftonly: Outputs only left records where no matching key exists on the right
timeout=INT{s|m|h|d|mon}
Subquery timeout. Specify using s (second), m (minute), h (hour), d (day), or mon (month) units. If the subquery does not complete within the specified time, it is forcibly terminated.

Target

FIELD, ...
Join key fields. Specify multiple fields separated by commas (,).
[ SUBQUERY ]
Subquery enclosed in brackets ([]). The subquery result is used as the right-side dataset.

Error codes

Parse errors
Error codeMessageDescription
23200지원되지 않는 스트림 조인 방식입니다. inner, left, leftonly 중 하나를 사용하세요.An unsupported join type is specified.
23201스트림 조인 키를 지정하세요.No join key field is specified.
90204[가 짝이 맞지 않습니다.The subquery brackets are unbalanced.
90206서브 쿼리가 없습니다.No subquery is specified.
Runtime errors

N/A

Description

The streamjoin command first executes the subquery, loads its results into off-heap memory, and then performs the hash join. Unlike the join command, it keeps the subquery results in memory, so it can be used in stream queries.

The amount of data that can be joined is limited by the memory pool size. The default memory pool size is 500 MB, and you can change it using the logpresso.streamjoin.max_buffer_size system property.

If the subquery fails, the exception message is added to the input record in the _streamjoin_fail field. If the timeout option is specified and the subquery does not complete within the time limit, it is forcibly terminated.

Examples

  1. Inner join

    json "[{'src_ip': '192.0.2.1', 'bytes': 1024}, {'src_ip': '192.0.2.2', 'bytes': 2048}]"
    | streamjoin src_ip [
        json "[{'src_ip': '192.0.2.1', 'hostname': 'web-01'}, {'src_ip': '192.0.2.3', 'hostname': 'db-01'}]"
      ]
    

    Performs an inner join on the src_ip field. Only the 192.0.2.1 record, which matches on both sides, is output.

  2. Left join

    json "[{'src_ip': '192.0.2.1', 'bytes': 1024}, {'src_ip': '192.0.2.2', 'bytes': 2048}]"
    | streamjoin type=left src_ip [
        json "[{'src_ip': '192.0.2.1', 'hostname': 'web-01'}]"
      ]
    

    Outputs all left records and merges matching right fields. The 192.0.2.2 record is also output but does not have a hostname field.

  3. Left-only join

    table duration=1h web_logs
    | streamjoin type=leftonly src_ip [
        table duration=1d allowlist
      ]
    

    Outputs only web log records with IPs not in the allowlist.

  4. Specify a subquery timeout

    table duration=1h web_logs
    | streamjoin timeout=30s src_ip [
        table duration=1d ip_assets
      ]
    

    Forcibly terminates the subquery if it does not complete within 30 seconds.

  5. Use multiple join keys

    table duration=1h web_logs
    | streamjoin src_ip, dst_port [
        table duration=1d service_map
      ]
    

    Merges records using both src_ip and dst_port as join keys.