高性能网络通信框架--Mercury

news/2024/11/25 10:11:11/

Mercury是一个专门设计用于HPC系统的RPC框架,允许异步传输参数和执行请求,以及直接支持大型数据参数。网络实现是抽象的,允许轻松移植到未来系统,并有效使用现有的本地传输机制。Mercury的接口是通用的,允许对任何函数调用进行序列化。汞是微服务Mochi生态系统的核心组成部分。

一、支持的架构

MPI实现支持的架构通常由网络抽象层支持。

网络抽象(NA)层在内部由RPC层和批量层使用。NA层使用插件机制,以便在运行时轻松添加和选择对各种网络协议的支持。

NA支持不同的后端实现。推荐OFI/libfabric用于节点间通信的插件,而SM(共享内存)则用于节点内通信。

 

在libfabric不可用或不建议使用的平台上,UCX插件也可用作替代传输,目前支持的协议是tcp和verbs。
MPI和BMI(tcp)插件仍然受支持,但逐渐被移动为不推荐使用的插件。

 

二、软件需求

要使用OFI libfabric插件,参考后面的libfabrib构建说明。

要使用UCX插件,参考后面的UCX构建说明。

要在Linux上使用本机NA SM(共享内存)插件,需要内核v3.2中引入的跨内存连接(CMA)功能。

要使用BMI插件,最方便的方法是通过spack安装,也可以:

git clone https://github.com/radix-io/bmi.git && cd bmi
./prepare && ./configure --enable-shared --enable-bmi-only
make && make install

要使用MPI插件,Mercury需要一个配置良好的MPI实现(MPICH2 v1.4.1或更高版本/OpenMPI v1.6或更高),并在接受远程连接的目标上提供MPI_THREAD_MULTIPLE。不接受传入连接的进程不需要具有多线程执行级别。

三、构建

#bzip2 -dc mercury-X.tar.bz2 | tar xvf -#cd mercury-X#mkdir build#cd build#ccmake .. //(".." 是mercury-X目录的相对路径)

多次按“c”,然后选择合适的选项。建议的选项有:

BUILD_SHARED_LIBS                ON (or OFF if the library you linkagainst requires static libraries)
BUILD_TESTING                    ON/OFF
Boost_INCLUDE_DIR                /path/to/include/directory
CMAKE_INSTALL_PREFIX             /path/to/install/directory
MERCURY_ENABLE_DEBUG             ON/OFF
MERCURY_TESTING_ENABLE_PARALLEL  ON/OFF
MERCURY_USE_BOOST_PP             ON
MERCURY_USE_CHECKSUMS            ON/OFF
MERCURY_USE_SYSTEM_BOOST         ON/OFF
MERCURY_USE_SYSTEM_MCHECKSUM     ON/OFF
MERCURY_USE_XDR                  OFF
NA_USE_BMI                       ON/OFF
NA_USE_MPI                       ON/OFF
NA_USE_OFI                       ON/OFF
NA_USE_PSM                       ON/OFF
NA_USE_PSM2                      ON/OFF
NA_USE_SM                        ON/OFF
NA_USE_UCX                       ON/OFF

设置包含目录和库路径可能需要您通过键入“t”切换到高级模式。完成后,如果没有看到任何错误,请键入“g”以生成生成生成文件。退出CMake配置屏幕并准备构建目标后,请执行以下操作:

#make

详细编译/生成输出是通过在make命令中插入VERBOSE=1来实现的

make VERBOSE=1

四、安装

假设已设置CMAKE_INSTALL_PREFIX(见上一步),并且对目标目录具有写入权限,请从构建目录执行:

make install

可以运行测试以检查基本RPC功能(请求和批量数据传输)是否正常工作。CTest用于运行测试,只需从构建目录运行:

ctest .

详细测试是通过在ctest命令中插入-V来完成的:

ctest -V .

通过插入-VV可以显示额外的详细信息:

ctest -VV .

五、实例分析

客户端

