Home:ALL Converter>Deserialisation error and logging the partition, topic and offset

Deserialisation error and logging the partition, topic and offset

Ask Time:2021-07-19T15:30:23         Author:Kinght007

Json Formatter

I am handling deserialisation error using the ErrorHandlingDeserialiser sent on my DefaultKafkaConsumerFactory.

I have code a custom

try (ErrorHandlingDeserializer<MyEvent> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(theRealDeserialiser)) {
            errorHandlingDeserializer.setFailedDeserializationFunction(myCustomFunction::apply);
            return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), consumerKeyDeserializer, errorHandlingDeserializer);
        }

My custom function does some processing and publishes to a poison pill topic and returns null.

When a deserialisation error occurs, I would like to log the topic, partition and offset. The only way I can think of doing this is to stop returning null in the function and return a new sub type of MyEvent. My KafkaListener could then interrogate the new sub type.

I have a @KafkaListener component, which listens for the ConsumerRecord as follows:

 @KafkaListner(....)
 public void onMessage(ConsumerRecord<String, MyEvent> record) {
   ...
   ...
   
// if record.value instance of MyNewSubType
//   I have access to the topic, partition and offset here, so I could log it here
// I'd have to check that the instance of MyEvent is actually my sub type representing a failed record.

 }

Is this the way to do it? I know null has special meaning Kafka.

The downside of this sub type approach is, I'd have to create a subtype every type using the ErrorHandlingDeserialiser.

Author:Kinght007,eproduced under the CC 4.0 BY-SA copyright license with a link to the original source and this disclaimer.
Link to original article:https://stackoverflow.com/questions/68436679/deserialisation-error-and-logging-the-partition-topic-and-offset
yy