写一个简单的kafka 消费者
1. 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2. 消费者
import com.xxxx.npi.module.common.msg.enums.Topic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SslConfigs;import java.util.Arrays;
import java.util.Properties;public class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "xxxx.xxxx.xxxx.xxxx:9092");props.put("group.id", "gedigital");props.put("enable.auto.commit", "false");props.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 后面安全相关的配置,如果没有,可以不用配置;如果有,必须配props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "e:\\client_truststore.jks"); //Save the certificate (dowload client_truststore.jks) in trust store to local server directoy (only for Java client)props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "passwdconfig");props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';");//注意passwod结尾的分号一定不要漏props.put("ssl.endpoint.identification.algorithm", "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = Topic.FBR.getTopic();consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value());}consumer.commitSync();}}
}