Chainlit快速实现AI对话应用并将聊天数据的AWS S3 和 Azure Blob云服务中

ops/2024/10/18 5:48:08/

自定义数据层

Literal AI 提供了最简单的方法来保存、分析和监控您的数据。

如果您正在考虑实现自定义数据层,请查看此处的示例以获取一些启发。

此外,我们非常希望看到社区主导的开源数据层实现并将其列在这里。如果您有兴趣做出贡献,请通过 Discord 与我们联系。

您需要在 chainlit 应用程序中导入自定义数据层,并将其分配给数据层变量,如下所示:

import chainlit.data as cl_dataclass CustomDataLayer(cl_data.BaseDataLayer):# TODO: implement all methods from cl_data.BaseDataLayercl_data._data_layer = CustomDataLayer()

SQL Alchemy数据层

该自定义层已针对 PostgreSQL 进行了测试,但是由于使用了 SQL Alchemy 数据库,它应该可以支持更多的 SQL 数据库。

该数据层还支持BaseStorageClient将元素存储到 Azure Blob 存储或 AWS S3 中。

以下是用于创建该数据层模式的 SQL:


CREATE TABLE users ("id" UUID PRIMARY KEY,"identifier" TEXT NOT NULL UNIQUE,"metadata" JSONB NOT NULL,"createdAt" TEXT
);CREATE TABLE IF NOT EXISTS threads ("id" UUID PRIMARY KEY,"createdAt" TEXT,"name" TEXT,"userId" UUID,"userIdentifier" TEXT,"tags" TEXT[],"metadata" JSONB,FOREIGN KEY ("userId") REFERENCES users("id") ON DELETE CASCADE
);CREATE TABLE IF NOT EXISTS steps ("id" UUID PRIMARY KEY,"name" TEXT NOT NULL,"type" TEXT NOT NULL,"threadId" UUID NOT NULL,"parentId" UUID,"disableFeedback" BOOLEAN NOT NULL DEFAULT false,"streaming" BOOLEAN NOT NULL,"waitForAnswer" BOOLEAN,"isError" BOOLEAN,"metadata" JSONB,"tags" TEXT[],"input" TEXT,"output" TEXT,"createdAt" TEXT,"start" TEXT,"end" TEXT,"generation" JSONB,"showInput" TEXT,"language" TEXT,"indent" INT
);CREATE TABLE IF NOT EXISTS elements ("id" UUID PRIMARY KEY,"threadId" UUID,"type" TEXT,"url" TEXT,"chainlitKey" TEXT,"name" TEXT NOT NULL,"display" TEXT,"objectKey" TEXT,"size" TEXT,"page" INT,"language" TEXT,"forId" UUID,"mime" TEXT
);CREATE TABLE IF NOT EXISTS feedbacks ("id" UUID PRIMARY KEY,"forId" UUID NOT NULL,"threadId" UUID NOT NULL,"value" INT NOT NULL,"comment" TEXT
);

例子

以下是使用 Azure 存储客户端在 PostgreSQL 数据库上设置此数据层的示例。首先安装所需的依赖项:

pip install asyncpg SQLAlchemy azure-identity azure-storage-file-datalake

导入自定义数据层和存储客户端,并cl_data._data_layerChainlit 应用程序开头设置变量。

import chainlit.data as cl_data
from chainlit.data.sql_alchemy import SQLAlchemyDataLayer
from chainlit.data.storage_clients import AzureStorageClientstorage_client = AzureStorageClient(account_url="<your_account_url>", container="<your_container>")cl_data._data_layer = SQLAlchemyDataLayer(conninfo="<your conninfo>", storage_provider=storage_client)

请注意,您需要+asyncpg在字符串中添加协议conninfo,以便它使用 asyncpg 库。

DynamoDB数据层

该数据层还支持BaseStorageClient将元素存储到 AWS S3Azure Blob 存储中。

例子

以下是设置此数据层的示例。首先安装 boto3

pip install boto3

导入自定义数据层和存储客户端,并cl_data._data_layerChainlit 应用程序开头设置变量。

import chainlit.data as cl_data
from chainlit.data.dynamodb import DynamoDBDataLayer
from chainlit.data.storage_clients import S3StorageClientstorage_client = S3StorageClient(bucket="<Your Bucket>")cl_data._data_layer = DynamoDBDataLayer(table_name="<Your Table>", storage_provider=storage_client)


