Kclient
KClient是一个简单易用,有效集成,高性能,高稳定的Kafka Java客户端。
Install / Use
/learn @henrypfhu/KclientREADME
Kafka客户端(KClient)
KClient是一个简单易用,有效集成,高性能,高稳定的Kafka Java客户端。
此文档包含了背景介绍、功能特性、使用指南、API简介、后台监控和管理、消息处理机模板项目、架构设计以及性能压测相关章节。如果你想使用KClient快速的构建Kafka处理机服务,请参考消息处理机模板项目章节; 如果你想了解KClient的其他使用方式、功能特性、监控和管理等,请参考背景介绍、功能恶性、使用指南、API简介、后台监控和管理等章节; 如果你想更深入的理解KClient的架构设计和性能Benchmark,请参考架构设计和性能压测章节。
背景介绍
消息队列在互联网领域里得到了广泛的应用,它多应用在异步处理,模块之间的解偶和高并发的消峰等场景,消息队列中表现最好的当属Apache开源项目Kafka,Kafka使用支持高并发的Scala语言开发,利用操作系统的缓存原理达到高性能,并且天生具有可分区,分布式的特点,而且有不同语言的客户端,使用起来非常的方便。KClient提供了一个简单易用,有效集成,高性能,高稳定的Kafka Java客户端。
在继续介绍KClient的功能特性,使用方法和架构设计之前,读者需要对Kafka进行基本的学习和了解。如果你是Kafka的初学者或者从未接触过Kafka,请参考我的博客文章:Kafka的那些事儿。如果你英文还不错,也可以直接参考Kafka官方在线文档:Kafka 0.8.2 Documentation。
功能特性
1.简单易用
简化了Kafka客户端API的使用方法, 特别是对消费端开发,消费端开发者只需要实现MessageHandler接口或者相关子类,在实现中处理消息完成业务逻辑,并且在主线程中启动封装的消费端服务器即可。它提供了各种常用的MessageHandler,框架自动转换消息到领域对象模型或者JSON对象等数据结构,让开发者更专注于业务处理。如果使用服务源码注解的方式声明消息处理机的后台,可以将一个通用的服务方法直接转变成具有完善功能的处理Kafka消息队列的处理机,使用起来极其简单,代码看起来一目了然,在框架级别通过多种线程池技术保证了处理机的高性能。
在使用方面,它提供了多种使用方式:1. 直接使用Java API; 2. 与Spring环境无缝集成; 3. 服务源码注解,通过注解声明方式启动Kafka消息队列的处理机。除此之外,它基于注解提供了消息处理机的模板项目,可以根据模板项目通过配置快速开发Kafka的消息处理机。
2.高性能
为了在不同的业务场景下实现高性能, 它提供不同的线程模型: 1. 适合轻量级服务的同步线程模型; 2. 适合IO密集型服务的异步线程模型(细分为所有消费者流共享线程池和每个流独享线程池)。另外,异步模型中的线程池也支持确定数量线程的线程池和线程数量可伸缩的线程池。
3.高稳定性
框架级别处理了通用的异常,计入错误日志,可用于错误手工恢复或者洗数据,并实现了优雅关机和重启等功能。
使用指南
KClient提供了三种使用方法,对于每一种方法,按照下面的步骤可快速构建Kafka生产者和消费者程序。
前置步骤
1).下载源代码后在项目根目录执行如下命令安装打包文件到你的Maven本地库。
mvn install
2).在你的项目pom.xml文件中添加对KClient的依赖。
<dependency>
<groupId>com.robert.kafka</groupId>
<artifactId>kclient-core</artifactId>
<version>0.0.1</version>
</dependency>
3).根据Kafka官方文档搭建Kafka环境,并创建两个Topic, test1和test2。
4).然后,从Kafka安装目录的config目录下拷贝kafka-consumer.properties和kafka-producer.properties到你的项目类路径下,通常是src/main/resources目录。
1.Java API
Java API提供了最直接,最简单的使用KClient的方法。
构建Producer示例:
KafkaProducer kafkaProducer = new KafkaProducer("kafka-producer.properties", "test");
for (int i = 0; i < 10; i++) {
Dog dog = new Dog();
dog.setName("Yours " + i);
dog.setId(i);
kafkaProducer.sendBean2Topic("test", dog);
System.out.format("Sending dog: %d \n", i + 1);
Thread.sleep(100);
}
构建Consumer示例:
DogHandler mbe = new DogHandler();
KafkaConsumer kafkaConsumer = new KafkaConsumer("kafka-consumer.properties", "test", 1, mbe);
try {
kafkaConsumer.startup();
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
} finally {
kafkaConsumer.shutdownGracefully();
}
public class DogHandler extends BeanMessageHandler<Dog> {
public DogHandler() {
super(Dog.class);
}
protected void doExecuteBean(Dog dog) {
System.out.format("Receiving dog: %s\n", dog);
}
}
2.Spring环境集成
KClient可以与Spring环境无缝集成,你可以像使用Spring Bean一样来使用KafkaProducer和KafkaConsumer。
构建Producer示例:
ApplicationContext ac = new ClassPathXmlApplicationContext("kafka-producer.xml");
KafkaProducer kafkaProducer = (KafkaProducer) ac.getBean("producer");
for (int i = 0; i < 10; i++) {
Dog dog = new Dog();
dog.setName("Yours " + i);
dog.setId(i);
kafkaProducer.send2Topic("test", JSON.toJSONString(dog));
System.out.format("Sending dog: %d \n", i + 1);
Thread.sleep(100);
}
<bean name="producer" class="com.robert.kafka.kclient.core.KafkaProducer" init-method="init">
<property name="propertiesFile" value="kafka-producer.properties"/>
<property name="defaultTopic" value="test"/>
</bean>
构建Consumer示例:
ApplicationContext ac = new ClassPathXmlApplicationContext(
"kafka-consumer.xml");
KafkaConsumer kafkaConsumer = (KafkaConsumer) ac.getBean("consumer");
try {
kafkaConsumer.startup();
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
} finally {
kafkaConsumer.shutdownGracefully();
}
public class DogHandler extends BeanMessageHandler<Dog> {
public DogHandler() {
super(Dog.class);
}
protected void doExecuteBean(Dog dog) {
System.out.format("Receiving dog: %s\n", dog);
}
}
<bean name="dogHandler" class="com.robert.kafka.kclient.sample.api.DogHandler" />
<bean name="consumer" class="com.robert.kafka.kclient.core.KafkaConsumer" init-method="init">
<property name="propertiesFile" value="kafka-consumer.properties" />
<property name="topic" value="test" />
<property name="streamNum" value="1" />
<property name="handler" ref="dogHandler" />
</bean>
3.服务源码注解
KClient提供了类似Spring声明式的编程方法,使用注解声明Kafka处理器方法,所有的线程模型、异常处理、服务启动和关闭等都由后台服务自动完成,极大程度的简化了API的使用方法,提高了开发者的工作效率。
注解声明Kafka消息处理器:
@KafkaHandlers
public class AnnotatedDogHandler {
@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1)
@OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1")
public Cat dogHandler(Dog dog) {
System.out.println("Annotated dogHandler handles: " + dog);
return new Cat(dog);
}
@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1)
public void catHandler(Cat cat) throws IOException {
System.out.println("Annotated catHandler handles: " + cat);
throw new IOException("Man made exception.");
}
@ErrorHandler(exception = IOException.class, topic = "test1")
public void ioExceptionHandler(IOException e, String message) {
System.out.println("Annotated excepHandler handles: " + e);
}
}
注解启动程序:
public static void main(String[] args) {
ApplicationContext ac = new ClassPathXmlApplicationContext(
"annotated-kafka-consumer.xml");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
注解Spring环境配置:
<bean name="kClientBoot" class="com.robert.kafka.kclient.boot.KClientBoot" init-method="init"/>
<context:component-scan base-package="com.robert.kafka.kclient.sample.annotation" />
API简介
1.Producer API
KafkaProducer类提供了丰富的API来发送不同类型的消息,它支持发送字符串消息,发送一个普通的Bean,以及发送JSON对象等。在这些API中可以指定发送到某个Topic,也可以不指定而使用默认的Topic。对于发送的数据,支持带Key值的消息和不带Key值的消息。
发送字符串消息:
public void send(String message);
public void send2Topic(String topicName, String message);
public void send(String key, String message);
public void send2Topic(String topicName, String key, String message);
public void send(Collection<String> messages);
public void send2Topic(String topicName, Collection<String> messages);
public void send(Map<String, String> messages);
public void send2Topic(String topicName, Map<String, String> messages);
发送Bean消息:
public <T> void sendBean(T bean);
public <T> void sendBean2Topic(String topicName, T bean);
public <T> void sendBean(String key, T bean);
public <T> void sendBean2Topic(String topicName, String key, T bean);
public <T> void sendBeans(Collection<T> beans);
public <T> void sendBeans2Topic(String topicName, Collection<T> beans);
public <T> void sendBeans(Map<String, T> beans);
public <T> void sendBeans2Topic(String topicName, Map<String, T> beans);
发送JSON对象消息:
public void sendObject(JSONObject jsonObject);
public void sendObject2Topic(String topicName, JSONObject jsonObject);
public void sendObject(String key, JSONObject jsonObject);
public void sendObject2Topic(String topicName, String key, JSONObject jsonObject);
public void sendObjects(JSONArray jsonArray);
public void sendObjects2Topic(String topicName, JSONArray jsonArray);
public void sendObjects(Map<String, JSONObject> jsonObjects);
public void sendObjects2Topic(String topicName, Map<String, JSONObject> jsonObjects);
2.Consumer API
KafkaConsumer类提供了丰富的构造函数用来指定Kafka消费者服务器的各项参数,包括线程池策略,线程池类型,流数量等等。
使用PROPERTIES文件初始化:
public KafkaConsumer(String propertiesFile, String topic, int streamNum, MessageHandler handler);
public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, MessageHandler handler);
public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler);
public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler);
public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler);
使用PROPERTIES对象初始化:
public KafkaConsumer(Properties properties, String topic, int streamNum, MessageHandler handler);
public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, MessageHandler handler);
public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler);
public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler);
public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler);
3.消息处理器
消息处理器结构提供了一个基本接口,并且提供了不同的抽象类实现不同层次的功能,让功能得到最大化的重用,并且互相解偶,开发者可以根据需求选择某一个抽象类来继承和使用。
接口定义:
public interface MessageHandler {
public void execute(String message);
}
安全处理异常抽象类:
public abstract class SafelyMessageHandler implements MessageHandler {
public void execute(String message) {
try {
doExecute(message);
} catch (Throwable t) {
handleException(t, message);
}
}
protected void handleException(Throwable t, String message) {
for (ExceptionHandler excepHandler : excepHandlers) {
if (t.getClass() == IllegalStateException.class
&& t.getCause() != null
&& t.getCause().getClass() == InvocationTargetException.class
&& t.getCause().getCause() != null)
t = t.getCause().getCause();
if (excepHandler.support(t)) {
try {
excepHandler.handle(t, message);
} catch (Exception e) {
log.error(
"Exception hanp
