codehaus


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink 1.7 doesn't work with Kafka Table Source Descriptors


Hi Dhanuka,

From the exceptions, it seems you have changed the Kafka version to 'universal'. You can solve your problem in any of the following ways:
- Change Kafka version to 0.11. You only have a jar of 0.11 version in your lib folder.
- Add flink-connector-kafka_2.11-1.7.0.jar to your lib folder if you want to use 'universal'.

Best, Hequn

On Sun, Dec 23, 2018 at 8:48 PM dhanuka ranasinghe <dhanuka.priyanath@xxxxxxxxx> wrote:
Hi Cheng,

I have removed 1.6.1 jars and then I got below error

Starting execution of program

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=group.id
connector.properties.1.value=analytics
connector.properties.2.key=bootstrap.servers
connector.properties.2.value=localhost:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=testin
connector.type=kafka
connector.version=universal
format.fail-on-missing-field=false
format.json-schema={\n  \"type\": \"object\",\n  \"properties\": {\n    \"food\": {\n      \"type\": \"string\"\n    },\n    \"price\": {\n      \"type\": \"integer\"\n    },\n    \"processingTime\": {\n      \"type\": \"integer\"\n    }\n  }\n}
format.property-version=1
format.type=json
schema.0.type=VARCHAR
schema.1.type=DECIMAL
schema.2.name=processingTime
schema.2.proctime=true
schema.2.type=TIMESTAMP
update-mode=append

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:97)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)


On Sat, Dec 22, 2018 at 8:41 PM Hequn Cheng <chenghequn@xxxxxxxxx> wrote:
Hi dhanuka,

I failed to reproduce your error with release-1.7.0. It seems Kafka.toConnectorProperties() should be called instead of ConnectorDescriptor.toConnectorProperties(), the latter one is an abstract class, which lead to the AbstractMethodError.

From the picture uploaded, it is strange that the jar of 1.6.1 is mixed with the jar of 1.7.0. It may result in class conflict problem. 
Furthermore, set flink dependency scope to provided, so that classes of flink will not be packaged into the user jar. It will also cause class conflict problem.

Best, 
Hequn


On Fri, Dec 21, 2018 at 6:24 PM dhanuka ranasinghe <dhanuka.priyanath@xxxxxxxxx> wrote:
Add Dev Group

On Fri, Dec 21, 2018 at 6:21 PM dhanuka ranasinghe <dhanuka.priyanath@xxxxxxxxx> wrote:
Hi All,

I have tried to read data from Kafka from Flink using Table API. It's working fine with Flink 1.4 but when upgrade to 1.7 given me below error. I have attached the libraries added to Flink.

Could you please help me on this.

bin/flink run stream-analytics-0.0.1-SNAPSHOT.jar --read-topic testin --write-topic testout --bootstrap.servers localhost --group.id analytics
Starting execution of program
java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
    at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
    at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
    at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
    at org.monitoring.stream.analytics.FlinkTableSourceLatest.main(FlinkTableSourceLatest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Cheers,
Dhanuka

--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.