表结构
以下是用于创建 dynamo 表的 Cloudformation:


{"AWSTemplateFormatVersion": "2010-09-09","Resources": {"DynamoDBTable": {"Type": "AWS::DynamoDB::Table","Properties": {"TableName": "<YOUR-TABLE-NAME>","AttributeDefinitions": [{"AttributeName": "PK","AttributeType": "S"},{"AttributeName": "SK","AttributeType": "S"},{"AttributeName": "UserThreadPK","AttributeType": "S"},{"AttributeName": "UserThreadSK","AttributeType": "S"}],"KeySchema": [{"AttributeName": "PK","KeyType": "HASH"},{"AttributeName": "SK","KeyType": "RANGE"}],"GlobalSecondaryIndexes": [{"IndexName": "UserThread","KeySchema": [{"AttributeName": "UserThreadPK","KeyType": "HASH"},{"AttributeName": "UserThreadSK","KeyType": "RANGE"}],"Projection": {"ProjectionType": "INCLUDE","NonKeyAttributes": ["id", "name"]}}],"BillingMode": "PAY_PER_REQUEST"}}}
}

日志记录

DynamoDB 数据层定义了 chainlit 记录器的子级。

import logging
from chainlit import loggerlogger.getChild("DynamoDB").setLevel(logging.DEBUG)

限制

不支持通过正/负反馈进行过滤。

数据层方法不是异步的。Boto3 不是异步的,因此数据层使用非异步阻塞 io

设计

此实现采用单表设计。一个表中有 4 种不同的实体类型,由 PK 和 SK 中的前缀标识。

以下是实体类型:


type User = {PK: "USER#{user.identifier}"SK: "USER"// ...PersistedUser
}type Thread = {PK: f"THREAD#{thread_id}"SK: "THREAD"// GSI: UserThread for querying in list_threadsUserThreadPK: f"USER#{user_id}"UserThreadSK: f"TS#{ts}"// ...ThreadDict
}type Step = {PK: f"THREAD#{threadId}"SK: f"STEP#{stepId}"// ...StepDict// feedback is stored as part of step. // NOTE: feedback.value is stored as Decimal in dynamo which is not json serializablefeedback?: Feedback
}type Element = {"PK": f"THREAD#{threadId}""SK": f"ELEMENT#{element.id}"// ...ElementDict
}

如何实现自定义数据层?

该类BaseDataLayerChainlit 框架内数据持久性操作的抽象基础。该类概述了在聊天机器人应用程序中管理用户、反馈、元素、步骤和线程的方法。

方法

  • async get_user(self, identifier: str) Coroutine
    根据用户标识符获取用户。返回类型可选为PersistedUser

  • async create_user(self, user: User) Coroutine
    根据User提供的实例创建新用户。返回类型可选为PersistedUser。

  • async upsert_feedback(self, feedback: Feedback) Coroutine
    插入或更新反馈。接受一个Feedback实例并返回一个字符串作为持久反馈的标识符。

  • async delete_feedback(self, feedback_id: str) Coroutine
    通过 删除反馈。如果成功则feedback_id返回。True

  • async create_element(self, element_dict: ElementDict) Coroutine
    向数据层添加新元素。ElementDict作为参数接受。

  • async get_element(self, thread_id: str, element_id: str) Coroutine
    thread_id通过和检索元素element_id。返回类型可选为ElementDict。

  • async delete_element(self, element_id: str) Coroutine
    根据标识符删除元素element_id。

  • async create_step(self, step_dict: StepDict) Coroutine
    在数据层中创建新步骤。接受StepDict为参数。

  • async update_step(self, step_dict: StepDict) Coroutine
    更新现有步骤。StepDict作为参数接受。

  • async delete_step(self, step_id: str) Coroutine
    根据标识符删除步骤step_id。

  • async get_thread_author(self, thread_id: str) Coroutine
    通过 获取给定主题的作者thread_id。返回表示作者标识符的字符串。

  • async delete_thread(self, thread_id: str) Coroutine
    根据给定的标识符删除一个线程thread_id。

  • async list_threads(self, pagination: Pagination, filters: ThreadFilter) Coroutine
    根据pagination和filters参数列出线程。返回一个PaginatedResponse[ThreadDict]。

  • async get_thread(self, thread_id: str) Coroutine
    通过标识符检索线程thread_id。返回类型可选为ThreadDict。

  • async update_thread(self, thread_id: str, name: Optional[str] = None, user_id: Optional[str] = None, metadata: Optional[Dict] = None, tags: Optional[List[str]] = None) Coroutine
    更新主题的详细信息,如名称、用户 ID、元数据和标签。参数大多是可选的。

  • async delete_user_session(self, id: str) Coroutine
    根据标识符删除用户会话id。返回表示成功的布尔值。

