windows C#-生成和使用异步流(上)

news/2024/11/25 10:45:03/

异步流建立流式处理数据源模型。 数据流经常异步检索或生成元素。 它们为异步流式处理数据源提供了自然编程模型。

先决条件

需要将计算机设置为运行 .NET,包括 C# 编译器。 C# 编译器随附于 Visual Studio 2022 或 .NET SDK。

将需要创建 GitHub 访问令牌,以便可以访问 GitHub GraphQL 终结点。 为 GitHub 访问令牌选择以下权限:

  • repo:status
  • public_repo

将访问令牌保存在安全位置,以便可以使用它来访问 GitHub API 终结点。保护个人访问令牌。 任何带有你的个人访问令牌的软件都可以使用你的访问权限进行 GitHub API 调用。

运行初学者应用程序

可以从 asynchronous-programming/snippets 文件夹中的 dotnet/docs 存储库中获取本文中使用的初学者应用程序的代码。

初学者应用程序是一个控制台应用程序,它使用 GitHub GraphQL 接口检索最近在 dotnet/docs 存储库中编写的问题。 首先来看一下以下初学者应用 Main 方法的代码:

static async Task Main(string[] args)
{//Follow these steps to create a GitHub Access Token// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token//Select the following permissions for your GitHub Access Token:// - repo:status// - public_repo// Replace the 3rd parameter to the following code with your GitHub access token.var key = GetEnvVariable("GitHubKey","You must store your GitHub key in the 'GitHubKey' environment variable","");var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo")){Credentials = new Octokit.Credentials(key)};var progressReporter = new progressStatus((num) =>{Console.WriteLine($"Received {num} issues in total");});CancellationTokenSource cancellationSource = new CancellationTokenSource();try{var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",cancellationSource.Token, progressReporter);foreach(var issue in results)Console.WriteLine(issue);}catch (OperationCanceledException){Console.WriteLine("Work has been cancelled");}
}

可以将 GitHubKey 环境变量设置为个人访问令牌,也可以将对 GetEnvVariable 的调用中的最后一个参数替换为个人访问令牌。 如果要与其他人共享源,请不要将访问代码放在源代码中。 不要将访问代码上传到共享源存储库。

在创建 GitHub 客户端后,Main 中的代码将创建一个进度报告对象和一个取消令牌。 创建这些对象之后,Main 调用 RunPagedQueryAsync 来检索最近创建的 250 个问题。 任务完成后,将显示结果。

在运行初学者应用程序时,可以对该应用程序的运行方式进行一些重要观察。 将看到从 GitHub 返回的每个页面的进度报告。 在 GitHub 返回问题的每个新页面之前,可以观察到明显的停顿。 最后,只有在从 GitHub 检索到所有 10 个页面之后,问题才会显示出来。

检查实现情况 

