kamailio-ACC_JSON模块详解【后端语言go】

devtools/2025/2/2 23:59:03/

要确认 ACC_JSON 模块是否已经成功将计费信息推送到消息队列(MQueue),以及如何从队列中取值,可以按照以下步骤进行操作:


1. 确认 ACC_JSON 已推送到队列

1.1 配置 ACC_JSON

确保 ACC_JSON 模块已正确配置并启用。以下是一个示例配置:

kamailio">loadmodule "acc_json.so"
modparam("acc_json", "log_flag", 1)  # 启用 JSON 记录
modparam("acc_json", "log_extra", "ua=$hdr(User-Agent);uuid=$avp(i:123)")  # 记录额外信息route {if (method == "INVITE") {setflag(1);  # 设置计费标志t_relay();   # 转发请求}
}
1.2 检查 Kamailio 日志
  • 启动 Kamailio 并观察日志输出。
  • 如果 ACC_JSON 模块成功将数据推送到队列,日志中会显示类似以下内容:
    INFO: acc_json: JSON accounting data pushed to MQueue
    
1.3 检查消息队列
  • ACC_JSON 模块使用 Kamailio 的消息队列(MQueue)来存储 JSON 数据。
  • 默认情况下,消息队列的数据会存储在 Kamailio 的共享内存中。
  • 你可以使用 Kamailio 的 MI(Management Interface)RPC(Remote Procedure Call) 命令来检查队列状态。

2. 从队列中取值

2.1 使用 MI 命令

Kamailio 提供了 MI 命令来管理消息队列。以下是一些常用的 MI 命令:

2.1.1 检查队列状态
kamcmd mq.stats
  • 输出示例:
    {"queues": {"acc_json_queue": {"size": 10,          # 队列中当前的消息数量"max_size": 1000,    # 队列的最大容量"dropped": 0         # 丢弃的消息数量}}
    }
    
2.1.2 从队列中读取消息
kamcmd mq.read acc_json_queue
  • 输出示例:
    {"messages": [{"method": "INVITE","from_tag": "abc123","to_tag": "xyz456","callid": "12345","sip_code": "200","sip_reason": "OK","time": "2025-02-01 12:34:56","ua": "SomeUserAgent/1.0","uuid": "12345"},...]
    }
    
2.2 使用 RPC 命令

Kamailio 也支持通过 RPC 命令管理消息队列。以下是一些常用的 RPC 命令:

2.2.1 检查队列状态
kamctl rpc mq.stats
  • 输出示例:
    {"queues": {"acc_json_queue": {"size": 10,"max_size": 1000,"dropped": 0}}
    }
    
2.2.2 从队列中读取消息
kamctl rpc mq.read acc_json_queue
  • 输出示例:
    {"messages": [{"method": "INVITE","from_tag": "abc123","to_tag": "xyz456","callid": "12345","sip_code": "200","sip_reason": "OK","time": "2025-02-01 12:34:56","ua": "SomeUserAgent/1.0","uuid": "12345"},...]
    }
    

3. 自定义队列处理

如果默认的消息队列功能无法满足需求,可以通过以下方式自定义队列处理:

3.1 使用事件路由

Kamailio 支持通过事件路由(Event Route)处理消息队列中的数据。例如:

kamailio">event_route[mq:acc_json_queue] {xlog("L_INFO", "Received JSON accounting data: $mqk($mqv)\n");# 在这里处理队列中的数据
}
3.2 编写外部脚本

可以编写外部脚本(如 Python、Shell 脚本)通过 Kamailio 的 MI 或 RPC 接口定期从队列中读取数据,并进行进一步处理。


4. 调试和验证

  • 调试日志:启用 Kamailio 的调试日志,观察 ACC_JSON 模块的行为:
    kamailio -dd -E -e
    
  • 队列监控:使用 MI 或 RPC 命令定期检查队列状态,确保数据被正确推送和读取。

总结

  • 通过 Kamailio 的 MIRPC 命令可以确认 ACC_JSON 是否成功将数据推送到队列,并从队列中读取数据。
  • 如果需要更复杂的处理,可以使用事件路由或编写外部脚本。
  • 确保 Kamailio 配置正确,并定期监控队列状态以避免数据丢失。

要实现 ACC_JSON 模块将计费信息推送到队列,并由 Go 语言从队列中取值并写入数据库,可以按照以下步骤操作:


1. Kamailio 配置

1.1 加载 ACC_JSON 模块

在 Kamailio 配置文件中加载 ACC_JSON 模块,并配置消息队列:

kamailio">loadmodule "acc_json.so"# 配置 ACC_JSON
modparam("acc_json", "log_flag", 1)  # 启用 JSON 记录
modparam("acc_json", "log_extra", "ua=$hdr(User-Agent);uuid=$avp(i:123)")  # 记录额外信息# 配置消息队列
modparam("mq", "mq_size", 1000)  # 设置队列大小
modparam("mq", "mq_name", "acc_json_queue")  # 设置队列名称route {if (method == "INVITE") {setflag(1);  # 设置计费标志t_relay();   # 转发请求}
}
1.2 验证数据推送