装饰器

  • @queue_until_user_message() Decorator

将某些方法排在队列中,仅在接收到第一个用户消息后执行,这对于WebsocketSessions特别有用。


http://www.ppmy.cn/ops/93150.html

相关文章

element时间段选择器或时间选择器 只设置默认起始时间或者结束时间,不显示问题

element时间段选择器或时间选择器 只设置默认起始时间或者结束时间&#xff0c;不显示问题 <div v-for"(item,index) in [a,b]":key"item"><el-date-pickerv-if"b"v-model"value1[item]"type"datetimerange"value-…

编程-设计模式 31:服务定位器模式(Service Locator Pattern)

设计模式 31&#xff1a;服务定位器模式&#xff08;Service Locator Pattern&#xff09; 定义与目的 定义&#xff1a;服务定位器模式是一种用于查找和获取服务对象的模式。它提供了一个全局的服务目录&#xff0c;使得客户端可以在不知道具体服务实现的情况下获取服务实例…

8月8日笔记

8月8日笔记 msf常见命令 启动MSF控制台 msfconsole: 启动MSF控制台。msfconsole -h: 显示帮助菜单。msfconsole -q: 启动MSF控制台并立即退出。 导航和管理 back: 返回上一级菜单。exit: 退出MSF控制台。banner: 显示MSF的横幅。cd: 更改工作目录。color: 开启或关闭彩色输…

20240812 每日AI必读资讯

黑匣子被打开了&#xff01;能玩的Transformer可视化解释工具&#xff1a;Transformer Explainer - 佐治亚理工学院和 IBM 研究院开发一款基于 web 的开源交互式可视化工具「Transformer Explainer」&#xff0c;帮助非专业人士了解 Transformer 的高级模型结构和低级数学运算…

二叉树中的深搜

&#x1f3a5; 个人主页&#xff1a;Dikz12&#x1f525;个人专栏&#xff1a;算法(Java)&#x1f4d5;格言&#xff1a;吾愚多不敏&#xff0c;而愿加学欢迎大家&#x1f44d;点赞✍评论⭐收藏 目录 1. 计算布尔二叉树的值 1.1 题目描述 1.2 题解 1.3 代码实现 2. 求根节…

算法与数据结构面试宝典:详解算法效率评估方法及示例(C/C++)

文章目录 一、算法效率评估概述二、时间复杂度评估三、空间复杂度评估四、实际性能五、总结 在面试过程中&#xff0c;算法效率评估是衡量候选人能力的重要环节。本文将详细介绍如何进行算法效率评估&#xff0c;并通过C/C语言示例&#xff0c;帮助大家更好地备战面试。 一、算…

LVS是什么?以及LVS-NAT以及DR模式实验

目录 NAT LVS LVS集群的类型&#xff1a; LVS-NAT模式实验 环境准备&#xff1a; 实验步骤&#xff1a; LVS-DR模式实验 题目&#xff1a; 环境准备&#xff1a; 实验步骤&#xff1a; LVS-防火墙标签解决轮询调度问题 环境准备&#xff1a; 实验步骤&#xff1…

嵌入式人工智能(OpenCV-图像的基本操作)

1、OpenCV简介 人工智能一个重要方面的应用就是计算机视觉&#xff0c;而OpenCV正是基于BSD许可发行的开源、跨平台的计算机视觉库。它可以运行在Linux、Windows、Android和Mac OS操作系统上。 [1]它轻量级而且高效——由一系列 C 函数和C 类构成&#xff0c;同时提供了Python…