Details
-
Wish
-
Status: Patch Available
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
-
Using the Ubuntu/Kafka Docker image for testing purposes.
Description
Hello,
I am currently working on an implementation of the Kafka protocol.
So far, all my code is working as intended through serialising requests and deserialising response as long as I am not using the flex requests system.
I am now trying to implement the flex requests system but the documentation is scarce on the subject of tagged fields.
If we take the Request Header v2:
Request Header v2 => request_api_key request_api_version correlation_id client_id TAG_BUFFER request_api_key => INT16 request_api_version => INT16 correlation_id => INT32 client_id => NULLABLE_STRING
Here, the BNF seems violated. TAG_BUFFER is not a value in this situation. It appears to be a type. It also does not appear within the detailed description inside the BNF.
TAG_BUFFER also does not refer to any declared type within the documentation. It seems to indicate tagged fields though.
Now when looking at tagged fields, the only mention of them within the documentation is:
Note that KIP-482 tagged fields can be added to a request without incrementing the version number. This offers an additional way of evolving the message schema without breaking compatibility. Tagged fields do not take up any space when the field is not set. Therefore, if a field is rarely used, it is more efficient to make it a tagged field than to put it in the mandatory schema. However, tagged fields are ignored by recipients that don't know about them, which could pose a challenge if this is not the behavior that the sender wants. In such cases, a version bump may be more appropriate.
This leads to the KIP-482 that does not clearly and explicitly detail the process of writing and reading those tagged fields.
I decided to look up existing clients to understand how they handle tagged fields. I notably looked at kafka-js (JavaScript client) and librdkafka (C client) and to my surprise, they have not implemented tagged fields. In fact, they rely on a hack to skip them and ignore them completely.
I also had a look at the Java client bundled within Kafka within the Tagged Fields section.
Now I am not a Java developer so I may not understand exactly this code. I read the comment and implemented the logic following Google's Protobuf specifications. The problem is, this leads to a request that outputs a stack trace within Kafka (it would be appreciated to not just dump stack traces and gracefully handle errors by the way).
As a reference, I tried to send an APIVersions (key: 18) (version: 3) request.
My request reads as follows when converted to hexadecimal:
Request header: 00 12 00 03 00 00 00 00 00 07 77 65 62 2d 61 70 69 00 Request body: 01 08 77 65 62 2d 61 70 69 01 06 30 2e 30 2e 31 00 Full request: 00 00 00 23 00 12 00 03 00 00 00 00 00 07 77 65 62 2d 61 70 69 00 01 08 77 65 62 2d 61 70 69 01 06 30 2e 30 2e 31 00
This creates a buffer underflow error within Kafka:
[2023-05-31 14:14:31,132] ERROR Exception while processing request from 172.21.0.5:9092-172.21.0.3:59228-21 (kafka.network.Processor) org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: API_VERSIONS, apiVersion: 3, connectionId: 172.21.0.5:9092-172.21.0.3:59228-21, listenerName: ListenerName(PLAINTEXT), principal: User:ANONYMOUS Caused by: java.nio.BufferUnderflowException at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:182) at java.base/java.nio.ByteBuffer.get(ByteBuffer.java:770) at org.apache.kafka.common.protocol.ByteBufferAccessor.readArray(ByteBufferAccessor.java:58) at org.apache.kafka.common.protocol.Readable.readUnknownTaggedField(Readable.java:53) at org.apache.kafka.common.message.ApiVersionsRequestData.read(ApiVersionsRequestData.java:133) at org.apache.kafka.common.message.ApiVersionsRequestData.<init>(ApiVersionsRequestData.java:74) at org.apache.kafka.common.requests.ApiVersionsRequest.parse(ApiVersionsRequest.java:119) at org.apache.kafka.common.requests.AbstractRequest.doParseRequest(AbstractRequest.java:207) at org.apache.kafka.common.requests.AbstractRequest.parseRequest(AbstractRequest.java:165) at org.apache.kafka.common.requests.RequestContext.parseRequest(RequestContext.java:95) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:101) at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:1030) at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) at kafka.network.Processor.processCompletedReceives(SocketServer.scala:1008) at kafka.network.Processor.run(SocketServer.scala:893) at java.base/java.lang.Thread.run(Thread.java:829)
I attempted many different approaches but I have to admit that I am making absolutely no progress.
I am creating this issue for two reasons:
- Obviously, I would very much appreciate an explanation on how to write and read tagged fields with a detailed approach. What kind of bytes are expected? What are the values format?
- Since mainstream clients aren't implementing this feature, it is wasted on most users. Let's face it, very few people actually implement the binary protocol. It is sad that this feature is not widely available to everyone, especially since it can reduce requests loads. I think improving the documentation to make the tagged fields clearly explained with at least one example would greatly benefit the community.
Thanks in advance for your answers!
Attachments
Issue Links
- links to