启动 Kamailio 并验证数据是否成功推送到队列:

kamcmd mq.stats
  • 如果队列中有数据,说明 ACC_JSON 模块已成功推送。

2. Go 语言实现

Go 语言程序需要从 Kamailio 的消息队列中读取数据,并将其写入数据库。以下是详细实现思路和代码示例。

2.1 实现思路
  1. 连接 Kamailio:通过 Kamailio 的 RPC 接口连接到消息队列。
  2. 读取队列数据:定期从队列中读取 JSON 格式的计费信息。
  3. 解析 JSON 数据:将读取的 JSON 数据解析为 Go 结构体。
  4. 写入数据库:将解析后的数据写入数据库(如 MySQL、PostgreSQL 等)。
2.2 代码示例
2.2.1 安装依赖

首先,安装 Go 语言的相关依赖:

go get github.com/zero-os/gorpc        # Kamailio RPC 客户端
go get github.com/go-sql-driver/mysql  # MySQL 驱动
2.2.2 Go 代码实现

以下是一个完整的 Go 程序示例:

package mainimport ("database/sql""encoding/json""fmt""log""time""github.com/zero-os/gorpc"_ "github.com/go-sql-driver/mysql"
)// 定义计费信息结构体
type AccountingRecord struct {Method     string `json:"method"`FromTag    string `json:"from_tag"`ToTag      string `json:"to_tag"`CallID     string `json:"callid"`SipCode    string `json:"sip_code"`SipReason  string `json:"sip_reason"`Time       string `json:"time"`UserAgent  string `json:"ua"`UUID       string `json:"uuid"`
}// 数据库配置
const (dbDriver = "mysql"dbUser   = "root"dbPass   = "password"dbName   = "kamailio_acc"
)func main() {// 连接 Kamailio RPCclient := gorpc.NewClient("tcp", "127.0.0.1:2049") // Kamailio RPC 地址defer client.Close()// 连接数据库db, err := sql.Open(dbDriver, fmt.Sprintf("%s:%s@/%s", dbUser, dbPass, dbName))if err != nil {log.Fatalf("Failed to connect to database: %v", err)}defer db.Close()// 定期从队列中读取数据for {// 从队列中读取消息var result map[string]interface{}err := client.Call("mq.read", "acc_json_queue", &result)if err != nil {log.Printf("Failed to read from queue: %v", err)time.Sleep(5 * time.Second) // 等待 5 秒后重试continue}// 解析 JSON 数据messages, ok := result["messages"].([]interface{})if !ok {log.Println("No messages in queue")time.Sleep(5 * time.Second)continue}// 处理每条消息for _, msg := range messages {msgJSON, err := json.Marshal(msg)if err != nil {log.Printf("Failed to marshal message: %v", err)continue}var record AccountingRecordif err := json.Unmarshal(msgJSON, &record); err != nil {log.Printf("Failed to unmarshal message: %v", err)continue}// 将数据写入数据库if err := saveToDatabase(db, record); err != nil {log.Printf("Failed to save record to database: %v", err)}}time.Sleep(1 * time.Second) // 每隔 1 秒检查一次队列}
}// 将数据写入数据库
func saveToDatabase(db *sql.DB, record AccountingRecord) error {query := `INSERT INTO acc_records (method, from_tag, to_tag, callid, sip_code, sip_reason, time, ua, uuid) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`_, err := db.Exec(query,record.Method,record.FromTag,record.ToTag,record.CallID,record.SipCode,record.SipReason,record.Time,record.UserAgent,record.UUID,)return err
}

3. 数据库表设计

在 MySQL 中创建用于存储计费信息的表:

CREATE TABLE acc_records (id INT AUTO_INCREMENT PRIMARY KEY,method VARCHAR(50) NOT NULL,from_tag VARCHAR(100) NOT NULL,to_tag VARCHAR(100) NOT NULL,callid VARCHAR(100) NOT NULL,sip_code VARCHAR(10) NOT NULL,sip_reason VARCHAR(100) NOT NULL,time DATETIME NOT NULL,ua VARCHAR(255),uuid VARCHAR(100)
);

4. 运行程序

  1. 启动 Kamailio,确保 ACC_JSON 模块正常运行。
  2. 运行 Go 程序:
    go run main.go
    
  3. 程序会定期从 Kamailio 的消息队列中读取数据,并将其写入数据库。

5. 验证结果

  • 检查数据库中的 acc_records 表,确认数据是否成功写入。
  • 观察 Go 程序的日志输出,确保没有错误。

总结

通过以上步骤,可以实现 ACC_JSON 模块将计费信息推送到队列,并由 Go 语言程序从队列中读取数据并写入数据库。这种方法适用于需要将 Kamailio 的计费信息与外部系统集成的场景。

Gin框架代码实际
如果使用 Gin 框架,可以将 Go 程序组织成一个更结构化的 Web 服务,同时保留从 Kamailio 消息队列中读取数据并写入数据库的功能。以下是重新整理的代码示例:


1. 项目结构

kamailio-acc/
├── main.go
├── handlers/
│   └── acc_handler.go
├── models/
│   └── acc_model.go
├── services/
│   └── queue_service.go
├── config/
│   └── config.go
└── go.mod

2. 代码实现

2.1 main.go

主程序入口,初始化 Gin 框架并启动服务。

package mainimport ("kamailio-acc/config""kamailio-acc/handlers""kamailio-acc/services""log""time""github.com/gin-gonic/gin"
)func main() {// 加载配置cfg, err := config.LoadConfig()if err != nil {log.Fatalf("Failed to load config: %v", err)}// 初始化数据库db, err := config.InitDB(cfg)if err != nil {log.Fatalf("Failed to initialize database: %v", err)}// 初始化 Kamailio RPC 客户端client := services.NewKamailioClient(cfg.KamailioRPCAddr)defer client.Close()// 启动队列监听服务go services.StartQueueListener(client, db)// 初始化 Gin 框架r := gin.Default()// 注册路由handlers.RegisterRoutes(r, db)// 启动 Web 服务if err := r.Run(cfg.ServerAddr); err != nil {log.Fatalf("Failed to start server: %v", err)}
}

2.2 config/config.go

配置文件加载和数据库初始化。

