Develop Data Transforms

Learn how to initialize a data transforms project and write transform functions in your chosen language.

After reading this page, you will be able to:

  • Initialize a data transforms project using the rpk CLI

  • Build transform functions that process records and write to output topics

  • Implement multi-topic routing patterns with Schema Registry integration

Prerequisites

You must have the following development tools installed on your host machine:

Initialize a data transforms project

To initialize a data transforms project, use the following command to set up the project files in your current directory. This command adds the latest version of the SDK as a project dependency:

rpk transform init --language=<language> --name=<name>

If you do not include the --language flag, the command prompts you for the language. Supported languages include:

  • tinygo-no-goroutines (does not include Goroutines)

  • tinygo-with-goroutines

  • rust

  • javascript

  • typescript

For example, if you choose tinygo-no-goroutines, rpk creates the following project files:

.
├── go.mod
├── go.sum
├── README.md
├── transform.go
└── transform.yaml

The transform.go file contains a boilerplate transform function. The transform.yaml file specifies the configuration settings for the transform function.

Build transform functions

You can develop your transform logic with one of the available SDKs that allow your transform code to interact with a Redpanda cluster.

  • Go

  • Rust

  • JavaScript

All transform functions must register a callback with the OnRecordWritten() method.

You should run any initialization steps in the main() function because it’s only run once when the transform function is first deployed. You can also use the standard predefined init() function.

package main

import (
	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

func main() {
  // Register your transform function.
  // This is a good place to perform other setup too.
  transform.OnRecordWritten(myTransform)
}
// myTransform is where you read the record that was written, and then you can
// output new records that will be written to the destination topic
func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
  return writer.Write(event.Record())
}

All transform functions must register a callback with the on_record_written() method.

You should run any initialization steps in the main() function because it’s only run once when the transform function is first deployed.

use redpanda_transform_sdk::*;

fn main() {
  // Register your transform function.
  // This is a good place to perform other setup too.
  on_record_written(my_transform);
}

// my_transform is where you read the record that was written, and then you can
// return new records that will be written to the output topic
fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box<dyn Error>> {
  writer.write(event.record)?;
  Ok(())
}

All transform functions must register a callback with the onRecordWritten() method.

You should run any initialization steps outside of the callback so that they are only run once when the transform function is first deployed.

// src/index.js
import { onRecordWritten } from "@redpanda-data/transform-sdk";

// This is a good place to perform setup steps.

// Register your transform function.
onRecordWritten((event, writer) => {
  // This is where you read the record that was written, and then you can
  // output new records that will be written to the destination topic
  writer.write(event.record);
});

If you need to use Node.js standard modules in your transform function, you must configure the polyfillNode plugin for esbuild. This plugin allows you to polyfill Node.js APIs that are not natively available in the Redpanda JavaScript runtime environment.

esbuild.js
import * as esbuild from 'esbuild';
import { polyfillNode } from 'esbuild-plugin-polyfill-node';

await esbuild.build({
  plugins: [
    polyfillNode({
      globals: {
        buffer: true, // Allow a global Buffer variable if referenced.
        process: false, // Don't inject the process global, the Redpanda JavaScript runtime does that.
      },
      polyfills: {
        crypto: true, // Enable crypto polyfill
        // Add other polyfills as needed
      },
    }),
  ],
});

Error handling

By distinguishing between recoverable and critical errors, you can ensure that your transform functions are both resilient and robust. Handling recoverable errors internally helps maintain continuous operation, while allowing critical errors to escape ensures that the system can address severe issues effectively.

Redpanda tracks the offsets of records that transform functions have processed. If an error escapes the Wasm virtual machine (VM), the VM will fail. When the Wasm engine detects this failure and starts a new VM, the transform function retries processing the input topics from the last processed offset, potentially leading to repeated failures if the underlying issue is not resolved.

Handling errors internally by logging them and continuing to process subsequent records can help maintain continuous operation. However, this approach can result in silently discarding problematic records, which may lead to unnoticed data loss if the logs are not monitored closely.

  • Go

  • Rust

  • JavaScript

package main

import (
    "log"
    "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

func main() {
    transform.OnRecordWritten(myTransform)
}

func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
  record := event.Record()
  if record.Key == nil {
    // Handle the error internally by logging it
    log.Println("Error: Record key is nil")
    // Skip this record and continue to process other records
    return nil
  }
  // Allow errors with writes to escape
  return writer.Write(record)
}
use redpanda_transform_sdk::*;
use log::error;