该实现揭示了你观察到上一部分中讨论的行为的原因。 检查 RunPagedQueryAsync 的代码:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{var issueAndPRQuery = new GraphQLRequest{Query = queryText};issueAndPRQuery.Variables["repo_name"] = repoName;JArray finalResults = new JArray();bool hasMorePages = true;int pagesReturned = 0;int issuesReturned = 0;// Stop with 10 pages, because these are large repos:while (hasMorePages && (pagesReturned++ < 10)){var postBody = issueAndPRQuery.ToJsonText();var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),postBody, "application/json", "application/json");JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);int totalCount = (int)issues(results)["totalCount"]!;hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();issuesReturned += issues(results)["nodes"]!.Count();finalResults.Merge(issues(results)["nodes"]!);progress?.Report(issuesReturned);cancel.ThrowIfCancellationRequested();}return finalResults;JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

此方法执行的第一个操作是使用 GraphQLRequest 类创建 POST 对象:

public class GraphQLRequest
{[JsonProperty("query")]public string? Query { get; set; }[JsonProperty("variables")]public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();public string ToJsonText() =>JsonConvert.SerializeObject(this);
}

这有助于构成 POST 对象正文,并使用 ToJsonText 方法将其正确转换为以单个字符串呈现的 JSON,该方法从请求正文中删除所有换行符,并用 \(反斜杠)转义字符对其进行标记。

让我们集中讨论前面代码的分页算法和异步结构。 (有关 GitHub GraphQL API 的详细信息,可以参考 GitHub GraphQL 文档。)RunPagedQueryAsync 方法按从最新到最旧的顺序枚举问题。 它每页请求 25 个问题,并检查响应的 pageInfo 结构以继续上一页的操作。 这遵循了 GraphQL 对多页响应的标准分页支持。 响应包括 pageInfo 对象,该对象包含用于请求上一页的 hasPreviousPages 值和 startCursor 值。 问题在 nodes 数组中。 RunPagedQueryAsync 方法将这些节点追加到一个数组中,其中包含所有页面的所有结果。

在检索和还原结果页之后,RunPagedQueryAsync 将报告进度并检查是否取消。 如果已请求取消,RunPagedQueryAsync 将引发 OperationCanceledException。

此代码中有几个可以改进的元素。 最重要的是,RunPagedQueryAsync 必须为返回的所有问题分配存储空间。 该示例在 250 个问题处停止,因为检索所有未决问题需要更多的内存来存储所有检索到的问题。 支持进度报告和取消的协议使得算法在第一次读取时更加难以理解。 涉及更多类型和 API。 必须通过 CancellationTokenSource 及其关联的 CancellationToken 跟踪通信,以了解在何处请求取消,以及在何处授予取消。


http://www.ppmy.cn/news/1549797.html

相关文章

[系统安全]PE文件头中的重定位表

PE加载的过程 任何一个EXE程序会被分配4GB的内存空间&#xff0c;用户层处理低2G的内存&#xff0c;驱动处理高2G的内存。 1、双击EXE程序&#xff0c;操作系统开辟一个4GB的空间。2、从ImageBase决定了加载后的基址&#xff0c;ImageSize决定了程序有多大。3、然后加载DLL 大…

Python Flask快速开发网站

在Web开发领域,Python拥有多种优秀的Web框架,例如Django、Flask、Pyramid等。其中Flask是一个微型框架,核心代码非常精简,只包含了Web应用最基本的功能。这使得Flask非常轻量级,容易上手,适合快速开发小型Web应用。本文将介绍如何使用Flask快速搭建一个简单的网站。 © ivw…

[2024.11.17-24] 一周科技速报

2024 世界青年科学家峰会全体大会 时间地点&#xff1a;11 月 17 日在浙江温州举行。 参会人员&#xff1a;吸引了来自 71 个国家和地区以及 63 个国际科技组织的近 800 名科学家、企业家、教育家和青年科技人才代表。 会议内容&#xff1a;以 “汇聚天下英才共创美好未来”…

云计算期中作业:Spark机器学习问题解决

在原有pdf教程教程上&#xff0c;做一个补充 idea内搭建环境 导入依赖 就直接利用之前的作业工程项目里直接写&#xff0c;所以依赖基本上不用再导入了&#xff0c;如果要导入&#xff0c;看自己依赖的版本号&#xff0c;不要直接复制教程&#xff0c;比如我的&#xff1a; …

Java 8 Stream API 在数据转换中的应用 —— 将列表转换为映射

文章目录 背景原因1. 数据库设计或约束问题2. 业务逻辑问题3. 测试数据4. 数据库同步问题5. 编程错误 如何避免和处理键冲突1. 数据库层面2. 业务逻辑层面3. 测试数据管理4. 代码层面示例代码 总结 背景 本文实际生产案例讲解配套文章&#xff1a;sysUserList 中为何会出现多个…

NVR管理平台EasyNVR多个NVR同时管理:全方位安防监控视频融合云平台方案

EasyNVR是基于端-边-云一体化架构的安防监控视频融合云平台&#xff0c;具有简单轻量的部署方式与多样的功能&#xff0c;支持多种协议&#xff08;如GB28181、RTSP、Onvif、RTMP&#xff09;和设备类型&#xff08;IPC、NVR等&#xff09;&#xff0c;提供视频直播、录像、回放…

C++从零到满绩——类和对象(中)

目录 1>>前言 2>>构造函数&#xff08;我称之为初始化函数&#xff09; 3>>析构函数&#xff08;我称之为销毁函数&#xff09; 4>>拷贝构造函数&#xff08;我称之为复制函数&#xff09; 5>>运算符重载 5.2>>赋值运算符重载 ​编辑…

低温存储开关机问题

问题&#xff1a; 某消费电子产品在进行可靠性实验室&#xff0c;在低温-30C存储两个小时后&#xff0c;上电不开机。在常温25C时&#xff0c;开关机正常。 分析&#xff1a; 1、接串口抓log信息&#xff0c;从打印信息可以看出uboot可以起来&#xff0c;在跑kernel时&#x…