int
main(void)
{const char *info_string = NULL;char target_addr_string[PATH_MAX], *p;FILE *na_config = NULL;hg_class_t *hg_class;hg_context_t *hg_context;hg_addr_t hg_target_addr;hg_return_t hg_ret;/* Get info string */info_string = getenv("HG_PORT_NAME");if (!info_string) {fprintf(stderr, "HG_PORT_NAME environment variable must be set\n");exit(0);}printf("Using %s\n", info_string);HG_Set_log_level("warning");/* 使用所需的网络抽象类初始化Mercury */hg_class = HG_Init(info_string, HG_FALSE);/* Create HG context */hg_context = HG_Context_create(hg_class);/* 连接字符串是在NA_Addr_self()/NA_Addr_to_string()后生成, 得到字符串并将其传递给  NA_Addr_lookup() */na_config = fopen(TEMP_DIRECTORY CONFIG_FILE_NAME, "r");if (!na_config) {fprintf(stderr, "Could not open config file from: %s\n",TEMP_DIRECTORY CONFIG_FILE_NAME);exit(0);}fgets(target_addr_string, PATH_MAX, na_config);p = strrchr(target_addr_string, '\n');if (p != NULL)*p = '\0';printf("Target address is: %s\n", target_addr_string);fclose(na_config);/* 查找 target address */HG_Addr_lookup2(hg_class, target_addr_string, &hg_target_addr);/* Register RPC */snappy_compress_id_g = snappy_compress_register(hg_class);/* Send RPC to target */snappy_compress_rpc(hg_class, hg_context, hg_target_addr);/* Poke progress engine and check for events */do {unsigned int actual_count = 0;do {hg_ret = HG_Trigger(hg_context, 0 /* timeout */, 1 /* max count */, &actual_count);} while ((hg_ret == HG_SUCCESS) && actual_count);/* Do not try to make progress anymore if we're done */if (snappy_compress_done_g)break;hg_ret = HG_Progress(hg_context, HG_MAX_IDLE_TIME);} while (hg_ret == HG_SUCCESS);/* Finalize */HG_Addr_free(hg_class, hg_target_addr);HG_Context_destroy(hg_context);HG_Finalize(hg_class);return EXIT_SUCCESS;
}static int
snappy_compress_rpc(hg_class_t *hg_class, hg_context_t *hg_context, hg_addr_t hg_target_addr)
{int *input;size_t source_length = NR_ITEMS * sizeof(int);hg_bulk_t input_bulk_handle;void *compressed;size_t max_compressed_length;hg_bulk_t compressed_bulk_handle;snappy_compress_in_t snappy_compress_input;struct snappy_compress_rpc_args *snappy_compress_rpc_args;hg_handle_t handle;int i;/*** 我们将取一个缓冲区并将其发送到服务器进行压缩。*/max_compressed_length = snappy_max_compressed_length(source_length);printf("Input buffer length is: %zu\n", source_length);printf("Max compressed length is: %zu\n", max_compressed_length);/* 生成 input buffer */input = (int *) malloc(source_length);for (i = 0; i < NR_ITEMS; i++) {input[i] = rand() % 10;}print_buf(20, input);/* Allocate compressed buffer */compressed = malloc(max_compressed_length);memset(compressed, '\0', max_compressed_length);/* Create HG handle bound to target */HG_Create(hg_context, hg_target_addr, snappy_compress_id_g, &handle);/*** 将“handle”与内存区域关联。Mercury的批量传输将从该区域获取/输入数据*/HG_Bulk_create(hg_class, 1, (void **) &input, &source_length,HG_BULK_READ_ONLY, &input_bulk_handle);HG_Bulk_create(hg_class, 1, &compressed, &max_compressed_length,HG_BULK_READWRITE, &compressed_bulk_handle);/* 创建结构以保存参数,因为调用将异步执行 */snappy_compress_rpc_args = (struct snappy_compress_rpc_args *) malloc(sizeof(struct snappy_compress_rpc_args));snappy_compress_rpc_args->input = input;snappy_compress_rpc_args->input_length = source_length;snappy_compress_rpc_args->input_bulk_handle = input_bulk_handle;snappy_compress_rpc_args->compressed = compressed;snappy_compress_rpc_args->compressed_bulk_handle = compressed_bulk_handle;/* Set input arguments that will be passed to HG_Forward */snappy_compress_input.input_bulk_handle = input_bulk_handle;snappy_compress_input.compressed_bulk_handle = compressed_bulk_handle;/* Forward the call */printf("Sending input to target\n");HG_Forward(handle, snappy_compress_rpc_cb, snappy_compress_rpc_args,&snappy_compress_input);/* Handle will be destroyed when call completes (reference count) */HG_Destroy(handle);return 0;
}/* 该例程在调用HG_Trigger和RPC完成后执行 */
static hg_return_t
snappy_compress_rpc_cb(const struct hg_cb_info *callback_info)
{struct snappy_compress_rpc_args *snappy_compress_rpc_args =(struct snappy_compress_rpc_args *) callback_info->arg;hg_handle_t handle = callback_info->info.forward.handle;int *input;size_t source_length;void *compressed;size_t compressed_length;int *uncompressed;size_t uncompressed_length;snappy_compress_out_t snappy_compress_output;snappy_status ret;/* 获取 output */printf("Received output from target\n");HG_Get_output(handle, &snappy_compress_output);/*获取  output parameters */ret = snappy_compress_output.ret;compressed_length = snappy_compress_output.compressed_length;compressed = snappy_compress_rpc_args->compressed;input = snappy_compress_rpc_args->input;source_length = snappy_compress_rpc_args->input_length;/* Check ret */if (ret != SNAPPY_OK) {fprintf(stderr, "Error: snappy_compressed failed with ret %d\n", ret);}/*  输出数据现在在bulk缓冲区中  */printf("Compressed buffer length is: %zu\n", compressed_length);print_buf(5, (int *) compressed);if (snappy_validate_compressed_buffer(compressed, compressed_length) ==SNAPPY_OK) {printf("Compressed buffer validated: compressed successfully\n");}uncompressed_length = source_length * sizeof(int);uncompressed = (int *) malloc(uncompressed_length);/* Uncompress data and check uncompressed_length */printf("Uncompressing buffer...\n");snappy_uncompress(compressed, compressed_length, (char *) uncompressed,&uncompressed_length);printf("Uncompressed buffer length is: %zu\n", uncompressed_length);print_buf(20, uncompressed);/* Free output and handles */HG_Free_output(handle, &snappy_compress_output);HG_Bulk_free(snappy_compress_rpc_args->input_bulk_handle);HG_Bulk_free(snappy_compress_rpc_args->compressed_bulk_handle);/* Free data */free(uncompressed);free(compressed);free(input);free(snappy_compress_rpc_args);/* We're done */snappy_compress_done_g = HG_TRUE;return HG_SUCCESS;
}

