`
Donald_Draper
  • 浏览: 951604 次
社区版块
存档分类
最新评论

Kafka 集群环境搭建

阅读更多
Kafka目录结构:http://donald-draper.iteye.com/blog/2396760
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Kafka Standy模式、创建主题,生产消费消息:http://donald-draper.iteye.com/blog/2397170
上一篇文章,我们启动kafka的standy模式,并使用相关命令创建topic,生产消息和消费消息,我们只是运行一个broker,,没太多的意思。今天我们搭建一个kafka集群。对于Kafka,一个broker仅仅只是规模为1的集群,现在我们扩展至三个节点。我们在单机,只修改broker的监听端口,id及日志目录,其实多机版集群道理是一样的,只不过ip地址不一样。

首先为每个broker创建一个配置文件:

Last login: Sun Oct 22 21:43:13 2017
[donald@Donald_Draper ~]$ ls
Desktop  Documents  Downloads  kafka_2.11-0.11.0.1  kafka_2.11-0.11.0.1.tgz  Music  Pictures  Public  Templates  Videos
[donald@Donald_Draper ~]$ cd kafka_2.11-0.11.0.1/
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd config/
[donald@Donald_Draper config]$ ls
connect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  producer.properties     zookeeper.properties
connect-console-source.properties  connect-file-source.properties  consumer.properties            server.properties
connect-distributed.properties     connect-log4j.properties        log4j.properties               tools-log4j.properties
[donald@Donald_Draper config]$ cp server.properties server1.properties 
[donald@Donald_Draper config]$ cp server.properties server2.properties 
[donald@Donald_Draper config]$ ls
connect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  producer.properties  server.properties
connect-console-source.properties  connect-file-source.properties  consumer.properties            server1.properties   tools-log4j.properties
connect-distributed.properties     connect-log4j.properties        log4j.properties               server2.properties   zookeeper.properties

现在已经有三个broker配置文件,现在来修改broker1和broker2配置文件,即server1.properties与server2.properties

[donald@Donald_Draper config]$ vim server1.properties 
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1
[donald@Donald_Draper config]$ vim server2.properties 
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs2

[donald@Donald_Draper config]$ vim server.properties 
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
[donald@Donald_Draper config]$ ^C

broker.id是集群中每个节点的唯一且永久的名称,我们修改端口和日志分区是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。需要注意的是日志目录由于我们测试用的是临时目录/tmp,在生产环境中最好换个目录,因为当机器宕机时,日志文件可能丢失,这个在hadoop集群中会出现,我不知道在kafka集群中会不会出现,个人认为应该会出现,最好换个目录。同时,我们并没有使用Zookeeper集群,Zookeeper是standy模式,其实在生产环境中Zookeeper应该是高可用的,必须是集群。我们就不使用Zookeeper集群了,这个不是重点,我们在可以下下面这篇文章中找到Zookeeper集群的搭建:
Hadoop2.7.1高可用环境搭建:http://donald-draper.iteye.com/blog/2302217
主要修改Zookeeper的配置文件zoo.cfg
server.1=192.168.126.126:2888:3888  
server.2=192.168.126.127:2888:3888  
server.3=192.168.126.128:2888:3888 

并创建zookeeper数据文件夹zdata,并在zdata中创建myid文件,在三台机上内容分别为上面server后的id
然后再修改broker server配置文件的一下选项
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=192.168.126.126:2181,192.168.126.127:2181,192.168.126.128:2181


现在来看启动集群,先启动zookeeper,再启动3个broker
[donald@Donald_Draper bin]$ ./zookeeper-server-start.sh ../config/zookeeper.properties  &
[1] 5334
[donald@Donald_Draper bin]$ 
...
[2017-10-22 22:25:12,828] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-22 22:25:12,828] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-22 22:25:12,879] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

[donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server.properties &
[1] 5334
[donald@Donald_Draper bin]$ 
...
[2017-10-22 22:27:04,624] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(Donald_Draper.server.com,9093,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2017-10-22 22:27:04,625] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-10-22 22:27:05,047] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-22 22:27:05,047] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-22 22:27:05,048] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

[donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server1.properties &
[1] 5900
[donald@Donald_Draper bin]$ 
...
[2017-10-22 22:27:04,624] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(Donald_Draper.server.com,9093,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2017-10-22 22:27:04,625] WARN No meta.properties file under dir /tmp/kafka-logs1/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-10-22 22:27:05,047] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-22 22:27:05,047] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-22 22:27:05,048] INFO [Kafka Server 1], started (kafka.server.KafkaServer)

[donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server2.properties &
[1] 6611
[donald@Donald_Draper bin]$ 
...
[2017-10-22 22:31:18,442] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-10-22 22:31:18,496] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-10-22 22:31:18,497] INFO Registered broker 2 at path /brokers/ids/2 with addresses: EndPoint(Donald_Draper.server.com,9094,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2017-10-22 22:31:18,550] WARN No meta.properties file under dir /tmp/kafka-logs2/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-10-22 22:31:19,288] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-22 22:31:19,288] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-22 22:31:19,288] INFO [Kafka Server 2], started (kafka.server.KafkaServer)


[donald@Donald_Draper bin]$ netstat -ntlp
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN      -                   
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      -                   
tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN      -                   
tcp        0      0 0.0.0.0:25              0.0.0.0:*               LISTEN      -                   
tcp6       0      0 :::3306                 :::*                    LISTEN      -                   
tcp6       0      0 :::22                   :::*                    LISTEN      -                   
tcp6       0      0 :::36055                :::*                    LISTEN      5597/java           
tcp6       0      0 ::1:631                 :::*                    LISTEN      -                   
tcp6       0      0 :::45464                :::*                    LISTEN      5334/java           
tcp6       0      0 :::45979                :::*                    LISTEN      5900/java           
tcp6       0      0 :::34402                :::*                    LISTEN      6611/java           
tcp6       0      0 :::9092                 :::*                    LISTEN      5597/java           
tcp6       0      0 :::9093                 :::*                    LISTEN      5900/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      5334/java           
tcp6       0      0 :::9094                 :::*                    LISTEN      6611/java           
[donald@Donald_Draper bin]$


现在集群已经启动了,我们在创建一个复制因子为3的主题:

[donald@Donald_Draper bin]$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
[2017-10-22 22:36:23,175] INFO Accepted socket connection from /127.0.0.1:41964 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-22 22:36:23,214] INFO Client attempting to establish new session at /127.0.0.1:41964 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-22 22:36:23,236] INFO Established session 0x15f44792df10005 with negotiated timeout 30000 for client /127.0.0.1:41964 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-22 22:36:23,754] INFO Got user-level KeeperException when processing sessionid:0x15f44792df10005 type:setData cxid:0x6 zxid:0x122 txntype:-1 reqpath:n/a Error Path:/config/topics/my-replicated-topic Error:KeeperErrorCode = NoNode for /config/topics/my-replicated-topic (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-22 22:36:23,764] INFO Got user-level KeeperException when processing sessionid:0x15f44792df10005 type:create cxid:0x8 zxid:0x123 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor)
Created topic "my-replicated-topic".
[2017-10-22 22:36:23,834] INFO Processed session termination for sessionid: 0x15f44792df10005 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-22 22:36:23,838] INFO Closed socket connection for client /127.0.0.1:41964 which had sessionid 0x15f44792df10005 (org.apache.zookeeper.server.NIOServerCnxn)
[2017-10-22 22:36:23,887] INFO Got user-level KeeperException when processing sessionid:0x15f44792df10000 type:create cxid:0x186 zxid:0x127 txntype:-1 reqpath:n/a Error Path:/brokers/topics/my-replicated-topic/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-22 22:36:23,889] INFO Got user-level KeeperException when processing sessionid:0x15f44792df10000 type:create cxid:0x187 zxid:0x128 txntype:-1 reqpath:n/a Error Path:/brokers/topics/my-replicated-topic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic/partitions (org.apache.zookeeper.server.PrepRequestProcessor)

浏览主题列表
[donald@Donald_Draper bin]$  ./kafka-topics.sh --list --zookeeper localhost:2181  
[2017-10-22 22:36:34,017] INFO Accepted socket connection from /127.0.0.1:41970 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-22 22:36:34,021] INFO Client attempting to establish new session at /127.0.0.1:41970 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-22 22:36:34,024] INFO Established session 0x15f44792df10006 with negotiated timeout 30000 for client /127.0.0.1:41970 (org.apache.zookeeper.server.ZooKeeperServer)
__consumer_offsets
my-replicated-topic
test
[2017-10-22 22:36:34,065] INFO Processed session termination for sessionid: 0x15f44792df10006 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-22 22:36:34,067] INFO Closed socket connection for client /127.0.0.1:41970 which had sessionid 0x15f44792df10006 (org.apache.zookeeper.server.NIOServerCnxn)
[donald@Donald_Draper bin]$ 

从输出来看我们的my-replicated-topic主题创建成功,test是我们standy模式的测试主题。
好了,现在我们已经有了一个集群了,我们怎么知道每个集群在做什么呢?运行命令“describe topics”
	
[donald@Donald_Draper bin]$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
[2017-10-22 22:39:34,232] INFO Accepted socket connection from /127.0.0.1:41972 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-22 22:39:34,233] INFO Client attempting to establish new session at /127.0.0.1:41972 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-22 22:39:34,237] INFO Established session 0x15f44792df10007 with negotiated timeout 30000 for client /127.0.0.1:41972 (org.apache.zookeeper.server.ZooKeeperServer)
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
[2017-10-22 22:39:35,852] INFO Processed session termination for sessionid: 0x15f44792df10007 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-22 22:39:35,860] INFO Closed socket connection for client /127.0.0.1:41972 which had sessionid 0x15f44792df10007 (org.apache.zookeeper.server.NIOServerCnxn)
[donald@Donald_Draper bin]$ 

关键在这两行:
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1


下面来解释一下上面的输出,第一行是所有分区的摘要,每一个线提供一个分区信息,
因为我们只有一个分区,所有只有一条行介绍分区的信息。

1."leader":该节点负责所有指定分区的读和写,每个节点的领导都是随机选择的。
2."replicas":备份的节点,无论该节点是否是leader或者目前是否还活着,只是显示。
3."isr":备份节点的集合,也就是活着的节点集合。

从上面可看出,leader为broker2。
我们再次运行上面的这个命令,看看一开始我们创建的test主题的相关信息:


[donald@Donald_Draper bin]$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
[2017-10-22 22:46:54,560] INFO Accepted socket connection from /127.0.0.1:41974 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-22 22:46:54,579] INFO Client attempting to establish new session at /127.0.0.1:41974 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-22 22:46:54,619] INFO Established session 0x15f44792df10008 with negotiated timeout 30000 for client /127.0.0.1:41974 (org.apache.zookeeper.server.ZooKeeperServer)
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
[2017-10-22 22:46:55,069] INFO Processed session termination for sessionid: 0x15f44792df10008 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-22 22:46:55,071] INFO Closed socket connection for client /127.0.0.1:41974 which had sessionid 0x15f44792df10008 (org.apache.zookeeper.server.NIOServerCnxn)
[donald@Donald_Draper bin]$ 


关键在这两行:
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0



从上面可看出,由于上面我们运行的是standy模式,,所以test主题没有Replicas所以是0,同时isr为0,leader也是broker0。

现在我们来启动一个两个终端一个生产消息,一个消费消息:
生产者:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>

消费者:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic


在生产者端,生产消息:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>hello everyone!   
>how about ?
>


消费者中断输出为:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
hello everyone!
how about ?




现在我们来,测试集群的容错,kill掉leader,Broker2作为当前的leader,也就是kill掉Broker2。


[donald@Donald_Draper bin]$ ps aux | grep server2.properties
donald     6611  3.3 15.9 3676844 297348 pts/3  Sl   22:31   0:49 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/home/donald=/kafka_2.11-0.11.0.1/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/home/donald=/kafka_2.11-0.11.0.1/bin/../logs -Dlog4j.configuration=file:./../config/log4j.properties -cp :/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/argparse4j-0.7.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/commons-lang3-3.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-api-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-file-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-json-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-runtime-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/connect-transforms-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/guava-20.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/hk2-api-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/hk2-locator-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/hk2-utils-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-annotations-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-core-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-databind-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javassist-3.21.0-GA.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.annotation-api-1.2.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.inject-1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.inject-2.5.0-b05.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.servlet-api-3.1.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/javax.ws.rs-api-2.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-client-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-common-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-container-servlet-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-container-servlet-core-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-guava-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-media-jaxb-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jersey-server-2.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-http-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-io-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-security-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-server-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-servlet-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-servlets-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jetty-util-9.2.15.v20160210.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/jopt-simple-5.0.3.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka_2.11-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka_2.11-0.11.0.1-sources.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka_2.11-0.11.0.1-test-sources.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-clients-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-log4j-appender-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-streams-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-streams-examples-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/kafka-tools-0.11.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/log4j-1.2.17.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/lz4-1.3.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/maven-artifact-3.5.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/metrics-core-2.2.0.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/plexus-utils-3.0.24.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/reflections-0.9.11.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/rocksdbjni-5.0.1.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/scala-library-2.11.11.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/slf4j-api-1.7.25.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/slf4j-log4j12-1.7.25.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/snappy-java-1.1.2.6.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/validation-api-1.1.0.Final.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/zkclient-0.10.jar:/home/donald=/kafka_2.11-0.11.0.1/bin/../libs/zookeeper-3.4.10.jar kafka.Kafka ../config/server2.properties
donald     9171  0.0  0.0 112784   988 pts/0    S+   22:55   0:00 grep --color=auto server2.properties
[donald@Donald_Draper bin]$ kill -9 6611

在windows上:

> wmic process get processid,caption,commandline | find "java.exe" | find "server2.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.11-0.11.0.1.jar"  kafka.Kafka config\server2.properties    6611
> taskkill /pid 6611 /f


再次查看my-replicated-topic主题
[donald@Donald_Draper bin]$ ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
[2017-10-22 22:59:55,370] INFO Accepted socket connection from /127.0.0.1:42250 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-22 22:59:55,375] INFO Client attempting to establish new session at /127.0.0.1:42250 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-22 22:59:55,378] INFO Established session 0x15f44792df10009 with negotiated timeout 30000 for client /127.0.0.1:42250 (org.apache.zookeeper.server.ZooKeeperServer)
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 2,0,1 Isr: 0,1
[2017-10-22 22:59:55,630] INFO Processed session termination for sessionid: 0x15f44792df10009 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-22 22:59:55,643] INFO Closed socket connection for client /127.0.0.1:42250 which had sessionid 0x15f44792df10009 (org.apache.zookeeper.server.NIOServerCnxn)
[donald@Donald_Draper bin]$ 


从输出来看,备份节点broker0成为新的leader,而broker2已经不在同步备份集合里了。

重新启动一个消息者终端2:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
hello everyone!
how about ?

从输出来看消息并没有丢失。

重新生产消息:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>hello everyone!   
>how about ?
>[2017-10-22 22:58:33,385] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

>who is living ? 
>

消费者终端输出为:
消息者终端1:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
hello everyone!
how about ?
...
[2017-10-22 22:58:39,207] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

who is living ?


消息者终端2
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
hello everyone!
how about ?

who is living ?


从生产者和消息者1的输出来看,它们已经知道broker2已经不可用。
到目前为止kafka集群搭建及测试完毕,关闭集群:
[donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server.properties 
[donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server1.properties 
[donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server2.properties 
[donald@Donald_Draper bin]$ ./zookeeper-server-stop.sh  ../config/zookeeper.properties 
[donald@Donald_Draper bin]$ netstat -ntlp
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN      -                   
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      -                   
tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN      -                   
tcp        0      0 0.0.0.0:25              0.0.0.0:*               LISTEN      -                   
tcp6       0      0 :::3306                 :::*                    LISTEN      -                   
tcp6       0      0 :::35219                :::*                    LISTEN      9858/java           
tcp6       0      0 :::35286                :::*                    LISTEN      8893/java           
tcp6       0      0 :::22                   :::*                    LISTEN      -                   
tcp6       0      0 ::1:631                 :::*                    LISTEN      -                   
[donald@Donald_Draper bin]$ 


下一篇我们来讲kafka connect。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics