Commit 4cb0fd25 by zhengcheng.wang

Initial commit

parents
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/gRPC-Raisound-Base-second-pass.iml" filepath="$PROJECT_DIR$/.idea/gRPC-Raisound-Base-second-pass.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="SqlDialectMappings">
<file url="file://$PROJECT_DIR$/command.sql" dialect="MariaDB" />
<file url="file://$PROJECT_DIR$/log.sql" dialect="MariaDB" />
</component>
</project>
\ No newline at end of file
syntax = "proto3";
//执行命令
//protoc --go_out=plugins=grpc:. ./Raisound-protobuf/*.proto
option go_package = "./Raisound-protobuf;raisound";
// 初始化语音流的设置
message InitConfig {
string token = 1;
string devid = 2;
string codec = 3;
int32 sample_rate = 4;
int32 sample_bits = 5;
int32 bitrate = 6;
string scene = 7;
string lang = 8;
int32 nbest = 9;
int32 vad_level = 10;
int32 max_speech_duration = 11;
string meeting_id = 12;
string app_name = 13;
string conn_id = 14;
bool oral_flag = 15;
bool sensitive_flag = 16;
bool punc_flag = 17;
bool num_convert_flag = 18;
string user = 19;
string passwd = 20;
int32 inter_outer_flag = 21; //0表示对内,1表示对外
}
// 初始化语音流的返回
message InitResponse {
int32 code = 1;
string session_id = 2;
string description = 3;
}
// 发送语音流数据的设置
message SendAudioConfig {
string session_id = 1;
bytes audio_data = 2;
int32 end_flag = 3;
int32 data_count = 4;
}
// 发送语音流的返回
message SendAudioResponse {
int32 code = 1;
bool is_result = 2;
bool is_final = 3;
string session_id = 4;
string results = 5;
string description = 6;
string json_data = 7;
int32 speech_duration = 8;
int32 data_index = 9;
}
// 发送心跳的设置
message HeartBeatConfig {
string session_id = 1;
}
// 发送心跳的返回
message HeartBeatResponse {
int32 code = 1;
string session_id = 2;
string description = 3;
}
// 释放语音流的设置
message RelConfig {
string session_id = 1;
}
// 释放语音流的返回
message RelResponse {
int32 code = 1;
string session_id = 2;
string description = 3;
}
service RaisoundSpeech {
rpc InitStreamingASR(InitConfig) returns(InitResponse);
rpc StreamingASR(stream SendAudioConfig) returns(stream SendAudioResponse);
rpc RelStreamingASR(RelConfig) returns(RelResponse);
rpc HeartBeat(HeartBeatConfig) returns(HeartBeatResponse);
}
### 实体定义SQL
# 实体主体结构
CREATE TABLE `entity`
(
`id` INT(16) PRIMARY KEY AUTO_INCREMENT COMMENT '自增长ID',
`entity_id` VARCHAR(64) UNIQUE NOT NULL COMMENT '实体ID',
`entity_name` VARCHAR(128) UNIQUE NOT NULL COMMENT '实体ID',
`entity_number` int(8) NOT NULL COMMENT '实体条目数量',
`platform` VARCHAR(32) NOT NULL DEFAULT 'voice' COMMENT '使用平台',
`created_at` TIMESTAMP DEFAULT now() COMMENT '创建时间',
`updated_at` TIMESTAMP DEFAULT now() COMMENT '创建时间'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
# 实体条目内容,和实体主体结构是一对多的关联关心,关联字段是`entity_id`
CREATE TABLE `entity_items`
(
`id` INT(16) PRIMARY KEY AUTO_INCREMENT COMMENT '自增长ID',
`entity_id` VARCHAR(64) NOT NULL COMMENT '实体ID,和`entity`的`entity_id`关联',
`entity_item_index` int(8) NOT NULL COMMENT '实体内容条目的索引',
`entity_item` VARCHAR(128) NOT NULL COMMENT '实体内容条目',
`created_at` TIMESTAMP DEFAULT now() COMMENT '创建时间',
`updated_at` TIMESTAMP DEFAULT now() COMMENT '创建时间'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
# 查询SQL(参考)
SELECT `t1`.`entity_id`, `t1`.`entity_name`, `t1`.`entity_number`, `t2`.`entity_item_index`, `t2`.`entity_item`
FROM `entity` AS `t1`
JOIN `entity_items` AS `t2` ON `t1`.`entity_id` = `t2`.`entity_id`;
### 命令模板结构
# 命令模板结构
CREATE TABLE `command_structure`
(
`id` INT(16) PRIMARY KEY AUTO_INCREMENT COMMENT '自增长ID',
`command_intent_id` VARCHAR(64) UNIQUE NOT NULL COMMENT '命令意图模板ID',
`command_intent_name` VARCHAR(128) UNIQUE NOT NULL COMMENT '命令意图模板名称',
`command_entity_number` int(2) NOT NULL COMMENT '命令实体项中条目数',
`platform` VARCHAR(32) NOT NULL DEFAULT 'voice' COMMENT '使用平台',
`created_at` TIMESTAMP DEFAULT now() COMMENT '创建时间',
`updated_at` TIMESTAMP DEFAULT now() COMMENT '创建时间'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
# 命令实体项结构
CREATE TABLE `command_entity`
(
`id` INT(16) PRIMARY KEY AUTO_INCREMENT COMMENT '自增长ID',
`command_intent_id` VARCHAR(64) NOT NULL COMMENT '命令意图模板ID,和`command_structure`的`command_intent_id`关联',
`command_entity_tag` VARCHAR(128) NOT NULL COMMENT '命令实体项条目的标识',
`entity_id` VARCHAR(64) NOT NULL COMMENT '实体ID,和`entity`的`entity_id`关联',
`created_at` TIMESTAMP DEFAULT now() COMMENT '创建时间',
`updated_at` TIMESTAMP DEFAULT now() COMMENT '创建时间'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
# 命令样板项结构
CREATE TABLE `command_template`
(
`id` INT(16) PRIMARY KEY AUTO_INCREMENT COMMENT '自增长ID',
`command_intent_id` VARCHAR(64) NOT NULL COMMENT '命令意图模板ID,和`command_structure`的`command_intent_id`关联',
`command_template_index` int(2) NOT NULL COMMENT '命令样板项索引',
`command_template_item` VARCHAR(512) NOT NULL COMMENT '命令样板项内容',
`created_at` TIMESTAMP DEFAULT now() COMMENT '创建时间',
`updated_at` TIMESTAMP DEFAULT now() COMMENT '创建时间'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
SELECT command_intent_id,command_intent_name FROM command_structure;
SELECT * FROM command_entity WHERE command_intent_id='24508051244799537092493312';
SELECT * FROM command_template WHERE command_intent_id='24508051244799537092493312';
SELECT * FROM entity_items WHERE entity_id='1111111111';
USE bot_know_db;
SELECT t1.command_intent_name, t3.command_entity_tag, t4.entity_name, t5.entity_item, t2.command_template_item
FROM command_structure AS t1
JOIN command_template AS t2 ON t1.command_intent_id = t2.command_intent_id
JOIN command_entity AS t3 ON t1.command_intent_id = t3.command_intent_id
JOIN entity AS t4 ON t3.entity_id = t4.entity_id
JOIN entity_items AS t5 ON t5.entity_id = t4.entity_id WHERE t1.command_intent_id='24508051244799537092493312';
# # 命令模板样板项SQL查询(参考)
# SELECT `t1`.`command_intent_id`,
# `t1`.`command_intent_name`,
# `t1`.`command_template_id`,
# `t2`.`command_template_index`,
# `t2`.`command_template_item`
# FROM `command_structure` AS `t1`
# JOIN `command_template` AS `t2` ON `t1`.`command_template_id` = `t2`.`command_template_id`;
#
#
# ### 训练模板需要的数据查询SQL(参考)
# SELECT `t1`.command_intent_name,
# `t1`.command_entity_id,
# `t2`.command_entity_tag,
# `t2`.entity_id,
# `t4`.entity_name,
# `t5`.entity_item,
# `t3`.command_template_index,
# `t3`.command_template_item
# FROM command_structure AS `t1`
# JOIN command_entity AS `t2` ON `t1`.command_entity_id = `t2`.command_entity_id
# JOIN command_template AS `t3` ON `t1`.command_template_id = `t3`.command_template_id
# JOIN entity AS `t4` ON `t2`.entity_id = `t4`.entity_id
# JOIN entity_items AS `t5` ON `t4`.entity_id = `t5`.entity_id;
\ No newline at end of file
[common]
port = 20065 #端口
defaultLanguage = ZH-CN #默认语种,目前只有三个值ZH-CN(普通话)、ZH-HK(粤语)、EN(英语)
checkTokenURL = http:// #校验token以认证连接
redisURL = huisheng_redis:6379 #redis连接地址,格式为【ip:port】
redisChannel = send audio info #redis的public的通道
hotWordURL = http://192.168.0.61/huisheng_api_gm/hotwords/new/personal/model/get #通过user_id获取热词文本的地址
[mysql]
mysqlUse = false # false为禁用,即禁用日志数据保存在mysql。true即否则则否
mysqlIp = 172.16.5.29
mysqlPort = 3320
mysqlUser = root
mysqlPasswd = imslimsl # 禁止带有有`#`
mysqlDatabase = bot_know_db
[ZH-CN]
websocketArr = ws://huisheng_offline:20086 # 英气地址,可多个,多个是用英文逗号”,“隔开
defaultHotWordFile = #默认热词完整地址名称
digitConvertURL = http://huisheng_itn:8305/encode #转数字
sensitiveWordURL = http://huisheng_itn:8306/encode #敏感词
punctuationURL = http://huisheng_itn:8125/encode #标点符号处理
postURL = http://
[ZH-HK]
websocketArr = ws://huisheng_offline_cantonese:20086/ # 英气地址,可多个,多个是用英文逗号”,“隔开
defaultHotWordFile = /opt/huisheng/data/hotword/a.txt #默认热词完整地址名称
digitConvertURL = http://huisheng_itn:8305/encode #转数字
sensitiveWordURL = http://huisheng_itn:8306/encode #敏感词
punctuationURL = http://huisheng_itn:8125/encode #标点符号处理
postURL = http://
[EN]
websocketArr = ws://huisheng_offline_english:20086/ # 英气地址,可多个,多个是用英文逗号”,“隔开
defaultHotWordFile = /opt/huisheng/data/hotword/a.txt #默认热词完整地址名称
e2nURL = http://huisheng_itn_english:8120/e2n
punctuationURL = http://huisheng_itn_english:8126/encode
[fileConf]
logName = /opt/huisheng/data/log/grpc-golang-base/log/server.log #日志地址
audioPath = /opt/huisheng/data/wav/grpc-golang-base/audio #音频地址
\ No newline at end of file
package initServer
import (
"flag"
"gRPC-Raisound-Base-second-pass/otherHandler"
"github.com/go-ini/ini"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"log"
"os"
"strings"
"sync"
"time"
)
// 配置文件数据结构
var (
Version bool
Config = flag.String("config", "./config.ini", "the server config")
ConfigMap = make(map[string]interface{})
ZhCn = make(map[string]map[string]interface{})
ZhCnLock sync.Mutex
ZhHk = make(map[string]map[string]interface{})
ZhHkLock sync.Mutex
EN = make(map[string]map[string]interface{})
ENLock sync.Mutex
Log *Logger
LogData *otherHandler.LogDataHandler
Snow *otherHandler.SnowflakeBase
)
// TODO: 配置文件处理
func GetConfig() {
var (
conf *ini.File
err error
)
if conf, err = ini.Load(*Config); err != nil {
panic(err.Error())
}
if conf != nil {
//common
ConfigMap["port"] = conf.Section("common").Key("port").String()
ConfigMap["checkTokenURL"] = conf.Section("common").Key("checkTokenURL").String()
ConfigMap["defaultLanguage"] = conf.Section("common").Key("defaultLanguage").String()
ConfigMap["redisURL"] = conf.Section("common").Key("redisURL").String()
ConfigMap["redisChannel"] = conf.Section("common").Key("redisChannel").String()
ConfigMap["hotWordURL"] = conf.Section("common").Key("hotWordURL").String()
if ConfigMap["defaultLanguage"].(string) == "" {
ConfigMap["defaultLanguage"] = "ZH-CN"
}
// mysql
if isMysqlUse, e := conf.Section("mysql").Key("mysqlUse").Bool(); e != nil {
logrus.Fatalln(e)
} else {
ConfigMap["mysqlUse"] = isMysqlUse
}
ConfigMap["mysqlIp"] = conf.Section("mysql").Key("mysqlIp").String()
ConfigMap["mysqlPort"] = conf.Section("mysql").Key("mysqlPort").String()
ConfigMap["mysqlUser"] = conf.Section("mysql").Key("mysqlUser").String()
ConfigMap["mysqlPasswd"] = conf.Section("mysql").Key("mysqlPasswd").String()
ConfigMap["mysqlDatabase"] = conf.Section("mysql").Key("mysqlDatabase").String()
// ZH-CN
ConfigMap["websocketArrZH-CN"] = strings.Split(conf.Section("ZH-CN").Key("websocketArr").String(), ",")
ConfigMap["hotWord-ZH-CN"] = conf.Section("ZH-CN").Key("defaultHotWordFile").String()
ConfigMap["digitConvertURL-ZH-CN"] = conf.Section("ZH-CN").Key("digitConvertURL").String()
ConfigMap["sensitiveWordURL-ZH-CN"] = conf.Section("ZH-CN").Key("sensitiveWordURL").String()
ConfigMap["punctuationURL-ZH-CN"] = conf.Section("ZH-CN").Key("punctuationURL").String()
ConfigMap["postURL-ZH-CN"] = conf.Section("ZH-CN").Key("postURL").String()
// ZH-HK
ConfigMap["websocketArrZH-HK"] = strings.Split(conf.Section("ZH-HK").Key("websocketArr").String(), ",")
ConfigMap["hotWord-ZH-HK"] = conf.Section("ZH-HK").Key("defaultHotWordFile").String()
ConfigMap["digitConvertURL-ZH-HK"] = conf.Section("ZH-HK").Key("digitConvertURL").String()
ConfigMap["sensitiveWordURL-ZH-HK"] = conf.Section("ZH-HK").Key("sensitiveWordURL").String()
ConfigMap["punctuationURL-ZH-HK"] = conf.Section("ZH-HK").Key("punctuationURL").String()
ConfigMap["postURL-ZH-HK"] = conf.Section("ZH-HK").Key("postURL").String()
// EN
ConfigMap["websocketArrEN"] = strings.Split(conf.Section("EN").Key("websocketArr").String(), ",")
ConfigMap["hotWord-EN"] = conf.Section("EN").Key("defaultHotWordFile").String()
ConfigMap["e2nURL"] = conf.Section("EN").Key("e2nURL").String()
ConfigMap["EnPunctuationURL"] = conf.Section("EN").Key("punctuationURL").String()
// fileConf
ConfigMap["logName"] = conf.Section("fileConf").Key("logName").String()
ConfigMap["audioPath"] = conf.Section("fileConf").Key("audioPath").String()
} else {
panic("config error")
}
go func() {
// 创建文件夹
_, e := os.Stat(ConfigMap["audioPath"].(string))
if os.IsNotExist(e) {
if e = os.MkdirAll(ConfigMap["audioPath"].(string), os.ModePerm); e != nil {
log.Println(e.Error())
}
}
}()
// 中文迎请探针
go func(arr []string) {
for _, ip := range arr {
value := make(map[string]interface{})
value["num"] = 0
value["alive"] = true
ZhCn[ip] = value
}
for {
var group sync.WaitGroup
ZhCnLock.Lock()
for _, ip := range arr {
group.Add(1)
go func(ipStr string) {
if c, _, e := websocket.DefaultDialer.Dial(ipStr, nil); e != nil {
ZhCn[ipStr]["alive"] = false
} else {
if c != nil {
_ = c.Close()
ZhCn[ipStr]["alive"] = true
} else {
ZhCn[ipStr]["alive"] = false
}
}
group.Done()
}(ip)
}
group.Wait()
ZhCnLock.Unlock()
time.Sleep(10 * time.Second)
}
}(ConfigMap["websocketArrZH-CN"].([]string))
// 粤语迎请探针
go func(arr []string) {
for _, ip := range arr {
value := make(map[string]interface{})
value["num"] = 0
value["alive"] = true
ZhHk[ip] = value
}
for {
var group sync.WaitGroup
ZhHkLock.Lock()
for _, ip := range arr {
group.Add(1)
go func(ipStr string) {
if c, _, e := websocket.DefaultDialer.Dial(ipStr, nil); e != nil {
ZhHk[ipStr]["alive"] = false
} else {
if c != nil {
_ = c.Close()
ZhHk[ipStr]["alive"] = true
} else {
ZhHk[ipStr]["alive"] = false
}
}
group.Done()
}(ip)
}
group.Wait()
ZhHkLock.Unlock()
time.Sleep(10 * time.Second)
}
}(ConfigMap["websocketArrZH-HK"].([]string))
// 英语迎请探针
go func(arr []string) {
for _, ip := range arr {
value := make(map[string]interface{})
value["num"] = 0
value["alive"] = true
EN[ip] = value
}
for {
var group sync.WaitGroup
ENLock.Lock()
for _, ip := range arr {
group.Add(1)
go func(ipStr string) {
if c, _, e := websocket.DefaultDialer.Dial(ipStr, nil); e != nil {
EN[ipStr]["alive"] = false
} else {
if c != nil {
_ = c.Close()
EN[ipStr]["alive"] = true
} else {
EN[ipStr]["alive"] = false
}
}
group.Done()
}(ip)
}
group.Wait()
ENLock.Unlock()
time.Sleep(10 * time.Second)
}
}(ConfigMap["websocketArrEN"].([]string))
}
func init() {
flag.BoolVar(&Version, "version", false, "")
flag.Parse()
if !Version {
logrus.SetReportCaller(true)
GetConfig()
Log, _ = InitLogger(ConfigMap["logName"].(string), "info")
if ConfigMap["mysqlUse"].(bool) {
LogData = otherHandler.InitLog(ConfigMap["mysqlIp"].(string), ConfigMap["mysqlPort"].(string), ConfigMap["mysqlUser"].(string), ConfigMap["mysqlPasswd"].(string), ConfigMap["mysqlDatabase"].(string), 1024)
Snow = otherHandler.NewSnowFlake(1)
}
}
}
package initServer
import (
"fmt"
"github.com/sirupsen/logrus"
"os"
"path"
)
type Logger struct {
Log *logrus.Logger
}
func InitLogger(fileName, level string) (log *Logger, err error) {
var (
logger *logrus.Logger
file *os.File
fileDir string
)
logger = logrus.New()
logger.SetReportCaller(true)
logger.SetFormatter(&logrus.JSONFormatter{})
fileDir = path.Dir(fileName)
fmt.Println(fileDir)
_, err = os.Stat(fileDir)
if os.IsNotExist(err) {
if err = os.MkdirAll(fileDir, os.ModePerm); err != nil {
logger.Fatal(err)
}
}
if file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0766); err != nil {
logger.Error(err)
}
if level == "error" {
logger.SetLevel(logrus.ErrorLevel)
}
if level == "info" {
logger.SetLevel(logrus.InfoLevel)
}
if level == "warn" {
logger.SetLevel(logrus.WarnLevel)
}
if level == "fatal" {
logger.SetLevel(logrus.FatalLevel)
}
if level == "panic" {
logger.SetLevel(logrus.PanicLevel)
}
if level == "debug" {
logger.SetLevel(logrus.DebugLevel)
}
logger.Out = file
log = &Logger{
Log: logger,
}
return
}
//func (logger *Logger) Info(data, name string) {
// log.Println(runtime.Caller(0))
// logger.log.WithFields(logrus.Fields{"source": name, "status": 200}).Info(data)
//}
//
//func (logger *Logger) Error(data, name string) {
// logger.log.WithFields(logrus.Fields{"source": name, "status": 400}).Error(data)
//}
## 实时转写结果语法
CREATE TABLE `transfer_log`
(
`id` VARCHAR(64) PRIMARY KEY COMMENT '正式结果的id',
`user` VARCHAR(64) DEFAULT NULL COMMENT '用户名',
`sys_type` int(2) DEFAULT 0 COMMENT '1.语音操控 2.APP',
`meeting_id` VARCHAR(64) NOT NULL COMMENT '每个长连接连接的id',
`session_id` VARCHAR(64) NOT NULL COMMENT '服务端赋予每个长连接连接的id',
`texts` VARCHAR(1024) DEFAULT NULL COMMENT '正式结果',
`audio_file` VARCHAR(512) UNIQUE DEFAULT NULL COMMENT '对应音频文件的完整地址',
`started_at` TIMESTAMP DEFAULT NULL COMMENT '开始时间',
`created_at` TIMESTAMP DEFAULT NULL COMMENT '创建时间',
`updated_at` TIMESTAMP DEFAULT NULL COMMENT '更新时间'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
INSERT INTO `transfer_log` (`id`, `user`,`sys_type`, `meeting_id`, `session_id`, `texts`,`audio_file`,`started_at`,`created_at`,`updated_at`) VALUES ('1234567890','test1',0,'meeting_id1','sessiong_id1','从北京到上海的航线情况','/opt/huisheng/wav/audio/test1/meeting_id1/sessiong_id100000001.wav','2023-06-12 12:00:00','2023-06-12 12:00:00','2023-06-12 12:00:00');
INSERT INTO `transfer_log` (`id`, `user`,`sys_type`, `meeting_id`, `session_id`, `texts`,`audio_file`,`started_at`,`created_at`,`updated_at`) VALUES ('1234567891','test1',0,'meeting_id1','sessiong_id1','从北京到广州的航线情况','/opt/huisheng/wav/audio/test1/meeting_id1/sessiong_id100000002.wav','2023-06-12 12:00:20','2023-06-12 12:00:20','2023-06-12 12:00:20');
INSERT INTO `transfer_log` (`id`, `user`,`sys_type`, `meeting_id`, `session_id`, `texts`,`audio_file`,`started_at`,`created_at`,`updated_at`) VALUES ('1234567892','test1',0,'meeting_id1','sessiong_id1','从北京到深圳的航线情况','/opt/huisheng/wav/audio/test1/meeting_id1/sessiong_id100000003.wav','2023-06-12 12:00:30','2023-06-12 12:00:30','2023-06-12 12:00:30');
INSERT INTO `transfer_log` (`id`, `user`,`sys_type`, `meeting_id`, `session_id`, `texts`,`audio_file`,`started_at`,`created_at`,`updated_at`) VALUES ('1234567893','test1',0,'meeting_id1','sessiong_id1','从北京到成都的航线情况','/opt/huisheng/wav/audio/test1/meeting_id1/sessiong_id100000004.wav','2023-06-12 12:00:40','2023-06-12 12:00:40','2023-06-12 12:00:40');
## 需求解析结果sql
CREATE TABLE `parse_log`
(
`transfer_id` VARCHAR(64) NOT NULL UNIQUE COMMENT '正式结果的id,跟`transfer_log`的`id`匹配',
`parse_result` VARCHAR(1024) DEFAULT NULL COMMENT '需求解析结果,一般设置json格式',
`created_at` TIMESTAMP DEFAULT NULL COMMENT '创建时间',
`updated_at` TIMESTAMP DEFAULT NULL COMMENT '创建时间'
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
INSERT INTO `parse_log` (`transfer_id`, `parse_result`,`created_at`,`updated_at`) VALUES ('1234567890','"{"intent":"","entities":[{"entity":"city","value":"北京"},{"entity":"city","value":"上海"}]}"','2023-06-12 12:00:00','2023-06-12 12:00:00');
INSERT INTO `parse_log` (`transfer_id`, `parse_result`,`created_at`,`updated_at`) VALUES ('1234567891','"{"intent":"","entities":[{"entity":"city","value":"北京"},{"entity":"city","value":"广州"}]}"','2023-06-12 12:00:20','2023-06-12 12:00:20');
INSERT INTO `parse_log` (`transfer_id`, `parse_result`,`created_at`,`updated_at`) VALUES ('1234567892','"{"intent":"","entities":[{"entity":"city","value":"北京"},{"entity":"city","value":"深圳"}]}"','2023-06-12 12:00:30','2023-06-12 12:00:30');
INSERT INTO `parse_log` (`transfer_id`, `parse_result`,`created_at`,`updated_at`) VALUES ('1234567893','"{"intent":"","entities":[{"entity":"city","value":"北京"},{"entity":"city","value":"成都"}]}"','2023-06-12 12:00:40','2023-06-12 12:00:40');
package main
import (
"fmt"
"gRPC-Raisound-Base-second-pass/initServer"
"gRPC-Raisound-Base-second-pass/server"
"github.com/gin-gonic/gin"
"github.com/soheilhy/cmux"
"log"
"net"
"time"
)
var Version = "This is the basic version of gRPC, the latest version updated on October 10, 2022.\nUpdated an issue where audio and text do not match."
func main() {
if initServer.Version {
log.Println(Version)
}
if !initServer.Version {
var (
err error
lis net.Listener
)
//server.ConfigMap["empty"] = b
if lis, err = net.Listen("tcp", fmt.Sprintf(":%v", initServer.ConfigMap["port"].(string))); err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Println("listen: ", fmt.Sprintf(":%v ,time: %v", initServer.ConfigMap["port"].(string), time.Now().String()))
cm := cmux.New(lis)
grpcL := cm.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
grpcServer := server.GrpcServer()
go func() {
if e := grpcServer.Serve(grpcL); e != nil {
log.Fatalf("failed to serve: %v", e)
}
}()
// http
router := gin.Default()
go func() {
httpL := cm.Match(cmux.HTTP1Fast())
router.GET("/getEnd/:meeting_id", server.GetEnd)
router.GET("/getAudioFile", server.GetAudioFile)
router.POST("/removeMeetingId", server.RemoveFile)
router.GET("/getAudioPath/:token/:aid/:bid", server.LoadFile)
router.GET("/loadAudioPath/:aid/:bid", server.LoadFile1)
_ = router.RunListener(httpL)
}()
_ = cm.Serve()
}
}
package otherHandler
import (
"errors"
"github.com/sirupsen/logrus"
"time"
)
type LogData struct {
Id interface{}
User interface{}
SysType interface{}
MeetingId interface{}
SessionId interface{}
Texts interface{}
AudioFile interface{}
StartedAt interface{}
CreatedAt interface{}
UpdatedAt interface{}
End bool
}
type LogDataHandler struct {
dataCache chan LogData
mysqlCon *MysqlConn
}
func InitLog(mysqlIp, mysqlPort, mysqlUser, mysqlPasswd, mysqlDatabase string, dataCacheNum int) (log *LogDataHandler) {
var (
conn *MysqlConn
)
conn = InitMysqlDb(mysqlIp, mysqlPort, mysqlUser, mysqlPasswd, mysqlDatabase)
log = &LogDataHandler{
dataCache: make(chan LogData, dataCacheNum),
mysqlCon: conn,
}
go func() {
for {
log.insertDataToMysql()
}
}()
return
}
func (Log *LogDataHandler) SetData(data LogData) (err error) {
logrus.Println(data)
select {
case Log.dataCache <- data:
default:
err = errors.New("the database has crashed and the cache space is full")
}
return
}
func (Log *LogDataHandler) insertDataToMysql(data ...LogData) {
var (
getData LogData
cacheData []LogData
sql = "INSERT INTO `transfer_log` (`id`,`user`,`sys_type`,`meeting_id`,`session_id`,`texts`,`audio_file`,`started_at`,`created_at`,`updated_at`) VALUES (?,?,?,?,?,?,?,?,?,?); "
columnData [][]interface{}
err error
)
if len(data) > 0 {
for _, val := range data {
//if idx == len(data)-1 {
// sql = sql + "(?,?,?,?,?,?,?,?,?,?);"
//} else {
// sql = sql + "(?,?,?,?,?,?,?,?,?,?),"
//}
columnData = append(columnData, []interface{}{val.Id, val.User, val.SysType, val.MeetingId, val.SessionId, val.Texts, val.AudioFile, val.StartedAt, val.CreatedAt, val.UpdatedAt})
}
if err = Log.mysqlCon.InsertData(sql, columnData...); err != nil {
logrus.Errorln(err)
time.Sleep(5 * time.Second)
Log.insertDataToMysql(data...)
}
} else {
for {
select {
case getData = <-Log.dataCache:
}
if len(cacheData) == 10 || getData.End {
break
}
//logrus.Println(getData)
cacheData = append(cacheData, getData)
}
Log.insertDataToMysql(cacheData...)
}
return
}
package otherHandler
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/sirupsen/logrus"
"log"
"sync"
"time"
)
type MysqlConn struct {
dbConn *sql.DB
ip string
port string
user string
passwd string
database string
restartConn bool
lock sync.Mutex
}
func InitMysqlDb(ip, port, user, passwd, database string) (conn *MysqlConn) {
var (
db *sql.DB
err error
dsn string
)
dsn = fmt.Sprintf("%v:%v@tcp(%v:%v)/%v?charset=utf8mb4&parseTime=True", user, passwd, ip, port, database)
log.Println(dsn)
if db, err = sql.Open("mysql", dsn); err != nil {
logrus.Errorln(err)
time.Sleep(5 * time.Second)
return InitMysqlDb(ip, port, user, passwd, database)
}
if err = db.Ping(); err != nil {
logrus.Errorln(err)
time.Sleep(5 * time.Second)
return InitMysqlDb(ip, port, user, passwd, database)
}
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(2)
conn = &MysqlConn{
dbConn: db,
ip: ip,
port: port,
user: user,
passwd: passwd,
database: database,
}
log.Println("HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH")
return
}
func (db *MysqlConn) InsertData(insertSQL string, data ...[]interface{}) (err error) {
//var (
// result sql.Result
//)
if db.dbConn != nil {
if err = db.dbConn.Ping(); err != nil {
db.lock.Lock()
if !db.restartConn {
db.restartConn = true
dbTemp := InitMysqlDb(db.ip, db.port, db.user, db.passwd, db.database)
db.dbConn = dbTemp.dbConn
}
db.lock.Unlock()
go func() {
time.Sleep(3 * time.Second)
db.restartConn = false
}()
return db.InsertData(insertSQL, data...)
}
tx, e := db.dbConn.Begin()
if e != nil {
logrus.Errorln(e)
time.Sleep(1 * time.Second)
return db.InsertData(insertSQL, data...)
}
stmt, e1 := tx.Prepare(insertSQL)
if e1 != nil {
logrus.Errorln(e1)
time.Sleep(1 * time.Second)
return db.InsertData(insertSQL, data...)
}
var errData [][]interface{}
for _, val := range data {
if _, err = stmt.Exec(val...); err != nil {
logrus.Errorln(err)
//time.Sleep(1 * time.Second)
errData = append(errData, val)
}
//if row, _ := result.LastInsertId(); row == 0 {
// logrus.Errorln("cannot insert data")
// errData = append(errData, val)
//}
}
_ = tx.Commit()
if len(errData) > 0 {
return db.InsertData(insertSQL, errData...)
}
} else {
db.lock.Lock()
if !db.restartConn {
db.restartConn = true
dbTemp := InitMysqlDb(db.ip, db.port, db.user, db.passwd, db.database)
db.dbConn = dbTemp.dbConn
}
db.lock.Unlock()
go func() {
time.Sleep(3 * time.Second)
db.restartConn = false
}()
return db.InsertData(insertSQL, data...)
}
return
}
package otherHandler
import (
"crypto/md5"
"fmt"
"github.com/garyburd/redigo/redis"
"github.com/sirupsen/logrus"
)
type RdbStruct struct {
rDbConn redis.Conn
channel string
server string
}
func InitRdb(channel, server string) (Db *RdbStruct, err error) {
var (
rDb redis.Conn
)
if rDb, err = redis.Dial("tcp", server); err != nil {
logrus.Errorln("connecting redis error")
return
}
Db = &RdbStruct{
rDbConn: rDb,
channel: channel,
server: server,
}
return
}
func (rdb *RdbStruct) Set(key string, value int) (err error) {
key = "meetingId:" + fmt.Sprintf("%x", md5.Sum([]byte(key)))
if _, err = rdb.rDbConn.Do("SET", key, value); err != nil {
logrus.Errorln("Publish data redis channel error")
var (
rDb redis.Conn
)
if rDb, err = redis.Dial("tcp", rdb.server); err != nil {
return
}
if _, err = rDb.Do("SET", key, value); err != nil {
return
}
rdb.rDbConn = rDb
}
return
}
func (rdb *RdbStruct) PubMsg(msg string) (err error) {
if _, err = rdb.rDbConn.Do("Publish", rdb.channel, msg); err != nil {
logrus.Errorln("Publish data redis channel error")
var (
rDb redis.Conn
)
if rDb, err = redis.Dial("tcp", rdb.server); err != nil {
return
}
if _, err = rDb.Do("Publish", rdb.channel, msg); err != nil {
return
}
rdb.rDbConn = rDb
}
return
}
func (rdb *RdbStruct) LeftPubMsg(msg string) (err error) {
if _, err = rdb.rDbConn.Do("lpush", "audioQueue", msg); err != nil {
logrus.Errorln("lPublish data redis channel error")
var (
rDb redis.Conn
)
if rDb, err = redis.Dial("tcp", rdb.server); err != nil {
return
}
if _, err = rdb.rDbConn.Do("lpush", "audioQueue", msg); err != nil {
return
}
rdb.rDbConn = rDb
}
return
}
func (rdb *RdbStruct) RedisClose() {
var err error
if err = rdb.rDbConn.Close(); err != nil {
logrus.Errorln("closing redis connecting error")
}
}
package otherHandler
import (
"fmt"
"io/ioutil"
"os"
)
type WavStruct struct {
file *os.File
meetingId string
EFile string
audioLength int
isClose bool
}
// InitSaveWave TODO:初始化wave文件
func InitSaveWave(meetingId, audioPath string) (wav *WavStruct, err error) {
var (
file *os.File
audioLength int
)
// 创建文件夹
pcmPath := audioPath
// 确定文件格式
fileDir := fmt.Sprintf("%v/%v", pcmPath, meetingId)
eName := fmt.Sprintf("%v/%vf.wav", fileDir, meetingId)
if exist, _ := ExistsFile(eName); exist {
content, _ := ioutil.ReadFile(eName)
audioLength = len(content)
}
_, err = os.Stat(fileDir)
if os.IsNotExist(err) {
if err = os.MkdirAll(fileDir, os.ModePerm); err != nil {
return
}
}
// 将数据存入总文件
if file, err = os.OpenFile(eName, os.O_RDONLY|os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0766); err != nil {
return
}
wav = &WavStruct{
file: file,
meetingId: meetingId,
EFile: eName,
audioLength: audioLength,
}
return
}
func (w *WavStruct) SaveData(data []byte) (filePath string, err error) {
if _, err = w.file.Write(data); err != nil {
if w.file, err = os.OpenFile(w.EFile, os.O_RDONLY|os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0766); err != nil {
return
}
if _, err = w.file.Write(data); err != nil {
return
}
}
filePath = w.EFile
w.audioLength = w.audioLength + len(data)
return
}
func (w *WavStruct) CloseWav() (err error) {
//_ ,_ = w.file.Seek(44,0)
//w.file.WriteAt()
if !w.isClose {
w.isClose = true
if err = w.file.Close(); err != nil {
}
file1, _ := os.OpenFile(w.EFile, os.O_RDWR, os.ModePerm)
l, _ := file1.Seek(0, 0)
waveHeader := pcmToWavHeader(1, 16000, 16, w.audioLength)
_, _ = file1.WriteAt(waveHeader, l)
_ = file1.Close()
}
return
}
/*
*
dst:二进制字符串
numChannel:1=单声道,2=多声道
sampleRate:采样率 8000/16000
*/
func pcmToWavHeader(numChannel, sampleRate, bitsPerSample, audioLen int) (wavHeader []byte) {
longSampleRate := sampleRate
byteRate := bitsPerSample * sampleRate * numChannel / 8
totalAudioLen := audioLen
totalDataLen := totalAudioLen + 36
var header = make([]byte, 44)
// RIFF/WAVE header
header[0] = 'R'
header[1] = 'I'
header[2] = 'F'
header[3] = 'F'
header[4] = byte(totalDataLen & 0xff)
header[5] = byte((totalDataLen >> 8) & 0xff)
header[6] = byte((totalDataLen >> 16) & 0xff)
header[7] = byte((totalDataLen >> 24) & 0xff)
//WAVE
header[8] = 'W'
header[9] = 'A'
header[10] = 'V'
header[11] = 'E'
// 'fmt ' chunk
header[12] = 'f'
header[13] = 'm'
header[14] = 't'
header[15] = ' '
// 4 bytes: size of 'fmt ' chunk
header[16] = 16
header[17] = 0
header[18] = 0
header[19] = 0
// format = 1
header[20] = 1
header[21] = 0
header[22] = byte(numChannel)
header[23] = 0
header[24] = byte(longSampleRate & 0xff)
header[25] = byte((longSampleRate >> 8) & 0xff)
header[26] = byte((longSampleRate >> 16) & 0xff)
header[27] = byte((longSampleRate >> 24) & 0xff)
header[28] = byte(byteRate & 0xff)
header[29] = byte((byteRate >> 8) & 0xff)
header[30] = byte((byteRate >> 16) & 0xff)
header[31] = byte((byteRate >> 24) & 0xff)
// block align
header[32] = byte(numChannel * bitsPerSample / 8)
header[33] = 0
// bits per sample
header[34] = byte(bitsPerSample)
header[35] = 0
//data
header[36] = 'd'
header[37] = 'a'
header[38] = 't'
header[39] = 'a'
header[40] = byte(totalAudioLen & 0xff)
header[41] = byte((totalAudioLen >> 8) & 0xff)
header[42] = byte((totalAudioLen >> 16) & 0xff)
header[43] = byte((totalAudioLen >> 24) & 0xff)
wavHeader = header
return
}
package otherHandler
import (
"github.com/bwmarrin/snowflake"
"github.com/sirupsen/logrus"
"time"
)
type SnowflakeBase struct {
node *snowflake.Node
}
func NewSnowFlake(machineId int64) (snow *SnowflakeBase) {
var (
err error
node *snowflake.Node
)
snowflake.Epoch = time.Now().UnixMicro() / 1e6
if node, err = snowflake.NewNode(machineId); err != nil {
logrus.Fatalln(err.Error())
}
snow = &SnowflakeBase{
node: node,
}
return
}
func (snow *SnowflakeBase) GetIds(number int) (ids []string, err error) {
for i := 0; i < number; i++ {
ids = append(ids, snow.node.Generate().String())
}
return
}
package server
import (
"encoding/json"
"fmt"
serverPb "gRPC-Raisound-Base-second-pass/Raisound-protobuf"
"gRPC-Raisound-Base-second-pass/initServer"
"gRPC-Raisound-Base-second-pass/otherHandler"
"gRPC-Raisound-Base-second-pass/serverHandler"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"io/ioutil"
"os"
"regexp"
"strconv"
"strings"
"time"
)
func GrpcServer() (grpcServer *grpc.Server) {
var keep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
//grpc.KeepaliveEnforcementPolicy(kaep),
var kasp = keepalive.ServerParameters{
//MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
//MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
grpcServer = grpc.NewServer(grpc.KeepaliveEnforcementPolicy(keep), grpc.KeepaliveParams(kasp))
if server, err := serverHandler.InitServer(); err != nil {
return
} else {
serverPb.RegisterRaisoundSpeechServer(grpcServer, server)
}
return
}
func GetAudioFile(ctx *gin.Context) {
token := ctx.Request.Header.Get("token")
if token == "" {
ctx.JSONP(400, gin.H{
"code": 400,
"description": "token is empty.",
},
)
} else {
if yes, err := otherHandler.CheckToken(token, initServer.ConfigMap["checkTokenURL"].(string)); yes && err == nil {
meetingID := ctx.Query("meeting_id")
if meetingID == "" {
ctx.JSONP(400, gin.H{
"code": 400,
"description": "The parameter value is missing.",
},
)
} else {
audioFile := fmt.Sprintf("%v/%v/%vf.wav", initServer.ConfigMap["audioPath"].(string), meetingID, meetingID)
if exit, _ := otherHandler.ExistsFile(audioFile); exit {
ctx.JSONP(200, gin.H{
"code": 200,
"data": fmt.Sprintf("http://%v/getAudioPath/%v/%v/%vf.wav", ctx.Request.Host, token, meetingID, meetingID),
"description": "succeed",
},
)
} else {
ctx.JSONP(404, gin.H{
"code": 404,
"description": "The file does not exist.",
},
)
}
}
} else if !yes && err == nil {
ctx.JSONP(401, gin.H{
"code": 401,
"description": "token error.",
},
)
} else if err != nil {
ctx.JSONP(404, gin.H{
"code": 404,
"description": err.Error(),
},
)
}
}
}
func GetEnd(ctx *gin.Context) {
meetingId := ctx.Param("meeting_id")
if meetingId == "" {
ctx.JSONP(
400,
gin.H{
"code": 400,
"description": "the meeting_id is empty,please check the value of meeting_id.",
})
} else {
reCompile, _ := regexp.Compile("_\\d{0,3}$")
var (
tempMeetingId string
tempSpeaker string
)
if reCompile.Match([]byte(meetingId)) {
tempMeetingId = reCompile.ReplaceAllString(meetingId, "")
tempSpeaker = strings.Replace(reCompile.FindString(meetingId), "_", "", -1)
} else {
tempMeetingId = meetingId
tempSpeaker = "false"
}
audioFile := fmt.Sprintf("%v/%v/%vf.wav", initServer.ConfigMap["audiioPath"].(string), meetingId, meetingId)
if exit, _ := otherHandler.ExistsFile(audioFile); exit {
var (
publicMsg = make(map[string]interface{})
endMsg = make(map[string]interface{})
wavStruct *otherHandler.WavStruct
rb *otherHandler.RdbStruct
err error
)
content, _ := ioutil.ReadFile(audioFile)
duration := len(content) - 44
if content[0] == 'R' && content[1] == 'I' && content[2] == 'F' && content[3] == 'F' {
} else {
if wavStruct, err = otherHandler.InitSaveWave(meetingId, initServer.ConfigMap["audioPath"].(string)); err != nil {
ctx.JSONP(400, gin.H{
"code": 400,
"description": err.Error(),
},
)
}
if wavStruct != nil {
_ = wavStruct.CloseWav()
}
}
if rb, err = otherHandler.InitRdb(initServer.ConfigMap["redisChannel"].(string), initServer.ConfigMap["redisURL"].(string)); err != nil {
ctx.JSONP(400, gin.H{
"code": 400,
"description": err.Error(),
},
)
}
if rb != nil {
timestamp, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", float64(time.Now().UnixMilli())/float64(32000)), 64)
durationTime, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", float64(duration)/float64(1000)), 64)
publicMsg["timestamp"] = timestamp
publicMsg["url"] = ""
publicMsg["raw_text"] = ""
publicMsg["end_of_speech"] = 1
publicMsg["pcmtime"] = 0.0
publicMsg["duration"] = durationTime
publicMsg["meeting_id"] = tempMeetingId
publicMsg["session_id"] = ""
publicMsg["entire_file_path"] = audioFile
publicMsg["end"] = 1
publicMsg["speaker"] = fmt.Sprintf("%v", tempSpeaker) //speaker
data, _ := json.Marshal(publicMsg)
_ = rb.PubMsg(string(data))
endMsg["speaker"] = tempSpeaker
endMsg["meeting_id"] = tempMeetingId
endMsg["file_path"] = audioFile
jsonQueueData, _ := json.Marshal(endMsg)
_ = rb.LeftPubMsg(string(jsonQueueData))
rb.RedisClose()
ctx.JSONP(200, gin.H{
"code": 200,
"description": "succeed",
"meeting_id": meetingId,
})
}
} else {
ctx.JSONP(403, gin.H{
"code": 403,
"description": "can find the meeting",
},
)
}
}
}
func LoadFile1(ctx *gin.Context) {
fileIdA := ctx.Param("aid")
fileIdB := ctx.Param("bid")
audioFile := fmt.Sprintf("%v/%v/%v", initServer.ConfigMap["audioPath"].(string), fileIdA, fileIdB)
ok, _ := otherHandler.ExistsFile(audioFile)
if !ok {
ctx.JSONP(404, gin.H{
"code": 404,
"description": "the file is not existing.",
})
return
}
ctx.Header("Content-Type", "application/octet-stream")
ctx.Header("Content-Disposition", "attachment; filename="+audioFile)
ctx.Header("Content-Transfer-Encoding", "binary")
ctx.FileAttachment(audioFile, fileIdB)
}
func LoadFile(ctx *gin.Context) {
token := ctx.Param("token")
fileIdA := ctx.Param("aid")
fileIdB := ctx.Param("bid")
if yes, err := otherHandler.CheckToken(token, initServer.ConfigMap["checkTokenURL"].(string)); yes && err == nil {
audioFile := fmt.Sprintf("%v/%v/%v", initServer.ConfigMap["audioPath"].(string), fileIdA, fileIdB)
ctx.Header("Content-Type", "application/octet-stream")
ctx.Header("Content-Disposition", "attachment; filename="+audioFile)
ctx.Header("Content-Transfer-Encoding", "binary")
ctx.FileAttachment(audioFile, fileIdB)
} else if !yes && err == nil {
ctx.JSONP(404, gin.H{
"code": 404,
"description": "file path error.",
},
)
} else {
if err != nil {
if err.Error() != "checking token error, please check server" {
ctx.JSONP(403, gin.H{
"code": 403,
"description": "The address is invalid",
},
)
} else {
ctx.JSONP(400, gin.H{
"code": 400,
"description": err.Error(),
},
)
}
}
}
}
type reqBody struct {
MeetingId string `json:"meeting_id"`
}
func RemoveFile(ctx *gin.Context) {
var (
reqData = &reqBody{}
err error
)
if err = ctx.BindJSON(reqData); err != nil {
ctx.JSONP(400, gin.H{
"status": 400,
"desc": err.Error(),
})
return
}
//if len(strings.Split(initServer.ConfigMap["audioPath"].(string),""))
if reqData.MeetingId == "" || initServer.ConfigMap["audioPath"].(string) == "" {
ctx.JSONP(400, gin.H{
"status": 400,
"desc": "wrong root directory or meeting_id.",
})
return
}
meetingIdName := fmt.Sprintf("%v/%v", initServer.ConfigMap["audioPath"].(string), reqData.MeetingId)
if err = os.RemoveAll(meetingIdName); err != nil {
ctx.JSONP(400, gin.H{
"status": 400,
"desc": err.Error(),
})
return
}
ctx.JSONP(200, gin.H{
"status": 200,
"desc": "meeting data is removed successfully",
})
return
}
package serverHandler
import (
"context"
"errors"
serverPb "gRPC-Raisound-Base-second-pass/Raisound-protobuf"
)
func (server *Service) HeartBeat(cxt context.Context, req *serverPb.HeartBeatConfig) (resp *serverPb.HeartBeatResponse, err error) {
if ok := server.checkSessionId(req.GetSessionId()); ok {
//_ = server.delete(req.GetSessionId())
resp = &serverPb.HeartBeatResponse{
Code: 200,
SessionId: req.GetSessionId(),
Description: "HeartBeat",
}
} else {
resp = &serverPb.HeartBeatResponse{
Code: 400,
SessionId: req.GetSessionId(),
Description: "Pool connections are not logged",
}
err = errors.New("pool connections are not logged")
}
return
}
package serverHandler
import (
"context"
"errors"
"fmt"
serverPb "gRPC-Raisound-Base-second-pass/Raisound-protobuf"
"gRPC-Raisound-Base-second-pass/initServer"
"gRPC-Raisound-Base-second-pass/otherHandler"
"github.com/gofrs/uuid"
"github.com/sirupsen/logrus"
"log"
"strings"
"time"
)
// InitStreamingASR(context.Context, *raisound.InitConfig) (*raisound.InitResponse, error)
func (server *Service) InitStreamingASR(cxt context.Context, req *serverPb.InitConfig) (resp *serverPb.InitResponse, err error) {
info := make(map[string]interface{})
if req.GetLang() != "" {
if req.GetLang() == "ZH-CN" || req.GetLang() == "ZH-HK" || req.GetLang() == "EN" {
info["language"] = req.GetLang()
} else {
info["language"] = initServer.ConfigMap["defaultLanguage"].(string)
}
} else {
info["language"] = initServer.ConfigMap["defaultLanguage"].(string)
}
if req.GetConnId() == "" {
info["userId"] = initServer.ConfigMap["hotWord-ZH-CN"].(string)
} else {
initServer.Log.Log.WithFields(logrus.Fields{"source": "get user_id"}).Info(req.GetConnId())
if hotWordPath, e := otherHandler.GetHotWordPath(req.GetConnId(), initServer.ConfigMap["hotWordURL"].(string)); e != nil {
initServer.Log.Log.WithFields(logrus.Fields{"source": "get hot word path error"}).Error(fmt.Sprintf(`{"user_id":"%v","error":"%v"}`, req.GetConnId(), e.Error()))
log.Println("get hot word path error:", e.Error())
info["userId"] = initServer.ConfigMap["hotWord-ZH-CN"].(string)
} else {
initServer.Log.Log.WithFields(logrus.Fields{"source": "hot word Path"}).Info(hotWordPath)
log.Println("hot word Path:", hotWordPath)
if hotWordPath != "" {
info["userId"] = hotWordPath
} else {
if req.GetLang() == "ZH-CN" {
info["userId"] = initServer.ConfigMap["hotWord-ZH-CN"].(string)
} else if req.GetLang() == "ZH-HK" {
info["userId"] = initServer.ConfigMap["hotWord-ZH-HK"].(string)
} else if req.GetLang() == "EN" {
info["userId"] = initServer.ConfigMap["hotWord-EN"].(string)
} else {
info["userId"] = initServer.ConfigMap["hotWord-ZH-CN"].(string)
}
}
}
}
sessionId, _ := uuid.NewV1()
if req.GetMeetingId() != "" {
log.Println(req.GetMeetingId())
splitMeeting := strings.Split(req.GetMeetingId(), "@")
info["meetingId"] = splitMeeting[0]
info["user"] = ""
info["sysType"] = 1
if len(splitMeeting) == 2 {
if splitMeeting[1] == "app" {
info["sysType"] = 2
}
}
if len(splitMeeting) > 2 {
if splitMeeting[1] == "app" {
info["sysType"] = 2
}
info["user"] = splitMeeting[2]
}
} else {
meetingId, _ := uuid.NewV4()
info["meetingId"] = meetingId.String()
info["user"] = ""
info["sysType"] = 1
}
if req.GetInterOuterFlag() != 1 {
if req.GetUser() == "admin" && req.GetPasswd() == "123456" {
info["time"] = time.Now().Unix()
info["sessionId"] = sessionId.String()
info["sensitiveFlag"] = req.GetSensitiveFlag()
info["puncFlag"] = req.GetPuncFlag()
info["numConvertFlag"] = req.GetNumConvertFlag()
_ = server.Add(sessionId.String(), info)
resp = &serverPb.InitResponse{
Code: 200,
SessionId: sessionId.String(),
Description: fmt.Sprintf(`{"status":200,"description":"Initialization succeeded","meeting_id":"%v"}`, info["meetingId"].(string)),
}
} else {
resp = &serverPb.InitResponse{
Code: 400,
SessionId: sessionId.String(),
Description: `{"status":400,"description":"user or the password is incorrect or invalid"}`,
}
err = errors.New("user or the password is incorrect or invalid")
}
} else {
log.Println("get the token:", req.Token)
if ok, _ := otherHandler.CheckToken(req.Token, initServer.ConfigMap["checkTokenURL"].(string)); ok {
info["time"] = time.Now().Unix()
info["sessionId"] = sessionId.String()
info["sensitiveFlag"] = req.GetSensitiveFlag()
info["puncFlag"] = req.GetPuncFlag()
info["numConvertFlag"] = req.GetNumConvertFlag()
_ = server.Add(sessionId.String(), info)
resp = &serverPb.InitResponse{
Code: 200,
SessionId: sessionId.String(),
Description: fmt.Sprintf(`{"status":200,"description":"Initialization succeeded","meeting_id":"%v"}`, info["meetingId"].(string)),
}
} else {
resp = &serverPb.InitResponse{
Code: 400,
SessionId: "",
Description: `{"status":400,"description":"token is incorrect or invalid"}`,
}
err = errors.New("token is incorrect or invalid")
}
}
log.Println("get Session:", sessionId.String())
log.Println("session info:", server.ConnInfo)
return
}
package serverHandler
import (
"context"
"errors"
serverPb "gRPC-Raisound-Base-second-pass/Raisound-protobuf"
)
func (server *Service) RelStreamingASR(cxt context.Context, req *serverPb.RelConfig) (resp *serverPb.RelResponse, err error) {
if ok := server.checkSessionId(req.GetSessionId()); ok {
_ = server.delete(req.GetSessionId())
resp = &serverPb.RelResponse{
Code: 200,
SessionId: req.GetSessionId(),
Description: "Release successfully",
}
//err = errors.New("release successfully")
} else {
resp = &serverPb.RelResponse{
Code: 400,
SessionId: req.GetSessionId(),
Description: "Pool connections are not logged",
}
err = errors.New("pool connections are not logged")
}
return
}
package serverHandler
import (
"fmt"
serverPb "gRPC-Raisound-Base-second-pass/Raisound-protobuf"
"gRPC-Raisound-Base-second-pass/initServer"
"github.com/sirupsen/logrus"
"time"
)
func (server *Service) StreamingASR(req serverPb.RaisoundSpeech_StreamingASRServer) (err error) {
var (
stream *Stream
isFirst bool
isClose bool
reqData *serverPb.SendAudioConfig
errMsg error
connInfoData map[string]interface{}
)
go func() {
var (
respData *serverPb.SendAudioResponse
e error
)
for {
if !isClose {
if stream != nil {
if respData, e = stream.ReadMsg(); e != nil {
goto ERR
}
go func(data *serverPb.SendAudioResponse) {
sendStr := fmt.Sprintf(`{"session_id":"%v","meeting_id":"%v","is_final":%v,"result":"%v"}`, stream.sessionId, stream.meetingId, data.IsFinal, data.Results)
initServer.Log.Log.WithFields(logrus.Fields{"source": "data sent to client"}).Info(sendStr)
}(respData)
if e = req.Send(respData); e != nil {
goto ERR
}
} else {
time.Sleep(500 * time.Millisecond)
}
} else {
time.Sleep(500 * time.Millisecond)
}
}
ERR:
if stream != nil {
stream.CloseChan()
}
}()
for {
if reqData, errMsg = req.Recv(); errMsg != nil {
goto ERR
}
if !isFirst {
if connInfoData, err = server.checkInfo(reqData.GetSessionId()); err != nil {
initServer.Log.Log.WithFields(logrus.Fields{"source": "check data from connection error"}).Error(err.Error())
goto ERR
}
if stream, err = InitStream(connInfoData); err != nil {
initServer.Log.Log.WithFields(logrus.Fields{"source": "nit stream error"}).Error(err.Error())
goto ERR
}
isFirst = true
}
if stream != nil {
if reqData.EndFlag == 1 {
if errMsg = stream.WriteMsg([]byte(`{"signal" : "stop"}`)); errMsg != nil {
goto ERR
}
continue
}
if errMsg = stream.WriteMsg(reqData.GetAudioData()); errMsg != nil {
goto ERR
}
}
}
ERR:
if stream != nil {
sendData := []byte(`{"signal" : "end"}`)
_ = stream.WriteMsg(sendData)
time.Sleep(500 * time.Millisecond)
stream.CloseChan()
}
time.Sleep(500 * time.Millisecond)
isClose = true
return
}
package serverHandler
import (
"errors"
serverPb "gRPC-Raisound-Base-second-pass/Raisound-protobuf"
"sync"
"time"
)
type Service struct {
serverPb.UnimplementedRaisoundSpeechServer
ConnInfo map[string]map[string]interface{}
lock sync.Mutex
}
func InitServer() (server *Service, err error) {
server = &Service{
ConnInfo: make(map[string]map[string]interface{}),
}
go server.checkAlive()
return
}
func (server *Service) Add(key string, value map[string]interface{}) (err error) {
server.lock.Lock()
if _, ok := server.ConnInfo[key]; !ok {
server.ConnInfo[key] = value
}
server.lock.Unlock()
return
}
func (server *Service) delete(key string) (err error) {
server.lock.Lock()
if _, ok := server.ConnInfo[key]; ok {
delete(server.ConnInfo, key)
}
server.lock.Unlock()
return
}
func (server *Service) checkInfo(key string) (value map[string]interface{}, err error) {
server.lock.Lock()
if firstData, ok1 := server.ConnInfo[key]; ok1 {
value = firstData
} else {
err = errors.New("session_id is error")
}
server.lock.Unlock()
return
}
func (server *Service) check(firstKey, secondKey string) (value interface{}, err error) {
server.lock.Lock()
if firstData, ok1 := server.ConnInfo[firstKey]; ok1 {
if getValue, ok2 := firstData[secondKey]; ok2 {
value = getValue
} else {
err = errors.New(secondKey + " is error")
}
} else {
err = errors.New("session_id is error")
}
server.lock.Unlock()
return
}
func (server *Service) checkSessionId(sessionId string) (isOk bool) {
server.lock.Lock()
_, isOk = server.ConnInfo[sessionId]
server.lock.Unlock()
return
}
func (server *Service) checkAlive() {
group := sync.WaitGroup{}
for {
server.lock.Lock()
for k, v := range server.ConnInfo {
key := k
value := v
group.Add(1)
go func() {
if value["time"].(int64)+12*60*60 <= time.Now().Unix() {
if _, ok := server.ConnInfo[key]; ok {
delete(server.ConnInfo, key)
}
}
group.Done()
}()
}
group.Wait()
server.lock.Unlock()
time.Sleep(5 * time.Minute)
}
}
#### 说明
#### 说明
- 该程序的主要主要接口是实时语音转文字的`gRPC`接口,主要使用的类型是双流式。
- 辅助接口有相应会议音频查询与下载的`http`接口。
- 这些接口共用同一个端口。
#### 实时语音转文字`gRPC`接口
- `。proto`文档
```
syntax = "proto3";
option go_package = "./Raisound-protobuf;raisound";
// 初始化语音流的设置
message InitConfig {
string token = 1;
string devid = 2;
string codec = 3;
int32 sample_rate = 4;
int32 sample_bits = 5;
int32 bitrate = 6;
string scene = 7;
string lang = 8;
int32 nbest = 9;
int32 vad_level = 10;
int32 max_speech_duration = 11;
string meeting_id = 12;
string app_name = 13;
string conn_id = 14;
bool oral_flag = 15;
bool sensitive_flag = 16;
bool punc_flag = 17;
bool num_convert_flag = 18;
string user = 19;
string passwd = 20;
int32 inter_outer_flag = 21; //除1外其他表示对内,1表示对外
}
// 初始化语音流的返回
message InitResponse {
int32 code = 1;
string session_id = 2;
string description = 3;
}
// 发送语音流数据的设置
message SendAudioConfig {
string session_id = 1;
bytes audio_data = 2;
int32 end_flag = 3;
int32 data_count = 4;
}
// 发送语音流的返回
message SendAudioResponse {
int32 code = 1;
bool is_result = 2;
bool is_final = 3;
string session_id = 4;
string results = 5;
string description = 6;
string json_data = 7;
int32 speech_duration = 8;
int32 data_index = 9;
}
// 发送心跳的设置
message HeartBeatConfig {
string session_id = 1;
}
// 发送心跳的返回
message HeartBeatResponse {
int32 code = 1;
string session_id = 2;
string description = 3;
}
// 释放语音流的设置
message RelConfig {
string session_id = 1;
}
// 释放语音流的返回
message RelResponse {
int32 code = 1;
string session_id = 2;
string description = 3;
}
service RaisoundSpeech {
rpc InitStreamingASR(InitConfig) returns(InitResponse);
rpc StreamingASR(stream SendAudioConfig) returns(stream SendAudioResponse);
rpc RelStreamingASR(RelConfig) returns(RelResponse);
rpc HeartBeat(HeartBeatConfig) returns(HeartBeatResponse);
}
```
> 1. 调用顺序 `InitStreamingASR` -> `StreamingASR` -> `RelStreamingASR`。
> 2. `HeartBeat` 新版中不需要使用到了。
> 3. `InitStreamingASR`:当`inter_outer_flag`不等于`1`时,`user`默认为`admin`,`passwd`默认为`123456`,只要这三个即可登录获取`session_id`;当`inter_outer_flag`等于`1`时,需要获取`token`,之后也可以登录获取取`session_id`。`meeting_id`即为会议`ID`,可无,若传值时,后端保存音频或发布`redis`时会以这的`ID`为主要标识,否者后端会以接入的当前时刻为会议`ID`.
> 4. `StreamingASR`:请求的`session_id`是在`InitStreamingASR`步骤获取的,之后流式中每一帧数据都要以这个`session_id`为标识。其中`data_count`在新版中已经不再调用。`end_flag`是紧急将临时结果变成最终结果,特殊场景调用。
- 调用过程
- `InitStreamingASR`初始化
- 请求结构`InitConfig`(现用使用到的参数)
| 参数 | 要求 | 说明 |
| :---: | :---: | :--- |
| `token` | 字符串 | 通过接口获取,当`inter_outer_flag`为`1`是必要。 |
| `user`|字符串|当`inter_outer_flag`为`1`是必要,默认`admin`|
|`passwd` |字符串|当`inter_outer_flag`为`1`是必要,默认`123456`|
|`meeting_id`|字符串|没有传这个参数,后端以接入时刻为其值,非必要|
|`inter_outer_flag`|整数|为`1`事和token组成即可获取`session_id`;其他值时和`user`/`passwd`组成也可获取`session_id`。其值默认为`1`|
- 响应结构`InitResponse`
| 参数 | 要求 | 说明 |
| :---: | :---: | :--- |
|`code`| 整数 |`200`成功|
|`session_id`|字符串|整个会议过程都会以这个`session_id`为基础|
|`description`|字符串|说用描述|
- `StreamingASR`识别(双流式)
- 请求结构`SendAudioConfig`
| 参数 | 要求 | 说明 |
| :---: | :---: | :--- |
|`session_id`|字符串|`InitStreamingASR`获取的`session_id`|
|`audio_data`|音频数据|音频采样频率为`16000`,单声道,每一帧大小建议`1600`~`3200`|
|`end_flag`|字符串|紧急将临时结果变成最终结果,特殊场景调用|
|`data_count`|整数|非必要,新版没有调用到|
- 响应结构`SendAudioResponse`
| 参数 | 要求 | 说明 |
| :---: | :---: | :--- |
|`code`|整数|`200`成功|
|`is_result`|布尔类型|`True`未有结果,`False`为没有结果|
|`is_final`|布尔类型|`True`最终结果,`False`为临时结果|
|`session_id`|字符串|`SendAudioConfig`的`session_id`|
|`results`|字符串|音频文字结果|
|`description`|字符串|详情描述|
|`json_data`|字符串|`json`格式的结果,目前没使用这个参数,非必要|
|`speech_duration`|整数|非必要|
|`data_index`|整数|非必要|
- `RelStreamingASR`释放
- 请求结构`RelConfig`
| 参数 | 要求 | 说明 |
| :---: | :---: | :--- |
|`session_id`|字符串|`InitStreamingASR`获取的`session_id`|
- 响应结构`RelResponse`
| 参数 | 要求 | 说明 |
| :---: | :---: | :--- |
|`code`|整数|`200`成功|
|`session_id`|字符串|`RelConfig`的`session_id`|
|`description`|字符串|详情描述|
> 在释放段阶段,客户端要断开`gRPC`的连接
#### 查询会议音频`http`接口(`GET`)
- 基本请求结构
- `header`头结构
```
{"token":{token值}}
```
- 请求链接(假如服务端口是`20065`)
```
http://127.0.0.1:20065/getAudioFile?meeting_id={meeting_id值}
```
- 响应结构
- 成功
```
{
"code":200,
"data":{下载音频链接},
"description":"succeed"
}
```
- 失败
- 没带`header`头
```
{
"code":400,
"description":"token is empty."
}
```
- 没传`meeting_id`值
```
{
"code":400,
"description":"The parameter value is missing."
}
```
- 音频文件不存在
```
{
"code":404,
"description":"The file does not exist."
}
```
- `token`值不正确
```
{
"code":401,
"description":"token error."
}
```
- 其他错误
```
{
"code":404,
"description":{错误描述}
}
```
#### 下载会议音频`http`接口(`GET`)
> 说明:下载接口是根据查询接口正确返回拿到的
- 错误响应说明
- 错误一
```
{
"code":404,
"description":"file path error."
}
```
- 错误二
```
{
"code":403,
"description":"The address is invalid"
}
```
- 错误三
```
{
"code":400,
"description":{错误描述}
}
```
#### 会议结束音频异常后调`http`接口(`GET`)
- 请求链接(假如服务端口是`20065`)
```
http://127.0.0.1:20065/getEnd/{meeting_id值}
```
- 响应结构
- 成功
```
{
"code":200,
"description":"succeed",
"meeting_id":{meetingId的值}
}
```
- 会议不存在
```
{
"code":403,
"description":"can find the meeting"
}
```
- `meeting_id`值为空
```
{
"code":400,
"description":"the meeting_id is empty,please check the value of meeting_id."
}
```
- 其他错误
```
{
"code":404,
"description":{错误描述}
}
```
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment