KAFKA-19012 Fix rare producer message corruption, don't reuse buffers… #21288
+165
−37
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
… on the client in certain error cases (#21065)
Client versions 2.8.0 and later are affected by a
change that exposes a latent bug in how BufferPool is used (BufferPool is a class used on the client side to allocate memory in ByteBuffers, for performance it will reuse them with the caller of the class doing manual memory management by calling free when they are done with the memory). The bug is that a pooled ByteBuffer can be freed while it is still in use by the network sending thread - this early freeing can happen when batches expire / brokers are disconnecting from clients. This bug has existed for more than a decade (since Kafka 0.x it seems), but never manifested because prior to 2.8.0 the pooled ByteBuffer (which contained record data aka your publishes) was copied into a freshly allocated ByteBuffer before any potential reuse and that fresh ByteBuffer was what got written over the network to the broker. With a change included in 2.8.0, the pooled ByteBuffer remains as-is inside of a MemoryRecords instance and this pooled ByteBuffer (which in some cases can be reused and overwritten with other data) is written over the network. Two contributing factors are that the checksum for Kafka records only includes the key/value/headers/etc and not the topic so there is no protection there, and also an implementation detail is that, also newly in the commit that exposed the bug, the produce request header (which includes the topic and partition of a group of message batches) is serialized in a buffer separately from the messages themselves (and the latter is what gets put in the pooled ByteBuffer) which allows you to get messages misrouted to a random recently used topic as opposed to simple duplicate messages on their intended topic.
The key change is in Sender.sendProducerData, we cannot allow the pooled ByteBuffer to be reused for expired in-flight batches until the request completes. For these batches we avoid deallocating the buffer in the normal failBatch call, deferring it until we call completeBatch (or a different path of failBatch).
There are some automated tests to cover this, and also manual testing done to reproduce the issue from KAFKA-19012 and verify that this is sufficient to stop it.
Reviewers: Justine Olshan jolshan@confluent.io, Jun Rao
junrao@gmail.com, Chia-Ping Tsai chia7712@gmail.com