When trying to consume using flink's Kafka consumer, I get the error "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign"
I am trying to write a Kafka Consumer which consumes data from a topic. However, whenever I try to run it, I get the following error.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:39)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:391)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:229)
The Java classes are:
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class KafkaConsumer {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<String>("rdf-new", new SimpleStringSchema(), parameterTool.getProperties()));
stream.print();
env.execute();
}}
I have created a standalone project in intellij (with its own pom) using the same code and it works fine, but since I need the code from another project, I created a new maven module in the existing project , then try to run it, now it shows me this error.
The dependencies in the pom.xml of the maven module are:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.4.2</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
The only thing I noticed is that inside the maven module I set the KafkaVersion to 1.1.0 but the pom has the KafkaConnector "flink-connector-kafka-0.9_2.11"
2018-05-18 11:14:56,105 - AbstractConfig [WARN] - ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config.
2018-05-18 11:14:56,105 - AppInfoParser$AppInfo [INFO] - AppInfoParser - Kafka version : 1.1.0
2018-05-18 11:14:56,105 - AppInfoParser$AppInfo [INFO] - AppInfoParser - Kafka commitId : fdcf75ea326b8e07
In the standalone project (where the user works fine) the Kafka version is 0.9.0.1.
11:32:19,537 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration zookeeper.connect = localhost:2181 was supplied but isn't a known config.
11:32:19,537 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.1
11:32:19,538 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 23c69d62a0cabf06
It would be of great help if someone could tell me what could be the problem? This is probably due to the dependencies in the pom file, but in the standalone project it's also the same as the dependencies I gave. Thanks.
As you already know, the problem is that the kafka version (1.0) in the module does not match the version expected by the flink connector (0.9).
you can do:
mvn dependency:tree
Find out the source of the kafka client dependency version on the command line.
In your module's pom, you can add a dependencyManagement section to replace the dependency version of the kafka client library with the desired version, like this:
<dependencyManagement>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencyManagement>