Skip to content

Kafka: Expand default_msg_processor into a miniature decoding unit #306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

amotl
Copy link
Contributor

@amotl amotl commented Jul 13, 2025

Pitch

Loading data from Kafka into a database destination works well, but we found there are no options to specifically decode and break out the Kafka event value properly, in order to only relay that into the target database, without any metadata information.

Observation

For example, Kafka Connect provides such configuration options for similar use cases which are fragments hereof.

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonIotaConverter",

Solution

This patch slightly builds upon and expands the existing default_msg_processor implementation to accept a few more options which resolve our problem.

Details

  • Accept a bunch of decoding options per KafkaDecodingOptions
  • Provide a bunch of output formatting options per KafkaEvent
  • Tie both elements together using KafkaEventProcessor

The machinery is effectively the same like before, but provides a few more options to allow type decoding for Kafka event's key/value slots (key_type and value_type), a selection mechanism to limit the output to specific fields only (include), a small projection mechanism to optionally drill down into a specific field (select), and an option to select the output format (format).

In combination, those decoding options allow users to relay JSON-encoded Kafka event values directly into a destination table, without any metadata wrappings. Currently, the output formatter provides three different variants out of the box (standard_v1, standard_v2, flexible) 1. More variants can be added in the future, as other users or use cases may have different requirements in the same area.

Most importantly, the decoding unit is very compact, so relevant tasks don't need a corresponding transformation unit down the pipeline, to keep the whole ensemble lean, in the very spirit of ingestr.

Preview

uv pip install --upgrade 'ingestr @ git+https://github.com/crate-workbench/ingestr.git@kafka-decoder'

Example

docker run --rm --name=kafka \
  --publish=9092:9092 docker.io/apache/kafka:latest
echo '{"sensor_id":1,"ts":"2025-06-01 10:00","reading":42.42}' | \
  kcat -P -b localhost -t demo
echo '{"sensor_id":2,"ts":"2025-06-01 11:00","reading":451.00}' | \
  kcat -P -b localhost -t demo
ingestr ingest --yes \
  --source-uri "kafka://?bootstrap_servers=localhost:9092&group_id=test&value_type=json&select=value" \
  --source-table "demo" \
  --dest-uri "duckdb:///kafka.duckdb" \
  --dest-table "demo.kafka"
duckdb kafka.duckdb 'SELECT * FROM demo.kafka WHERE sensor_id>1;'

Backlog

  • Add software tests for non-standard decoding and output formatting options
  • Add docs and improve inline comments

Footnotes

  1. The standard_v2 output format is intended to resolve Naming things: Rename _kafka_msg_id to _kafka__msg_id #289.

- Accept a bunch of decoding options per `KafkaDecodingOptions`
- Provide a bunch of output formatting options per `KafkaEvent`
- Tie both elements together using `KafkaEventProcessor`

The machinery is effectively the same like before, but provides a few
more options to allow type decoding for Kafka event's key/value slots,
a selection mechanism to limit the output to specific fields only, and
a small projection mechanism to optionally drill down into a specific
field.

In combination, those decoding options allow users to relay
JSON-encoded Kafka event values directly into a destination table,
without any metadata wrappings.

The output formatter provides three different variants out of the box.
More variants can be added in the future, as other users or use cases
may have different requirements in the same area.

Most importantly, the decoding unit is very compact, so relevant tasks
don't need a corresponding transformation unit down the pipeline, to
keep the whole ensemble lean, in the very spirit of ingestr.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant