-
Notifications
You must be signed in to change notification settings - Fork 14.9k
Strategy based implementation of acquire mode in SharePartition #21262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Strategy based implementation of acquire mode in SharePartition #21262
Conversation
apoorvmittal10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the refactor, left a comment regarding APIs in AcquireStrategy.
Though it's a good refactor and I ll invest more time going through it.
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.kafka.server.share.fetch.acquire; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the package should be fetch itself. Same for other classes in the PR.
| package org.apache.kafka.server.share.fetch.acquire; | |
| package org.apache.kafka.server.share.fetch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason I created acquire package inside fetch was since these are 5 new classes already which are specific to acquire itself and with more refactors in the future, the classes under acquire will only increase, hence I thought to segregate it. I am open to changing it if you think otherwise.
| public boolean requiresSubsetMatch(InFlightBatch inFlightBatch, int maxRecordsToAcquire, int acquiredCount) { | ||
| // batch_optimized mode doesn't force subset matching based on record count alone. | ||
| // Subset matching is determined by other factors (fullMatch, offsetState, throttling). | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is not needed in BatchOptimized mode then this API is not needed in interface. Remove the method from interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is exposed to SharePartition and determines if the in-flight batch requires subset matching based on the acquire mode. In a way it decides if you need to maintain the in-flight batch at per offset level and acquire only a subset of those offsets or you should be good with the entire batch. Hence, it is needed in the interface itself.
| * creating a single batch only.</li> | ||
| * </ul> | ||
| */ | ||
| public interface AcquireStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just on a high level, not totally aligned with the APIs exposed here in the method. I was thinking that you just need one method acquire which shall process as per the Strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of the refactor is to move the code only which is different for the strategies. If we move the entire acquire logic outside SharePartition, there will be a lot of duplicate code that we need to manage. Also, for adding new strategy, we will require more code to implement it along with possible duplication of some code. Hence, the best idea to me was to decide the places in acquire which are different and acordingly design the APIs.
About
This PR performs a refactor of
SharePartition.javaacquirefunctionality. The functionality had become quite huge and adding new code in this method looks daunting. Hence, I have refactored the code to useStrategydesign pattern to distinguish code for acquire mode thereby creating a clean separation between acquisition modesTesting
The originally present unit and integrayion tests should cover this change.