fn main() {
  // Set up logging
  env_logger::init();
  on_record_written(my_transform);
}

fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> anyhow::Result<()> {
  let record = event.record;
  if record.key().is_none() {
    // Handle the error internally by logging it
    error!("Error: Record key is nil");
    // Skip this record and continue to process other records
    return Ok(());
  }
  // Allow errors with writes to escape
  return writer.write(record)
}
import { onRecordWritten } from "@redpanda-data/transform-sdk";

// Register your transform function.
onRecordWritten((event, writer) => {
  const record = event.record;
  if (!record.key) {
    // Handle the error internally by logging it
    console.error("Error: Record key is nil");
    // Skip this record and continue to process other records
    return;
  }
  // Allow errors with writes to escape
  writer.write(record);
});

When you deploy this transform function, and produce a message without a key, you’ll get the following in the logs:

{
  "body": {
    "stringValue": "2024/06/20 08:17:33 Error: Record key is nil\n"
  },
  "timeUnixNano": 1718871455235337000,
  "severityNumber": 13,
  "attributes": [
    {
      "key": "transform_name",
      "value": {
        "stringValue": "test"
      }
    },
    {
      "key": "node",
      "value": {
        "intValue": 0
      }
    }
  ]
}

You can view logs for transform functions using the rpk transform logs <transform-function-name> command.

To ensure that you are notified of any errors or issues in your data transforms, Redpanda provides metrics that you can use to monitor the state of your data transforms.

See also:

Avoid state management

Relying on in-memory state across transform invocations can lead to inconsistencies and unpredictable behavior. Data transforms operate with at-least-once semantics, meaning a transform function might be executed more than once for a given record. Redpanda may also restart a transform function at any point, which causes its state to be lost.

Access environment variables

You can access both built-in and custom environment variables in your transform function. In this example, environment variables are checked once during initialization:

  • Go

  • Rust

  • JavaScript

package main

