Mercury是一个专门设计用于HPC系统的RPC框架,允许异步传输参数和执行请求,以及直接支持大型数据参数。网络实现是抽象的,允许轻松移植到未来系统,并有效使用现有的本地传输机制。Mercury的接口是通用的,允许对任何函数调用进行序列化。汞是微服务Mochi生态系统的核心组成部分。
一、支持的架构
MPI实现支持的架构通常由网络抽象层支持。
网络抽象(NA)层在内部由RPC层和批量层使用。NA层使用插件机制,以便在运行时轻松添加和选择对各种网络协议的支持。
NA支持不同的后端实现。推荐OFI/libfabric用于节点间通信的插件,而SM(共享内存)则用于节点内通信。
在libfabric不可用或不建议使用的平台上,UCX插件也可用作替代传输,目前支持的协议是tcp和verbs。
MPI和BMI(tcp)插件仍然受支持,但逐渐被移动为不推荐使用的插件。
二、软件需求
要使用OFI libfabric插件,参考后面的libfabrib构建说明。
要使用UCX插件,参考后面的UCX构建说明。
要在Linux上使用本机NA SM(共享内存)插件,需要内核v3.2中引入的跨内存连接(CMA)功能。
要使用BMI插件,最方便的方法是通过spack安装,也可以:
git clone https://github.com/radix-io/bmi.git && cd bmi
./prepare && ./configure --enable-shared --enable-bmi-only
make && make install
要使用MPI插件,Mercury需要一个配置良好的MPI实现(MPICH2 v1.4.1或更高版本/OpenMPI v1.6或更高),并在接受远程连接的目标上提供MPI_THREAD_MULTIPLE。不接受传入连接的进程不需要具有多线程执行级别。
三、构建
#bzip2 -dc mercury-X.tar.bz2 | tar xvf -#cd mercury-X#mkdir build#cd build#ccmake .. //(".." 是mercury-X目录的相对路径)
多次按“c”,然后选择合适的选项。建议的选项有:
BUILD_SHARED_LIBS ON (or OFF if the library you linkagainst requires static libraries)
BUILD_TESTING ON/OFF
Boost_INCLUDE_DIR /path/to/include/directory
CMAKE_INSTALL_PREFIX /path/to/install/directory
MERCURY_ENABLE_DEBUG ON/OFF
MERCURY_TESTING_ENABLE_PARALLEL ON/OFF
MERCURY_USE_BOOST_PP ON
MERCURY_USE_CHECKSUMS ON/OFF
MERCURY_USE_SYSTEM_BOOST ON/OFF
MERCURY_USE_SYSTEM_MCHECKSUM ON/OFF
MERCURY_USE_XDR OFF
NA_USE_BMI ON/OFF
NA_USE_MPI ON/OFF
NA_USE_OFI ON/OFF
NA_USE_PSM ON/OFF
NA_USE_PSM2 ON/OFF
NA_USE_SM ON/OFF
NA_USE_UCX ON/OFF
设置包含目录和库路径可能需要您通过键入“t”切换到高级模式。完成后,如果没有看到任何错误,请键入“g”以生成生成生成文件。退出CMake配置屏幕并准备构建目标后,请执行以下操作:
#make
详细编译/生成输出是通过在make命令中插入VERBOSE=1来实现的
make VERBOSE=1
四、安装
假设已设置CMAKE_INSTALL_PREFIX(见上一步),并且对目标目录具有写入权限,请从构建目录执行:
make install
可以运行测试以检查基本RPC功能(请求和批量数据传输)是否正常工作。CTest用于运行测试,只需从构建目录运行:
ctest .
详细测试是通过在ctest命令中插入-V来完成的:
ctest -V .
通过插入-VV可以显示额外的详细信息:
ctest -VV .
五、实例分析
客户端
int
main(void)
{const char *info_string = NULL;char target_addr_string[PATH_MAX], *p;FILE *na_config = NULL;hg_class_t *hg_class;hg_context_t *hg_context;hg_addr_t hg_target_addr;hg_return_t hg_ret;/* Get info string */info_string = getenv("HG_PORT_NAME");if (!info_string) {fprintf(stderr, "HG_PORT_NAME environment variable must be set\n");exit(0);}printf("Using %s\n", info_string);HG_Set_log_level("warning");/* 使用所需的网络抽象类初始化Mercury */hg_class = HG_Init(info_string, HG_FALSE);/* Create HG context */hg_context = HG_Context_create(hg_class);/* 连接字符串是在NA_Addr_self()/NA_Addr_to_string()后生成, 得到字符串并将其传递给 NA_Addr_lookup() */na_config = fopen(TEMP_DIRECTORY CONFIG_FILE_NAME, "r");if (!na_config) {fprintf(stderr, "Could not open config file from: %s\n",TEMP_DIRECTORY CONFIG_FILE_NAME);exit(0);}fgets(target_addr_string, PATH_MAX, na_config);p = strrchr(target_addr_string, '\n');if (p != NULL)*p = '\0';printf("Target address is: %s\n", target_addr_string);fclose(na_config);/* 查找 target address */HG_Addr_lookup2(hg_class, target_addr_string, &hg_target_addr);/* Register RPC */snappy_compress_id_g = snappy_compress_register(hg_class);/* Send RPC to target */snappy_compress_rpc(hg_class, hg_context, hg_target_addr);/* Poke progress engine and check for events */do {unsigned int actual_count = 0;do {hg_ret = HG_Trigger(hg_context, 0 /* timeout */, 1 /* max count */, &actual_count);} while ((hg_ret == HG_SUCCESS) && actual_count);/* Do not try to make progress anymore if we're done */if (snappy_compress_done_g)break;hg_ret = HG_Progress(hg_context, HG_MAX_IDLE_TIME);} while (hg_ret == HG_SUCCESS);/* Finalize */HG_Addr_free(hg_class, hg_target_addr);HG_Context_destroy(hg_context);HG_Finalize(hg_class);return EXIT_SUCCESS;
}static int
snappy_compress_rpc(hg_class_t *hg_class, hg_context_t *hg_context, hg_addr_t hg_target_addr)
{int *input;size_t source_length = NR_ITEMS * sizeof(int);hg_bulk_t input_bulk_handle;void *compressed;size_t max_compressed_length;hg_bulk_t compressed_bulk_handle;snappy_compress_in_t snappy_compress_input;struct snappy_compress_rpc_args *snappy_compress_rpc_args;hg_handle_t handle;int i;/*** 我们将取一个缓冲区并将其发送到服务器进行压缩。*/max_compressed_length = snappy_max_compressed_length(source_length);printf("Input buffer length is: %zu\n", source_length);printf("Max compressed length is: %zu\n", max_compressed_length);/* 生成 input buffer */input = (int *) malloc(source_length);for (i = 0; i < NR_ITEMS; i++) {input[i] = rand() % 10;}print_buf(20, input);/* Allocate compressed buffer */compressed = malloc(max_compressed_length);memset(compressed, '\0', max_compressed_length);/* Create HG handle bound to target */HG_Create(hg_context, hg_target_addr, snappy_compress_id_g, &handle);/*** 将“handle”与内存区域关联。Mercury的批量传输将从该区域获取/输入数据*/HG_Bulk_create(hg_class, 1, (void **) &input, &source_length,HG_BULK_READ_ONLY, &input_bulk_handle);HG_Bulk_create(hg_class, 1, &compressed, &max_compressed_length,HG_BULK_READWRITE, &compressed_bulk_handle);/* 创建结构以保存参数,因为调用将异步执行 */snappy_compress_rpc_args = (struct snappy_compress_rpc_args *) malloc(sizeof(struct snappy_compress_rpc_args));snappy_compress_rpc_args->input = input;snappy_compress_rpc_args->input_length = source_length;snappy_compress_rpc_args->input_bulk_handle = input_bulk_handle;snappy_compress_rpc_args->compressed = compressed;snappy_compress_rpc_args->compressed_bulk_handle = compressed_bulk_handle;/* Set input arguments that will be passed to HG_Forward */snappy_compress_input.input_bulk_handle = input_bulk_handle;snappy_compress_input.compressed_bulk_handle = compressed_bulk_handle;/* Forward the call */printf("Sending input to target\n");HG_Forward(handle, snappy_compress_rpc_cb, snappy_compress_rpc_args,&snappy_compress_input);/* Handle will be destroyed when call completes (reference count) */HG_Destroy(handle);return 0;
}/* 该例程在调用HG_Trigger和RPC完成后执行 */
static hg_return_t
snappy_compress_rpc_cb(const struct hg_cb_info *callback_info)
{struct snappy_compress_rpc_args *snappy_compress_rpc_args =(struct snappy_compress_rpc_args *) callback_info->arg;hg_handle_t handle = callback_info->info.forward.handle;int *input;size_t source_length;void *compressed;size_t compressed_length;int *uncompressed;size_t uncompressed_length;snappy_compress_out_t snappy_compress_output;snappy_status ret;/* 获取 output */printf("Received output from target\n");HG_Get_output(handle, &snappy_compress_output);/*获取 output parameters */ret = snappy_compress_output.ret;compressed_length = snappy_compress_output.compressed_length;compressed = snappy_compress_rpc_args->compressed;input = snappy_compress_rpc_args->input;source_length = snappy_compress_rpc_args->input_length;/* Check ret */if (ret != SNAPPY_OK) {fprintf(stderr, "Error: snappy_compressed failed with ret %d\n", ret);}/* 输出数据现在在bulk缓冲区中 */printf("Compressed buffer length is: %zu\n", compressed_length);print_buf(5, (int *) compressed);if (snappy_validate_compressed_buffer(compressed, compressed_length) ==SNAPPY_OK) {printf("Compressed buffer validated: compressed successfully\n");}uncompressed_length = source_length * sizeof(int);uncompressed = (int *) malloc(uncompressed_length);/* Uncompress data and check uncompressed_length */printf("Uncompressing buffer...\n");snappy_uncompress(compressed, compressed_length, (char *) uncompressed,&uncompressed_length);printf("Uncompressed buffer length is: %zu\n", uncompressed_length);print_buf(20, uncompressed);/* Free output and handles */HG_Free_output(handle, &snappy_compress_output);HG_Bulk_free(snappy_compress_rpc_args->input_bulk_handle);HG_Bulk_free(snappy_compress_rpc_args->compressed_bulk_handle);/* Free data */free(uncompressed);free(compressed);free(input);free(snappy_compress_rpc_args);/* We're done */snappy_compress_done_g = HG_TRUE;return HG_SUCCESS;
}
服务端
int
main(void)
{const char *info_string = NULL;char self_addr_string[PATH_MAX];hg_addr_t self_addr;FILE *na_config = NULL;hg_class_t *hg_class;hg_context_t *hg_context;unsigned major;unsigned minor;unsigned patch;hg_return_t hg_ret;hg_size_t self_addr_string_size = PATH_MAX;HG_Version_get(&major, &minor, &patch);printf("Server running mercury version %u.%u.%u\n", major, minor, patch);/* Get info string *//* bmi+tcp://localhost:port */info_string = getenv("HG_PORT_NAME");if (!info_string) {fprintf(stderr, "HG_PORT_NAME environment variable must be set, ""e.g.:\nHG_PORT_NAME=\"tcp://127.0.0.1:22222\"\n");exit(0);}HG_Set_log_level("warning");/* Initialize Mercury with the desired network abstraction class */hg_class = HG_Init(info_string, HG_TRUE);/* Get self addr to tell client about */HG_Addr_self(hg_class, &self_addr);HG_Addr_to_string(hg_class, self_addr_string, &self_addr_string_size, self_addr);HG_Addr_free(hg_class, self_addr);printf("Server address is: %s\n", self_addr_string);/* Write addr to a file */na_config = fopen(TEMP_DIRECTORY CONFIG_FILE_NAME, "w+");if (!na_config) {fprintf(stderr, "Could not open config file from: %s\n",TEMP_DIRECTORY CONFIG_FILE_NAME);exit(0);}fprintf(na_config, "%s\n", self_addr_string);fclose(na_config);/* Create HG context */hg_context = HG_Context_create(hg_class);/* Register RPC */snappy_compress_register(hg_class);/* Poke progress engine and check for events */do {unsigned int actual_count = 0;do {hg_ret = HG_Trigger(hg_context, 0 /* timeout */, 1 /* max count */, &actual_count);} while ((hg_ret == HG_SUCCESS) && actual_count);/* Do not try to make progress anymore if we're done */if (snappy_compress_done_target_g)break;hg_ret = HG_Progress(hg_context, HG_MAX_IDLE_TIME);} while (hg_ret == HG_SUCCESS);/* Finalize */HG_Context_destroy(hg_context);HG_Finalize(hg_class);return EXIT_SUCCESS;
}hg_id_t
snappy_compress_register(hg_class_t *hg_class)
{return MERCURY_REGISTER(hg_class, "snappy_compress", snappy_compress_in_t,snappy_compress_out_t, snappy_compress_cb);
}/*** 设置实际执行工作的例程的例程.* 这个“handle”参数是传递给这个回调的唯一值,但Mercury例程允许我们查询有关调用上下文的信息. */
static hg_return_t
snappy_compress_cb(hg_handle_t handle)
{struct snappy_transfer_args *snappy_transfer_args;size_t input_length;snappy_transfer_args = (struct snappy_transfer_args *) malloc(sizeof(struct snappy_transfer_args));snappy_transfer_args->handle = handle;/* 获取从源经过HG_Forward() 发送过来的输入参数 */HG_Get_input(handle, &snappy_transfer_args->snappy_compress_input);/* Now set up the bulk transfer and get the input length */input_length = HG_Bulk_get_size(snappy_transfer_args->snappy_compress_input.input_bulk_handle);/* bulk handle 基本上是一个指针,另外“handle”可以引用多个内存区域. */HG_Bulk_create(HG_Get_info(handle)->hg_class, 1, NULL, &input_length,HG_BULK_READWRITE, &snappy_transfer_args->local_input_bulk_handle);/* 将数据从origin的内存中拉到本端的内存中 *//* 另一种方法是通过HG_Bulk_access,这将允许mercury在“co-resident”的情况下避免复制数据 */HG_Bulk_transfer(HG_Get_info(handle)->context, snappy_pull_cb,snappy_transfer_args, HG_BULK_PULL, HG_Get_info(handle)->addr,snappy_transfer_args->snappy_compress_input.input_bulk_handle,0, /* origin */snappy_transfer_args->local_input_bulk_handle, 0, /* local */input_length, HG_OP_ID_IGNORE);return HG_SUCCESS;
}static hg_return_t
snappy_pull_cb(const struct hg_cb_info *hg_cb_info)
{struct snappy_transfer_args *snappy_transfer_args =(struct snappy_transfer_args *) hg_cb_info->arg;hg_return_t ret = HG_SUCCESS;void *input;size_t input_length;size_t source_length =HG_Bulk_get_size(snappy_transfer_args->local_input_bulk_handle);/* 从本地handle获取指向输入缓冲区的指针input */HG_Bulk_access(hg_cb_info->info.bulk.local_handle, 0, source_length,HG_BULK_READ_ONLY, 1, &input, &input_length, NULL);printf("Transferred input buffer of length: %zu\n", input_length);print_buf(20, (int *) input);/* 为压缩输入数据分配压缩缓冲区 */snappy_transfer_args->compressed_length =snappy_max_compressed_length(input_length);snappy_transfer_args->compressed =malloc(snappy_transfer_args->compressed_length);/* Compress data */printf("Compressing buffer...\n");snappy_transfer_args->ret =snappy_compress(input, input_length, snappy_transfer_args->compressed,&snappy_transfer_args->compressed_length);printf("Return value of snappy_compress is: %d\n", snappy_transfer_args->ret);printf("Compressed buffer length is: %zu\n",snappy_transfer_args->compressed_length);print_buf(5, (int *) snappy_transfer_args->compressed);/* Free bulk handles */HG_Bulk_free(snappy_transfer_args->local_input_bulk_handle);if (snappy_validate_compressed_buffer(snappy_transfer_args->compressed,snappy_transfer_args->compressed_length) == SNAPPY_OK) {printf("Compressed buffer validated: compressed successfully\n");}/* 将压缩后的数据push回源节点 */HG_Bulk_create(HG_Get_info(snappy_transfer_args->handle)->hg_class, 1,&snappy_transfer_args->compressed,&snappy_transfer_args->compressed_length, HG_BULK_READ_ONLY,&snappy_transfer_args->local_compressed_bulk_handle);HG_Bulk_transfer(HG_Get_info(snappy_transfer_args->handle)->context,snappy_push_cb, snappy_transfer_args, HG_BULK_PUSH,HG_Get_info(snappy_transfer_args->handle)->addr,snappy_transfer_args->snappy_compress_input.compressed_bulk_handle,0, /* origin */snappy_transfer_args->local_compressed_bulk_handle, 0, /* local */snappy_transfer_args->compressed_length, HG_OP_ID_IGNORE);return ret;
}/* 压缩数据推送到源节点后回调,给源节点HG_Forward请求回消息*/static hg_return_t
snappy_push_cb(const struct hg_cb_info *hg_cb_info)
{struct snappy_transfer_args *snappy_transfer_args =(struct snappy_transfer_args *) hg_cb_info->arg;hg_return_t ret = HG_SUCCESS;snappy_compress_out_t snappy_compress_output;/* 设置输出参数以通知源节点 */snappy_compress_output.ret = snappy_transfer_args->ret;snappy_compress_output.compressed_length =snappy_transfer_args->compressed_length;printf("Transferred compressed buffer of length %zu\n",snappy_transfer_args->compressed_length);printf("Sending output parameters back to origin\n");HG_Respond(snappy_transfer_args->handle, snappy_compress_done_cb, NULL,&snappy_compress_output);/* Free bulk handles */printf("Freeing resources\n");HG_Bulk_free(snappy_transfer_args->local_compressed_bulk_handle);free(snappy_transfer_args->compressed);/* Free input */HG_Free_input(snappy_transfer_args->handle,&snappy_transfer_args->snappy_compress_input);/* Destroy handle (no longer need it, safe because of reference count) */HG_Destroy(snappy_transfer_args->handle);free(snappy_transfer_args);return ret;
}static hg_return_t
snappy_compress_done_cb(const struct hg_cb_info *callback_info)
{/* We're done */snappy_compress_done_target_g = HG_TRUE;return callback_info->ret;
}