服务端

int
main(void)
{const char *info_string = NULL;char self_addr_string[PATH_MAX];hg_addr_t self_addr;FILE *na_config = NULL;hg_class_t *hg_class;hg_context_t *hg_context;unsigned major;unsigned minor;unsigned patch;hg_return_t hg_ret;hg_size_t self_addr_string_size = PATH_MAX;HG_Version_get(&major, &minor, &patch);printf("Server running mercury version %u.%u.%u\n", major, minor, patch);/* Get info string *//* bmi+tcp://localhost:port */info_string = getenv("HG_PORT_NAME");if (!info_string) {fprintf(stderr, "HG_PORT_NAME environment variable must be set, ""e.g.:\nHG_PORT_NAME=\"tcp://127.0.0.1:22222\"\n");exit(0);}HG_Set_log_level("warning");/* Initialize Mercury with the desired network abstraction class */hg_class = HG_Init(info_string, HG_TRUE);/* Get self addr to tell client about */HG_Addr_self(hg_class, &self_addr);HG_Addr_to_string(hg_class, self_addr_string, &self_addr_string_size, self_addr);HG_Addr_free(hg_class, self_addr);printf("Server address is: %s\n", self_addr_string);/* Write addr to a file */na_config = fopen(TEMP_DIRECTORY CONFIG_FILE_NAME, "w+");if (!na_config) {fprintf(stderr, "Could not open config file from: %s\n",TEMP_DIRECTORY CONFIG_FILE_NAME);exit(0);}fprintf(na_config, "%s\n", self_addr_string);fclose(na_config);/* Create HG context */hg_context = HG_Context_create(hg_class);/* Register RPC */snappy_compress_register(hg_class);/* Poke progress engine and check for events */do {unsigned int actual_count = 0;do {hg_ret = HG_Trigger(hg_context, 0 /* timeout */, 1 /* max count */, &actual_count);} while ((hg_ret == HG_SUCCESS) && actual_count);/* Do not try to make progress anymore if we're done */if (snappy_compress_done_target_g)break;hg_ret = HG_Progress(hg_context, HG_MAX_IDLE_TIME);} while (hg_ret == HG_SUCCESS);/* Finalize */HG_Context_destroy(hg_context);HG_Finalize(hg_class);return EXIT_SUCCESS;
}hg_id_t
snappy_compress_register(hg_class_t *hg_class)
{return MERCURY_REGISTER(hg_class, "snappy_compress", snappy_compress_in_t,snappy_compress_out_t, snappy_compress_cb);
}/*** 设置实际执行工作的例程的例程.* 这个“handle”参数是传递给这个回调的唯一值,但Mercury例程允许我们查询有关调用上下文的信息. */
static hg_return_t
snappy_compress_cb(hg_handle_t handle)
{struct snappy_transfer_args *snappy_transfer_args;size_t input_length;snappy_transfer_args = (struct snappy_transfer_args *) malloc(sizeof(struct snappy_transfer_args));snappy_transfer_args->handle = handle;/* 获取从源经过HG_Forward() 发送过来的输入参数 */HG_Get_input(handle, &snappy_transfer_args->snappy_compress_input);/* Now set up the bulk transfer and get the input length */input_length = HG_Bulk_get_size(snappy_transfer_args->snappy_compress_input.input_bulk_handle);/* bulk handle 基本上是一个指针,另外“handle”可以引用多个内存区域. */HG_Bulk_create(HG_Get_info(handle)->hg_class, 1, NULL, &input_length,HG_BULK_READWRITE, &snappy_transfer_args->local_input_bulk_handle);/* 将数据从origin的内存中拉到本端的内存中 *//* 另一种方法是通过HG_Bulk_access,这将允许mercury在“co-resident”的情况下避免复制数据 */HG_Bulk_transfer(HG_Get_info(handle)->context, snappy_pull_cb,snappy_transfer_args, HG_BULK_PULL, HG_Get_info(handle)->addr,snappy_transfer_args->snappy_compress_input.input_bulk_handle,0,                                                /* origin */snappy_transfer_args->local_input_bulk_handle, 0, /* local */input_length, HG_OP_ID_IGNORE);return HG_SUCCESS;
}static hg_return_t
snappy_pull_cb(const struct hg_cb_info *hg_cb_info)
{struct snappy_transfer_args *snappy_transfer_args =(struct snappy_transfer_args *) hg_cb_info->arg;hg_return_t ret = HG_SUCCESS;void *input;size_t input_length;size_t source_length =HG_Bulk_get_size(snappy_transfer_args->local_input_bulk_handle);/* 从本地handle获取指向输入缓冲区的指针input */HG_Bulk_access(hg_cb_info->info.bulk.local_handle, 0, source_length,HG_BULK_READ_ONLY, 1, &input, &input_length, NULL);printf("Transferred input buffer of length: %zu\n", input_length);print_buf(20, (int *) input);/* 为压缩输入数据分配压缩缓冲区 */snappy_transfer_args->compressed_length =snappy_max_compressed_length(input_length);snappy_transfer_args->compressed =malloc(snappy_transfer_args->compressed_length);/* Compress data */printf("Compressing buffer...\n");snappy_transfer_args->ret =snappy_compress(input, input_length, snappy_transfer_args->compressed,&snappy_transfer_args->compressed_length);printf("Return value of snappy_compress is: %d\n", snappy_transfer_args->ret);printf("Compressed buffer length is: %zu\n",snappy_transfer_args->compressed_length);print_buf(5, (int *) snappy_transfer_args->compressed);/* Free bulk handles */HG_Bulk_free(snappy_transfer_args->local_input_bulk_handle);if (snappy_validate_compressed_buffer(snappy_transfer_args->compressed,snappy_transfer_args->compressed_length) == SNAPPY_OK) {printf("Compressed buffer validated: compressed successfully\n");}/* 将压缩后的数据push回源节点 */HG_Bulk_create(HG_Get_info(snappy_transfer_args->handle)->hg_class, 1,&snappy_transfer_args->compressed,&snappy_transfer_args->compressed_length, HG_BULK_READ_ONLY,&snappy_transfer_args->local_compressed_bulk_handle);HG_Bulk_transfer(HG_Get_info(snappy_transfer_args->handle)->context,snappy_push_cb, snappy_transfer_args, HG_BULK_PUSH,HG_Get_info(snappy_transfer_args->handle)->addr,snappy_transfer_args->snappy_compress_input.compressed_bulk_handle,0,                                                     /* origin */snappy_transfer_args->local_compressed_bulk_handle, 0, /* local */snappy_transfer_args->compressed_length, HG_OP_ID_IGNORE);return ret;
}/* 压缩数据推送到源节点后回调,给源节点HG_Forward请求回消息*/static hg_return_t
snappy_push_cb(const struct hg_cb_info *hg_cb_info)
{struct snappy_transfer_args *snappy_transfer_args =(struct snappy_transfer_args *) hg_cb_info->arg;hg_return_t ret = HG_SUCCESS;snappy_compress_out_t snappy_compress_output;/* 设置输出参数以通知源节点 */snappy_compress_output.ret = snappy_transfer_args->ret;snappy_compress_output.compressed_length =snappy_transfer_args->compressed_length;printf("Transferred compressed buffer of length %zu\n",snappy_transfer_args->compressed_length);printf("Sending output parameters back to origin\n");HG_Respond(snappy_transfer_args->handle, snappy_compress_done_cb, NULL,&snappy_compress_output);/* Free bulk handles */printf("Freeing resources\n");HG_Bulk_free(snappy_transfer_args->local_compressed_bulk_handle);free(snappy_transfer_args->compressed);/* Free input */HG_Free_input(snappy_transfer_args->handle,&snappy_transfer_args->snappy_compress_input);/* Destroy handle (no longer need it, safe because of reference count) */HG_Destroy(snappy_transfer_args->handle);free(snappy_transfer_args);return ret;
}static hg_return_t
snappy_compress_done_cb(const struct hg_cb_info *callback_info)
{/* We're done */snappy_compress_done_target_g = HG_TRUE;return callback_info->ret;
}


