codehaus


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Unable to serialize org.apache.kafka.common.config.types.Password


The exception is very clear that the SourceFunction should be serializable. Password is not serializable. You can try to set the kafka consumer properties such as this:

props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required subject=\"test\" secret=\"test\";");

The String value will be parsed to Password object.(refer to the method org.apache.kafka.common.config.ConfigDef.parseType)

Regards,
Dian


在 2018年12月25日,下午11:04,tao xiao <xiaotao183@xxxxxxxxx> 写道:

Hi team,

I am passing a security enabled kafka consumer properties to FlinkKafkaConsumer but keep getting this error java.io.NotSerializableException? what is the best way to handle this?

I use Flink 1.7.1 and here is the consumer property that produces the exception

props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required subject=\"test\" secret=\"test\";"));

stacktrace
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397)
at org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69)
Caused by: java.io.NotSerializableException: org.apache.kafka.common.config.types.Password
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.Hashtable.writeObject(Hashtable.java:1157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 5 more


Attachment: smime.p7s
Description: S/MIME cryptographic signature