`
Donald_Draper
  • 浏览: 951605 次
社区版块
存档分类
最新评论

Kafka Standy模式、创建主题,生产消费消息

阅读更多
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Quickstart:http://kafka.apache.org/quickstart
上面我们简单看了一下kafka的相关配置,今天我们启动standy模式的kafka,即单机版,并使用相关命令创建topic,
生产消息和消费消息。
1.下载kafka
[donald@Donald_Draper ~]$ ls
Desktop  Documents  Downloads  kafka_2.11-0.11.0.1.tgz  Music  Pictures  Public  Templates  Videos
[donald@Donald_Draper ~]$

解压:
[donald@Donald_Draper ~]$ tar -zxvf kafka_2.11-0.11.0.1.tgz 
kafka_2.11-0.11.0.1/
kafka_2.11-0.11.0.1/LICENSE
kafka_2.11-0.11.0.1/NOTICE
kafka_2.11-0.11.0.1/bin/
...
kafka_2.11-0.11.0.1/libs/connect-json-0.11.0.1.jar
kafka_2.11-0.11.0.1/libs/connect-file-0.11.0.1.jar
kafka_2.11-0.11.0.1/libs/kafka-streams-0.11.0.1.jar
kafka_2.11-0.11.0.1/libs/rocksdbjni-5.0.1.jar
kafka_2.11-0.11.0.1/libs/kafka-streams-examples-0.11.0.1.jar
[donald@Donald_Draper ~]$ ls
Desktop  Documents  Downloads  kafka_2.11-0.11.0.1  kafka_2.11-0.11.0.1.tgz  Music  Pictures  Public  Templates  Videos
[donald@Donald_Draper ~]$ 
[donald@Donald_Draper ~]$ cd kafka_2.11-0.11.0.1/
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls
bin  config  libs  LICENSE  NOTICE  site-docs


从上面来看解压后的文件夹 kafka_2.11-0.11.0.1内有bin,config,libs主要文件夹,
下面分别来看这几个文件夹下的文件:
先来看bin目录下:
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd bin/
[donald@Donald_Draper bin]$ ls
connect-distributed.sh            kafka-delete-records.sh              kafka-simple-consumer-shell.sh
connect-standalone.sh             kafka-mirror-maker.sh                kafka-streams-application-reset.sh
kafka-acls.sh                     kafka-preferred-replica-election.sh  kafka-topics.sh
kafka-broker-api-versions.sh      kafka-producer-perf-test.sh          kafka-verifiable-consumer.sh
kafka-configs.sh                  kafka-reassign-partitions.sh         kafka-verifiable-producer.sh
kafka-console-consumer.sh         kafka-replay-log-producer.sh         windows
kafka-console-producer.sh         kafka-replica-verification.sh        zookeeper-security-migration.sh
kafka-consumer-groups.sh          kafka-run-class.sh                   zookeeper-server-start.sh
kafka-consumer-offset-checker.sh  kafka-server-start.sh                zookeeper-server-stop.sh
kafka-consumer-perf-test.sh       kafka-server-stop.sh                 zookeeper-shell.sh

从上面来看bin主要是sh脚本,
再来看config文件夹:
[donald@Donald_Draper bin]$ cd ..
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ ls
bin  config  libs  LICENSE  NOTICE  site-docs
[donald@Donald_Draper kafka_2.11-0.11.0.1]$ cd config/
[donald@Donald_Draper config]$ ls
connect-console-sink.properties    connect-file-source.properties  log4j.properties        zookeeper.properties
connect-console-source.properties  connect-log4j.properties        producer.properties
connect-distributed.properties     connect-standalone.properties   server.properties
connect-file-sink.properties       consumer.properties             tools-log4j.properties

主要生产者消息者配置文件,日志配置文件,broker配置文件,connect和内置zookeeper配置文件;
2.启动服务器

由于kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果没有Zookeeper,可以使用kafka内部自带的Zookeeper。
[donald@Donald_Draper bin]$ ./zookeeper-server-start.sh ../config/zookeeper.properties &
[2017-10-20 08:56:08,397] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2017-10-20 08:56:08,418] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-10-20 08:56:08,418] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-10-20 08:56:08,419] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2017-10-20 08:56:08,419] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2017-10-20 08:56:08,539] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2017-10-20 08:56:08,539] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2017-10-20 08:56:08,550] INFO Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,550] INFO Server environment:host.name=Donald_Draper.server.com (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,550] INFO Server environment:java.version=1.8.0_91 (org.apache.zookeeper.server.ZooKeeperServer)
...

 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,550] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,550] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:os.version=3.10.0-327.22.2.el7.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:user.name=donald (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:user.home=/home/donald= (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,551] INFO Server environment:user.dir=/home/donald=/kafka_2.11-0.11.0.1/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,576] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,576] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,576] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 08:56:08,648] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)



启动kafka服务器:


[donald@Donald_Draper bin]$ ./kafka-server-start.sh ../config/server.properties  &
[2] 5077
[donald@Donald_Draper bin]$ [2017-10-20 09:00:20,669] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        authorizer.class.name = 
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 0
        broker.id.generation.enable = true
        broker.rack = null
        compression.type = producer
        connections.max.idle.ms = 600000
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        create.topic.policy.class.name = null
        default.replication.factor = 1
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = false
        fetch.purgatory.purge.interval.requests = 1000
        group.initial.rebalance.delay.ms = 0
        group.max.session.timeout.ms = 300000
        group.min.session.timeout.ms = 6000
        host.name = 
        inter.broker.listener.name = null
        inter.broker.protocol.version = 0.11.0-IV2
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT
        listeners = null
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /tmp/kafka-logs
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.flush.start.offset.checkpoint.interval.ms = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.format.version = 0.11.0-IV2
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides = 
        message.max.bytes = 1000012
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 1440
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 1
        offsets.topic.segment.bytes = 104857600
        port = 9092
        principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
        producer.purgatory.purge.interval.requests = 1000
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism.inter.broker.protocol = GSSAPI
        security.inter.broker.protocol = PLAINTEXT
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = null
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
        transaction.max.timeout.ms = 900000
        transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
        transaction.state.log.load.buffer.size = 5242880
        transaction.state.log.min.isr = 1
        transaction.state.log.num.partitions = 50
        transaction.state.log.replication.factor = 1
        transaction.state.log.segment.bytes = 104857600
        transactional.id.expiration.ms = 604800000
        unclean.leader.election.enable = false
        zookeeper.connect = localhost:2181
        zookeeper.connection.timeout.ms = 6000
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2017-10-20 09:00:20,934] INFO starting (kafka.server.KafkaServer)
[2017-10-20 09:00:20,966] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2017-10-20 09:00:21,024] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2017-10-20 09:00:21,042] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,042] INFO Client environment:host.name=Donald_Draper.server.com (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,042] INFO Client environment:java.version=1.8.0_91 (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,042] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.91-1.b14.el7_2.x86_64/jre (org.apache.zookeeper.ZooKeeper)

...

[2017-10-20 09:00:21,043] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:os.version=3.10.0-327.22.2.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:user.name=donald (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:user.home=/home/donald= (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,043] INFO Client environment:user.dir=/home/donald=/kafka_2.11-0.11.0.1/bin (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,044] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1139b2f3 (org.apache.zookeeper.ZooKeeper)
[2017-10-20 09:00:21,065] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2017-10-20 09:00:21,203] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2017-10-20 09:00:21,337] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2017-10-20 09:00:21,339] INFO Accepted socket connection from /127.0.0.1:35852 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-20 09:00:21,455] INFO Client attempting to establish new session at /127.0.0.1:35852 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 09:00:21,476] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
[2017-10-20 09:00:21,498] INFO Established session 0x15f374a56ac0000 with negotiated timeout 6000 for client /127.0.0.1:35852 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-20 09:00:21,501] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x15f374a56ac0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2017-10-20 09:00:21,507] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2017-10-20 09:00:21,601] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x5 zxid:0x3 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:21,669] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0xb zxid:0x7 txntype:-1 reqpath:n/a Error Path:/config Error:KeeperErrorCode = NoNode for /config (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:21,691] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x13 zxid:0xc txntype:-1 reqpath:n/a Error Path:/admin Error:KeeperErrorCode = NoNode for /admin (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:21,902] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x1d zxid:0x12 txntype:-1 reqpath:n/a Error Path:/cluster Error:KeeperErrorCode = NoNode for /cluster (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:21,912] INFO Cluster ID = IGYQ5SHuTHemtryrMBrFWA (kafka.server.KafkaServer)
[2017-10-20 09:00:22,044] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-10-20 09:00:22,177] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-10-20 09:00:22,177] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-10-20 09:00:22,555] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2017-10-20 09:00:22,753] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2017-10-20 09:00:22,773] INFO Loading logs. (kafka.log.LogManager)
[2017-10-20 09:00:22,785] INFO Logs loading complete in 11 ms. (kafka.log.LogManager)
[2017-10-20 09:00:24,089] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2017-10-20 09:00:24,091] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2017-10-20 09:00:24,173] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2017-10-20 09:00:24,178] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer)
[2017-10-20 09:00:24,197] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,198] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,197] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,278] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,295] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,295] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-10-20 09:00:24,308] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-10-20 09:00:24,332] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2017-10-20 09:00:24,335] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-10-20 09:00:24,340] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2017-10-20 09:00:24,350] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:setData cxid:0x29 zxid:0x16 txntype:-1 reqpath:n/a Error Path:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:24,371] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2017-10-20 09:00:24,420] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2017-10-20 09:00:24,472] INFO [Transaction Coordinator 0]: Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-10-20 09:00:24,477] INFO [Transaction Coordinator 0]: Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-10-20 09:00:24,493] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2017-10-20 09:00:24,593] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2017-10-20 09:00:24,638] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:delete cxid:0x45 zxid:0x19 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:24,657] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-10-20 09:00:24,665] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x46 zxid:0x1a txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:24,668] INFO Got user-level KeeperException when processing sessionid:0x15f374a56ac0000 type:create cxid:0x47 zxid:0x1b txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-20 09:00:24,679] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-10-20 09:00:24,681] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(Donald_Draper.server.com,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2017-10-20 09:00:24,682] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-10-20 09:00:24,701] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-20 09:00:24,701] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-20 09:00:24,703] INFO [Kafka Server 0], started (kafka.server.KafkaServer)


3.创建一个主题


创建一个只有一个分区和备份的test Topic,

[donald@Donald_Draper bin]$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
[2017-10-21 17:21:29,587] INFO Accepted socket connection from /127.0.0.1:53310 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-21 17:21:29,592] INFO Client attempting to establish new session at /127.0.0.1:53310 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-21 17:21:29,598] INFO Established session 0x15f3e3b9f1b0001 with negotiated timeout 30000 for client /127.0.0.1:53310 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-21 17:21:29,762] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0001 type:setData cxid:0x4 zxid:0x27 txntype:-1 reqpath:n/a Error Path:/config/topics/test Error:KeeperErrorCode = NoNode for /config/topics/test (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:21:29,773] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0001 type:create cxid:0x6 zxid:0x28 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics (org.apache.zookeeper.server.PrepRequestProcessor)
Created topic "test".
[2017-10-21 17:21:29,795] INFO Processed session termination for sessionid: 0x15f3e3b9f1b0001 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:21:29,799] INFO Closed socket connection for client /127.0.0.1:53310 which had sessionid 0x15f3e3b9f1b0001 (org.apache.zookeeper.server.NIOServerCnxn)
[2017-10-21 17:21:29,834] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0000 type:create cxid:0x3f zxid:0x2c txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:21:29,839] INFO Got user-level KeeperException when processing sessionid:0x15f3e3b9f1b0000 type:create cxid:0x40 zxid:0x2d txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:21:29,883] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions test-0 (kafka.server.ReplicaFetcherManager)
[2017-10-21 17:21:29,935] INFO Loading producer state from offset 0 for partition test-0 with message format version 2 (kafka.log.Log)
[2017-10-21 17:21:29,944] INFO Completed load of log test-0 with 1 log segments, log start offset 0 and log end offset 0 in 35 ms (kafka.log.Log)
[2017-10-21 17:21:29,948] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.11.0-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2017-10-21 17:21:29,950] INFO Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition test-0 (kafka.cluster.Partition)
[2017-10-21 17:21:29,955] INFO Replica loaded for partition test-0 with initial high watermark 0 (kafka.cluster.Replica)
[2017-10-21 17:21:29,958] INFO Partition [test,0] on broker 0: test-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[donald@Donald_Draper bin]$ 

我们可以通过kafka-topics命令查看topic信息,具体如下:

[donald@Donald_Draper bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181
[2017-10-21 17:23:13,183] INFO Accepted socket connection from /127.0.0.1:53312 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2017-10-21 17:23:13,187] INFO Client attempting to establish new session at /127.0.0.1:53312 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-21 17:23:13,191] INFO Established session 0x15f3e3b9f1b0002 with negotiated timeout 30000 for client /127.0.0.1:53312 (org.apache.zookeeper.server.ZooKeeperServer)
test
[2017-10-21 17:23:13,253] INFO Processed session termination for sessionid: 0x15f3e3b9f1b0002 (org.apache.zookeeper.server.PrepRequestProcessor)
[2017-10-21 17:23:13,255] INFO Closed socket connection for client /127.0.0.1:53312 which had sessionid 0x15f3e3b9f1b0002 (org.apache.zookeeper.server.NIOServerCnxn)
[donald@Donald_Draper bin]$ 

从上面来看,有一个test主题。

另外,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。

4.生产消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。

[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>my name is donald.
>hello!


5.消费消息

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
重新启动一个终端,用于消费者消费消息:

[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
my name is donald.
hello!



如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
我们在生产者终端生产消息,在消息者终端消费消息。

在生产者终端继续生产如下消息:
[donald@Donald_Draper bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>my name is donald.
>hello!
>what's your name?
>

消息者终端显示如下:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
my name is donald.
hello!
what's your name?



所有的命令行工具有很多的选项,可以运行无参数命令,查看命令文档来了解更多的功能。

比如kafka-topics命令:
[donald@Donald_Draper bin]$ ./kafka-topics.sh
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--config <String: name=value>            A topic configuration override for the 
                                           topic being created or altered.The   
                                           following is a list of valid         
                                           configurations:                      
                                                cleanup.policy                        
                                                compression.type                      
                                                delete.retention.ms                   
                                                file.delete.delay.ms                  
                                                flush.messages                        
                                                flush.ms                              
                                                follower.replication.throttled.       
                                           replicas                             
                                                index.interval.bytes                  
                                                leader.replication.throttled.replicas 
                                                max.message.bytes                     
                                                message.format.version                
                                                message.timestamp.difference.max.ms   
                                                message.timestamp.type                
                                                min.cleanable.dirty.ratio             
                                                min.compaction.lag.ms                 
                                                min.insync.replicas                   
                                                preallocate                           
                                                retention.bytes                       
                                                retention.ms                          
                                                segment.bytes                         
                                                segment.index.bytes                   
                                                segment.jitter.ms                     
                                                segment.ms                            
                                                unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs.        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option).                    
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--force                                  Suppress console prompts               
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting       
                                           topics, the action will only execute 
                                           if the topic exists                  
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist         
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected                     
--replica-assignment <String:            A list of manual partition-to-broker   
  broker_id_for_part1_replica1 :           assignments for the topic being      
  broker_id_for_part1_replica2 ,           created or altered.                  
  broker_id_for_part2_replica1 :                                                
  broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        
  replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to be create, alter or       
                                           describe. Can also accept a regular  
                                           expression except for --create option
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--zookeeper <String: urls>               REQUIRED: The connection string for    
                                           the zookeeper connection in the form 
                                           host:port. Multiple URLS can be      
                                           given to allow fail-over

删除topic命令:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
Topic test is marked for deletion.4.生产发送消息
Note: This will have no impact if delete.topic.enable is not set to true.
[donald@Donald_Draper bin]$ 

再次浏览topic:
[donald@Donald_Draper bin]$ ./kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test
[donald@Donald_Draper bin]$ 

从上面的输出来看,我们并没有删除test topic主要是因为我们没有开启broker的如下配置:
# Switch to enable topic deletion or not, default value is false
是否可以删除topic,如果为true,我们可以在命令行删除topic,否则,不能。
#delete.topic.enable=true



上面的操作我们启动的使standy模式,下面我们来,搭建一个kafka集群。
在搭建集群之前,先关闭原先的standy kafka:
[donald@Donald_Draper bin]$ ./kafka-server-stop.sh ../config/server.properties 
[donald@Donald_Draper bin]$ ./zookeeper-server-stop.sh  ../config/zookeeper.properties 

kafka集群,我们放在下一篇来讲。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics