1. flink通常整合kafka实现消费和生产。在很大原因上是由于kafka很适合流处理
在我们平常的业务场景中,仅读取,写入和存储数据流是不够的,更多目的是启用流的实时处理。在Kafka中,流处理器是指从输入topic获取连续数据流,对该输入执行一些处理,并生成连续数据流以输出topic的任何内容。例如,零售应用程序可能会接受销售和装运的输入流,并输出一系列重新排序和根据此数据计算出的价格调整。
可以直接使用生产者API和消费者API进行简单的处理。然而,对于更复杂的转换,Kafka提供完全集成的Streams API。这允许构建应用程序进行非平凡的处理,从而计算聚合关闭流或将流连接在一起。在这过程中,fiink整合kafka来实现对流数据的处理是一个非常好的选择。
2. 采用flink的Api,实现消费者。
往kafka的“flinktest”这个topic不断发布消息,然后经过flink的消费之后,输出处理的时间和处理的字符串。采用的是flink1.4.2 和kafka1.0.0
pom文件如下
业务逻辑如下
- 没有运用到集群,所以只需要添加一个ip地址就可以。如果是集群,则把其他的地址加入到这里。
- 我采用的是flink1.4.2。所以这里使用123456789101112131415161718192021222324252627**注意这里有个坑,之前我不是采用这种打包方式,导致会产生如下异常**```javajava.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010at com.maskwang.flink.ReadFromKafka.main(ReadFromKafka.java:22)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:497)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$8/989447607.run(Unknown Source)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010at java.net.URLClassLoader.findClass(URLClassLoader.java:381)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 19 more
运行状态如下:
- 向kafka的topic推送数据
- flink作为客户对订阅topic实现消费并输出
3. flink作为生产者向topic推送数据
|
|
在kafka上的finkwrite(与消费的队列不同,可以自定义) topic上可以看到,数据确实被生产了
运行结果:
参考文章
https://www.zhihu.com/question/28925721
https://github.com/tgrall/kafka-flink-101