I am handling deserialisation error using the ErrorHandlingDeserialiser sent on my DefaultKafkaConsumerFactory.
I have code a custom
try (ErrorHandlingDeserializer<MyEvent> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(theRealDeserialiser)) {
errorHandlingDeserializer.setFailedDeserializationFunction(myCustomFunction::apply);
return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), consumerKeyDeserializer, errorHandlingDeserializer);
}
My custom function does some processing and publishes to a poison pill topic and returns null.
When a deserialisation error occurs, I would like to log the topic, partition and offset. The only way I can think of doing this is to stop returning null in the function and return a new sub type of MyEvent. My KafkaListener could then interrogate the new sub type.
I have a @KafkaListener component, which listens for the ConsumerRecord as follows:
@KafkaListner(....)
public void onMessage(ConsumerRecord<String, MyEvent> record) {
...
...
// if record.value instance of MyNewSubType
// I have access to the topic, partition and offset here, so I could log it here
// I'd have to check that the instance of MyEvent is actually my sub type representing a failed record.
}
Is this the way to do it? I know null has special meaning Kafka.
The downside of this sub type approach is, I'd have to create a subtype every type using the ErrorHandlingDeserialiser.