import (
  "fmt"
  "os"
	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

func main() {
  // Check environment variables before registering the transform function.
  outputTopic1, ok := os.LookupEnv("REDPANDA_OUTPUT_TOPIC_1")
  if ok {
    fmt.Printf("Output topic 1: %s\n", outputTopic1)
  } else {
    fmt.Println("Only one output topic is set")
  }

  // Register your transform function.
  transform.OnRecordWritten(myTransform)
}

func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error {
  return writer.Write(event.Record())
}
use redpanda_transform_sdk::*;
use std::env;
use log::error;

fn main() {
  // Set up logging
  env_logger::init();

  // Check environment variables before registering the transform function.
  match env::var("REDPANDA_OUTPUT_TOPIC_1") {
    Ok(output_topic_1) => println!("Output topic 1: {}", output_topic_1),
    Err(_) => println!("Only one output topic is set"),
  }

  // Register your transform function.
  on_record_written(my_transform);
}

fn my_transform(_event: WriteEvent, _writer: &mut RecordWriter) -> anyhow::Result<()> {
  Ok(())
}
import { onRecordWritten } from "@redpanda-data/transform-sdk";

// Check environment variables before registering the transform function.
const outputTopic1 = process.env.REDPANDA_OUTPUT_TOPIC_1;
if (outputTopic1) {
  console.log(`Output topic 1: ${outputTopic1}`);
} else {
  console.log("Only one output topic is set");
}

// Register your transform function.
onRecordWritten((event, writer) => {
  return writer.write(event.record);
});

Write to specific output topics

You can configure your transform function to write records to specific output topics based on message content, enabling powerful routing and fan-out patterns. This capability is useful for:

  • Filtering messages by criteria and routing to different topics

  • Fan-out patterns that distribute data from one input topic to multiple output topics

  • Event routing based on message type or schema

  • Data distribution for downstream consumers

Wasm transforms provide a simpler alternative to external connectors like Kafka Connect for in-broker data routing, with lower latency and no additional infrastructure to manage.

Basic JSON validation example

The following example shows a filter that outputs only valid JSON from the input topic into the output topic. The transform writes invalid JSON to a different output topic.

  • Go

  • Rust

  • JavaScript

import (
	"encoding/json"
	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

func main() {
	transform.OnRecordWritten(filterValidJson)
}

func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error {
	if json.Valid(event.Record().Value) {
		return writer.Write(event.Record())
	}
	// Send invalid records to separate topic
	return writer.Write(event.Record(), transform.ToTopic("invalid-json"))
}
use anyhow::Result;
use redpanda_transform_sdk::*;

fn main() {
	on_record_written(filter_valid_json);
}

fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> {
	let value = event.record.value().unwrap_or_default();
	if serde_json::from_slice::<serde_json::Value>(value).is_ok() {
		writer.write(event.record)?;
	} else {
		// Send invalid records to separate topic
		writer.write_with_options(event.record, WriteOptions::to_topic("invalid-json"))?;
	}
	Ok(())
}

The JavaScript SDK does not support writing records to a specific output topic.

Multi-topic fan-out with Schema Registry

This example shows how to route batched updates from a single input topic to multiple output topics based on a routing field in each message. Messages are encoded with the Schema Registry wire format for validation against the output topic schema. Consider using this pattern with Iceberg-enabled topics to fan out data directly into lakehouse tables.

Input message example
{
  "updates": [
    {"table": "orders", "data": {"order_id": "123", "amount": 99.99}},
    {"table": "inventory", "data": {"product_id": "P456", "quantity": 50}},
    {"table": "customers", "data": {"customer_id": "C789", "name": "Jane"}}
  ]
}

Configure the transform with multiple output topics:

name: event-router
input_topic: events
output_topics:
  - orders
  - inventory
  - customers

The transform extracts each update and routes it to the appropriate topic based on the table field. Schemas are registered dynamically in the main() function using the Schema Registry client, which returns the schema IDs needed for encoding messages in the wire format.

In this example, it is assumed that you have created the output topics and have the schema definitions ready. The transform registers the schemas dynamically on startup using the {topic-name}-value naming convention for schema subjects (for example, orders-value, inventory-value).
  • Go

  • Rust

  • JavaScript

go.mod
module fanout-example

go 1.20

require github.com/redpanda-data/redpanda/src/transform-sdk/go/transform v1.1.0 // v1.1.0+ required

transform.go:

package main

import (
    "encoding/binary"
    "encoding/json"
    "log"
    "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
    "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform/sr"
)

// Input message structure with array of updates
type BatchMessage struct {
    Updates []TableUpdate `json:"updates"`
}

// Individual table update with routing field
type TableUpdate struct {
    Table string          `json:"table"` // Routing field - determines output topic
    Data  json.RawMessage `json:"data"`  // The actual data to write
}

// Schema IDs for each output topic, registered dynamically at startup
var schemaIDs = make(map[string]int)

func main() {
    // Create Schema Registry client
    client := sr.NewClient()

    // Define schemas for each output topic
    schemas := map[string]string{
        "orders":    `{"type":"record","name":"Order","fields":[{"name":"order_id","type":"string"},{"name":"amount","type":"double"}]}`,
        "inventory": `{"type":"record","name":"Inventory","fields":[{"name":"product_id","type":"string"},{"name":"quantity","type":"int"}]}`,
        "customers": `{"type":"record","name":"Customer","fields":[{"name":"customer_id","type":"string"},{"name":"name","type":"string"}]}`,
    }

    // Register schemas and store their IDs
    for topic, schemaStr := range schemas {
        subject := topic + "-value"
        schema := sr.Schema{
            Schema: schemaStr,
            Type:   sr.TypeAvro,
        }

        result, err := client.CreateSchema(subject, schema)
        if err != nil {
            log.Fatalf("Failed to register schema for %s: %v", topic, err)
        }

        schemaIDs[topic] = result.ID
        log.Printf("Registered schema for %s with ID %d", topic, result.ID)
    }

    log.Printf("Starting fanout transform with schema IDs: %v", schemaIDs)
    transform.OnRecordWritten(routeUpdates)
}

func routeUpdates(event transform.WriteEvent, writer transform.RecordWriter) error {
    var batch BatchMessage
    if err := json.Unmarshal(event.Record().Value, &batch); err != nil {
        log.Printf("Failed to parse batch message: %v", err)
        return nil // Skip invalid records
    }

    // Process each update in the batch
    for i, update := range batch.Updates {
        schemaID, exists := schemaIDs[update.Table]
        if !exists {
            log.Printf("Unknown table in update %d: %s", i, update.Table)
            continue
        }

        if err := writeUpdate(update, schemaID, writer, event); err != nil {
            log.Printf("Failed to write update %d to %s: %v", i, update.Table, err)
        }
    }

    return nil
}

func writeUpdate(update TableUpdate, schemaID int, writer transform.RecordWriter, event transform.WriteEvent) error {
    // Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...]
    value := make([]byte, 5)
    value[0] = 0 // magic byte
    binary.BigEndian.PutUint32(value[1:5], uint32(schemaID))
    value = append(value, update.Data...)

    record := transform.Record{
        Key:   event.Record().Key,
        Value: value,
    }

    return writer.Write(record, transform.ToTopic(update.Table))
}
Cargo.toml
[package]
name = "fanout-rust-example"
version = "0.1.0"
edition = "2021"

[dependencies]
redpanda-transform-sdk = "1.1.0"  # v1.1.0+ required for WriteOptions API
redpanda-transform-sdk-sr = "1.1.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
log = "0.4"
env_logger = "0.11"

[profile.release]
opt-level = "z"
lto = true
strip = true

src/main.rs:

use redpanda_transform_sdk::*;
use redpanda_transform_sdk_sr::{SchemaRegistryClient, Schema, SchemaFormat};
use serde::Deserialize;
use std::collections::HashMap;
use std::error::Error;
use std::sync::OnceLock;
use log::{info, error};

#[derive(Deserialize)]
struct BatchMessage {
    updates: Vec<TableUpdate>,
}

#[derive(Deserialize)]
struct TableUpdate {
    table: String,
    data: serde_json::Value,
}

// Schema IDs for each output topic, registered dynamically at startup
static SCHEMA_IDS: OnceLock<HashMap<String, i32>> = OnceLock::new();

fn main() {
    // Initialize logging
    env_logger::init();

    // Create Schema Registry client
    let mut client = SchemaRegistryClient::new();

    // Define schemas for each output topic
    let schemas = [
        ("orders", r#"{"type":"record","name":"Order","fields":[{"name":"order_id","type":"string"},{"name":"amount","type":"double"}]}"#),
        ("inventory", r#"{"type":"record","name":"Inventory","fields":[{"name":"product_id","type":"string"},{"name":"quantity","type":"int"}]}"#),
        ("customers", r#"{"type":"record","name":"Customer","fields":[{"name":"customer_id","type":"string"},{"name":"name","type":"string"}]}"#),
    ];

    let mut schema_ids = HashMap::new();

    // Register schemas and store their IDs
    for (topic, schema_str) in schemas {
        let subject = format!("{}-value", topic);
        let schema = Schema::new(schema_str.to_string(), SchemaFormat::Avro, vec![]);

        match client.create_schema(&subject, schema) {
            Ok(result) => {
                let id = result.id(); // SchemaId type
                schema_ids.insert(topic.to_string(), id.0); // Extract i32 from SchemaId wrapper
                info!("Registered schema for {} with ID {}", topic, id.0);
            }
            Err(e) => {
                error!("Failed to register schema for {}: {}", topic, e);
                panic!("Schema registration failed");
            }
        }
    }

    let _ = SCHEMA_IDS.set(schema_ids);
    info!("Starting fanout transform with schema IDs");

    on_record_written(route_updates);
}

fn write_update(
    update: &TableUpdate,
    schema_id: i32,
    writer: &mut RecordWriter,
    event: &WriteEvent,
) -> Result<(), Box<dyn Error>> {
    // Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...]
    let mut value = vec![0u8; 5];
    value[0] = 0; // magic byte
    value[1..5].copy_from_slice(&schema_id.to_be_bytes());

    let data_bytes = serde_json::to_vec(&update.data)?;
    value.extend_from_slice(&data_bytes);

    let key = event.record.key().map(|k| k.to_vec());
    let record = BorrowedRecord::new(key.as_deref(), Some(&value));

    writer.write_with_options(record, WriteOptions::to_topic(&update.table))?;
    Ok(())
}

fn route_updates(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box<dyn Error>> {
    let batch: BatchMessage = serde_json::from_slice(event.record.value().unwrap_or_default())?;
    let schema_ids = SCHEMA_IDS.get().unwrap();

    for update in batch.updates.iter() {
        if let Some(&schema_id) = schema_ids.get(&update.table) {
            write_update(update, schema_id, writer, &event)?;
        }
    }

    Ok(())
}

The JavaScript SDK does not support writing records to specific output topics. For multi-topic fan-out, use the Go or Rust SDK.

Connect to the Schema Registry

You can use the Schema Registry client library to read and write schemas as well as serialize and deserialize records. This client library is useful when working with schema-based topics in your data transforms.

See also: