You are browsing unreleased documentation.
Starting in Kong Gateway 3.10, the Kafka Upstream plugin supports customizing the Kafka message format.
Using the configuration field config.message_by_lua_functions
, you can define a function chain with multiple functions that can manipulate both the Kafka message format and the content of the message.
Prerequisites
- You have created a Kafka topic
- The
kcat
utility is installed
Using the plugin
The configuration field message_by_lua_functions
is an array of strings, where each string is a customized Lua function. The default message generated by the Kafka Upstream plugin gets reduced through the following functions:
Message(default) -> f1(Message) -> f2(Message) -> ... -> Producer(New_Message)
The Lua function must be in the following format, where the function receives one message argument, and returns the new message as a single return value:
return function(message)
... -- Some manipulation code
return new_message
end
Example: A function that does nothing
You can just return the message argument as the return value, so that the function makes no modifications to the default message:
return function(message)
return message
end
Let’s configure this function with an example declarative config:
_format_version: "3.0"
services:
- name: kafka-service
url: http://mock-upstream
routes:
- name: kafka-route
paths:
- /kafka
plugins:
- name: kafka-upstream
config:
bootstrap_servers:
- host: localhost
port: 9092
topic: "my-topic"
forward_method: true
forward_uri: true
forward_headers: true
forward_body: true
message_by_lua_functions:
- 'return function(message) return message end'
Make sure the Kafka topic is created, then send a request to the route path and check the message:
curl http://localhost:8000/kafka
Response:
{"message":"message sent"}
You can use the kcat
tool to check the message:
kcat -b localhost:9092 -G mygroup1 -o beginning -t my-topic -p 0 -C
You should see that the default message was sent to the topic:
{"body_base64":true,"headers":{"accept":"*\/*","host":"localhost:8000","user-agent":"curl\/8.7.1"},"uri_args":{},"uri":"\/kafka","body_args":{},"body":"","method":"GET"}
Example: A function that returns a fixed table
In the custom function, users can return whatever data they want as the return value. The returned value must be JSON-serializable, otherwise it can’t be sent to Kafka.
return function(message)
return {a="1", b="2"}
end
Configure this function with an example declarative config:
_format_version: "3.0"
services:
- name: kafka-service
url: http://mock-upstream
routes:
- name: kafka-route
paths:
- /kafka
plugins:
- name: kafka-upstream
config:
bootstrap_servers:
- host: localhost
port: 9092
topic: "my-topic"
message_by_lua_functions:
- 'return function(message) return {a="1", b="2"} end'
Make sure the Kafka topic is created, then send a request to the route path and check the message:
curl http://localhost:8000/kafka
Response:
{"message":"message sent"}
You can use the kcat
tool to check the message:
kcat -b localhost:9092 -G mygroup1 -o beginning -t my-topic -p 0 -C
You should see that the new message was sent to the topic:
.....OLD MESSAGES.....
% Reached end of topic my-topic [0] at offset 8
{"b":"2","a":"1"}
% Reached end of topic my-topic [0] at offset 9
You can also return the encoded message string as a return value:
return function(message)
return [[{"a":"1","b":"2"}]]
end
The result should be the same as the one returned by the {a="1", b="2"}
table.
Note: The Kafka Upstream plugin only supports JSON encoding on the message. Other serialization methods are not supported.
Example: Functions that insert consumer info and redact a sensitive request header
Let’s use the following functions, which call the get_consumer
PDK function and manipulate the request header information:
--- FUNCTION 1 that inject the consumer information to the message
return function(message)
message.consumer = kong.client.get_consumer()
return message
end
--- FUNCTION 2 that clean an apikey header
return function(message)
local headers = message and message.headers
local apikey = headers and headers.apikey
if apikey then
headers.apikey = "[REDACTED]"
end
return message
end
Create an example declarative config for these two functions:
_format_version: "3.0"
services:
- name: kafka-service
url: http://mock-upstream
routes:
- name: kafka-route
paths:
- /kafka
plugins:
- name: key-auth
- name: kafka-upstream
config:
bootstrap_servers:
- host: localhost
port: 9092
topic: "my-topic"
forward_method: true
forward_uri: true
forward_headers: true
forward_body: true
message_by_lua_functions:
- "return function(message) message.consumer = kong.client.get_consumer() return message end"
- "return function(message) local headers = message and message.headers local apikey = headers and headers.apikey if apikey then headers.apikey = '[REDACTED]' end return message end"
consumers:
- username: alice
keyauth_credentials:
- key: "example-api-key"
Trigger the API call and check the message:
curl http://localhost:8000/kafka -H 'apikey: example-api-key'
Response:
{"message":"message sent"}
You can use the kcat
tool to check the message:
kcat -b localhost:9092 -G mygroup1 -o beginning -t my-topic -p 0 -C
You should see the REDACTED
API key in the message:
.....OLD MESSAGES.....
% Reached end of topic my-topic [0] at offset 20
{"body_base64":true,"method":"GET","consumer":{"id":"25f66f8c-b5e1-4bbd-9b17-2cbe733d1895","created_at":1741768764,"username":"alice","username_lower":"alice","type":0,"updated_at":1741768764},"body_args":{},"body":"","headers":{"apikey":"[REDACTED]","accept":"*\/*","host"
:"localhost:8000","x-consumer-id":"25f66f8c-b5e1-4bbd-9b17-2cbe733d1895","x-credential-identifier":"795fb073-f698-403d-9302-209ba3abdd55","user-agent":"curl\/8.7.1","x-consumer-username":"alice"},"uri_args":{},"uri":"\/kafka"}
% Reached end of topic my-topic [0] at offset 21