http://www.ppmy.cn/news/515304.html

相关文章

Mercury.DeviceReplay对象实现鼠标、键盘模拟操作

这里介绍一下Mercury.DeviceReplay这个对象&#xff0c;来实现对鼠标、键盘的模拟操作&#xff0c;它是一个很实用的对象&#xff0c;但QTP帮助文档中对几乎没有此介绍。 这个对象用来模拟鼠标的单击和移动、键盘输入等&#xff0c;但有个前提&#xff0c;实用该对象前&#x…

mercury已断开服务器无响应,mercury无线网卡驱动插上没反应怎么办-插上没反应的解决办法...

原创mercury无线网卡驱动插上没反应怎么办-插上没反应的解决办法 编辑:小禾 来源:互联网 时间:2020-10-15 09:58:00 有的朋友在使用mercury无线网卡驱动时&#xff0c;发现插上没反应&#xff0c;该怎么办呢?那么下面小编就和大家一起分享mercury无线网卡驱动插上没反应的解决…

mercury已断开服务器无响应,Mercury水星无线路由器无法上网解决办法 | 192路由网...

本文解决的问题:电脑连接水星(mercury)无线路由器,并在水星无限路由器设置了上网账号,但是仍然不能够上网的问题。这个问题的原因比较多,下面来介绍一些常见的原因的引起的使用水星无限路由器无法上网的问题。 我们先来把问题细分化,先确定是水星无线路由器与宽带之间的问…

