- 浏览: 951606 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Kafka目录结构:http://donald-draper.iteye.com/blog/2396760
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Kafka Standy模式、创建主题,生产消费消息:http://donald-draper.iteye.com/blog/2397170
Kafka 集群环境搭建:http://donald-draper.iteye.com/blog/2397276
上面篇文章我们看了kafka集群环境的搭建,今天我们来使用kafka Connect导入/导出数据。
先启动kafka集群:
从控制台写入和写回数据容易,但你可能想要从其他来源导入或导出数据到其他系统。
对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。Kafka Connect是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。下面,我们将看到如何运行Kafka Connect用简单的连接器从文件导入数据到Kafka主题,再从Kafka主题导出数据到文件。
我们先简单看一下kafka connect文件source和sink配置:
从kafka connect文件source和sink配置可,猜想一下,kafka connect文件导入导出数据,实际上就是写入数据到source文件test,kafka connect将文件数据格式化,写到主题connect-test,kafka connect再讲数据写到sink文件中。
我们首先创建source 连接器的文件数据源test.txt:
接下来,我们在connector standy模式下,启动2个连接器运行,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。第一个始终是kafka Connect进程,如kafka broker连接和数据库序列化格式,剩下的配置文件每个指定的连接器来创建,这些文件包括一个唯一的连接器名称,实例化连接器类和任何其他配置要求的。
启动connector standy模式
上述启动kafka连接器的配置文件,使用默认的本地集群配置,并创建了2个连接器:第一个是导入连接器,从source导入文件中按行读书数据,并生成消息发布到Kafka主题,第二个是导出连接器,从kafka主题以行读取消息,并输出到sink外部文件。在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始,导入连接器应该从test.txt读取每行数据写入到topic connect-test中,导出连接器从主题connect-test读取消息写入到sink输出文件test.sink.txt中。我们可以通过验证输出文件的内容来验证数据数据已经全部导出。
现在向source连接器的数据源文件test.txt,写入种子数据用来测试:
这个我们可以看到kafka connect控制有如下输出:
从输出来看,连接器已经在处理文件数据流:
现在来查看sink输出文件
注意,导入的数据也已经在Kafka主题connect-test中,所以我们可以在运行控制台消息者命令(或用消息者程序代码处理),
查看主题中的相关数据:
连接器在继续处理数据,因此我们可以添加数据到文件,观看文件数据在管道中的移动:
连接器控制台输出:
查看sink文件:
消费者终端:
从sink文件和消费者终端输出来看,kafka connect 成功处理文件数据。
再来简单看一下kafka Connect source和sink控制的的配置:
这个与kafka Connect source和sink的文件不同的是,连接器类不同,同时数据源是从控制台输入的数据。
另外,kafka还有一个实时处理存储在kafka集群中的数据客户端库kafka Streams,我们引用官方的功能说明:
Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's
server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more. This quickstart example will demonstrate how to run a streaming application coded in this library.
实例可以在kafka的lib文件夹下的kafka-streams-examples-0.11.0.1.jar包中,有兴趣的可以研究一下。
有一篇中文文章有简单讲解,有兴趣可以查看:http://orchome.com/6
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Kafka Standy模式、创建主题,生产消费消息:http://donald-draper.iteye.com/blog/2397170
Kafka 集群环境搭建:http://donald-draper.iteye.com/blog/2397276
上面篇文章我们看了kafka集群环境的搭建,今天我们来使用kafka Connect导入/导出数据。
先启动kafka集群:
[donald@Donald_Draper bin]$ ./zookeeper-server-start.sh ../config/zookeeper.properties & [1] 4145 [donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server.properties & [2] 4401 [donald@Donald_Draper bin]$ [donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server1.properties & [3] 4947 [donald@Donald_Draper bin]$ [donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server2.properties & [4] 5246 [donald@Donald_Draper bin]$ [donald@Donald_Draper ~]$ netstat -ntlp (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 192.168.122.1:53 0.0.0.0:* LISTEN - ... tcp6 0 0 :::9092 :::* LISTEN 4401/java tcp6 0 0 :::9093 :::* LISTEN 4947/java tcp6 0 0 :::2181 :::* LISTEN 4145/java tcp6 0 0 :::9094 :::* LISTEN 5246/java [donald@Donald_Draper ~]$
从控制台写入和写回数据容易,但你可能想要从其他来源导入或导出数据到其他系统。
对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。Kafka Connect是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。下面,我们将看到如何运行Kafka Connect用简单的连接器从文件导入数据到Kafka主题,再从Kafka主题导出数据到文件。
我们先简单看一下kafka connect文件source和sink配置:
[donald@Donald_Draper config]$ more connect-file-source.properties ... name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test [donald@Donald_Draper config]$ more connect-file-sink.properties ... name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test [donald@Donald_Draper config]$
从kafka connect文件source和sink配置可,猜想一下,kafka connect文件导入导出数据,实际上就是写入数据到source文件test,kafka connect将文件数据格式化,写到主题connect-test,kafka connect再讲数据写到sink文件中。
我们首先创建source 连接器的文件数据源test.txt:
[donald@Donald_Draper bin]$ vim test.txt :wq
接下来,我们在connector standy模式下,启动2个连接器运行,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。第一个始终是kafka Connect进程,如kafka broker连接和数据库序列化格式,剩下的配置文件每个指定的连接器来创建,这些文件包括一个唯一的连接器名称,实例化连接器类和任何其他配置要求的。
启动connector standy模式
[donald@Donald_Draper bin]$ ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties & [1] 7583 [donald@Donald_Draper bin]$ [2017-10-23 09:10:27,387] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199) [2017-10-23 09:10:27,390] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,395] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,395] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-10-23 09:10:27,396] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,396] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,396] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,396] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,396] INFO Added aliases 'MockSourceConnector' and 'MockSource' to plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,397] INFO Added aliases 'SchemaSourceConnector' and 'SchemaSource' to plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,397] INFO Added aliases 'VerifiableSinkConnector' and 'VerifiableSink' to plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,397] INFO Added aliases 'VerifiableSourceConnector' and 'VerifiableSource' to plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,397] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,397] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,397] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) [2017-10-23 09:10:27,398] INFO Added alias 'RegexRouter' to plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290) [2017-10-23 09:10:27,399] INFO Added alias 'TimestampRouter' to plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290) [2017-10-23 09:10:27,399] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290) connect standy 配置 [2017-10-23 09:10:27,423] INFO StandaloneConfig values: access.control.allow.methods = access.control.allow.origin = bootstrap.servers = [localhost:9092] internal.key.converter = class org.apache.kafka.connect.json.JsonConverter internal.value.converter = class org.apache.kafka.connect.json.JsonConverter key.converter = class org.apache.kafka.connect.json.JsonConverter offset.flush.interval.ms = 10000 offset.flush.timeout.ms = 5000 offset.storage.file.filename = /tmp/connect.offsets plugin.path = null rest.advertised.host.name = null rest.advertised.port = null rest.host.name = null rest.port = 8083 task.shutdown.graceful.timeout.ms = 5000 value.converter = class org.apache.kafka.connect.json.JsonConverter (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:223) [2017-10-23 09:10:27,684] INFO Logging initialized @5264ms (org.eclipse.jetty.util.log:186) [2017-10-23 09:10:27,877] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:49) [2017-10-23 09:10:27,877] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70) [2017-10-23 09:10:27,877] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:144) [2017-10-23 09:10:27,878] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:59) [2017-10-23 09:10:27,908] INFO Worker started (org.apache.kafka.connect.runtime.Worker:149) [2017-10-23 09:10:27,908] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72) [2017-10-23 09:10:27,908] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98) [2017-10-23 09:10:27,999] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327) Oct 23, 2017 9:10:28 AM org.glassfish.jersey.internal.Errors logErrors WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation. WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation. WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation. WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation. source连接器配置 [2017-10-23 09:10:28,752] INFO Started o.e.j.s.ServletContextHandler@393bd750{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744) [2017-10-23 09:10:28,812] INFO Started ServerConnector@4ff4478{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266) [2017-10-23 09:10:28,812] INFO Started @6392ms (org.eclipse.jetty.server.Server:379) [2017-10-23 09:10:28,813] INFO REST server listening at http://192.168.126.128:8083/, advertising URL http://192.168.126.128:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150) [2017-10-23 09:10:28,813] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55) [2017-10-23 09:10:28,819] INFO ConnectorConfig values: connector.class = FileStreamSource key.converter = null name = local-file-source tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig:223) [2017-10-23 09:10:28,819] INFO EnrichedConnectorConfig values: connector.class = FileStreamSource key.converter = null name = local-file-source tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223) [2017-10-23 09:10:28,819] INFO Creating connector local-file-source of type FileStreamSource (org.apache.kafka.connect.runtime.Worker:204) [2017-10-23 09:10:28,820] INFO Instantiated connector local-file-source with version 0.11.0.1 of type class org.apache.kafka.connect.file.FileStreamSourceConnector (org.apache.kafka.connect.runtime.Worker:207) [2017-10-23 09:10:28,825] INFO Finished creating connector local-file-source (org.apache.kafka.connect.runtime.Worker:225) [2017-10-23 09:10:28,826] INFO SourceConnectorConfig values: connector.class = FileStreamSource key.converter = null name = local-file-source tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.SourceConnectorConfig:223) [2017-10-23 09:10:28,826] INFO EnrichedConnectorConfig values: connector.class = FileStreamSource key.converter = null name = local-file-source tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223) [2017-10-23 09:10:28,829] INFO Creating task local-file-source-0 (org.apache.kafka.connect.runtime.Worker:358) [2017-10-23 09:10:28,829] INFO ConnectorConfig values: connector.class = FileStreamSource key.converter = null name = local-file-source tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig:223) [2017-10-23 09:10:28,830] INFO EnrichedConnectorConfig values: connector.class = FileStreamSource key.converter = null name = local-file-source tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223) [2017-10-23 09:10:28,835] INFO TaskConfig values: task.class = class org.apache.kafka.connect.file.FileStreamSourceTask (org.apache.kafka.connect.runtime.TaskConfig:223) [2017-10-23 09:10:28,837] INFO Instantiated task local-file-source-0 with version 0.11.0.1 of type org.apache.kafka.connect.file.FileStreamSourceTask (org.apache.kafka.connect.runtime.Worker:373) [2017-10-23 09:10:28,874] INFO ProducerConfig values: acks = all batch.size = 16384 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 9223372036854775807 max.in.flight.requests.per.connection = 1 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 2147483647 retries = 2147483647 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer (org.apache.kafka.clients.producer.ProducerConfig:223) sink 连接器配置 [2017-10-23 09:10:28,947] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser:83) [2017-10-23 09:10:28,948] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser:84) [2017-10-23 09:10:28,975] INFO Created connector local-file-source (org.apache.kafka.connect.cli.ConnectStandalone:91) [2017-10-23 09:10:28,978] INFO Source task WorkerSourceTask{id=local-file-source-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143) [2017-10-23 09:10:28,995] INFO ConnectorConfig values: connector.class = FileStreamSink key.converter = null name = local-file-sink tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig:223) [2017-10-23 09:10:28,996] INFO EnrichedConnectorConfig values: connector.class = FileStreamSink key.converter = null name = local-file-sink tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223) [2017-10-23 09:10:28,997] INFO Creating connector local-file-sink of type FileStreamSink (org.apache.kafka.connect.runtime.Worker:204) [2017-10-23 09:10:28,997] INFO Instantiated connector local-file-sink with version 0.11.0.1 of type class org.apache.kafka.connect.file.FileStreamSinkConnector (org.apache.kafka.connect.runtime.Worker:207) [2017-10-23 09:10:28,997] INFO Finished creating connector local-file-sink (org.apache.kafka.connect.runtime.Worker:225) [2017-10-23 09:10:28,998] INFO SinkConnectorConfig values: connector.class = FileStreamSink key.converter = null name = local-file-sink tasks.max = 1 topics = [connect-test] transforms = null value.converter = null (org.apache.kafka.connect.runtime.SinkConnectorConfig:223) [2017-10-23 09:10:28,999] INFO EnrichedConnectorConfig values: connector.class = FileStreamSink key.converter = null name = local-file-sink tasks.max = 1 topics = [connect-test] transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223) [2017-10-23 09:10:29,000] INFO Creating task local-file-sink-0 (org.apache.kafka.connect.runtime.Worker:358) [2017-10-23 09:10:29,000] INFO ConnectorConfig values: connector.class = FileStreamSink key.converter = null name = local-file-sink tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig:223) [2017-10-23 09:10:29,000] INFO EnrichedConnectorConfig values: connector.class = FileStreamSink key.converter = null name = local-file-sink tasks.max = 1 transforms = null value.converter = null (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223) [2017-10-23 09:10:29,001] INFO TaskConfig values: task.class = class org.apache.kafka.connect.file.FileStreamSinkTask (org.apache.kafka.connect.runtime.TaskConfig:223) [2017-10-23 09:10:29,001] INFO Instantiated task local-file-sink-0 with version 0.11.0.1 of type org.apache.kafka.connect.file.FileStreamSinkTask (org.apache.kafka.connect.runtime.Worker:373) [2017-10-23 09:10:29,027] INFO ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = connect-local-file-sink heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig:223) [2017-10-23 09:10:29,065] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser:83) [2017-10-23 09:10:29,065] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser:84) [2017-10-23 09:10:29,071] INFO Created connector local-file-sink (org.apache.kafka.connect.cli.ConnectStandalone:91) [2017-10-23 09:10:29,074] INFO Sink task WorkerSinkTask{id=local-file-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:247) [2017-10-23 09:10:29,214] INFO Discovered coordinator Donald_Draper.server.com:9092 (id: 2147483647 rack: null) for group connect-local-file-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:607) [2017-10-23 09:10:29,217] INFO Revoking previously assigned partitions [] for group connect-local-file-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419) [2017-10-23 09:10:29,217] INFO (Re-)joining group connect-local-file-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442) [2017-10-23 09:10:29,261] INFO Successfully joined group connect-local-file-sink with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409) [2017-10-23 09:10:29,262] INFO Setting newly assigned partitions [connect-test-0] for group connect-local-file-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
上述启动kafka连接器的配置文件,使用默认的本地集群配置,并创建了2个连接器:第一个是导入连接器,从source导入文件中按行读书数据,并生成消息发布到Kafka主题,第二个是导出连接器,从kafka主题以行读取消息,并输出到sink外部文件。在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始,导入连接器应该从test.txt读取每行数据写入到topic connect-test中,导出连接器从主题connect-test读取消息写入到sink输出文件test.sink.txt中。我们可以通过验证输出文件的内容来验证数据数据已经全部导出。
现在向source连接器的数据源文件test.txt,写入种子数据用来测试:
[donald@Donald_Draper ~]$ echo -e "name\ndonald" > test.txt [donald@Donald_Draper ~]$
这个我们可以看到kafka connect控制有如下输出:
[2017-10-23 09:04:52,885] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets asynchronously using sequence number 56: {connect-test-0=OffsetAndMetadata{offset=2, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:288) [2017-10-23 09:04:53,079] INFO Finished WorkerSourceTask{id=local-file-source-0} commitOffsets successfully in 74 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:373)
从输出来看,连接器已经在处理文件数据流:
现在来查看sink输出文件
[donald@Donald_Draper bin]$ ls | grep test kafka-consumer-perf-test.sh kafka-producer-perf-test.sh test.sink.txt test.txt [donald@Donald_Draper bin]$ cat test.sink.txt name donald [donald@Donald_Draper bin]$
注意,导入的数据也已经在Kafka主题connect-test中,所以我们可以在运行控制台消息者命令(或用消息者程序代码处理),
查看主题中的相关数据:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. {"schema":{"type":"string","optional":false},"payload":"name"} {"schema":{"type":"string","optional":false},"payload":"donald"} ...
连接器在继续处理数据,因此我们可以添加数据到文件,观看文件数据在管道中的移动:
[donald@Donald_Draper bin]$ echo "your name?" >> test.txt
连接器控制台输出:
[donald@Donald_Draper bin]$ [2017-10-23 11:51:59,932] INFO Finished WorkerSourceTask{id=local-file-source-0} commitOffsets successfully in 122 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:373) [2017-10-23 11:52:09,004] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets asynchronously using sequence number 970: {connect-test-0=OffsetAndMetadata{offset=3, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:288)
查看sink文件:
[donald@Donald_Draper bin]$ cat test.sink.txt name donald your name? [donald@Donald_Draper bin]$
消费者终端:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. {"schema":{"type":"string","optional":false},"payload":"name"} {"schema":{"type":"string","optional":false},"payload":"donald"} {"schema":{"type":"string","optional":false},"payload":"your name?"}
从sink文件和消费者终端输出来看,kafka connect 成功处理文件数据。
再来简单看一下kafka Connect source和sink控制的的配置:
[donald@Donald_Draper config]$ more connect-console-source.properties ... name=local-console-source connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector tasks.max=1 topic=connect-test [donald@Donald_Draper config]$ more connect-console-sink.properties ... name=local-console-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 topics=connect-test [donald@Donald_Draper config]$
这个与kafka Connect source和sink的文件不同的是,连接器类不同,同时数据源是从控制台输入的数据。
另外,kafka还有一个实时处理存储在kafka集群中的数据客户端库kafka Streams,我们引用官方的功能说明:
Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's
server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more. This quickstart example will demonstrate how to run a streaming application coded in this library.
实例可以在kafka的lib文件夹下的kafka-streams-examples-0.11.0.1.jar包中,有兴趣的可以研究一下。
有一篇中文文章有简单讲解,有兴趣可以查看:http://orchome.com/6
相关推荐
java kafka 生产者/消费者demo。。。。。。。。。。。。
一种基于Kafka/Hadoop/Hive平台的大规模智慧路灯的数据采集与处理系统,何沙,肖波,随着信息科技的飞速发展,智慧路灯出现在了人们视野中。区别于普通路灯,智慧路灯集成了很多智能模块。这些模块实时地向服务器上
消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch</groupId> <artifactId>...
从zookeeper、kafka的安装,到kafka-connect的配置,有详细的步骤和参数的解释。
Kafka Connect Redis 用于Redis的Kafka源和接收器连接器... 该演示将引导您在本地计算机上设置Kubernetes,安装连接器,并使用连接器将数据写入Redis集群或将数据从Redis引入Kafka。 兼容性 需要Redis 2.6或更高版本。
介绍通过安装该Kafka Connect连接器提供了监视目录的文件和在将新文件写入输入目录时读取数据的功能。 输入文件中的每个记录将根据用户提供的模式进行转换。 CSVRecordProcessor支持读取CSV或TSV文件。 它可以将CSV...
使用springboot整合kafka,并进行基于kafka的发布订阅消息队列模式的消息发布与消费测试。
kafka-connect-mqtt 此仓库包含用于Apache Kafka的MQTT源和接收器连接器。 已通过Kafka 2+进行了测试。 使用源连接器,您可以订阅MQTT主题,并将这些消息写到Kafka主题。 接收器连接器以相反的方式工作。 笔记: ...
现场演示与Docker独立运行docker run --rm -it -p 8000:8000 \ -e "CONNECT_URL=http://connect.distributed.url" \ landoop/kafka-connect-uiCONNECT_URL可以是逗号分隔的Connect worker端点数组。...
Kafka Connect JDBC连接器 kafka-connect-jdbc是一个用于与任何兼容JDBC的数据库之间加载数据。 可以在找到该连接器的文档。发展要构建开发版本,您需要Kafka的最新版本以及一系列上游Confluent项目,您必须从其相应...
消费Kafka主题并导出到Prometheus 开始程序 java -jar kafka-topic-exporter-0.0.6-jar-with-dependencies.jar config/kafka-topic-exporter.properties 配置 启动过程时,必须将配置文件作为参数传递。 ## Kafka...
&& \cp ../../kafka-connect-rest-plugin/target/kafka-connect-rest-plugin-*-shaded.jar jars/ && \cp ../../kafka-connect-transform-from-json/kafka-connect-transform-from-json-plugin/target/kafka-connect...
The JDBC source and sink connectors allow you to exchange data between relational databases and Kafka. The JDBC source connector allows you to import data from any relational database with a JDBC ...
您可以通过使用kafka9-connect-mongodb分支将此连接器用于Kafka9。 用于Kafka Connect的MongoDB接收器连接器提供了从Kafka主题或主题集到MongoDB集合或多个集合的简单,连续的链接。 连接器使用Kafka消息,重命名...
kafka-connect-twitter, Kafka 连接接收器/Twitter Kafka 连接 TwitterTwitter的Kafka 连接。 提供了源( 从 Twitter 到 Kafka ) 和接收器( 从 Kafka 到 Twitter ):接收接收来自 Kafka的普通字符串,这是使用 Tw
进程守护方法和前后台进程切换3、创建一个主题扩展知识:bootstrap-server和zookeeper使用区别4、删除主题5、发送消息6、消费消息7、设置多个broker集群8、使用Kafka connect来导入/导出数据 1、Kafka下载 下载Kafka...
转换器控制将数据写入源连接器的Kafka或从Kafka接收器的连接器读取的数据格式。兼容性2.x版本系列与Kafka Connect 5.x兼容。 以及更高版本(较早的版本已经过验证,可以一直使用到Kafka Connect 3.2.0,尽管我们...
名称将是kafka-connect-rockset-[VERSION]-SNAPSHOT-jar-with-dependencies.jar 。 用法 您的 Kafka 集群并确认它正在运行。 Kafka Connect 可以在模式下运行。 在这两种模式下,有一个配置文件控制 Kafka 连接
kafka-2.3.0-3.x86_64.rpm for centos7 Default locations binaries: /opt/kafka data: /var/lib/kafka logs: /var/log/kafka configs: /etc/kafka, /etc/sysconfig/kafka
向kafka插入数据测试