Docs Self-Managed Develop Data Transforms Build Develop Data Transforms Learn how to initialize a data transforms project and write transform functions in your chosen language. After reading this page, you will be able to: Initialize a data transforms project using the rpk CLI Build transform functions that process records and write to output topics Implement multi-topic routing patterns with Schema Registry integration Prerequisites You must have the following development tools installed on your host machine: The rpk command-line client installed on your host machine and configured to connect to your Redpanda cluster. For Golang projects, you must have at least version 1.20 of Go. For Rust projects, you must have the latest stable version of Rust. For JavaScript and TypeScript projects, you must have the latest long-term-support release of Node.js. Initialize a data transforms project To initialize a data transforms project, use the following command to set up the project files in your current directory. This command adds the latest version of the SDK as a project dependency: rpk transform init --language=<language> --name=<name> If you do not include the --language flag, the command prompts you for the language. Supported languages include: tinygo-no-goroutines (does not include Goroutines) tinygo-with-goroutines rust javascript typescript For example, if you choose tinygo-no-goroutines, rpk creates the following project files: . ├── go.mod ├── go.sum ├── README.md ├── transform.go └── transform.yaml The transform.go file contains a boilerplate transform function. The transform.yaml file specifies the configuration settings for the transform function. See also: Configure Data Transforms Build transform functions You can develop your transform logic with one of the available SDKs that allow your transform code to interact with a Redpanda cluster. Go Rust JavaScript All transform functions must register a callback with the OnRecordWritten() method. You should run any initialization steps in the main() function because it’s only run once when the transform function is first deployed. You can also use the standard predefined init() function. package main import ( "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { // Register your transform function. // This is a good place to perform other setup too. transform.OnRecordWritten(myTransform) } // myTransform is where you read the record that was written, and then you can // output new records that will be written to the destination topic func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error { return writer.Write(event.Record()) } All transform functions must register a callback with the on_record_written() method. You should run any initialization steps in the main() function because it’s only run once when the transform function is first deployed. use redpanda_transform_sdk::*; fn main() { // Register your transform function. // This is a good place to perform other setup too. on_record_written(my_transform); } // my_transform is where you read the record that was written, and then you can // return new records that will be written to the output topic fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box<dyn Error>> { writer.write(event.record)?; Ok(()) } All transform functions must register a callback with the onRecordWritten() method. You should run any initialization steps outside of the callback so that they are only run once when the transform function is first deployed. // src/index.js import { onRecordWritten } from "@redpanda-data/transform-sdk"; // This is a good place to perform setup steps. // Register your transform function. onRecordWritten((event, writer) => { // This is where you read the record that was written, and then you can // output new records that will be written to the destination topic writer.write(event.record); }); If you need to use Node.js standard modules in your transform function, you must configure the polyfillNode plugin for esbuild. This plugin allows you to polyfill Node.js APIs that are not natively available in the Redpanda JavaScript runtime environment. esbuild.js import * as esbuild from 'esbuild'; import { polyfillNode } from 'esbuild-plugin-polyfill-node'; await esbuild.build({ plugins: [ polyfillNode({ globals: { buffer: true, // Allow a global Buffer variable if referenced. process: false, // Don't inject the process global, the Redpanda JavaScript runtime does that. }, polyfills: { crypto: true, // Enable crypto polyfill // Add other polyfills as needed }, }), ], }); Error handling By distinguishing between recoverable and critical errors, you can ensure that your transform functions are both resilient and robust. Handling recoverable errors internally helps maintain continuous operation, while allowing critical errors to escape ensures that the system can address severe issues effectively. Redpanda tracks the offsets of records that transform functions have processed. If an error escapes the Wasm virtual machine (VM), the VM will fail. When the Wasm engine detects this failure and starts a new VM, the transform function retries processing the input topics from the last processed offset, potentially leading to repeated failures if the underlying issue is not resolved. Handling errors internally by logging them and continuing to process subsequent records can help maintain continuous operation. However, this approach can result in silently discarding problematic records, which may lead to unnoticed data loss if the logs are not monitored closely. Go Rust JavaScript package main import ( "log" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { transform.OnRecordWritten(myTransform) } func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error { record := event.Record() if record.Key == nil { // Handle the error internally by logging it log.Println("Error: Record key is nil") // Skip this record and continue to process other records return nil } // Allow errors with writes to escape return writer.Write(record) } use redpanda_transform_sdk::*; use log::error; fn main() { // Set up logging env_logger::init(); on_record_written(my_transform); } fn my_transform(event: WriteEvent, writer: &mut RecordWriter) -> anyhow::Result<()> { let record = event.record; if record.key().is_none() { // Handle the error internally by logging it error!("Error: Record key is nil"); // Skip this record and continue to process other records return Ok(()); } // Allow errors with writes to escape return writer.write(record) } import { onRecordWritten } from "@redpanda-data/transform-sdk"; // Register your transform function. onRecordWritten((event, writer) => { const record = event.record; if (!record.key) { // Handle the error internally by logging it console.error("Error: Record key is nil"); // Skip this record and continue to process other records return; } // Allow errors with writes to escape writer.write(record); }); When you deploy this transform function, and produce a message without a key, you’ll get the following in the logs: { "body": { "stringValue": "2024/06/20 08:17:33 Error: Record key is nil\n" }, "timeUnixNano": 1718871455235337000, "severityNumber": 13, "attributes": [ { "key": "transform_name", "value": { "stringValue": "test" } }, { "key": "node", "value": { "intValue": 0 } } ] } You can view logs for transform functions using the rpk transform logs <transform-function-name> command. To ensure that you are notified of any errors or issues in your data transforms, Redpanda provides metrics that you can use to monitor the state of your data transforms. See also: View logs for transform functions Monitor data transforms Configure transform logging rpk transform logs reference Avoid state management Relying on in-memory state across transform invocations can lead to inconsistencies and unpredictable behavior. Data transforms operate with at-least-once semantics, meaning a transform function might be executed more than once for a given record. Redpanda may also restart a transform function at any point, which causes its state to be lost. Access environment variables You can access both built-in and custom environment variables in your transform function. In this example, environment variables are checked once during initialization: Go Rust JavaScript package main import ( "fmt" "os" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { // Check environment variables before registering the transform function. outputTopic1, ok := os.LookupEnv("REDPANDA_OUTPUT_TOPIC_1") if ok { fmt.Printf("Output topic 1: %s\n", outputTopic1) } else { fmt.Println("Only one output topic is set") } // Register your transform function. transform.OnRecordWritten(myTransform) } func myTransform(event transform.WriteEvent, writer transform.RecordWriter) error { return writer.Write(event.Record()) } use redpanda_transform_sdk::*; use std::env; use log::error; fn main() { // Set up logging env_logger::init(); // Check environment variables before registering the transform function. match env::var("REDPANDA_OUTPUT_TOPIC_1") { Ok(output_topic_1) => println!("Output topic 1: {}", output_topic_1), Err(_) => println!("Only one output topic is set"), } // Register your transform function. on_record_written(my_transform); } fn my_transform(_event: WriteEvent, _writer: &mut RecordWriter) -> anyhow::Result<()> { Ok(()) } import { onRecordWritten } from "@redpanda-data/transform-sdk"; // Check environment variables before registering the transform function. const outputTopic1 = process.env.REDPANDA_OUTPUT_TOPIC_1; if (outputTopic1) { console.log(`Output topic 1: ${outputTopic1}`); } else { console.log("Only one output topic is set"); } // Register your transform function. onRecordWritten((event, writer) => { return writer.write(event.record); }); Write to specific output topics You can configure your transform function to write records to specific output topics based on message content, enabling powerful routing and fan-out patterns. This capability is useful for: Filtering messages by criteria and routing to different topics Fan-out patterns that distribute data from one input topic to multiple output topics Event routing based on message type or schema Data distribution for downstream consumers Wasm transforms provide a simpler alternative to external connectors like Kafka Connect for in-broker data routing, with lower latency and no additional infrastructure to manage. Basic JSON validation example The following example shows a filter that outputs only valid JSON from the input topic into the output topic. The transform writes invalid JSON to a different output topic. Go Rust JavaScript import ( "encoding/json" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) func main() { transform.OnRecordWritten(filterValidJson) } func filterValidJson(event transform.WriteEvent, writer transform.RecordWriter) error { if json.Valid(event.Record().Value) { return writer.Write(event.Record()) } // Send invalid records to separate topic return writer.Write(event.Record(), transform.ToTopic("invalid-json")) } use anyhow::Result; use redpanda_transform_sdk::*; fn main() { on_record_written(filter_valid_json); } fn filter_valid_json(event: WriteEvent, writer: &mut RecordWriter) -> Result<()> { let value = event.record.value().unwrap_or_default(); if serde_json::from_slice::<serde_json::Value>(value).is_ok() { writer.write(event.record)?; } else { // Send invalid records to separate topic writer.write_with_options(event.record, WriteOptions::to_topic("invalid-json"))?; } Ok(()) } The JavaScript SDK does not support writing records to a specific output topic. Multi-topic fan-out with Schema Registry This example shows how to route batched updates from a single input topic to multiple output topics based on a routing field in each message. Messages are encoded with the Schema Registry wire format for validation against the output topic schema. Consider using this pattern with Iceberg-enabled topics to fan out data directly into lakehouse tables. Input message example { "updates": [ {"table": "orders", "data": {"order_id": "123", "amount": 99.99}}, {"table": "inventory", "data": {"product_id": "P456", "quantity": 50}}, {"table": "customers", "data": {"customer_id": "C789", "name": "Jane"}} ] } Configure the transform with multiple output topics: name: event-router input_topic: events output_topics: - orders - inventory - customers The transform extracts each update and routes it to the appropriate topic based on the table field. Schemas are registered dynamically in the main() function using the Schema Registry client, which returns the schema IDs needed for encoding messages in the wire format. In this example, it is assumed that you have created the output topics and have the schema definitions ready. The transform registers the schemas dynamically on startup using the {topic-name}-value naming convention for schema subjects (for example, orders-value, inventory-value). Go Rust JavaScript go.mod module fanout-example go 1.20 require github.com/redpanda-data/redpanda/src/transform-sdk/go/transform v1.1.0 // v1.1.0+ required transform.go: package main import ( "encoding/binary" "encoding/json" "log" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform/sr" ) // Input message structure with array of updates type BatchMessage struct { Updates []TableUpdate `json:"updates"` } // Individual table update with routing field type TableUpdate struct { Table string `json:"table"` // Routing field - determines output topic Data json.RawMessage `json:"data"` // The actual data to write } // Schema IDs for each output topic, registered dynamically at startup var schemaIDs = make(map[string]int) func main() { // Create Schema Registry client client := sr.NewClient() // Define schemas for each output topic schemas := map[string]string{ "orders": `{"type":"record","name":"Order","fields":[{"name":"order_id","type":"string"},{"name":"amount","type":"double"}]}`, "inventory": `{"type":"record","name":"Inventory","fields":[{"name":"product_id","type":"string"},{"name":"quantity","type":"int"}]}`, "customers": `{"type":"record","name":"Customer","fields":[{"name":"customer_id","type":"string"},{"name":"name","type":"string"}]}`, } // Register schemas and store their IDs for topic, schemaStr := range schemas { subject := topic + "-value" schema := sr.Schema{ Schema: schemaStr, Type: sr.TypeAvro, } result, err := client.CreateSchema(subject, schema) if err != nil { log.Fatalf("Failed to register schema for %s: %v", topic, err) } schemaIDs[topic] = result.ID log.Printf("Registered schema for %s with ID %d", topic, result.ID) } log.Printf("Starting fanout transform with schema IDs: %v", schemaIDs) transform.OnRecordWritten(routeUpdates) } func routeUpdates(event transform.WriteEvent, writer transform.RecordWriter) error { var batch BatchMessage if err := json.Unmarshal(event.Record().Value, &batch); err != nil { log.Printf("Failed to parse batch message: %v", err) return nil // Skip invalid records } // Process each update in the batch for i, update := range batch.Updates { schemaID, exists := schemaIDs[update.Table] if !exists { log.Printf("Unknown table in update %d: %s", i, update.Table) continue } if err := writeUpdate(update, schemaID, writer, event); err != nil { log.Printf("Failed to write update %d to %s: %v", i, update.Table, err) } } return nil } func writeUpdate(update TableUpdate, schemaID int, writer transform.RecordWriter, event transform.WriteEvent) error { // Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...] value := make([]byte, 5) value[0] = 0 // magic byte binary.BigEndian.PutUint32(value[1:5], uint32(schemaID)) value = append(value, update.Data...) record := transform.Record{ Key: event.Record().Key, Value: value, } return writer.Write(record, transform.ToTopic(update.Table)) } Cargo.toml [package] name = "fanout-rust-example" version = "0.1.0" edition = "2021" [dependencies] redpanda-transform-sdk = "1.1.0" # v1.1.0+ required for WriteOptions API redpanda-transform-sdk-sr = "1.1.0" serde = { version = "1", features = ["derive"] } serde_json = "1" log = "0.4" env_logger = "0.11" [profile.release] opt-level = "z" lto = true strip = true src/main.rs: use redpanda_transform_sdk::*; use redpanda_transform_sdk_sr::{SchemaRegistryClient, Schema, SchemaFormat}; use serde::Deserialize; use std::collections::HashMap; use std::error::Error; use std::sync::OnceLock; use log::{info, error}; #[derive(Deserialize)] struct BatchMessage { updates: Vec<TableUpdate>, } #[derive(Deserialize)] struct TableUpdate { table: String, data: serde_json::Value, } // Schema IDs for each output topic, registered dynamically at startup static SCHEMA_IDS: OnceLock<HashMap<String, i32>> = OnceLock::new(); fn main() { // Initialize logging env_logger::init(); // Create Schema Registry client let mut client = SchemaRegistryClient::new(); // Define schemas for each output topic let schemas = [ ("orders", r#"{"type":"record","name":"Order","fields":[{"name":"order_id","type":"string"},{"name":"amount","type":"double"}]}"#), ("inventory", r#"{"type":"record","name":"Inventory","fields":[{"name":"product_id","type":"string"},{"name":"quantity","type":"int"}]}"#), ("customers", r#"{"type":"record","name":"Customer","fields":[{"name":"customer_id","type":"string"},{"name":"name","type":"string"}]}"#), ]; let mut schema_ids = HashMap::new(); // Register schemas and store their IDs for (topic, schema_str) in schemas { let subject = format!("{}-value", topic); let schema = Schema::new(schema_str.to_string(), SchemaFormat::Avro, vec![]); match client.create_schema(&subject, schema) { Ok(result) => { let id = result.id(); // SchemaId type schema_ids.insert(topic.to_string(), id.0); // Extract i32 from SchemaId wrapper info!("Registered schema for {} with ID {}", topic, id.0); } Err(e) => { error!("Failed to register schema for {}: {}", topic, e); panic!("Schema registration failed"); } } } let _ = SCHEMA_IDS.set(schema_ids); info!("Starting fanout transform with schema IDs"); on_record_written(route_updates); } fn write_update( update: &TableUpdate, schema_id: i32, writer: &mut RecordWriter, event: &WriteEvent, ) -> Result<(), Box<dyn Error>> { // Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...] let mut value = vec![0u8; 5]; value[0] = 0; // magic byte value[1..5].copy_from_slice(&schema_id.to_be_bytes()); let data_bytes = serde_json::to_vec(&update.data)?; value.extend_from_slice(&data_bytes); let key = event.record.key().map(|k| k.to_vec()); let record = BorrowedRecord::new(key.as_deref(), Some(&value)); writer.write_with_options(record, WriteOptions::to_topic(&update.table))?; Ok(()) } fn route_updates(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box<dyn Error>> { let batch: BatchMessage = serde_json::from_slice(event.record.value().unwrap_or_default())?; let schema_ids = SCHEMA_IDS.get().unwrap(); for update in batch.updates.iter() { if let Some(&schema_id) = schema_ids.get(&update.table) { write_update(update, schema_id, writer, &event)?; } } Ok(()) } The JavaScript SDK does not support writing records to specific output topics. For multi-topic fan-out, use the Go or Rust SDK. Connect to the Schema Registry You can use the Schema Registry client library to read and write schemas as well as serialize and deserialize records. This client library is useful when working with schema-based topics in your data transforms. See also: Redpanda Schema Registry Go Schema Registry client reference Rust Schema Registry client reference JavaScript Schema Registry client reference Next steps Configure Data Transforms Suggested reading How Data Transforms Work Data Transforms SDKs rpk transform commands Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! Kubernetes Configure