mercury php,XAMPP配置Mercury 邮件服务器教程

这在一些网站首发大量邮件的时候比较有作用&#xff0c;由Sendmail来接管通信过程&#xff0c;避免由PHP或Java网页文件本身来连接socket&#xff0c;占用资源1、启动 Mercury 邮件服务器 XAMPP 自带的邮件服务器名称是 Mercury。假设你已经启动了 Apache 和 MySQL&#xff0c;…

CreateObject(Mercury.DeviceReplay)中的键盘值

(1)在QTP中经常使用CreateObject(“Mercury.DeviceReplay”)来模拟鼠标以及键盘值的发送操作&#xff0c;那么在使用的过程中&#xff0c;键盘上的按键&#xff0c;在代码中对应的数值是多少呢&#xff1f; 图一 图二 图三 图四 图五 图六 图七 图八 …

mercury怎么设置虚拟服务器,XAMPP邮件服务器Mercury的设置方法

启动Apache和MySQL服务; 点击 Mercury 顶部菜单的“Configuration / MercuryS SMTP Server” ,在弹出的对话框上点击“Connection control”标签页,去掉“Do not permit SMTP relaying of non-local mail”这一项的勾选,然后点击“确定”; 点击 Mercury 顶部菜单的“Confi…

Redis 2023面试5题(四)

一、AOF 持久化&#xff08;Append Only File&#xff09;如何配置&#xff1f; AOF&#xff08;Append Only File&#xff09;持久化是 Redis 的一种持久化方式&#xff0c;它通过记录所有收到的写命令来保存数据。以下是一些关于如何配置 AOF 持久化的重要信息&#xff1a; …

C++20

目录 模块支持三向比较运算符基于范围for循环的初始化器指派初始化器constevalstd::bit_cast属性其他特性 模块支持 使用关键字import导入模块 三向比较运算符 <>&#xff0c;用于确定两个值的大小顺序。它可以告诉你一个值是否大于&#xff0c;等于&#xff0c;小于另…