LiveKit的agent介绍

news/2025/1/15 15:23:24/

概念

LiveKit核心概念:

  • Room(房间)
  • Participant(参会人)
  • Track(信息流追踪)

Agent 架构图

订阅信息流

agent交互流程

客户端操作

加入房间

房间创建方式

手动

赋予用户创建房间的权限,在客户的加入并创建房间。

自动

客户的指定ws_url和token,加入指定房间。

room = LiveKit.create(appContext = applicationContext)
room.connect(wsUrl, token)

离开房间

调用 Room.disconnect() 通知 LiveKit 离开事件。如果应用程序在未通知 LiveKit 的情况下关闭,则将继续显示参与者在 Room 中 15 秒。

Swift上,当应用程序退出时,会自动调用 Room.disconnect

发送消息

发送方式

客户端通过LocalParticipant.publishData API 向房间中的任何参与者发布任意数据消息。房间数据通过 WebRTC 数据通道发布到SFU;LiveKit 服务器会将该数据转发给聊天室中的一个或多个参与者。

给指定用户发消息,通过设置destinationIdentities ,它表示用户的身份。

// 发送消息
coroutineScope.launch {val data: ByteArray = //...// 发送有损消息给全员,LOSSY表示数据发送一次,无顺序保证。这对于优先考虑交付速度的实时更新来说是理想的选择。room.localParticipant.publishData(data, DataPublishReliability.LOSSY)// 发送可靠的消息给指定成员,RELIABLE表示发送数据时最多重试3次并保证顺序。适合优先保证交付而不是抵延迟的场景,例如室内聊天。val identities = listOf(Participant.Identity("alice"),Participant.Identity("bob"),)room.localParticipant.publishData(data, DataPublishReliability.RELIABLE, identities)
}// 处理接收到的消息
coroutineScope.launch {room.events.collect { event ->if(event is RoomEvent.DataReceived) {// Process data}}
}

消息大小限制

由于 SCTP 协议的限制,对大于 16 KiB 的消息使用数据通道是不切实际的,包括 LiveKit 的协议包装器。我们建议将消息大小保持在 15 KiB 以下。详细了解数据通道大小限制。

消息的topic

消息可以指定topic,在接收方通过topic进行过滤出感兴趣的消息。

发送信息流

livekit默认支持摄像头、麦克风、录屏3个流,也支持用户发布自定义流的配置。

音视频流

// Turns camera track on
room.localParticipant.setCameraEnabled(true)// Turns microphone track on
room.localParticipant.setMicrophoneEnabled(true)

录屏流

// Create an intent launcher for screen capture
// This *must* be registered prior to onCreate(), ideally as an instance val
val screenCaptureIntentLauncher = registerForActivityResult(ActivityResultContracts.StartActivityForResult()
) { result ->val resultCode = result.resultCodeval data = result.dataif (resultCode != Activity.RESULT_OK || data == null) {return@registerForActivityResult}lifecycleScope.launch {room.localParticipant.setScreenShareEnabled(true, data)}
}// When it's time to enable the screen share, perform the following
val mediaProjectionManager =getSystemService(MEDIA_PROJECTION_SERVICE) as MediaProjectionManager
screenCaptureIntentLauncher.launch(mediaProjectionManager.createScreenCaptureIntent())

自定义流配置

// Option 1: set room defaults
val options = RoomOptions(audioTrackCaptureDefaults = LocalAudioTrackOptions(noiseSuppression = true,echoCancellation = true,autoGainControl = true,highPassFilter = true,typingNoiseDetection = true,),videoTrackCaptureDefaults = LocalVideoTrackOptions(deviceId = "",position = CameraPosition.FRONT,captureParams = VideoPreset169.H1080.capture,),audioTrackPublishDefaults = AudioTrackPublishDefaults(audioBitrate = 20_000,dtx = true,),videoTrackPublishDefaults = VideoTrackPublishDefaults(videoEncoding = VideoPreset169.H1080.encoding,)
)
var room = LiveKit.create(...roomOptions = options,
)// Option 2: create tracks manually
val localParticipant = room.localParticipant
val audioTrack = localParticipant.createAudioTrack("audio")
localParticipant.publishAudioTrack(audioTrack)val videoTrack = localParticipant.createVideoTrack("video", LocalVideoTrackOptions(CameraPosition.FRONT,VideoPreset169.H1080.capture
))
localParticipant.publishVideoTrack(videoTrack)

订阅信息流

默认用户进入房间,会监听所有信息流。

coroutineScope.launch {room.events.collect { event ->when(event) {is RoomEvent.TrackSubscribed -> {// Audio tracks are automatically played.val videoTrack = event.track as? VideoTrack ?: return@collectvideoTrack.addRenderer(videoRenderer)}else -> {}}}
}

监听事件

事件分为:room事件和参与者事件。这是事件列表:

EVENT

DESCRIPTION

ROOM EVENT

PARTICIPANT EVENT

ParticipantConnected 参与者Connected

A RemoteParticipant joins after the local participant.

✔️

RemoteParticipant 在本地参与者之后加入。

ParticipantDisconnected 参与者断开连接

A RemoteParticipant leaves

✔️

RemoteParticipant 离开

Reconnecting 重新连接

The connection to the server has been interrupted and it's attempting to reconnect.

✔️

与服务器的连接已中断,它正在尝试重新连接。

Reconnected 重新

Reconnection has been successful

✔️

重新连接成功

Disconnected 断开

Disconnected from room due to the room closing or unrecoverable failure

✔️

由于会议室关闭或无法恢复的故障而与会议室断开连接

TrackPublished 轨迹已发布

A new track is published to room after the local participant has joined

✔️

✔️

本地参加者加入后,新轨道将发布到聊天室

TrackUnpublished TrackUnpublished (未发布)

A RemoteParticipant has unpublished a track

✔️

✔️

RemoteParticipant 已取消发布轨道

TrackSubscribed

The LocalParticipant has subscribed to a track

✔️

✔️

LocalParticipant 已订阅跟踪

TrackUnsubscribed 跟踪Unsubscribed

A previously subscribed track has been unsubscribed

✔️

✔️

之前订阅的曲目已取消订阅

TrackMuted TrackMuted (轨道静音)

A track was muted, fires for both local tracks and remote tracks

✔️

✔️

轨道已静音,本地轨道和远程轨道均触发

TrackUnmuted TrackUnmuted (轨道未静音)

A track was unmuted, fires for both local tracks and remote tracks

✔️

✔️

轨道已取消静音,本地轨道和远程轨道均触发

LocalTrackPublished LocalTrack已发布

A local track was published successfully

✔️

✔️

已成功发布本地轨道

LocalTrackUnpublished

A local track was unpublished

✔️

✔️

本地曲目未发布

ActiveSpeakersChanged ActiveSpeakers已更改

Current active speakers has changed

✔️

当前当前活跃的发言人已更改

IsSpeakingChanged

The current participant has changed speaking status

✔️

当前参与者已更改发言状态

ConnectionQualityChanged 连接质量已更改

Connection quality was changed for a Participant

✔️

✔️

参与者的连接质量已更改

ParticipantMetadataChanged

A participant's metadata was updated via server API

✔️

✔️

参与者的元数据已通过服务器 API 更新

RoomMetadataChanged RoomMetadataChanged 的

Metadata associated with the room has changed

✔️

与聊天室关联的元数据已更改

DataReceived 已接收数据

Data received from another participant or server

✔️

✔️

从其他参与者或服务器接收的数据

TrackStreamStateChanged TrackStreamStateChanged (已更改)

Indicates if a subscribed track has been paused due to bandwidth

✔️

✔️

指示订阅的曲目是否因带宽而暂停

TrackSubscriptionPermissionChanged

One of subscribed tracks have changed track-level permissions for the current participant

✔️

✔️

其中一个已订阅的轨道已更改当前参与者的轨道级别权限

ParticipantPermissionsChanged

When the current participant's permissions have changed

✔️

✔️

ParticipantPermissions已更改

当前参与者的权限发生更改时

服务端操作

生成用户token

需要LiveKit服务的API_KEY和API-SECRET,通过LiveKit API生成JWT令牌。

通过登录JWT获取到用户的信息,identify=user_id+场景,name=用户昵称(默认值),room名称=场景名(user_id)

# server.py
import os
from livekit import api
from flask import Flaskapp = Flask(__name__)@app.route('/getToken')
def getToken():token = api.AccessToken(os.getenv('LIVEKIT_API_KEY'), os.getenv('LIVEKIT_API_SECRET')) \.with_identity("identity") \.with_name("my name") \.with_grants(api.VideoGrants(room_join=True,room="my-room",))return token.to_jwt()

开发环境可以通过CLI快速创建token:

livekit-cli token create   --api-key devkey --api-secret secret   --join --room test_room --identity test_user   --valid-for 24h

token属性

基于JWT的令牌,包含用户身份、放假名称、功能、权限等。按照场景颁发token,也就是对应的房间。

聊天室权限在解码的加入令牌的 video 字段中指定。它可能包含以下一个或多个属性:

FIELD

TYPE

DESCRIPTION

roomCreate room创建

bool

Permission to create or delete rooms

创建或删除聊天室的权限

roomList roomList 会议室

bool

Permission to list available rooms

列出可用会议室的权限

roomJoin room加入

bool

Permission to join a room

加入聊天室的权限

roomAdmin roomAdmin 管理员

bool

Permission to moderate a room

管理聊天室的权限

roomRecord roomRecord (房间记录)

bool

Permissions to use Egress service

使用 Egress 服务的权限

ingressAdmin 入口管理员

bool 布尔

Permissions to use Ingress service

Ingress 服务使用权限

room 房间

string 字符串

Name of the room, required if join or admin is set

聊天室的名称,如果设置了 join 或 admin,则为必填项

canPublish 可以发布

bool 布尔

Allow participant to publish tracks

允许参与者发布轨迹

canPublishData

bool 布尔

Allow participant to publish data to the room

允许参与者将数据发布到聊天室

canPublishSources

string[] 字符串[]

When set, only listed source can be published. (camera, microphone, screen_share, screen_share_audio)

设置后,只能发布列出的源。(摄像头、麦克风、screen_share、screen_share_audio)

canSubscribe canSubscribe 订阅

bool 布尔

Allow participant to subscribe to tracks

允许参加者订阅曲目

canUpdateOwnMetadata

bool 布尔

Allow participant to update its own metadata

允许参与者更新自己的元数据

hidden 隐藏

bool 布尔

Hide participant from others in the room

对聊天室中的其他人隐藏参与者

kind 类

string 字符串

Type of participant (standard, ingress, egress, sip, or agent). this field is typically set by LiveKit internals.

参与者类型(标准、入口、出口、SIP 或代理)。此字段通常由 LiveKit 内部设置。

session断开操作

用户离开房间后,回话会结束,通过add_shutdown_callback回调,可以处理后续操作。例如:发送聊天结束事件。

async def entrypoint(ctx: JobContext):async def my_shutdown_hook():# save user state...ctx.add_shutdown_callback(my_shutdown_hook)

Agent操作

创建Agent服务节点

LiveKit的Agent框架现在只支持python的SDK,文档地址如下:https://docs.livekit.io/agents/quickstart/

这是官方给的demo:

import asynciofrom livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):# Create an initial chat context with a system promptinitial_ctx = llm.ChatContext().append(role="system",text=("You are a voice assistant created by LiveKit. Your interface with users will be voice. ""You should use short and concise responses, and avoiding usage of unpronouncable punctuation."),)# Connect to the LiveKit room# indicating that the agent will only subscribe to audio tracksawait ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)# VoiceAssistant is a class that creates a full conversational AI agent.# See https://github.com/livekit/agents/tree/main/livekit-agents/livekit/agents/voice_assistant# for details on how it works.assistant = VoiceAssistant(vad=silero.VAD.load(),stt=deepgram.STT(),llm=openai.LLM(),tts=openai.TTS(),chat_ctx=initial_ctx,)# Start the voice assistant with the LiveKit roomassistant.start(ctx.room)await asyncio.sleep(1)# Greets the user with an initial messageawait assistant.say("Hey, how can I help you today?", allow_interruptions=True)if __name__ == "__main__":# Initialize the worker with the entrypointcli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

Agent的生命周期

  1. 当worker程序启动时,会通过websocket连接到LiveKit服务器,将自己注册成worker。一个worker下会有多个子进程(Agent)来处理请求。
  2. 当用户进入房间时,LiveKit服务器通过负载均衡选择一个worker,为用户提供服务。
  3. 子进程处理来自用户的消息,并给出回复。
  4. 当用户退出房间时,房间滚啊比,并且断开与agent的连接。

Agent内部执行流程

agent在处理请求时,包含几个节点:

  1. request handler:判断能否处理请求,不能请求则LiveKit会讲任务交给其他worker
  2. entrypoint:agent进入房间之前,执行的初始化操作
  3. prewarm function:agent进程启动时调用,可以执行加载模型等耗时的操作

Worker类型

opts = WorkerOptions(...# when omitted, the default is JobType.JT_ROOMworker_type=JobType.JT_ROOM,
)

JobType 枚举有两个选项:

  • JT_ROOM:将为每个房间创建一个新的代理实例。
  • JT_PUBLISHER:将为房间里的每个参与者创建一个新的代理实例。

Agent处理请求

处理音频流

@ctx.room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track,publication: rtc.TrackPublication,participant: rtc.RemoteParticipant,
):# 监听音频流if track.kind == rtc.TrackKind.KIND_AUDIO:audio_stream = rtc.AudioStream(track)async for event in audio_stream:do_something(event.frame)

发布音频流

发布音频涉及将流拆分为长度固定的音频帧。内部缓冲区保存 50 毫秒长的音频队列,实时发送。用于发送新帧的 capture_frame 方法是阻塞的,在缓冲区接收整个帧之前阻塞在那里。这样可以更轻松地处理中断。

为了发布音轨,需要事先确定采样率和声道数,以及每帧的长度(样本数)。下面的示例是在 10ms 长帧中以 48kHz 传输恒定的 16 位正弦波:

SAMPLE_RATE = 48000
NUM_CHANNELS = 1 # mono audio
AMPLITUDE = 2 ** 8 - 1
SAMPLES_PER_CHANNEL = 480 # 10ms at 48kHzasync def entrypoint(ctx: JobContext):await ctx.connect()source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS)track = rtc.LocalAudioTrack.create_audio_track("example-track", source)# since the agent is a participant, our audio I/O is its "microphone"options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)# ctx.agent is an alias for ctx.room.local_participantpublication = await ctx.agent.publish_track(track, options)frequency = 440async def _sinewave():audio_frame = rtc.AudioFrame.create(SAMPLE_RATE, NUM_CHANNELS, SAMPLES_PER_CHANNEL)audio_data = np.frombuffer(audio_frame.data, dtype=np.int16)time = np.arange(SAMPLES_PER_CHANNEL) / SAMPLE_RATEtotal_samples = 0while True:time = (total_samples + np.arange(SAMPLES_PER_CHANNEL)) / SAMPLE_RATEsinewave = (AMPLITUDE * np.sin(2 * np.pi * frequency * time)).astype(np.int16)np.copyto(audio_data, sinewave)# send this frame to the trackawait source.capture_frame(frame)total_samples += samples_per_channel

处理文本消息

监听data_received事件,处理用户发来的消息;通过publish_data()发送消息给用户。

@room.on("data_received")
def on_data_received(data: rtc.DataPacket):logging.info("received data from %s: %s", data.participant.identity, data.data)# string payload will be encoded to bytes with UTF-8
await room.local_participant.publish_data("my payload",reliable=True,destination_identities=["identity1", "identity2"],topic="topic1")

http://www.ppmy.cn/news/1522774.html

相关文章

STM32(十二):DMA直接存储器存取

DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设和存储器或者存储器和存储器之间的高速数据传输,无须CPU干预,节省了CPU的资源。(运行内存SRAM、程序存储器Flash、寄存器) 12个独立可配置的通道&…

SAP自动化操作

业务场景 1、主数据维护(物料、成本中心、科目、资产、供应商、客户等等) 2、业务单据创建(包括内部订单、销售订单,采购订单,生产订单,交货单等等) 3、业务单据处理(订单评审&…

HTTP 二、进阶

四、安全 1、TLS是什么 (1)为什么要有HTTPS ​ 简单的回答是“因为 HTTP 不安全”。由于 HTTP 天生“明文”的特点,整个传输过程完全透明,任何人都能够在链路中截获、修改或者伪造请求 / 响应报文,数据不具有可…

【多线程服务器】多线程下网络编程

目录 多线程模型-非阻塞IOone loop per thread one loop per thread 线程池 one loop per thread与线程池结合 目前主流多线程模型 Reactor模式线程池 Proactor模式 Master-Worker模型 多线程编程的实现 线程抢占问题 Happens-Before关系 Linux下多线程编程常用函数 …

keepalived和lvs高可用集群

keepavlied和lvs高可用集群搭建 主备模式: 关闭防火墙和selinux systemctl stop firewalld setenforce 0部署master负载调度服务器 zyj86 安装ipvsadm keepalived yum install -y keepalived ipvsadm修改主节点配置 vim /etc/keepalived/keepalived.conf! Conf…

C语言小游戏--贪吃蛇实现

C语言小游戏--贪吃蛇实现 1.游戏实现背景2.Win32 API介绍2.1什么是Win32 API2.2控制台程序(Console)2.3控制台屏幕的坐标COORD2.4GetStdHandle2.4.1函数语法2.4.2函数的使用 2.5GetConsoleCursorInfo2.5.1函数语法2.5.2函数的使用 2.6CONSOLE_CURSOR_INFO2.6.1结构体结构2.6.2结…

【RabbitMQ】核心概念

界⾯上的导航栏共分6部分, 这6部分分别是什么意思呢, 我们先看看RabbitMQ的工作流程 1. Producer和Consumer Producer:生产者,是RabbitMQ Server的客户端,向RabbitMQ发送消息 Consumer: 消费者,也是RabbitMQ Server的客户端,从RabbitMQ接收消息 Broker:其实就是RabbitMQSer…

UE4_地形_悬崖拉伸的解决

参考教程 【虚幻5】UE5_UE4_解决悬崖地形贴图拉伸_哔哩哔哩_bilibili 纹理处理 | 虚幻引擎 4.27 文档 | Epic Developer Community (epicgames.com) 主要通过蓝图节点解决:WorldAlignedTexture WorldAlignedTexture(全局一致纹理)函数用于…