Changelog

Kafka sink: topic configuration, custom partitioning, and more!

Sep 16, 2024

As more users turn to Kafka (or Kafka API-compatible brokers, like Redpanda) to build automation downstream of Materialize, we’ve been working on improving the developer experience of the Kafka sink. To keep you up to speed, here’s an overview of the sink-related improvements that shipped in the latest releases of Materialize!

Topic configuration

When a new Kafka sink is created, Materialize automatically creates a new topic downstream with the default broker configurations. You can now override specific settings like partition count, replication factor, and retention policy for this topic, as well as schema registry compatibility types.

Example:

sql
CREATE SINK custom_topic_sink
  FROM my_matview
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'test_avro_topic',
    TOPIC PARTITION COUNT 4,
    TOPIC REPLICATION FACTOR 2,
    TOPIC CONFIG MAP['cleanup.policy' => 'compact']
  )
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;

Compression

To improve throughput and network bandwidth usage, all sinks now default to lz4 compression. To change the default commpression algorithm applied to messages before they’re sent out to your downstream broker, or disable compression altogether, you can use the new COMPRESSION TYPE sink option.

Example:

sql
CREATE SINK compression_disabled_sink
  FROM my_matview
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'sink-compression',
    COMPRESSION TYPE 'none'
  )
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;

Custom partitioning

By default, the Kafka sink uses a partitioning strategy based on the key (if any!) of the messages being emitted. Depending on the nature of your data, you might want more explicit control over how messages are distributed across partitions — after all, this has performance, availability and data governance implications! To specify a custom partitioning strategy, you can now use the PARTITION BY sink option.

sql
CREATE SINK customer_orders
  FROM my_matview
  INTO KAFKA CONNECTION kafka_connection (
    TOPIC 'customer-orders',
    -- The partition hash includes only the customer ID, so the partition
    -- will be assigned only based on the customer ID.
    PARTITION BY = seahash(customer_id::text)
  )
  -- The key includes both the customer ID and order ID, so Kafka's compaction
  -- will keep only the latest message for each order ID.
  KEY (customer_id, order_id)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE UPSERT;

That’s it for now!

Check out the updated documentation for an overview of features old and new, and reach out to our team if you have feedback or suggestions for other features you’d like to see in the Kafka sink!

← Back to the Changelog

Try Materialize Free