Producer和Consumer可以直接使用logstash和fluentd针对Kafka的input/output插件,也可以基于各语言的库自行实现,比如对于go语言,可以使用Shopify提供的sarama。
假定业务场景大概是这样的:
- Broker - 来保存销售信息
- Produer - 向Broker发送销售信息
- Consumer1 - 实时地计算每个商品的已售数量
- Consumer2 - 对每天的销售信息进行合计
然后将合计的结果保存在Redis并且可以从Redis获得销售排名的信息。Redis的golang客户端使用garyburd/redigo。
启动Kafka Broker
通过Docker配置Kafka
可以准备如下的Dockerfile:
FROM java:openjdk-8-jre
ENV DEBIAN_FRONTEND noninteractive
RUN curl -s http://mirror.bit.edu.cn/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz \
| tar xz -C /opt \
&& mv /opt/kafka_2.11-0.11.0.0 /opt/kafka
ADD run.sh /opt/kafka/run.sh
WORKDIR /opt/kafka
CMD ["/run.sh"]
run.sh用于启动kafka和zookeeper:
# 因为aws ec2免费实例的限制,所以降低kafka的配置
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties
然后用docker build
启动容器:
$ docker build -t nepaws/kafka .
$ docker run --rm -it --name kafka \
--publish 9092:9092 --publish 2181:2181 \
nepaws/kafka
在Windows上配置Kakfa
将从官网下载的压缩包解压后,在Kafka所在目录以管理员权限执行以下命令:
C:\kafka_2.11-0.11.0.0> mkdir \tmp
C:\kafka_2.11-0.11.0.0> mkdir \log
C:\kafka_2.11-0.11.0.0> start /b bin\windows\zookeeper-server-start.bat config\zookeeper.properties > \log\zookeeper.log
C:\kafka_2.11-0.11.0.0> bin\windows\kafka-server-start.bat config\server.properties
zookeeper和kafka就会运行在默认的2181和9092端口上了。
Producer的实现
sarama提供了AsyncProducer和SyncProducer两种Producer。两种Producer的创建过程基本相同,预先配置sarama.Config.Producer的各个属性,然后调用相应的创建方法。大部分情况下使用的AsyncProducer,以异步的方式产生消息并且用channel来就接收。SyncProducer在产生消息的时候一直会阻塞直至Kafka已经知晓消息已产生,这样的做法当然也会造成一部分性能上的损失。SyncProducer实际上的durability guarantees会依赖于创建Producer时 Producer.RequiredAcks
(等价于JVM Producer的 request.required.acks
)的设定值,而且还会有message已被SyncProducer acknowledge却仍然会丢失的情况。
AsyncProducer
func (ap *asyncProducer) InitProducer(brokers []string) {
config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
panic(err)
}
ap.producer = producer
}
func (ap *asyncProducer) ProduceMessage(topic string, encoder sarama.Encoder) error {
message := &sarama.ProducerMessage{
Topic: topic,
Value: encoder,
}
ap.producer.Input() <- message
log.Println(message)
return nil
}
SyncProducer
Producer.RequiredAcks 有WaitForAll、WaitForLocal和NoResponse三种配置。
func (sp *syncProducer) InitProducer(brokers []string) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
panic(err)
}
sp.producer = producer
}
func (sp *syncProducer) ProduceMessage(topic string, encoder sarama.Encoder) error {
message := &sarama.ProducerMessage{
Topic: topic,
Value: encoder,
}
partition, offset, err := sp.producer.SendMessage(message)
if err != nil {
panic(err)
}
log.Printf("Partition: %v, Offset: %v\n", partition, offset)
return nil
}
Consumer的实现
sarama.Consumer的创建方式和Producer类似,也是先配置Config在调用NewConsumer方法。(通过NewConsumer所创建的,其实是ConsumerGroup?)然后再根据给定的topic和partition创建一个专用于这个partition的consumer。如果已经有Consumer在消费这组给定的topic/partition,创建方法会返回错误。通过这样的机制可以保证 同一个Partition的数据,只能被同一Consumer Group中的同一Consumer消费 。
func initConsumer() sarama.Consumer {
config := sarama.NewConfig()
consumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
return consumer
}
func createPartitionConsumer() sarama.PartitionConsumer {
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
return partitionConsumer
}
func main() {
consumer = initConsumer()
defer consumer.Close()
partitionConsumer := createPartitionConsumer()
defer partitionConsumer.Close()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
ch := make(chan struct{})
go func() {
for {
select {
case msg := <-partitionConsumer.Messages():
consumeMessage(msg)
autoIncrement++
case err := <-partitionConsumer.Errors():
log.Println(err)
case <-signals:
ch <- struct{}{}
}
}
}()
<-ch
}