一、消息队列场景
1.1、异步
1.2、解耦
1.3、削峰
1.4、缓冲
二、springboot整合kafka
导入pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
修改配置
spring.kafka.bootstrap-servers=192.168.200.1:9092
#配置序列化
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
发送消息:
@SpringBootTest
class Boot3KafkaApplicationTests {@Autowiredprivate KafkaTemplate kafkaTemplate;@Testvoid contextLoads() {//计时StopWatch stopWatch = new StopWatch();CompletableFuture[] completableFuture = new CompletableFuture[1000];stopWatch.start();for (int i = 0; i < 1000; i++) {CompletableFuture send = kafkaTemplate.send("timi", "timi1", "haha");completableFuture[i] = send;}CompletableFuture.allOf(completableFuture).join();stopWatch.stop();//统计花费时间long totalTimeMillis = stopWatch.getTotalTimeMillis();System.out.println("1000条消息发送时间:"+ totalTimeMillis);}@Testvoid testKafka(){Person person = new Person();person.setName("张三");person.setAge(12);CompletableFuture send = kafkaTemplate.send("timi", "person", person);send.join();}
}
创建主题
@Configuration
public class KafkaConfig {//创建主题@Beanpublic NewTopic topic(){return TopicBuilder.name("ax").partitions(1).compact().build();}
}
获取消息
@Component
@Log4j2
public class TimiKafkaListener {//默认获取最后一条消息@KafkaListener(topics = "timi",groupId = "timi")public void timiKafka(ConsumerRecord record){Object key = record.key();Object value = record.value();log.info("接收到消息的key {},value:{}",key,value);}//获取所有消息@KafkaListener(groupId = "ya",topicPartitions = {@TopicPartition(topic = "timi",partitionOffsets = {@PartitionOffset(partition = "0",initialOffset = "0")})})public void timiKafka2(ConsumerRecord record){Object key = record.key();Object value = record.value();log.info("接收到消息的key2 {},value2:{}",key,value);}
}