1、kafka是什么
-
Apache Kafka是一个开源消息系统,由Scala写成
-
Kafka最初是由LinkedIn开发,并于2011年初开源
-
Kafka是一个分布式消息队列:生产者消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现
-
Kafka对消息保存时根据Topic进行分类,发送消息者称为producer,消息接受者称为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例称为broker
-
无论是Kafka集群还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统的可用性
JMS:jms是Java提供的一套技术规范。
可以用来异构系统集成通信,缓解系统瓶颈,提高系统的伸缩性增强系统用户体验,使得系统模块化和组件化变得可行并更加灵活。
类JMS消息队列,结合JMS中的两种模式,可以有多个消费者主动拉取数据,在JMS中只有点对点模式才有消费者主动拉取数据。
kafka是一个生产-消费模型。
01.Producer:生产者
只负责数据生产,生产者的代码可以集成到任务系统中。 数据的分发策略由producer决定,默认是defaultPartition Utils.abs(key.hashCode) % numPartitions
02.Broker:
当前服务器上的Kafka进程,俗称拉皮条。只管数据存储,不管是谁生产,不管是谁消费。在集群中每个broker都有一个唯一brokerid,不得重复。
03.Topic:
目标发送的目的地,这是一个逻辑上的概念,落到磁盘上是一个partition的目录。partition的目录中有多个segment组合(index,log)
一个Topic对应多个partition[0,1,2,3],一个partition对应多个segment组合。一个segment有默认的大小是1G。
每个partition可以设置多个副本(replication-factor 1),会从所有的副本中选取一个leader出来。所有读写操作都是通过leader来进行的。
特别强调,和mysql中主从有区别,mysql做主从是为了读写分离,在kafka中读写操作都是leader。
04.ConsumerGroup:
数据消费者组,ConsumerGroup可以有多个,每个ConsumerGroup消费的数据都是一样的。
可以把多个consumer线程划分为一个组,组里面所有成员共同消费一个topic的数据,组员之间不能重复消费。
(在下面代码配置文件中,可以设置groupID和读取的位置)
05.zookeeper
依赖集群保存meta信息(每次读取到哪的信息)。
2、kafka生产数据时的分组策略
默认是defaultPartition Utils.abs(key.hashCode) % numPartitions
上文中的key是producer在发送数据时传入的,produer.send(KeyedMessage(topic,myPartitionKey,messageContent))
3、kafka如何保证数据的完全生产
ack机制:broker表示发来的数据已确认接收无误,表示数据已经保存到磁盘。
0:不等待broker返回确认消息
1:等待topic中某个partition leader保存成功的状态反馈
-1:等待topic中某个partition 所有副本都保存成功的状态反馈
4、broker如何保存数据
在理论环境下,broker按照顺序读写的机制,可以每秒保存600M的数据。主要通过pagecache机制,尽可能的利用当前物理机器上的空闲内存来做缓存。
当前topic所属的broker,必定有一个该topic的partition,partition是一个磁盘目录。partition的目录中有多个segment组合(index,log)
5、partition如何分布在不同的broker上
int i = 0
list{kafka01,kafka02,kafka03}
for(int i=0;i<5;i++){
brIndex = i%broker;
hostName = list.get(brIndex)
}
6、consumerGroup的组员和partition之间如何做负载均衡
最好是一一对应,一个partition对应一个consumer。
如果consumer的数量过多,必然有空闲的consumer。
算法:
假如topic1,具有如下partitions: P0,P1,P2,P3
加入group中,有如下consumer: C1,C2
首先根据partition索引号对partitions排序: P0,P1,P2,P3
根据consumer.id排序: C0,C1
计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
7、如何保证kafka消费者消费数据是全局有序的
伪命题
如果要全局有序的,必须保证生产有序,存储有序,消费有序。
由于生产可以做集群,存储可以分片,消费可以设置为一个consumerGroup,要保证全局有序,就需要保证每个环节都有序。
只有一个可能,就是一个生产者,一个partition,一个消费者。这种场景和大数据应用场景相悖。
8.kafka生产数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; import java.util.UUID; /** * 这是一个简单的Kafka producer代码 * 包含两个功能: * 1、数据发送 * 2、数据按照自定义的partition策略进行发送 * KafkaSpout的类 */ public class KafkaProducerSimple { public static void main(String[] args) { //1、指定当前kafka producer生产的数据的目的地 //创建topic可以输入以下命令,在kafka集群的任一节点进行创建。 //bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test String TOPIC = "orderMq" ; //2、读取配置文件 Properties props = new Properties(); //key.serializer.class默认为serializer.class props.put( "serializer.class" , "kafka.serializer.StringEncoder" ); //kafka broker对应的主机,格式为host1:port1,host2:port2 props.put( "metadata.broker.list" , "kafka01:9092,kafka02:9092,kafka03:9092" ); // request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1 // 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。 // 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。 // -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失 props.put( "request.required.acks" , "1" ); // 可选配置,如果不配置,则使用默认的partitioner partitioner.class // 默认值:kafka.producer.DefaultPartitioner // 用来把消息分到各个partition中,默认行为是对key进行hash。 props.put( "partitioner.class" , "cn.my.storm.kafka.MyLogPartitioner" ); // props.put("partitioner.class", "kafka.producer.DefaultPartitioner"); //3、通过配置文件,创建生产者 Producer<String, String> producer = new Producer<String, String>( new ProducerConfig(props)); //4、通过for循环生产数据 for ( int messageNo = 1 ; messageNo < 100000 ; messageNo++) { // String messageStr = new String(messageNo + "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey," + // "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" + // "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" + // "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" + // "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" + // "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" + // "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" + // "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" + // "用来配合自定义的MyLogPartitioner进行数据分发"); // 5、调用producer的send方法发送数据 // 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发 producer.send( new KeyedMessage<String, String>(TOPIC, messageNo + "" , "appid" + UUID.randomUUID() + "itcast" )); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; import org.apache.log4j.Logger; public class MyLogPartitioner implements Partitioner { private static Logger logger = Logger.getLogger(MyLogPartitioner. class ); public MyLogPartitioner(VerifiableProperties props) { } public int partition(Object obj, int numPartitions) { return Integer.parseInt(obj.toString())%numPartitions; // return 1; } } |
9.kafka消费数据(低阶)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class KafkaConsumerSimple implements Runnable { public String title; public KafkaStream< byte [], byte []> stream; public KafkaConsumerSimple(String title, KafkaStream< byte [], byte []> stream) { this .title = title; this .stream = stream; } @Override public void run() { System.out.println( "开始运行 " + title); ConsumerIterator< byte [], byte []> it = stream.iterator(); /** * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞 * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false * */ while (it.hasNext()) { MessageAndMetadata< byte [], byte []> data = it.next(); String topic = data.topic(); int partition = data.partition(); long offset = data.offset(); String msg = new String(data.message()); System.out.println(String.format( "Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]" , title, topic, partition, offset, msg)); } System.out.println(String.format( "Consumer: [%s] exiting ..." , title)); } public static void main(String[] args) throws Exception{ Properties props = new Properties(); props.put( "group.id" , "dashujujiagoushi" ); props.put( "zookeeper.connect" , "zk01:2181,zk02:2181,zk03:2181" ); props.put( "auto.offset.reset" , "largest" ); props.put( "auto.commit.interval.ms" , "1000" ); props.put( "partition.assignment.strategy" , "roundrobin" ); ConsumerConfig config = new ConsumerConfig(props); String topic1 = "orderMq" ; String topic2 = "paymentMq" ; //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出 ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config); //定义一个map Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topic1, 3 ); //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流 Map<String, List<KafkaStream< byte [], byte []>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap); //取出 `kafkaTest` 对应的 streams List<KafkaStream< byte [], byte []>> streams = topicStreamsMap.get(topic1); //创建一个容量为4的线程池 ExecutorService executor = Executors.newFixedThreadPool( 3 ); //创建20个consumer threads for ( int i = 0 ; i < streams.size(); i++) executor.execute( new KafkaConsumerSimple( "消费者" + (i + 1 ), streams.get(i))); } } |
10.kafka和zookeeper使用JavaAPI能够拉取到数据(高阶消费)
properties配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | ###zookeeper\u548ckafka\u914d\u7f6e\u5730\u5740 zk.connect=xxxxx #zk.connect=xxxxx ###kafka\u6d88\u8d39\u7684group\u5fc5\u987b\u8c03\u6574\u4e3a\u72ec\u5360 adinfo.log.group.name=qinbin_ad_interfaceLog_20171218 ###kafka\u7684topic.\u9700\u8981\u548cadstat\u6a21\u5757\u7684kafka topic\u4e00\u81f4 adinfo.log.topic.name=ad_interfaceLog adinfo.log.queue.max= 10000 adinfo.log.list.size= 1 ###\u4e2d\u95f4\u7ed3\u679c\u4fdd\u5b58\u65e5\u5fd7 adinfo.log.pathFile=E:/opt/realtime/avro/file/ ###\u9ed8\u8ba4\u4e0d\u8981\u52a8 adinfo.statistics.time= 120000 adinfo.statistics.commitSize= 3000 |
kafka配置文件(注意groupID)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | <?xml version= "1.0" encoding= "UTF-8" ?> <beans xmlns= "http://www.springframework.org/schema/beans" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance" xmlns: int = "http://www.springframework.org/schema/integration" xmlns: int -kafka= "http://www.springframework.org/schema/integration/kafka" xmlns:task= "http://www.springframework.org/schema/task" xsi:schemaLocation="http: //www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http: //www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http: //www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http: //www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> < int :channel id= "inputFromAdinfo" > < int :queue/> </ int :channel> < int -kafka:inbound-channel-adapter id= "kafkaInboundChannelAdinfo" kafka-consumer-context-ref= "consumerContextAdinfo" auto-startup= "true" channel= "inputFromAdinfo" > < int :poller fixed-delay= "10" time-unit= "MILLISECONDS" max-messages-per-poll= "5" /> </ int -kafka:inbound-channel-adapter> <bean id= "consumerPropertiesAdinfo" class = "org.springframework.beans.factory.config.PropertiesFactoryBean" > <property name= "properties" > <props> <prop key= "auto.offset.reset" >smallest</prop> <prop key= "socket.receive.buffer.bytes" > 314572 </prop> <!-- 5M --> <prop key= "fetch.min.bytes" > 26214 </prop><!-- 256k --> <prop key= "fetch.message.max.bytes" > 104857 </prop><!-- 3M --> <prop key= "fetch.wait.max.ms" > 5000 </prop> <prop key= "auto.commit.interval.ms" > 2000 </prop> <prop key= "rebalance.backoff.ms" > 5000 </prop> <prop key= "rebalance.max.retries" > 5 </prop> </props> </property> </bean> < int -kafka:consumer-context id= "consumerContextAdinfo" consumer-timeout= "4000" zookeeper-connect= "zookeeperConnectAdinfo" consumer-properties= "consumerPropertiesAdinfo" > < int -kafka:consumer-configurations> <!-- 需要注意如果两个线程同时互不相干去消费通一个topic,则需要注意group-id不能重复 --> < int -kafka:consumer-configuration group-id= "${adinfo.log.group.name}" max-messages= "500" > < int -kafka:topic id= "${adinfo.log.topic.name}" streams= "1" /> </ int -kafka:consumer-configuration> </ int -kafka:consumer-configurations> </ int -kafka:consumer-context> < int -kafka:zookeeper-connect id= "zookeeperConnectAdinfo" zk-connect= "${zk.connect}" zk-connection-timeout= "6000" zk-session-timeout= "6000" zk-sync-time= "2000" /> </beans> |
然后在spring配置文件中import kafka的配置文件
Java接收:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | import java.io.UnsupportedEncodingException; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Resource; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.channel.QueueChannel; import org.springframework.messaging.Message; import com.ElasticSearchServiceImpl; import com.IElasticSearchService; import com.AdInfoRealTimeThread; import com.ConfigUtil; import com.AdInfo; public class AdInfoConsumer { // DatumReader<AdInfo> adInfoDatumReader = new // SpecificDatumReader<AdInfo>(AdInfoOld.getClassSchema(),AdInfo.getClassSchema()); DatumReader<AdInfo> adInfoDatumReader = new SpecificDatumReader<AdInfo>(AdInfo. class ); private Logger logger = LoggerFactory.getLogger(AdInfoConsumer. class ); @Resource (type = ElasticSearchServiceImpl. class ) private IElasticSearchService elasticSearchServiceImpl; @Resource (type = ConfigUtil. class ) private ConfigUtil configUtil; private QueueChannel queueChannel; public QueueChannel getQueueChannel() { return queueChannel; } public void setQueueChannel(QueueChannel queueChannel) { this .queueChannel = queueChannel; } private AdInfoRealTimeThread adInfoRealTimeThread; public AdInfoRealTimeThread getAdInfoRealTimeThread() { return adInfoRealTimeThread; } public void setAdInfoRealTimeThread(AdInfoRealTimeThread adInfoRealTimeThread) { this .adInfoRealTimeThread = adInfoRealTimeThread; } public void consumerLog() throws UnsupportedEncodingException { @SuppressWarnings ( "rawtypes" ) Message msg; while ((msg = queueChannel.receive()) != null ) { // msg = queueChannel.receive(); try { Map<String, Object> map = (Map<String, Object>) msg.getPayload(); Set<Entry<String, Object>> set = map.entrySet(); for (Map.Entry<String, Object> entry : set) { String topic = entry.getKey(); ConcurrentHashMap<Integer, List< byte []>> messages = (ConcurrentHashMap<Integer, List< byte []>>) entry .getValue(); Collection<List< byte []>> values = messages.values(); for (Iterator<List< byte []>> iterator = values.iterator(); iterator.hasNext();) { List< byte []> list = iterator.next(); for ( byte [] object : list) { String message = new String(object, "UTF-8" ); StringBuilder megJson = new StringBuilder(message); megJson.delete( 0 , megJson.indexOf( "=" ) + 1 ); // logger.info("json:"+megJson.toString()); // adinfoToSaveES.saveAdLogToEs(megJson.toString()); elasticSearchServiceImpl.executeSearch(configUtil.clusterName,megJson.toString()); //System.out.println(megJson.toString()); } } } } catch (Exception ex) { logger.error( "===AdInfoConsumer consumer is exception" , ex); } } logger.error( "====AdInfoConsumer receive is interrupted====" ); } /* * public void consumerLog() throws UnsupportedEncodingException { * * @SuppressWarnings("rawtypes") Message msg; while ((msg = * queueChannel.receive()) != null) { * * try { * * Map<String, Object> map = (Map<String, Object>) msg.getPayload(); * Set<Entry<String, Object>> set = map.entrySet(); for (Map.Entry<String, * Object> entry : set) { // String topic = entry.getKey(); * ConcurrentHashMap<Integer, List<byte[]>> messages = * (ConcurrentHashMap<Integer, List<byte[]>>) entry .getValue(); * Collection<List<byte[]>> values = messages.values(); for * (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) { * List<byte[]> list = iterator.next(); for (byte[] object : list) { * * try { Decoder decoder = DecoderFactory.get().binaryDecoder(object, null); * * AdInfo adInfo = adInfoDatumReader.read(null, decoder); * * String json=adInfo.toString(); System.out.println("*************"+json); * //logger.info("json:"+json); //adInfoRealTimeThread.statistics(json); * * } catch (Exception e) { * logger.error("===AdInfoConsumer consumer one is exception", e); } * * * } } } } catch (Exception ex) { * logger.error("===AdInfoConsumer consumer is exception", ex); } } * * logger.error("====AdInfoConsumer receive is interrupted===="); } */ } |