streamjoin
Merges input data with subquery results using a hash join. Because subquery results are loaded into off-heap memory, this command is faster than the join command and can also be used in stream queries.
Command properties
| Item | Description |
|---|---|
| Command type | Transforming |
| Required permission | None |
| License usage | N/A |
| Parallel execution | Supported |
| Distributed execution | Runs on Data Node (mapper) |
Syntax
Options
type={inner|left|leftonly}- Join type (Default:
inner)
inner: Outputs only records where the join key matches on both sidesleft: Outputs all left (input) records; merges right (subquery) fields when the join key matchesleftonly: Outputs only left records where no matching key exists on the right
timeout=INT{s|m|h|d|mon}- Subquery timeout. Specify using
s(second),m(minute),h(hour),d(day), ormon(month) units. If the subquery does not complete within the specified time, it is forcibly terminated.
Target
FIELD, ...- Join key fields. Specify multiple fields separated by commas (
,). [ SUBQUERY ]- Subquery enclosed in brackets (
[]). The subquery result is used as the right-side dataset.
Error codes
Parse errors
| Error code | Message | Description |
|---|---|---|
| 23200 | 지원되지 않는 스트림 조인 방식입니다. inner, left, leftonly 중 하나를 사용하세요. | An unsupported join type is specified. |
| 23201 | 스트림 조인 키를 지정하세요. | No join key field is specified. |
| 90204 | [가 짝이 맞지 않습니다. | The subquery brackets are unbalanced. |
| 90206 | 서브 쿼리가 없습니다. | No subquery is specified. |
Runtime errors
N/A
Description
The streamjoin command first executes the subquery, loads its results into off-heap memory, and then performs the hash join. Unlike the join command, it keeps the subquery results in memory, so it can be used in stream queries.
The amount of data that can be joined is limited by the memory pool size. The default memory pool size is 500 MB, and you can change it using the logpresso.streamjoin.max_buffer_size system property.
If the subquery fails, the exception message is added to the input record in the _streamjoin_fail field. If the timeout option is specified and the subquery does not complete within the time limit, it is forcibly terminated.
Examples
-
Inner join
json "[{'src_ip': '192.0.2.1', 'bytes': 1024}, {'src_ip': '192.0.2.2', 'bytes': 2048}]" | streamjoin src_ip [ json "[{'src_ip': '192.0.2.1', 'hostname': 'web-01'}, {'src_ip': '192.0.2.3', 'hostname': 'db-01'}]" ]Performs an inner join on the
src_ipfield. Only the192.0.2.1record, which matches on both sides, is output. -
Left join
json "[{'src_ip': '192.0.2.1', 'bytes': 1024}, {'src_ip': '192.0.2.2', 'bytes': 2048}]" | streamjoin type=left src_ip [ json "[{'src_ip': '192.0.2.1', 'hostname': 'web-01'}]" ]Outputs all left records and merges matching right fields. The
192.0.2.2record is also output but does not have ahostnamefield. -
Left-only join
table duration=1h web_logs | streamjoin type=leftonly src_ip [ table duration=1d allowlist ]Outputs only web log records with IPs not in the allowlist.
-
Specify a subquery timeout
table duration=1h web_logs | streamjoin timeout=30s src_ip [ table duration=1d ip_assets ]Forcibly terminates the subquery if it does not complete within 30 seconds.
-
Use multiple join keys
table duration=1h web_logs | streamjoin src_ip, dst_port [ table duration=1d service_map ]Merges records using both
src_ipanddst_portas join keys.