netcore Kafka

devtools/2024/11/17 15:56:42/

一、新建项目KafakDemo

  <ItemGroup><PackageReference Include="Confluent.Kafka" Version="2.6.0" /></ItemGroup>

二、Program.cs

using Confluent.Kafka;
using System;
using System.Threading;
using System.Threading.Tasks;namespace KafakDemo
{internal class Program{static void Main(string[] args){Task.Run(() =>{Publish();});Task.Run(() =>{Consumer();});Console.ReadKey();}static void Publish() {var conf = new ProducerConfig { BootstrapServers = "192.168.31.135:9092" };Action<DeliveryReport<Null, string>> handler = r =>Console.WriteLine(!r.Error.IsError? $"Delivered message to {r.TopicPartitionOffset}": $"Delivery Error: {r.Error.Reason}");while (true){using (var p = new ProducerBuilder<Null, string>(conf).Build()){for (int i = 0; i < 1; ++i){p.Produce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);}p.Flush();}Thread.Sleep(TimeSpan.FromSeconds(2));}}static void Consumer(){var conf = new ConsumerConfig{GroupId = "test-consumer-group",BootstrapServers = "192.168.31.135:9092"// Note: The AutoOffsetReset property determines the start offset in the event// there are not yet any committed offsets for the consumer group for the// topic/partitions of interest. By default, offsets are committed// automatically, so in this example, consumption will only start from the// earliest message in the topic 'my-topic' the first time you run the program.//AutoOffsetReset = AutoOffsetReset.Earliest};using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()){c.Subscribe("my-topic");CancellationTokenSource cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) => {// Prevent the process from terminating.e.Cancel = true;cts.Cancel();};try{while (true){try{var cr = c.Consume(cts.Token);Console.WriteLine($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");}catch (ConsumeException e){Console.WriteLine($"Error occured: {e.Error.Reason}");}}}catch (OperationCanceledException){// Ensure the consumer leaves the group cleanly and final offsets are committed.c.Close();}}}}
}

运行效果:

 


http://www.ppmy.cn/devtools/134728.html

相关文章

计算机毕业设计Hadoop+Spark高考推荐系统 高考分数线预测 知识图谱 高考数据分析可视化 高考大数据 大数据毕业设计 Hadoop 深度学习

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

【视频讲解】Python深度神经网络DNNs-K-Means(K-均值)聚类方法在MNIST等数据可视化对比分析...

全文链接&#xff1a;https://tecdat.cn/?p38289 分析师&#xff1a;Cucu Sun 近年来&#xff0c;由于诸如自动编码器等深度神经网络&#xff08;DNN&#xff09;的高表示能力&#xff0c;深度聚类方法发展迅速。其核心思想是表示学习和聚类可以相互促进&#xff1a;好的表示会…

UniApp 应用、页面与组件的生命周期详解

UniApp 应用、页面与组件的生命周期详解 在uni-app中包含了 应用生命周期、页面生命周期、和组件生命周期&#xff08; Vue.js的&#xff09;函数。 应用生命周期 应用生命周期仅可在App.vue中监听&#xff0c;在其它页面监听无效。 <script>export default {onLaunc…

vue2项目中在线预览csv文件

简介 希望在项目中&#xff0c;在线预览.csv文件&#xff0c;本以为插件很多&#xff0c;结果都只是支持excel&#xff08;.xls、.xlsx&#xff09;一到.csv就歇菜。。。 关于文件预览 vue-office&#xff1a;文档、 查看在线演示demo&#xff0c;支持docx、.xlsx、pdf、ppt…

游戏引擎学习第13天

视频参考:https://www.bilibili.com/video/BV1QQUaYMEEz/ 改代码的地方尽量一张图说清楚吧,懒得浪费时间 game.h #pragma once #include <cmath> #include <cstdint> #include <malloc.h>#define internal static // 用于定义内翻译单元内部函数 #…

SQL 语句优化及编程方法

DBMS生成的执行计划在很大程度上要受到代码外部结构的影响。因此要想优化查询性能&#xff0c;就必须要知道如何写代码才能使优化器的执行效率更高。 但是&#xff0c;不能为了“效率”牺牲代码的可读性&#xff0c;要让代码清晰。 1 查询优化 在解决SQL造成的性能问题时&am…

小白进!QMK 键盘新手入门指南

经常玩键盘的伙伴应该都知道&#xff0c;现在的键盘市场可谓是百花齐放&#xff0c;已经不是之前的单一功能产品化时代。我们可以看到很多诸如&#xff1a;机械轴键盘、磁轴键盘、光轴键盘、电感轴键盘&#xff0c;以及可能会上市的光磁轴键盘&#xff0c;更有支持屏幕的、带旋…

分布式微服务项目,同一个controller不同方法间的转发导致cookie丢失,报错null pointer异常

源码&#xff1a; /***添加商品进入购物车*/ GetMapping("/addToCart") public String addToCart(RequestParam("num") Integer num, RequestParam("skuId") Long skuId, RedirectAttributes redirectAttributes) {System.out.println("nu…