- 浏览: 951605 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Quickstart:http://kafka.apache.org/quickstart
上面我们简单看了一下kafka的相关配置,今天我们启动standy模式的kafka,即单机版,并使用相关命令创建topic,
生产消息和消费消息。
1.下载kafka
解压:
从上面来看解压后的文件夹 kafka_2.11-0.11.0.1内有bin,config,libs主要文件夹,
下面分别来看这几个文件夹下的文件:
先来看bin目录下:
从上面来看bin主要是sh脚本,
再来看config文件夹:
主要生产者消息者配置文件,日志配置文件,broker配置文件,connect和内置zookeeper配置文件;
2.启动服务器
由于kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果没有Zookeeper,可以使用kafka内部自带的Zookeeper。
启动kafka服务器:
3.创建一个主题
创建一个只有一个分区和备份的test Topic,
我们可以通过kafka-topics命令查看topic信息,具体如下:
从上面来看,有一个test主题。
另外,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。
4.生产消息
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。
5.消费消息
Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
重新启动一个终端,用于消费者消费消息:
如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
我们在生产者终端生产消息,在消息者终端消费消息。
在生产者终端继续生产如下消息:
消息者终端显示如下:
所有的命令行工具有很多的选项,可以运行无参数命令,查看命令文档来了解更多的功能。
比如kafka-topics命令:
删除topic命令:
再次浏览topic:
从上面的输出来看,我们并没有删除test topic主要是因为我们没有开启broker的如下配置:
上面的操作我们启动的使standy模式,下面我们来,搭建一个kafka集群。
在搭建集群之前,先关闭原先的standy kafka:
kafka集群,我们放在下一篇来讲。
Quickstart:http://kafka.apache.org/quickstart
上面我们简单看了一下kafka的相关配置,今天我们启动standy模式的kafka,即单机版,并使用相关命令创建topic,
生产消息和消费消息。
1.下载kafka
[donald@Donald_Draper ~]$ ls Desktop Documents Downloads kafka_2.11-0.11.0.1.tgz Music Pictures Public Templates Videos [donald@Donald_Draper ~]$
解压:
[donald@Donald_Draper ~]$ tar -zxvf kafka_2.11-0.11.0.1.tgz kafka_2.11-0.11.0.1/ kafka_2.11-0.11.0.1/LICENSE kafka_2.11-0.11.0.1/NOTICE kafka_2.11-0.11.0.1/bin/ ... kafka_2.11-0.11.0.1/libs/connect-json-0.11.0.1.jar kafka_2.11-0.11.0.1/libs/connect-file-0.11.0.1.jar kafka_2.11-0.11.0.1/libs/kafka-streams-0.11.0.1.jar kafka_2.11-0.11.0.1/libs/rocksdbjni-5.0.1.jar kafka_2.11-0.11.0.1/libs/kafka-streams-examples-0.11.0.1.jar [donald@Donald_Draper ~]$ ls Desktop Documents Downloads kafka_2.11-0.11.0.1 kafka_2.11-0.11.0.1.tgz Music Pictures Public Templates Videos [donald@Donald_Draper ~]$ [donald@Donald_Draper ~]$ cd kafka_2.11-0.11.0.1/ [donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls bin config libs LICENSE NOTICE site-docs
从上面来看解压后的文件夹 kafka_2.11-0.11.0.1内有bin,config,libs主要文件夹,
下面分别来看这几个文件夹下的文件:
先来看bin目录下:
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd bin/ [donald@Donald_Draper bin]$ ls connect-distributed.sh kafka-delete-records.sh kafka-simple-consumer-shell.sh connect-standalone.sh kafka-mirror-maker.sh kafka-streams-application-reset.sh kafka-acls.sh kafka-preferred-replica-election.sh kafka-topics.sh kafka-broker-api-versions.sh kafka-producer-perf-test.sh kafka-verifiable-consumer.sh kafka-configs.sh kafka-reassign-partitions.sh kafka-verifiable-producer.sh kafka-console-consumer.sh kafka-replay-log-producer.sh windows kafka-console-producer.sh kafka-replica-verification.sh zookeeper-security-migration.sh kafka-consumer-groups.sh kafka-run-class.sh zookeeper-server-start.sh kafka-consumer-offset-checker.sh kafka-server-start.sh zookeeper-server-stop.sh kafka-consumer-perf-test.sh kafka-server-stop.sh zookeeper-shell.sh
从上面来看bin主要是sh脚本,
再来看config文件夹:
[donald@Donald_Draper bin]$ cd .. [donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls bin config libs LICENSE NOTICE site-docs [donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd config/ [donald@Donald_Draper config]$ ls connect-console-sink.properties connect-file-source.properties log4j.properties zookeeper.properties connect-console-source.properties connect-log4j.properties producer.properties connect-distributed.properties connect-standalone.properties server.properties connect-file-sink.properties consumer.properties tools-log4j.properties
主要生产者消息者配置文件,日志配置文件,broker配置文件,connect和内置zookeeper配置文件;
2.启动服务器
由于kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果没有Zookeeper,可以使用kafka内部自带的Zookeeper。
[donald@Donald_Draper bin]$ ./zookeeper-server-start.sh ../config/zookeeper.properties & [2017-10-20 08:56:08,397] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2017-10-20 08:56:08,418] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2017-10-20 08:56:08,418] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2017-10-20 08:56:08,419] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2017-10-20 08:56:08,419] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2017-10-20 08:56:08,539] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2017-10-20 08:56:08,539] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) [2017-10-20 08:56:08,550] INFO Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,550] INFO Server environment:host.name=Donald_Draper.server.com (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,550] INFO Server environment:java.version=1.8.0_91 (org.apache.zookeeper.server.ZooKeeperServer) ... (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,550] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,550] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:os.version=3.10.0-327.22.2.el7.x86_64 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:user.name=donald (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:user.home=/home/donald= (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,551] INFO Server environment:user.dir=/home/donald=/kafka_2.11-0.11.0.1/bin (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,576] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,576] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,576] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 08:56:08,648] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
启动kafka服务器:
[donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server.properties & [2] 5077 [donald@Donald_Draper bin]$ [2017-10-20 09:00:20,669] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 0 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = null default.replication.factor = 1 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = false fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms = 0 group.max.session.timeout.ms = 300000 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = null inter.broker.protocol.version = 0.11.0-IV2 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT listeners = null log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /tmp/kafka-logs log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.format.version = 0.11.0-IV2 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 1 num.recovery.threads.per.data.dir = 1 num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 1440 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 1 offsets.topic.segment.bytes = 104857600 port = 9092 principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder producer.purgatory.purge.interval.requests = 1000 queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.enabled.mechanisms = [GSSAPI] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism.inter.broker.protocol = GSSAPI security.inter.broker.protocol = PLAINTEXT socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = null ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000 transaction.max.timeout.ms = 900000 transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr = 1 transaction.state.log.num.partitions = 50 transaction.state.log.replication.factor = 1 transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms = 604800000 unclean.leader.election.enable = false zookeeper.connect = localhost:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2017-10-20 09:00:20,934] INFO starting (kafka.server.KafkaServer) [2017-10-20 09:00:20,966] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2017-10-20 09:00:21,024] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2017-10-20 09:00:21,042] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,042] INFO Client environment:host.name=Donald_Draper.server.com (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,042] INFO Client environment:java.version=1.8.0_91 (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,042] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.91-1.b14.el7_2.x86_64/jre (org.apache.zookeeper.ZooKeeper) ... [2017-10-20 09:00:21,043] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:os.version=3.10.0-327.22.2.el7.x86_64 (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:user.name=donald (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:user.home=/home/donald= (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,043] INFO Client environment:user.dir=/home/donald=/kafka_2.11-0.11.0.1/bin (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,044] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1139b2f3 (org.apache.zookeeper.ZooKeeper) [2017-10-20 09:00:21,065] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient) [2017-10-20 09:00:21,203] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2017-10-20 09:00:21,337] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2017-10-20 09:00:21,339] INFO Accepted socket connection from /127.0.0.1:35852 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-20 09:00:21,455] INFO Client attempting to establish new session at /127.0.0.1:35852 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 09:00:21,476] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog) [2017-10-20 09:00:21,498] INFO Established session 0x15f374a56ac0000 with negotiated timeout 6000 for client /127.0.0.1:35852 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-20 09:00:21,501] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x15f374a56ac0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2017-10-20 09:00:21,507] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2017-10-20 09:00:21,601] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x5 zxid:0x3 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:21,669] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0xb zxid:0x7 txntype:-1 reqpath:n/a Error Path:/config Error:KeeperErrorCode = NoNode for /config (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:21,691] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x13 zxid:0xc txntype:-1 reqpath:n/a Error Path:/admin Error:KeeperErrorCode = NoNode for /admin (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:21,902] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x1d zxid:0x12 txntype:-1 reqpath:n/a Error Path:/cluster Error:KeeperErrorCode = NoNode for /cluster (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:21,912] INFO Cluster ID = IGYQ5SHuTHemtryrMBrFWA (kafka.server.KafkaServer) [2017-10-20 09:00:22,044] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2017-10-20 09:00:22,177] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2017-10-20 09:00:22,177] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2017-10-20 09:00:22,555] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2017-10-20 09:00:22,753] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager) [2017-10-20 09:00:22,773] INFO Loading logs. (kafka.log.LogManager) [2017-10-20 09:00:22,785] INFO Logs loading complete in 11 ms. (kafka.log.LogManager) [2017-10-20 09:00:24,089] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2017-10-20 09:00:24,091] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2017-10-20 09:00:24,173] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2017-10-20 09:00:24,178] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer) [2017-10-20 09:00:24,197] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,198] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,197] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,278] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,295] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,295] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2017-10-20 09:00:24,308] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2017-10-20 09:00:24,332] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator) [2017-10-20 09:00:24,335] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2017-10-20 09:00:24,340] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator) [2017-10-20 09:00:24,350] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:setData cxid:0x29 zxid:0x16 txntype:-1 reqpath:n/a Error Path:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:24,371] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2017-10-20 09:00:24,420] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager) [2017-10-20 09:00:24,472] INFO [Transaction Coordinator 0]: Starting up. (kafka.coordinator.transaction.TransactionCoordinator) [2017-10-20 09:00:24,477] INFO [Transaction Coordinator 0]: Startup complete. (kafka.coordinator.transaction.TransactionCoordinator) [2017-10-20 09:00:24,493] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2017-10-20 09:00:24,593] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2017-10-20 09:00:24,638] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:delete cxid:0x45 zxid:0x19 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:24,657] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2017-10-20 09:00:24,665] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x46 zxid:0x1a txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:24,668] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x47 zxid:0x1b txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-20 09:00:24,679] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2017-10-20 09:00:24,681] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(Donald_Draper.server.com,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils) [2017-10-20 09:00:24,682] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2017-10-20 09:00:24,701] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-20 09:00:24,701] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser) [2017-10-20 09:00:24,703] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
3.创建一个主题
创建一个只有一个分区和备份的test Topic,
[donald@Donald_Draper bin]$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test [2017-10-21 17:21:29,587] INFO Accepted socket connection from /127.0.0.1:53310 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-21 17:21:29,592] INFO Client attempting to establish new session at /127.0.0.1:53310 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-21 17:21:29,598] INFO Established session 0x15f3e3b9f1b0001 with negotiated timeout 30000 for client /127.0.0.1:53310 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-21 17:21:29,762] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0001 type:setData cxid:0x4 zxid:0x27 txntype:-1 reqpath:n/a Error Path:/config/topics/test Error:KeeperErrorCode = NoNode for /config/topics/test (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:21:29,773] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0001 type:create cxid:0x6 zxid:0x28 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor) Created topic "test". [2017-10-21 17:21:29,795] INFO Processed session termination for sessionid: 0x15f3e3b9f1b0001 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:21:29,799] INFO Closed socket connection for client /127.0.0.1:53310 which had sessionid 0x15f3e3b9f1b0001 (org.apache.zookeeper.server.NIOServerCnxn) [2017-10-21 17:21:29,834] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0000 type:create cxid:0x3f zxid:0x2c txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:21:29,839] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0000 type:create cxid:0x40 zxid:0x2d txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:21:29,883] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions test-0 (kafka.server.ReplicaFetcherManager) [2017-10-21 17:21:29,935] INFO Loading producer state from offset 0 for partition test-0 with message format version 2 (kafka.log.Log) [2017-10-21 17:21:29,944] INFO Completed load of log test-0 with 1 log segments, log start offset 0 and log end offset 0 in 35 ms (kafka.log.Log) [2017-10-21 17:21:29,948] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager) [2017-10-21 17:21:29,950] INFO Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition test-0 (kafka.cluster.Partition) [2017-10-21 17:21:29,955] INFO Replica loaded for partition test-0 with initial high watermark 0 (kafka.cluster.Replica) [2017-10-21 17:21:29,958] INFO Partition [test,0] on broker 0: test-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition) [donald@Donald_Draper bin]$
我们可以通过kafka-topics命令查看topic信息,具体如下:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181 [2017-10-21 17:23:13,183] INFO Accepted socket connection from /127.0.0.1:53312 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2017-10-21 17:23:13,187] INFO Client attempting to establish new session at /127.0.0.1:53312 (org.apache.zookeeper.server.ZooKeeperServer) [2017-10-21 17:23:13,191] INFO Established session 0x15f3e3b9f1b0002 with negotiated timeout 30000 for client /127.0.0.1:53312 (org.apache.zookeeper.server.ZooKeeperServer) test [2017-10-21 17:23:13,253] INFO Processed session termination for sessionid: 0x15f3e3b9f1b0002 (org.apache.zookeeper.server.PrepRequestProcessor) [2017-10-21 17:23:13,255] INFO Closed socket connection for client /127.0.0.1:53312 which had sessionid 0x15f3e3b9f1b0002 (org.apache.zookeeper.server.NIOServerCnxn) [donald@Donald_Draper bin]$
从上面来看,有一个test主题。
另外,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。
4.生产消息
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test >my name is donald. >hello!
5.消费消息
Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
重新启动一个终端,用于消费者消费消息:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning my name is donald. hello!
如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
我们在生产者终端生产消息,在消息者终端消费消息。
在生产者终端继续生产如下消息:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test >my name is donald. >hello! >what's your name? >
消息者终端显示如下:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning my name is donald. hello! what's your name?
所有的命令行工具有很多的选项,可以运行无参数命令,查看命令文档来了解更多的功能。
比如kafka-topics命令:
[donald@Donald_Draper bin]$ ./kafka-topics.sh Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. --delete Delete a topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting topics, the action will only execute if the topic exists --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. --topic <String: topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-replicated-partitions if set when describing topics, only show under replicated partitions --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over
删除topic命令:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test Topic test is marked for deletion.4.生产发送消息 Note: This will have no impact if delete.topic.enable is not set to true. [donald@Donald_Draper bin]$
再次浏览topic:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets test [donald@Donald_Draper bin]$
从上面的输出来看,我们并没有删除test topic主要是因为我们没有开启broker的如下配置:
# Switch to enable topic deletion or not, default value is false 是否可以删除topic,如果为true,我们可以在命令行删除topic,否则,不能。 #delete.topic.enable=true
上面的操作我们启动的使standy模式,下面我们来,搭建一个kafka集群。
在搭建集群之前,先关闭原先的standy kafka:
[donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server.properties [donald@Donald_Draper bin]$ ./zookeeper-server-stop.sh ../config/zookeeper.properties
kafka集群,我们放在下一篇来讲。
相关推荐
Kafka消费者工具则负责从主题中读取并处理这些消息。消费者可以并行地从多个分区读取消息,提高了处理大量数据的效率。此外,消费者还可以自动处理偏移量,以便在出现问题时能够重新开始消费。 这两个工具的优点...
springboot集成kafka实战项目代码 项目介绍地址:https://blog.csdn.net/qq_38105536/article/details/122308040
nodejs kafka-node 消费消息,生产消息(csdn)————程序
需求: 能力申请提交成功后,自动根据标识ID创建对应Kafka的Topic。 设计思路: 1、在Java代码中调用ZooKeeper的工具类,创建Topic。...5、完整代码见附件,包含消费者和生产者的java直接调用获取。
springboot整合kafka实现生产者和消费者,该项目用于小白学习,导入idea连上自己的kafka即可使用
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看...
该项目基于jdk1.8版本依赖kafka-clients-0.9.0.1.jar,实现对kafka消息订阅和产生 - 定义KafkaConsumerTest消息消费者测试类和KafkaProducerTest消息生产者测试类。
生产者用于向Kafka主题发送消息,而消费者用于从Kafka主题接收消息。 生产者代码包括以下部分: 创建一个Properties对象,设置Kafka生产者的配置。这些配置包括Kafka服务器地址、主题名称等。 创建一个...
消费kafka某时间段消息用于分析问题,生产环境海量数据,用kafka-console-consumer.sh只能消费全量,文件巨大,无法grep。 代码来源于博主:BillowX_ ,感谢分享 原贴地址:...
使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据,数据可以批量进行操作
自动位移提交在正常情况下不会发生消息丢失或重复消费的现象,唯一可能的情况,你拉取到消息后,消费者那边刚好进行了位移提交,Kafka那边以为你已经消费了这条消息,其实你刚开始准备对这条消息进行业务处理,但你...
kafka-java-demo 基于java的kafka生产消费者示例。 mvn
kafka学习过程,maven工程,包含基础过程、提升过程。可供大家学习一下,里面有详细注释,一个groupid多个Consumer来消费消息和一个Consumer且有多个线程消费
通过定时任务生成消息,并定时消费kafka主题消息,大家可以试试
封装抽取了一个kafka生产者的连接池,能很好的用池的方式对kafka生产者连接点进行有效的管理
KafkaMiddleWare利用kafka开发的一个中间件,其可以根据配置创建topic,向指定的topic中发送消息以及消费消息。其可打包供生产端和消费端调用。使用方式:首先按DataQualityMessage类生产数据,然后可按照测试文件中...
1.可用于查看kafka内的消息数据,消息的偏移量,消费分组数等信息。 2.主要用于开发调试,勿过于依赖辅助工具,命令使用才是根本。
kettle kafka 消息者生产插件,用于集成到kettle,生产Kafka消息。亲测试可用。
kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列...