七 月
16
日曜日

Kafka Producer和Consumer的go语言实现

Producer和Consumer可以直接使用logstashfluentd针对Kafka的input/output插件,也可以基于各语言的库自行实现,比如对于go语言,可以使用Shopify提供的sarama

假定业务场景大概是这样的:

  • Broker - 来保存销售信息
  • Produer - 向Broker发送销售信息
  • Consumer1 - 实时地计算每个商品的已售数量
  • Consumer2 - 对每天的销售信息进行合计

然后将合计的结果保存在Redis并且可以从Redis获得销售排名的信息。Redis的golang客户端使用garyburd/redigo


启动Kafka Broker

通过Docker配置Kafka

可以准备如下的Dockerfile:

FROM java:openjdk-8-jre

ENV DEBIAN_FRONTEND noninteractive

RUN curl -s http://mirror.bit.edu.cn/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz \
    | tar xz -C /opt \
    && mv /opt/kafka_2.11-0.11.0.0 /opt/kafka

ADD run.sh /opt/kafka/run.sh

WORKDIR /opt/kafka

CMD ["/run.sh"]

run.sh用于启动kafka和zookeeper:

# 因为aws ec2免费实例的限制,所以降低kafka的配置
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties

然后用docker build启动容器:

$ docker build -t nepaws/kafka .
$ docker run --rm -it --name kafka \
    --publish 9092:9092 --publish 2181:2181 \
    nepaws/kafka

在Windows上配置Kakfa

将从官网下载的压缩包解压后,在Kafka所在目录以管理员权限执行以下命令:

C:\kafka_2.11-0.11.0.0> mkdir \tmp
C:\kafka_2.11-0.11.0.0> mkdir \log
C:\kafka_2.11-0.11.0.0> start /b bin\windows\zookeeper-server-start.bat config\zookeeper.properties > \log\zookeeper.log
C:\kafka_2.11-0.11.0.0> bin\windows\kafka-server-start.bat config\server.properties

zookeeper和kafka就会运行在默认的2181和9092端口上了。

Producer的实现

sarama提供了AsyncProducer和SyncProducer两种Producer。两种Producer的创建过程基本相同,预先配置sarama.Config.Producer的各个属性,然后调用相应的创建方法。大部分情况下使用的AsyncProducer,以异步的方式产生消息并且用channel来就接收。SyncProducer在产生消息的时候一直会阻塞直至Kafka已经知晓消息已产生,这样的做法当然也会造成一部分性能上的损失。SyncProducer实际上的durability guarantees会依赖于创建Producer时 Producer.RequiredAcks (等价于JVM Producer的 request.required.acks )的设定值,而且还会有message已被SyncProducer acknowledge却仍然会丢失的情况。

AsyncProducer

func (ap *asyncProducer) InitProducer(brokers []string) {
	
	config := sarama.NewConfig()

	producer, err := sarama.NewAsyncProducer(brokers, config)
	if err != nil {
		panic(err)
	}
	
	ap.producer = producer
}

func (ap *asyncProducer) ProduceMessage(topic string, encoder sarama.Encoder) error {

	message := &sarama.ProducerMessage{
		Topic: topic,
		Value: encoder,
	}

	ap.producer.Input() <- message

	log.Println(message)
	
	return nil
}

SyncProducer

Producer.RequiredAcks 有WaitForAll、WaitForLocal和NoResponse三种配置。

func (sp *syncProducer) InitProducer(brokers []string) {
	
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Return.Successes = true

	producer, err := sarama.NewSyncProducer(brokers, config)
	if err != nil {
		panic(err)
	}
	
	sp.producer = producer
}

func (sp *syncProducer) ProduceMessage(topic string, encoder sarama.Encoder) error {
	
	message := &sarama.ProducerMessage{
		Topic: topic,
		Value: encoder,
	}

	partition, offset, err := sp.producer.SendMessage(message)
	if err != nil {
		panic(err)
	}

	log.Printf("Partition: %v, Offset: %v\n", partition, offset)

	return nil
}

Consumer的实现

sarama.Consumer的创建方式和Producer类似,也是先配置Config在调用NewConsumer方法。(通过NewConsumer所创建的,其实是ConsumerGroup?)然后再根据给定的topic和partition创建一个专用于这个partition的consumer。如果已经有Consumer在消费这组给定的topic/partition,创建方法会返回错误。通过这样的机制可以保证 同一个Partition的数据,只能被同一Consumer Group中的同一Consumer消费

func initConsumer() sarama.Consumer {
	config := sarama.NewConfig()

	consumer, err := sarama.NewConsumer(brokers, config)
	if err != nil {
		panic(err)
	}

	return consumer
}

func createPartitionConsumer() sarama.PartitionConsumer {
	partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}

	return partitionConsumer
}

func main() {
	
	consumer = initConsumer()
	defer consumer.Close()

	partitionConsumer := createPartitionConsumer()
	defer partitionConsumer.Close()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	ch := make(chan struct{})

	go func() {
		for {
			select {
			case msg := <-partitionConsumer.Messages():
				consumeMessage(msg)
				autoIncrement++
			case err := <-partitionConsumer.Errors():
				log.Println(err)
			case <-signals:
				ch <- struct{}{}
			}
		}
	}()

	<-ch
}