package configimport ("database/sql""fmt""log"_ "github.com/go-sql-driver/mysql""github.com/spf13/viper"
)type Config struct {ServerAddr      string `mapstructure:"SERVER_ADDR"`KamailioRPCAddr string `mapstructure:"KAMAILIO_RPC_ADDR"`DBDriver        string `mapstructure:"DB_DRIVER"`DBUser          string `mapstructure:"DB_USER"`DBPassword      string `mapstructure:"DB_PASSWORD"`DBName          string `mapstructure:"DB_NAME"`
}func LoadConfig() (*Config, error) {viper.SetConfigFile(".env")if err := viper.ReadInConfig(); err != nil {return nil, fmt.Errorf("failed to read config file: %v", err)}var cfg Configif err := viper.Unmarshal(&cfg); err != nil {return nil, fmt.Errorf("failed to unmarshal config: %v", err)}return &cfg, nil
}func InitDB(cfg *Config) (*sql.DB, error) {dsn := fmt.Sprintf("%s:%s@/%s", cfg.DBUser, cfg.DBPassword, cfg.DBName)db, err := sql.Open(cfg.DBDriver, dsn)if err != nil {return nil, fmt.Errorf("failed to connect to database: %v", err)}if err := db.Ping(); err != nil {return nil, fmt.Errorf("failed to ping database: %v", err)}log.Println("Connected to database")return db, nil
}

2.3 models/acc_model.go

定义数据模型和数据库操作方法。

package modelsimport ("database/sql""log"
)type AccountingRecord struct {Method    string `json:"method"`FromTag   string `json:"from_tag"`ToTag     string `json:"to_tag"`CallID    string `json:"callid"`SipCode   string `json:"sip_code"`SipReason string `json:"sip_reason"`Time      string `json:"time"`UserAgent string `json:"ua"`UUID      string `json:"uuid"`
}func SaveRecord(db *sql.DB, record AccountingRecord) error {query := `INSERT INTO acc_records (method, from_tag, to_tag, callid, sip_code, sip_reason, time, ua, uuid) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`_, err := db.Exec(query,record.Method,record.FromTag,record.ToTag,record.CallID,record.SipCode,record.SipReason,record.Time,record.UserAgent,record.UUID,)if err != nil {log.Printf("Failed to save record: %v", err)return err}log.Printf("Saved record: %+v", record)return nil
}

2.4 services/queue_service.go

从 Kamailio 消息队列中读取数据的服务。

package servicesimport ("encoding/json""kamailio-acc/models""log""time""github.com/zero-os/gorpc"
)type KamailioClient struct {client *gorpc.Client
}func NewKamailioClient(addr string) *KamailioClient {return &KamailioClient{client: gorpc.NewClient("tcp", addr),}
}func (kc *KamailioClient) Close() {kc.client.Close()
}func (kc *KamailioClient) ReadQueue(queueName string) ([]models.AccountingRecord, error) {var result map[string]interface{}if err := kc.client.Call("mq.read", queueName, &result); err != nil {return nil, err}messages, ok := result["messages"].([]interface{})if !ok {return nil, nil}var records []models.AccountingRecordfor _, msg := range messages {msgJSON, err := json.Marshal(msg)if err != nil {log.Printf("Failed to marshal message: %v", err)continue}var record models.AccountingRecordif err := json.Unmarshal(msgJSON, &record); err != nil {log.Printf("Failed to unmarshal message: %v", err)continue}records = append(records, record)}return records, nil
}func StartQueueListener(client *KamailioClient, db *sql.DB) {for {records, err := client.ReadQueue("acc_json_queue")if err != nil {log.Printf("Failed to read from queue: %v", err)time.Sleep(5 * time.Second)continue}for _, record := range records {if err := models.SaveRecord(db, record); err != nil {log.Printf("Failed to save record: %v", err)}}time.Sleep(1 * time.Second)}
}

2.5 handlers/acc_handler.go

定义 HTTP 路由和处理程序。

