Kafka学习(3)——测试
首先,简单介绍下kafka。
Kafka是由LinkedIn开发的分布式的、基于发布/订阅的消息系统,以可水平扩展和高吞吐率而被广泛使用。主要设计目标如下:
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展。
个人认为kafka是个万金油,不仅可以作为消息件,还可以在数据处理时当作分布式管道来运用,还可以作为蓄水池,实在是特别心动。
究竟kafka具体性能如何,还是需要亲自测一下才有底和放心。测试方法主要参考Kafka的创始人之一的Jay Kreps的bechmark。
1 测试环境
见附录。
2 测试内容
测试内容包括:
- producer吞吐率
- producer吞吐率和已存储的数据量间的关系
- producer吞吐率和partition数量的关系
- producer吞吐率和producer数量的关系
- producer吞吐率和同步复制/异步复制的关系
- producer的offset
- consumer吞吐率和consumer数量的关系
- consumer吞吐率和replica数量的关系
- consumer吞吐率和同步复制/异步复制的关系
- consumer的offset
- producer数量对consumer吞吐率的影响
- consumer数量对producer吞吐率的影响
- 消息长度对吞吐率的影响
- snappy压缩率
- 日志过期情况
2.1 producer吞吐率
该项测试只测producer的吞吐率,也就是数据只被持久化,没有consumer读数据。
(1) 吞吐率
重复10次。测试如下:
该测试在1台测试机设置1个producer,向有1个partition,1个replication的topic发送100 million的消息,消息大小100bytes。
图Producer Throughput是吞吐率图。从图中可以看到,吞吐率约500000~600000records/sec。
结论:单独producer运行时吞吐率约700000~800000records/sec。
(2) 已存储的数据量对producer吞吐率的影响
测试如下:
kafka集群,4 broker,4 partition,1 replication
该测试在1台测试机设置1个producer,向有4个partition,1个replication的topic发送1600 million的消息,消息大小1000bytes 。
图Producer Throughput VS Stored Data是吞吐率与数据量大小的关系图。可以明显看到,吞吐率存在一些偏差,kafka作者解释,偏差是由Linux I/O管理造成的,它会把数据缓存起来再批量flush。实际上从图中可以看到,当磁盘数据量达到1T时,吞吐率和磁盘数据只有几百MB时没有明显区别,吞吐率并不受磁盘上所存数据量大小的影响。
结论:producer吞吐率基本不受磁盘上所存数据量大小的影响。
(3) partition数量对producer吞吐率的影响
如果1个Topic对应1个partition ,那这个partition所在磁盘I/O将会成为这个Topic的性能瓶颈,将一个topic拆分为多个partition后,不同的消息可以并行写入不同磁盘的不同partition里,极大的提高了吞吐率。
这里需要注意的是,不同partition需要位于不同的磁盘。如果多个partition位于同一个磁盘,那么意味着有多个进程同时对一个磁盘的多个文件进行读写,使得操作系统会对磁盘读写进行频繁调度,也就是破坏了磁盘读写的连续性。
同一磁盘,测试如下:
(1) 在1台测试机设置1个producer,向有1个partition,1个replication的topic发送100 million的消息,消息大小100bytes ;
(2) partition加1,重复(1)。
不同磁盘,测试如下:
设置1台kafka为1个partition,1个replication;在另一台测试机设置1个producer,向kafka有1个partition,0个replication的topic发送100 million的消息,消息大小100bytes ;
设置2台kafka为2个partition,1个replication;在另一台测试机设置1个producer,向kafka有2个partition,0个replication的topic发送100 million的消息,消息大小100bytes ;
设置3台kafka为3个partition,1个replication;在另一台测试机设置1个producer,向kafka有3个partition,0个replication的topic发送100 million的消息,消息大小100bytes ;
图Producer Throughput VS Partition Number是吞吐率与partition数量的关系图。 在同一磁盘上增加partition,吞吐率基本上没有提升。partition位于不同磁盘,增加partition,吞吐率获得极大提高。
结论:同一磁盘增加partition,吞吐率基本无影响;不同磁盘增加partition,可极大提高吞吐率。
(4) producer数量对producer吞吐率的影响
不同机器。受限于机器数量,只做3组测试,每组重复10次。三组测试如下:
1个producer 。该测试在1台测试机设置1个producer ,向有1个partition,1个replication的topic发送50 million的消息,消息大小100bytes 。
2个producer 。该测试在2台测试机分别设置1个producer,同时向有1个partition,1个replication的topic发送50 million的消息,消息大小100bytes 。
3个producer 。该测试在3台测试机分别设置1个producer,同时向有1个partition,1个replication的topic发送50 million的消息,消息大小100bytes 。
同一机器。测试如下:
(1) 在1台测试机设置1个producer;
(2) 向有1个partition,1个replication的topic发送50 million的消息,消息大小100bytes 。
(3) 设置producer数量为2,重复(2);
(4) 设置producer数量为4,重复(2);
(5) 设置producer数量为6,重复(2);
(6) 设置producer数量为8,重复(2);
(7) 设置producer数量为10,重复(2);
(8) 设置producer数量为10,重复(2)。
图Producer Throughput VS Producer Number是吞吐率与producer数量的关系图。其中pij表示i个producer同时运行时第j个producer的吞吐率。可以看到,随着producer的增加,producer的吞吐率降低,但producer总的吞吐率获得较大提升。
图Producer Throughput VS Producer Number(at the same host)是吞吐率与producer数量的关系图。其中pi表示i个producer同时运行时各个producer的吞吐率,sum表示producer总吞吐率随producer数量的变化。可以看到,随着producer的增加,单个producer的吞吐率不断降低,但producer总的吞吐率获得较大提升。
结论:增加producer数量,单个producer的吞吐率降低,但producer总的吞吐率几乎线性增加。
(5) 同步复制/异步复制
异步replica。4组测试,每组重复10次。4组测试如下:
1个异步replica 。该测试有4个broker,在1台测试机设置1个producer,向有4个partition,1个replication的topic发送50 million的消息,消息大小100bytes 。
2个异步replica 。该测试有4个broker,在1台测试机设置1个producer,向有4个partition,2个replication的topic发送50 million的消息,消息大小100bytes 。
3个异步replica 。该测试有4个broker,在1台测试机设置1个producer,向有4个partition,3个replication的topic发送50 million的消息,消息大小100bytes 。
4个异步replica 。该测试有4个broker,在1台测试机设置1个producer,向有4个partition,4个replication的topic发送50 million的消息,消息大小100bytes 。
同步replica。4组测试,每组重复10次。4组测试如下:
1个同步replica 。该测试有4个broker,在1台测试机设置1个producer,向有4个partition,1个replication的topic发送50 million的消息,消息大小100bytes 。
2个同步replica 。该测试有4个broker,在1台测试机设置1个producer,向有4个partition,2个replication的topic发送50 million的消息,消息大小100bytes 。
3个同步replica 。该测试有4个broker,在1台测试机设置1个producer,向有4个partition,3个replication的topic发送50 million的消息,消息大小100bytes 。
4个同步replica 。该测试有4个broker,在1台测试机设置1个producer,向有4个partition,4个replication的topic发送50 million的消息,消息大小100bytes 。
增加replica后,整个集群存储能力会由于n倍的replication而只有原来的1/n。异步replica,producer吞吐率几乎无影响。同步replica,由于需要等待follower,producer吞吐率受影响。
结论:异步replica对producer吞吐率几乎无影响,同步replica对producer吞吐率有影响。
(6) offset
结论:producer的offset累加。
2.2 consumer吞吐率
Kafka由Producer向broker push消息,并由Consumer从broker pull消息。pull模式则可以根据Consumer的消费能力以适当的速率消费消息,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
(1) consumer数量
结论:有效consumer数量和partition数量相关,多于partition数量的consumer会被阻塞。
(2) replica数量
3组测试,每组重复10次。三组测试如下:
1个replica。该测试在1台测试机设置1个consumer,从有1个partition,1个replication的topic消费50 million的消息,消息大小100bytes 。
2个replica。该测试在1台测试机设置1个consumer,从有1个partition,2个replication的topic消费50 million的消息,消息大小100bytes 。
3个replica。该测试在1台测试机设置1个consumer,从有1个partition,3个replication的topic消费50 million的消息,消息大小100bytes 。
结论:replica数量不影响consumer的吞吐率。
(3) 同步复制/异步复制
结论:consumer吞吐率也与同步复制还是异步复制无关。
(4) offset
结论:consumer设置auto.offset.reset为smallest,若offset小于最早的记录,则从最早的记录消费,若offset大于最早的记录,则从offset处的记录消费;设置为largest,从最新的记录消费。
2.3 producer和consumer
(1) producer数量对consumer吞吐率的影响
3组测试,重复10次。测试如下:
1个producer,1个consumer。该测试在2台测试机分别设置1个producer和1个consumer,同时运行producer和consumer,对有1个partition,1个replication的topic,producer向其发送50 million的消息,消息大小100bytes ,consumer从其消费50 million的消息。
2个producer,1个consumer。该测试在3台测试机分别设置1个producer、1个producer和1个consumer,同时运行producer和consumer,对有1个partition,1个replication的topic,producer向其发送50 million的消息,消息大小100bytes ,consumer从其消费50 million的消息。
3个producer,1个consumer。该测试在4台测试机分别设置1个producer、1个producer、1个producer和1个consumer,同时运行producer和consumer,对有1个partition,1个replication的topic,producer向其发送50 million的消息,消息大小100bytes,consumer从其消费50 million的消息。
图Throughput VS Producer Number展示了同时运行producer和consumer时,不同数量producer情况下producer和consumer的吞吐率,其中ci表示i个producer时,consumer的吞吐率,p0表示单独producer运行时的吞吐率,pij表示i个producer时第i个producer的吞吐率。
可以看到,同时运行producer和consumer,producer吞吐率略低于单独运行producer时的吞吐率,consumer的吞吐率远低于单独运行consumer时的吞吐率,consumer吞吐率仍大于producer吞吐率。随着producer数量的增加,consumer吞吐率不断提高,最后趋于稳定。p33运行在跨网段机器上。
结论1:producer和consumer同时运行时,consumer吞吐率低于单独运行consumer时的吞吐率。
结论2:随着producer数量的增加,consumer吞吐率不断提高,最后趋于稳定。
(2) consumer数量对producer吞吐率的影响
3组测试,重复10次。测试如下:
1个producer,1个consumer。该测试在2台测试机分别设置1个producer和1个consumer,同时运行producer和consumer,对有1个partition,0个replication的topic,producer向其发送50 million的消息,消息大小100bytes ,consumer从有其消费50 million的消息。
1个producer,2个consumer。该测试在3台测试机分别设置1个producer、1个consumer和1个consumer,同时运行producer和consumer,对有1个partition,1个replication的topic,producer向其发送50 million的消息,消息大小100bytes ,consumer从其消费50 million的消息。
1个producer,3个consumer。该测试在4台测试机分别设置1个producer、1个consumer、1个consumer和1个consumer,同时运行producer和consumer,对有1个partition,1个replication的topic,producer向其发送50 million的消息,消息大小100bytes ,consumer从其消费50 million的消息。
图Throughput VS Consumer Number展示了同时运行producer和consumer时,不同数量consumer情况下producer和consumer的吞吐率,其中pi表示i个consumer时,producer的吞吐率,c0表示单独运行consumer时的吞吐率,cij表示i个consumer时第j个consumer的吞吐率。
可以看到, 同时运行producer和consumer,producer吞吐率略低于单独运行producer时的吞吐率,consumer的吞吐率远低于单独运行consumer时的吞吐率,consumer吞吐率仍大于producer吞吐率 。 随着consumer数量的增加,producer吞吐率不断下降,但下降速度不快。c33运行在跨网段机器上。
结论1:producer和consumer同时运行时相互影响,producer和consumer吞吐率均低于单独运行时的吞吐率。
结论2:producer吞吐率和consumer数量成反比。
2.4 消息长度对吞吐率的影响
5组测试如下:
10bytes
100bytes
1000bytes(1kb)
10000bytes(10kb)
100000bytes(100kb)
图Producer Throughput(record/sec) VS Record Size中,随着消息长度的增加,每秒钟所能发送的消息的数量逐渐减小。
图Producer Throughput(MB/sec) VS Record Size中,随着消息长度的增加,每秒钟发送的消息的总大小逐渐增大,从10000Bytes开始,吞吐率趋于饱和。
结论:消息大小10k,吞吐性能最好,消息大小大于10k,吞吐性能提升缓慢。
2.5 snappy压缩率
10G压缩到1G,压缩率10%
结论:snappy压缩率约10%,压缩对kafka吞吐率基本无影响。
2.6 日志过期
Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。
Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。
基于时间删除
测试一如下:
kafka集群,4 broker,4 partition,1 replication, 过期时间为5小时
在1台测试机设置1个producer,T时刻开始向有4个partition,1个replication的topic发送1600 million的消息,消息大小1000bytes 。
测试二如下:
kafka集群,1 broker,4 partition,1 replication, 过期时间为5小时
在1台测试机设置1个producer,T时刻向有4个partition,1个replication的topic发送100 million的消息,消息大小10000bytes 。
T+2(小时)时刻,停止kafka集群,修改过期时间为1小时,重启
测试一,T时刻开始写数据,T+4(小时)后,数据写入完成,T+5(小时)后,数据开始删除,T+9(小时)后,数据全部删除。
测试二,T时刻开始写数据,T+12(分钟)后数据写入完成,T+3(小时)观察发现数据已完全删除。
基于partition文件大小删除
未测试。
结论:基于时间删除日志速度足够快。日志过期时间以当前集群的过期时间配置为准。
3 commands
producer
./kafka-producer-perf-test.sh --topic test1 --num-records 10000000 --record-size 100 --throughput 10000000 --producer-props bootstrap.servers=192.168.0.2:9092
./kafka-consumer-perf-test.sh --zookeeper 192.168.0.1:2181/kafka --messages 50000000 --topic test1 --threads 1
producer和consumer
./kafka_2.10-0.9.0.1/bin/kafka-producer-perf-test.sh --topic test1 --num-records 50000000 --record-size 100 --throughput 50000000 --producer-props bootstrap.servers=192.168.0.2:9092
./kafka_2.10-0.9.0.1/bin/kafka-consumer-perf-test.sh --zookeeper 192.168.0.1:2181/kafka --messages 50000000 --topic test1 --threads 1
附录
附录一 测试环境
机器配置
usage | ip | cpu | disk | RAM | ethernet |
---|---|---|---|---|---|
zookeeper、生成测试数据 | 192.168.0.1 | Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz, 8 processors with 8 cores | 500G | 16G | 1Gb/s |
生成测试数据 | 192.168.0.6 | Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz, 8 processors with 8 cores | 500G | 16G | 1Gb/s |
broker1 of kafka | 192.168.0.2 | Intel(R) Xeon(R) CPU E5-2670 v3 @ 2.30GHz, 8 processors with 4 cores | 500G | 8G | 1Gb/s |
broker2 of kafka | 192.168.0.3 | Intel(R) Xeon(R) CPU E5-2670 v3 @ 2.30GHz, 8 processors with 4 cores | 500G | 8G | 1Gb/s |
broker3 of kafka | 192.168.0.4 | Intel(R) Xeon(R) CPU E5-2670 v3 @ 2.30GHz, 8 processors with 4 cores | 500G | 8G | 1Gb/s |
broker4 of kafka | 192.168.0.5 | Intel(R) Xeon(R) CPU E5-2670 v3 @ 2.30GHz, 8 processors with 4 cores | 500G | 8G | 1Gb/s |
主机名映射
ip | 主机名 |
---|---|
192.168.0.1 | silkworm-test-zookeeper |
192.168.0.2 | silkworm-test-kafka01 |
192.168.0.3 | silkworm-test-kafka02 |
192.168.0.4 | silkworm-test-kafka03 |
192.168.0.5 | silkworm-test-kafka04 |
kafka配置
broker.id=1
listeners=PLAINTEXT://:9092
port=9092
host.name=silkworm-test-kafka01
advertised.host.name=192.168.0.1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/admin/kafka/data/logs
num.partitions=4
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=silkworm-test-zookeeper:2181/kafka
zookeeper.connection.timeout.ms=6000
其他
vim bin/kafka-server-start.sh
==>
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi
还没有评论,来说两句吧...