Kafka: Expand default_msg_processor
into a miniature decoding unit
#306
+238
−41
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Pitch
Loading data from Kafka into a database destination works well, but we found there are no options to specifically decode and break out the Kafka event value properly, in order to only relay that into the target database, without any metadata information.
Observation
For example, Kafka Connect provides such configuration options for similar use cases which are fragments hereof.
Solution
This patch slightly builds upon and expands the existing
default_msg_processor
implementation to accept a few more options which resolve our problem.Details
KafkaDecodingOptions
KafkaEvent
KafkaEventProcessor
The machinery is effectively the same like before, but provides a few more options to allow type decoding for Kafka event's key/value slots (
key_type
andvalue_type
), a selection mechanism to limit the output to specific fields only (include
), a small projection mechanism to optionally drill down into a specific field (select
), and an option to select the output format (format
).In combination, those decoding options allow users to relay JSON-encoded Kafka event values directly into a destination table, without any metadata wrappings. Currently, the output formatter provides three different variants out of the box (
standard_v1
,standard_v2
,flexible
) 1. More variants can be added in the future, as other users or use cases may have different requirements in the same area.Most importantly, the decoding unit is very compact, so relevant tasks don't need a corresponding transformation unit down the pipeline, to keep the whole ensemble lean, in the very spirit of ingestr.
Preview
uv pip install --upgrade 'ingestr @ git+https://github.com/crate-workbench/ingestr.git@kafka-decoder'
Example
duckdb kafka.duckdb 'SELECT * FROM demo.kafka WHERE sensor_id>1;'
Backlog
Footnotes
The
standard_v2
output format is intended to resolve Naming things: Rename_kafka_msg_id
to_kafka__msg_id
#289. ↩