As more users turn to Kafka (or Kafka API-compatible brokers, like Redpanda) to build automation downstream of Materialize, we’ve been working on improving the developer experience of the Kafka sink. To keep you up to speed, here’s an overview of the sink-related improvements that shipped in the latest releases of Materialize!
Topic configuration
When a new Kafka sink is created, Materialize automatically creates a new topic downstream with the default broker configurations. You can now override specific settings like partition count, replication factor, and retention policy for this topic, as well as schema registry compatibility types.
Example:
CREATE SINK custom_topic_sink
FROM my_matview
INTO KAFKA CONNECTION kafka_connection (
TOPIC 'test_avro_topic',
TOPIC PARTITION COUNT 4,
TOPIC REPLICATION FACTOR 2,
TOPIC CONFIG MAP['cleanup.policy' => 'compact']
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;
Compression
To improve throughput and network bandwidth usage, all sinks now default
to lz4
compression.
To change the default commpression algorithm applied to messages before they’re
sent out to your downstream broker, or disable compression altogether, you can
use the new COMPRESSION TYPE
sink option.
Example:
CREATE SINK compression_disabled_sink
FROM my_matview
INTO KAFKA CONNECTION kafka_connection (
TOPIC 'sink-compression',
COMPRESSION TYPE 'none'
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;
Custom partitioning
By default, the Kafka sink uses a partitioning strategy based on the key
(if any!) of the messages being emitted. Depending on the nature of your data,
you might want more explicit control over how messages are distributed across
partitions — after all, this has performance, availability and data governance
implications! To specify a custom partitioning strategy, you can now use the
PARTITION BY
sink option.
CREATE SINK customer_orders
FROM my_matview
INTO KAFKA CONNECTION kafka_connection (
TOPIC 'customer-orders',
-- The partition hash includes only the customer ID, so the partition
-- will be assigned only based on the customer ID.
PARTITION BY = seahash(customer_id::text)
)
-- The key includes both the customer ID and order ID, so Kafka's compaction
-- will keep only the latest message for each order ID.
KEY (customer_id, order_id)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE UPSERT;
That’s it for now!
Check out the updated documentation for an overview of features old and new, and reach out to our team if you have feedback or suggestions for other features you’d like to see in the Kafka sink!