package handlersimport ("database/sql""net/http""github.com/gin-gonic/gin""kamailio-acc/models"
)func RegisterRoutes(r *gin.Engine, db *sql.DB) {r.GET("/records", func(c *gin.Context) {// 示例:从数据库中获取所有记录var records []models.AccountingRecordrows, err := db.Query("SELECT * FROM acc_records")if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})return}defer rows.Close()for rows.Next() {var record models.AccountingRecordif err := rows.Scan(&record.Method,&record.FromTag,&record.ToTag,&record.CallID,&record.SipCode,&record.SipReason,&record.Time,&record.UserAgent,&record.UUID,); err != nil {c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})return}records = append(records, record)}c.JSON(http.StatusOK, records)})
}

3. 配置文件 .env

SERVER_ADDR=:8080
KAMAILIO_RPC_ADDR=127.0.0.1:2049
DB_DRIVER=mysql
DB_USER=root
DB_PASSWORD=password
DB_NAME=kamailio_acc

4. 运行程序

  1. 启动 Kamailio,确保 ACC_JSON 模块正常运行。
  2. 运行 Go 程序:
    go run main.go
    
  3. 访问 http://localhost:8080/records,查看数据库中的计费记录。

总结

通过 Gin 框架,将 Kamailio 的计费信息处理逻辑组织成一个结构化的 Web 服务。Go 程序从 Kamailio 的消息队列中读取数据并写入数据库,同时提供 HTTP 接口供外部系统查询数据。


http://www.ppmy.cn/devtools/155572.html

相关文章

10:预处理

预处理 1、宏替换2、头文件包含3、条件编译4、typedef和#define的区别5、#define中的注意点5.1、使用do....while(0)5.2、#和##的含义 C语言编译器在编译程序之前,会先使用预处理器(预处理器)处理代码,代码经过预处理之后再送入编译器进行编译。预处理器…

嵌入式硬件篇---CPUGPUTPU

文章目录 第一部分:处理器CPU(中央处理器)1.通用性2.核心数3.缓存4.指令集5.功耗和发热 GPU(图形处理器)1.并行处理2.核心数量3.内存带宽4.专门的应用 TPU(张量处理单元)1.为深度学习定制2.低精…

Linux环境变量

查看所有环境变量 printenv env cat /proc/self/environ 这个命令会显示所有环境变量,但变量之间用 \0(空字符)分隔,适合程序读取而不是直接查看。 P A T H 特殊的环境变量, PATH特殊的环境变量, PATH特殊的…

STM32 AD多通道

接线图: 代码配置: 与单通道相比,将多路选择从初始化函数,调用到功能函数里,在功能函数里以此调用需要使用的通道 整体代码: //AD多通道 void AD_Init2(void) {//定义结构体变量GPIO_InitTypeDef GPIO_In…

亚博microros小车-原生ubuntu支持系列:14雷达跟踪与雷达守卫

背景知识 激光雷达的数据格式参见: 亚博microros小车-原生ubuntu支持系列:13 激光雷达避障-CSDN博客 本节体验雷达跟踪跟守卫 PID控制 从百度百科摘一段介绍 比例积分微分控制(proportional-integral-derivative control)&am…

网络安全实战指南:攻防技术与防御策略

📝个人主页🌹:一ge科研小菜鸡-CSDN博客 🌹🌹期待您的关注 🌹🌹 1. 引言 随着数字化转型的加速,网络安全已成为各行业不可忽视的重要领域。从数据泄露到勒索软件攻击,网络…

SpringCloud篇 微服务架构

1. 工程架构介绍 1.1 两种工程架构模型的特征 1.1.1 单体架构 上面这张图展示了单体架构(Monolithic Architecture)的基本组成和工作原理。单体架构是一种传统的软件架构模式,其中所有的功能都被打包在一个单一的、紧密耦合的应用程序中。 …

受击反馈HitReact、死亡效果Death Dissolve、Floating伤害值Text(末尾附 客户端RPC )

受击反馈HitReact 设置角色受击标签 (GameplayTag基本了解待补充) 角色监听标签并设置移动速度 创建一个受击技能,并应用GE 实现设置角色的受击蒙太奇动画 实现角色受击时播放蒙太奇动画,为了保证通用性,将其设置为一个函数,并…