Commit 2b25422f by zhengcheng.wang

Initial commit

parents
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="GoORMHelperCache">
<option name="schemaMapping">
<map>
<entry key="Connection">
<value>
<set>
<option value="file://$PROJECT_DIR$/server/serverHandler.go" />
</set>
</value>
</entry>
<entry key="DMDataConnBase">
<value>
<set>
<option value="file://$PROJECT_DIR$/../../../../../raisound/DM-database/main.go" />
</set>
</value>
</entry>
<entry key="Engine">
<value>
<set>
<option value="file://$PROJECT_DIR$/engineServer/handler.go" />
</set>
</value>
</entry>
<entry key="PostReq">
<value>
<set>
<option value="file://$PROJECT_DIR$/funcHandler/handler.go" />
</set>
</value>
</entry>
<entry key="PostResp">
<value>
<set>
<option value="file://$PROJECT_DIR$/funcHandler/handler.go" />
</set>
</value>
</entry>
<entry key="RdbStruct">
<value>
<set>
<option value="file://$PROJECT_DIR$/funcHandler/rdbHandler.go" />
</set>
</value>
</entry>
<entry key="WavStruct">
<value>
<set>
<option value="file://$PROJECT_DIR$/funcHandler/saveWave.go" />
</set>
</value>
</entry>
<entry key="mysqlConnInit">
<value>
<set>
<option value="file://$PROJECT_DIR$/funcHandler/mysqlHandler.go" />
</set>
</value>
</entry>
<entry key="reqBody">
<value>
<set>
<option value="file://$PROJECT_DIR$/server/serverBase.go" />
</set>
</value>
</entry>
</map>
</option>
<option name="scannedPathMapping">
<map>
<entry key="file://$PROJECT_DIR$/engineServer/handler.go">
<value>
<ScannedPath>
<option name="lastModified" value="1690250107028" />
<option name="schema">
<list>
<option value="Engine" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/funcHandler/handler.go">
<value>
<ScannedPath>
<option name="lastModified" value="1691648945736" />
<option name="schema">
<list>
<option value="PostReq" />
<option value="PostResp" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/funcHandler/mysqlHandler.go">
<value>
<ScannedPath>
<option name="lastModified" value="1700116412812" />
<option name="schema">
<list>
<option value="mysqlConnInit" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/funcHandler/rdbHandler.go">
<value>
<ScannedPath>
<option name="lastModified" value="1700098101357" />
<option name="schema">
<list>
<option value="RdbStruct" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/funcHandler/saveWave.go">
<value>
<ScannedPath>
<option name="lastModified" value="1670232707169" />
<option name="schema">
<list>
<option value="WavStruct" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/initServer/getConfig.go">
<value>
<ScannedPath>
<option name="lastModified" value="1700116413084" />
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/main.go">
<value>
<ScannedPath>
<option name="lastModified" value="1691650195041" />
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/server/serverBase.go">
<value>
<ScannedPath>
<option name="lastModified" value="1700099490553" />
<option name="schema">
<list>
<option value="reqBody" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/server/serverHandler.go">
<value>
<ScannedPath>
<option name="lastModified" value="1700098100771" />
<option name="schema">
<list>
<option value="Connection" />
</list>
</option>
</ScannedPath>
</value>
</entry>
<entry key="file://$PROJECT_DIR$/../../../../../raisound/DM-database/main.go">
<value>
<ScannedPath>
<option name="lastModified" value="1702368880274" />
<option name="schema">
<list>
<option value="DMDataConnBase" />
</list>
</option>
</ScannedPath>
</value>
</entry>
</map>
</option>
<option name="tableStructMapping">
<map>
<entry key="connection" value="Connection" />
<entry key="dm_data_conn_base" value="DMDataConnBase" />
<entry key="engine" value="Engine" />
<entry key="mysql_conn_init" value="mysqlConnInit" />
<entry key="post_req" value="PostReq" />
<entry key="post_resp" value="PostResp" />
<entry key="rdb_struct" value="RdbStruct" />
<entry key="req_body" value="reqBody" />
<entry key="wav_struct" value="WavStruct" />
</map>
</option>
<option name="lastTimeChecked" value="1702365922257" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/RaisoundWebSocketBase2C.iml" filepath="$PROJECT_DIR$/.idea/RaisoundWebSocketBase2C.iml" />
</modules>
</component>
</project>
\ No newline at end of file
[websocketCommon]
port = 19090
language = ALL #ZH-CN为简体中文,EN为英文,ALL为两个识别语种都启动,ZH-HK为粤语
redisChannel = send audio info
redisURL = huisheng_redis:6379
redisPasswd =
logPath = /opt/huisheng/websocket-golang-c/data/log #日志地址
pcmPath = /opt/huisheng/websocket-golang-c/data/audio #音频地址
hotWordURL = http://192.168.0.61/huisheng_api_gm/hotwords/new/personal/model/get
tokenVerityURL = http://192.168.0.61/huisheng_api_gm/token/verify
hotWordTextDir = /opt/huisheng/websocket-golang-c/hotWord
postURL = http://172.16.5.177:8124/encode
enPostURL =
[websocketZH-CN]
websocketArr = ws://huisheng_offline:20086/ #websocket数组,以“,”隔开
defaultHotWordFile = /opt/huisheng/data/hotword/a.txt #默认热词完整地址名称
[websocketZH-HK]
websocketArr = ws://huisheng_offline_cantonese:20086/ #websocket数组,以“,”隔开
defaultHotWordFile = /opt/huisheng/data/hotword/b.txt #默认热词完整地址名称
[websocketEN]
websocketArr = ws://huisheng_offline_english:20076/ #websocket数组,以“,”隔开
defaultHotWordFile = /opt/huisheng/data/hotword/c.txt #默认热词完整地址名称
package engineServer
import (
"RaisoundWebSocketBase2C/initServer"
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"log"
"sync"
"time"
)
type Engine struct {
Conn *websocket.Conn
lang string
contextPath string
ipStr string
ConnectNum int
isCloseRead bool
isCloseWrite bool
IsEnd bool
isClose bool
preEndTime float64
preSpeechTime float64
preResult map[string]interface{}
resultChan chan []byte
isSwitch bool
isRestart bool
isRestartLock sync.RWMutex
}
func InitEngine(lang, contextPath string) (engine *Engine, err error) {
logrus.Println("HHHHHHHHHHHHHHHHHHHHH", "START CONNECTION")
var (
conn *websocket.Conn
)
engine = &Engine{
Conn: conn,
lang: lang,
contextPath: contextPath,
preResult: make(map[string]interface{}),
resultChan: make(chan []byte, 64),
}
if conn, err = engine.determineWsConn(lang, contextPath); err != nil {
return
} else {
if conn != nil {
engine.Conn = conn
if lang == "ZH-CN" {
initServer.ZhCnLock.Lock()
initServer.ZhCn[engine.ipStr]["num"] = initServer.ZhCn[engine.ipStr]["num"].(int) + 1
//log.Println(initServer.ZhCn)
initServer.ZhCnLock.Unlock()
}
if lang == "ZH_HK" {
initServer.ZhHkLock.Lock()
initServer.ZhHk[engine.ipStr]["num"] = initServer.ZhHk[engine.ipStr]["num"].(int) + 1
//log.Println(initServer.ZhHk)
initServer.ZhHkLock.Unlock()
}
if lang == "EN" {
initServer.ENLock.Lock()
initServer.EN[engine.ipStr]["num"] = initServer.EN[engine.ipStr]["num"].(int) + 1
//log.Println(initServer.EN)
initServer.ENLock.Unlock()
}
} else {
err = errors.New("connected engine failed")
return
}
}
go engine.readMsgLoop()
return
}
func (Conn *Engine) determineWsConn(lang, contextPath string) (conn *websocket.Conn, err error) {
var (
ipStr string
ConNum int
isFirst bool
//recData []byte
//recParams = make(map[string]interface{})
)
if lang == "ZH-CN" {
initServer.ZhCnLock.Lock()
logrus.Println(initServer.ZhCn)
for ip, info := range initServer.ZhCn {
if info["alive"].(bool) {
if !isFirst {
ipStr = ip
ConNum = info["num"].(int)
isFirst = true
continue
}
if ConNum >= info["num"].(int) {
ipStr = ip
ConNum = info["num"].(int)
}
}
}
initServer.ZhCnLock.Unlock()
}
if lang == "ZH-HK" {
initServer.ZhHkLock.Lock()
logrus.Println(initServer.ZhHk)
for ip, info := range initServer.ZhHk {
if info["alive"].(bool) {
if !isFirst {
ipStr = ip
ConNum = info["num"].(int)
isFirst = true
continue
}
if ConNum >= info["num"].(int) {
ipStr = ip
ConNum = info["num"].(int)
}
}
}
initServer.ZhHkLock.Unlock()
}
if lang == "EN" {
initServer.ENLock.Lock()
logrus.Println(initServer.EN)
for ip, info := range initServer.EN {
if info["alive"].(bool) {
if !isFirst {
ipStr = ip
ConNum = info["num"].(int)
isFirst = true
continue
}
if ConNum >= info["num"].(int) {
ipStr = ip
ConNum = info["num"].(int)
}
}
}
initServer.ENLock.Unlock()
}
logrus.Println("connection address:", ipStr)
if ipStr != "" {
if conn, _, err = websocket.DefaultDialer.Dial(ipStr, nil); err != nil {
if Conn.ConnectNum > 10 {
err = errors.New("connected engine failed")
Conn.ConnectNum = 0
return
}
Conn.ConnectNum++
time.Sleep(1 * time.Second)
return Conn.determineWsConn(lang, contextPath)
}
if conn == nil {
if Conn.ConnectNum > 10 {
err = errors.New("connected engine failed")
Conn.ConnectNum = 0
return
}
Conn.ConnectNum++
time.Sleep(1 * time.Second)
return Conn.determineWsConn(lang, contextPath)
} else {
//contextPath = ""
if contextPath != "" {
loadText := fmt.Sprintf(`{"signal": "start", "continuous_decoding": %v, "nbest": 10, "enable_vad": %v, "chunk_size": 12, "two_pass": %v, "context_update":%v,"context_score": 5.0, "context_path": "%v"}`, true, true, true, true, contextPath)
logrus.Println(loadText)
//err = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{ "signal": "reload_context", "context_score": 5.0, "context_path": "%v", "update": %v}`, contextPath, true)))
//_ = conn.SetReadDeadline(time.Now().Add(300 * time.Millisecond))
if err = conn.WriteMessage(websocket.TextMessage, []byte(loadText)); err != nil {
logrus.Errorln(err.Error())
time.Sleep(1 * time.Second)
return Conn.determineWsConn(lang, contextPath)
}
//if _, recData, err = conn.ReadMessage(); err != nil {
// logrus.Println(err)
// _ = conn.Close()
// if conn, _, err = websocket.DefaultDialer.Dial(ipStr, nil); err != nil {
// if Conn.ConnectNum > 10 {
// err = errors.New("connected engine failed")
// Conn.ConnectNum = 0
// return
// }
// time.Sleep(1 * time.Second)
// Conn.ConnectNum++
// return Conn.determineWsConn(lang, contextPath)
// }
// if conn != nil {
// _ = conn.WriteMessage(websocket.TextMessage, []byte(`{"signal" : "start","nbest" :1,"continuous_decoding": true}`))
// }
//} else {
// logrus.Println(string(recData))
// if err = json.Unmarshal(recData, &recParams); err != nil {
// _ = conn.WriteMessage(websocket.TextMessage, []byte(`{"signal" : "start","nbest" :1,"continuous_decoding": true}`))
// } else {
// if _, ok := recParams["status"]; ok {
// if recParams["status"].(string) == "ok" && recParams["type"].(string) == "reload_end" {
// if err = conn.WriteMessage(websocket.TextMessage, []byte(`{"signal" : "start","nbest" :1,"continuous_decoding": true}`)); err != nil {
// }
// } else {
// _ = conn.Close()
// if conn, _, err = websocket.DefaultDialer.Dial(ipStr, nil); err != nil {
// logrus.Println(err.Error())
// }
// if conn != nil {
// if err = conn.WriteMessage(websocket.TextMessage, []byte(`{"signal" : "start","nbest" :1,"continuous_decoding": true}`)); err != nil {
// }
// }
// }
// } else {
// if conn, _, err = websocket.DefaultDialer.Dial(ipStr, nil); err != nil {
// logrus.Println(err)
// }
// if conn != nil {
// _ = conn.WriteMessage(websocket.TextMessage, []byte(`{"signal" : "start","nbest" :1,"continuous_decoding": true}`))
// }
// }
// }
//}
} else {
logrus.Println(fmt.Sprintf(`{"signal": "start", "continuous_decoding": %v, "nbest": 10, "enable_vad": %v, "chunk_size": 12, "two_pass": %v, "context_update":%v}`, true, true, true, true))
if conn != nil {
err = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"signal": "start", "continuous_decoding": %v, "nbest": 10, "enable_vad": %v, "chunk_size": 12, "two_pass": %v}`, true, true, true)))
if err != nil {
logrus.Errorln(err.Error())
time.Sleep(1 * time.Second)
return Conn.determineWsConn(lang, contextPath)
}
}
}
if conn == nil {
if Conn.ConnectNum > 10 {
err = errors.New("connected engine failed")
Conn.ConnectNum = 0
return
}
Conn.ConnectNum++
time.Sleep(1 * time.Second)
return Conn.determineWsConn(lang, contextPath)
}
}
Conn.ipStr = ipStr
Conn.ConnectNum = 0
} else {
logrus.Println("1111111111111111111111111111", ipStr)
if Conn.ConnectNum > 10 {
err = errors.New("all engine is dead")
Conn.ConnectNum = 0
return
}
Conn.ConnectNum++
time.Sleep(1 * time.Second)
return Conn.determineWsConn(lang, contextPath)
}
return
}
func (Conn *Engine) write(data []byte) (err error) {
select {
case Conn.resultChan <- data:
//case <-time.After(1 * time.Second):
// err = errors.New("not message")
}
return
}
func (Conn *Engine) ReadMsg() (recData map[string]interface{}, err error) {
var data []byte
recData = make(map[string]interface{})
select {
case data = <-Conn.resultChan:
if err = json.Unmarshal(data, &recData); err != nil {
return
}
//case <-time.After(5 * time.Second):
// err = errors.New("not message")
}
return
}
func (Conn *Engine) readMsgLoop() {
var (
data = make(map[string]interface{})
recData []byte
err error
)
for {
if data, err = Conn.readMsg(); err != nil {
logrus.Errorln("HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH", err)
goto ERR
}
if _, ok := data["results"]; ok {
if _, ok = data["end"]; ok {
switch data["end"].(type) {
case float64:
if data["results"].(string) == "" && data["end"].(float64) == 0 {
continue
}
case int:
if data["results"].(string) == "" && data["end"].(int) == 0 {
continue
}
}
} else {
continue
}
} else {
continue
}
if recData, err = json.Marshal(data); err != nil {
goto ERR
}
if err = Conn.write(recData); err != nil {
goto ERR
}
}
ERR:
close(Conn.resultChan)
}
func (Conn *Engine) readMsg() (recData map[string]interface{}, err error) {
var (
data []byte
dataParams = make(map[string]interface{})
)
recData = make(map[string]interface{})
if !Conn.isCloseRead {
if Conn.Conn != nil {
if _, data, err = Conn.Conn.ReadMessage(); err != nil {
logrus.Errorln(err)
if !Conn.isCloseWrite {
Conn.isRestartLock.Lock()
if !Conn.isRestart {
Conn.isRestart = true
if Conn.Conn != nil {
_ = Conn.Conn.Close()
}
if Conn.Conn, err = Conn.determineWsConn(Conn.lang, Conn.contextPath); err != nil {
logrus.Errorln(err)
return
}
}
Conn.isRestart = false
Conn.isRestartLock.Unlock()
return Conn.readMsg()
} else {
_ = Conn.write([]byte(`{"is_final":3,"desc":"speech_end"}`))
err = errors.New("the speech end")
}
if Conn.preResult != nil {
if _, ok := Conn.preResult["is_final"]; ok {
if Conn.preResult["is_final"].(int) == 1 {
return Conn.readMsg()
} else {
Conn.preResult["is_final"] = 1
}
recData = Conn.preResult
logrus.Println(recData)
}
}
return
}
//logrus.Println(data)
if len(data) < 10 {
recData["results"] = ""
recData["is_final"] = 0
recData["start"] = 0
recData["end"] = 0
recData["duration"] = 0
return Conn.readMsg()
}
//logrus.Println(string(data))
if err = json.Unmarshal(data, &dataParams); err != nil {
recData = Conn.preResult
err = nil
return
}
if _, okStatus := dataParams["status"]; okStatus {
if dataType, okType := dataParams["type"]; okType {
if dataType.(string) == "wait" || dataType.(string) == "server_ready" || dataType.(string) == "reload_end" {
recData["results"] = ""
recData["is_final"] = 0
recData["start"] = 0
recData["end"] = 0
recData["duration"] = 0
}
if dataType.(string) == "partial_result" {
recData["is_final"] = 0
bestFormat := dataParams["nbest"].([]interface{})
if len(bestFormat) > 0 {
recData["results"] = bestFormat[0].(map[string]interface{})["sentence"].(string)
} else {
recData["results"] = ""
}
recData["start"] = dataParams["timestamp"].(map[string]interface{})["start"].(float64)
recData["end"] = dataParams["timestamp"].(map[string]interface{})["end"].(float64)
recData["duration"] = int(recData["end"].(float64)-recData["start"].(float64)) * 32
Conn.preResult = recData
}
if dataType.(string) == "final_result" {
recData["start"] = dataParams["timestamp"].(map[string]interface{})["start"].(float64)
recData["end"] = dataParams["timestamp"].(map[string]interface{})["end"].(float64)
if recData["start"].(float64) == 0 {
Conn.preSpeechTime = Conn.preEndTime
}
Conn.preEndTime = recData["end"].(float64) + Conn.preSpeechTime
recData["is_final"] = 0
recData["results"] = ""
recData["start"] = 0
recData["end"] = 0
recData["duration"] = 0
return Conn.readMsg()
//bestFormat := dataParams["nbest"].([]interface{})
//if len(bestFormat) > 0 {
// recData["results"] = bestFormat[0].(map[string]interface{})["sentence"].(string)
//} else {
// recData["results"] = ""
//}
//recData["start"] = dataParams["timestamp"].(map[string]interface{})["start"].(float64) + Conn.preSpeechTime
//recData["end"] = dataParams["timestamp"].(map[string]interface{})["end"].(float64) + Conn.preSpeechTime
//if recData["start"].(float64) == 0 {
// Conn.preSpeechTime = Conn.preEndTime
//
//}
//Conn.preEndTime = recData["end"].(float64)
////if Conn.preEndTime != recData["start"].(float64) {
//// recData["start"] = Conn.preEndTime
////}
////if recData["end"].(float64) < Conn.preEndTime {
//// recData["end"] = recData["end"].(float64) + Conn.preSpeechTime
//// Conn.preEndTime = recData["end"].(float64)
////} else {
//// Conn.preEndTime = recData["end"].(float64)
////}
//recData["duration"] = int(recData["end"].(float64)-recData["start"].(float64)) * 32
//if recData["results"] != "" {
// log.Println(recData)
// recData["results"] = "!!!!!!!!!!!!!"
//}
////recData["results"] = ""
//Conn.preResult = recData
////log.Println(recData)
}
if dataType.(string) == "final_result_second_pass" {
recData["is_final"] = 1
bestFormat := dataParams["nbest"].([]interface{})
if len(bestFormat) > 0 {
recData["results"] = bestFormat[0].(map[string]interface{})["sentence"].(string)
} else {
recData["results"] = ""
}
recData["start"] = dataParams["timestamp"].(map[string]interface{})["start"].(float64) + Conn.preSpeechTime
recData["end"] = dataParams["timestamp"].(map[string]interface{})["end"].(float64) + Conn.preSpeechTime
//Conn.preEndTime = recData["end"].(float64)
log.Println("second_pass", recData)
//if recData["results"].(string) == "" {
// recData["results"] = "@@@@"
//}
//
//if recData["start"].(float64) == 0 {
// Conn.preSpeechTime = Conn.preEndTime
//}
//if Conn.preEndTime != recData["start"].(float64) {
// recData["start"] = Conn.preEndTime
// //if recData["end"].(float64) != 0 {
// // recData["start"] = Conn.preEndTime
// //}
//}
//if recData["end"].(float64) < Conn.preEndTime {
// recData["end"] = recData["end"].(float64) + Conn.preSpeechTime
// Conn.preEndTime = recData["end"].(float64)
// //if recData["end"].(float64) != 0 {
// // recData["end"] = recData["end"].(float64) + Conn.preEndTime
// // Conn.preEndTime = recData["end"].(float64)
// //}
//} else {
// Conn.preEndTime = recData["end"].(float64)
//}
recData["duration"] = int(recData["end"].(float64)-recData["start"].(float64)) * 32
Conn.preResult = recData
}
if dataType.(string) == "speech_end" {
time.Sleep(200 * time.Millisecond)
if !Conn.isCloseWrite {
_ = Conn.Conn.Close()
logrus.Println("the engine end speech,try to connect it:", Conn.lang, Conn.contextPath)
Conn.Conn, _ = Conn.determineWsConn(Conn.lang, Conn.contextPath)
Conn.isSwitch = false
} else {
Conn.IsEnd = true
_ = Conn.write([]byte(`{"is_final":3,"desc":"speech_end"}`))
err = errors.New("the speech end")
}
//Conn.IsEnd = true
}
} else {
err = errors.New("data format error")
}
} else {
if Conn.preResult["is_final"].(int) != 1 {
Conn.preResult["is_final"] = 1
}
recData = Conn.preResult
err = errors.New("connection is closed")
return
}
} else {
if Conn.Conn, err = Conn.determineWsConn(Conn.lang, Conn.contextPath); err != nil {
return
}
return Conn.readMsg()
}
} else {
err = errors.New("read engine message closed")
}
return
}
func (Conn *Engine) WriteMsg(writeData []byte, datatype int) (err error) {
if Conn.isRestart {
Conn.isRestartLock.Lock()
Conn.isRestartLock.Unlock()
}
if !Conn.isCloseWrite {
if Conn.Conn != nil {
if Conn.isSwitch {
time.Sleep(100 * time.Millisecond)
return Conn.WriteMsg(writeData, datatype)
}
if datatype == 0 {
//logrus.Println("=============>>>>>> the audio length:", len(writeData))
if err = Conn.Conn.WriteMessage(websocket.BinaryMessage, writeData); err != nil {
logrus.Errorln(err)
if Conn.Conn, err = Conn.determineWsConn(Conn.lang, Conn.contextPath); err != nil {
return
}
return Conn.WriteMsg(writeData, datatype)
}
}
if datatype == 1 {
Conn.isCloseWrite = true
if err = Conn.Conn.WriteMessage(websocket.TextMessage, writeData); err != nil {
logrus.Error(err)
return
}
}
if datatype == 2 {
//logrus.Println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS", `{"signal" : "end"}`)
if err = Conn.Conn.WriteMessage(websocket.TextMessage, []byte(`{"signal" : "end"}`)); err != nil {
logrus.Error(err)
return
}
}
if datatype == 3 {
//logrus.Println("EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE", string(writeData))
jsonMap := make(map[string]interface{})
if err = json.Unmarshal(writeData, &jsonMap); err != nil {
logrus.Error(err)
return
}
if jsonMap["language"].(string) == "ZH-CN" || jsonMap["language"].(string) == "ZH-HK" || jsonMap["language"].(string) == "EN" {
Conn.lang = jsonMap["language"].(string)
if Conn.lang == "ZH-CN" {
Conn.contextPath = initServer.ConfigMap["hotWord-ZH-CN"].(string)
}
if Conn.lang == "ZH-HK" {
Conn.contextPath = initServer.ConfigMap["hotWord-ZH-HK"].(string)
}
if Conn.lang == "EN" {
Conn.contextPath = initServer.ConfigMap["hotWord-EN"].(string)
}
}
if err = Conn.Conn.WriteMessage(websocket.TextMessage, []byte(`{"signal" : "end"}`)); err != nil {
logrus.Error(err)
return
}
Conn.isSwitch = true
logrus.Println(`{"signal" : "end"}`)
}
} else {
if Conn.Conn, err = Conn.determineWsConn(Conn.lang, Conn.contextPath); err != nil {
return
}
return Conn.WriteMsg(writeData, datatype)
}
} else {
err = errors.New("write engine message closed")
}
return
}
func (Conn *Engine) CloseEngine() (err error) {
if Conn != nil {
Conn.isCloseWrite = true
//if Conn.Conn != nil{
// _ = Conn.Conn.WriteMessage(websocket.TextMessage,[]byte(`{"signal" : "end"}`))
//}
time.Sleep(200 * time.Millisecond)
Conn.isCloseRead = true
if Conn.Conn != nil {
if Conn.IsEnd {
_ = Conn.Conn.Close()
Conn.IsEnd = false
}
_ = Conn.Conn.Close()
}
if !Conn.isClose {
Conn.isClose = true
if Conn.lang == "ZH-CN" {
initServer.ZhCnLock.Lock()
if _, ok := initServer.ZhCn[Conn.ipStr]["num"]; ok {
if initServer.ZhCn[Conn.ipStr]["num"].(int) > 0 {
initServer.ZhCn[Conn.ipStr]["num"] = initServer.ZhCn[Conn.ipStr]["num"].(int) - 1
//logrus.Println(initServer.ZhCn)
}
}
initServer.ZhCnLock.Unlock()
}
if Conn.lang == "ZH-HK" {
initServer.ZhHkLock.Lock()
if _, ok := initServer.ZhHk[Conn.ipStr]["num"]; ok {
if initServer.ZhHk[Conn.ipStr]["num"].(int) > 0 {
initServer.ZhHk[Conn.ipStr]["num"] = initServer.ZhHk[Conn.ipStr]["num"].(int) - 1
//logrus.Println(initServer.ZhHk)
}
}
initServer.ZhHkLock.Unlock()
}
if Conn.lang == "EN" {
initServer.ENLock.Lock()
if _, ok := initServer.EN[Conn.ipStr]["num"]; ok {
if initServer.EN[Conn.ipStr]["num"].(int) > 0 {
initServer.EN[Conn.ipStr]["num"] = initServer.EN[Conn.ipStr]["num"].(int) - 1
//logrus.Println(initServer.EN)
}
}
initServer.ENLock.Unlock()
}
}
}
return
}
package funcHandler
import (
"RaisoundWebSocketBase2C/initServer"
"bytes"
"crypto/hmac"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/gofrs/uuid"
"github.com/sirupsen/logrus"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"strings"
"time"
)
// CheckToken TODO:校验token
func VerityToken(token, checkTokenURL string) (isYes bool, err error) {
var (
url string // 请求路由
jsonData []byte // 请求json数据
req *http.Request
resp *http.Response
resultBody []byte // 请求结果
resultMap map[string]interface{} // 请求结果数据转换位map
)
url = checkTokenURL
// 请求json数据格式
jsonMap := make(map[string]string)
jsonMap["token"] = token
if jsonData, err = json.Marshal(jsonMap); err != nil {
err = errors.New("json error")
log.Println(url, err)
return
}
// post请求,获取结果
if req, err = http.NewRequest("POST", url, bytes.NewBuffer(jsonData)); err != nil {
err = errors.New("request error")
log.Println(url, err)
return
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
if resp, err = client.Do(req); err != nil {
err = errors.New("response error")
log.Println(url, err)
return
}
if resultBody, err = ioutil.ReadAll(resp.Body); err != nil {
err = errors.New("read error")
log.Println(url, err)
return
}
log.Println("token response:", string(resultBody))
// 请求结果数据转换位map
if err = json.Unmarshal(resultBody, &resultMap); err != nil {
err = errors.New("[]byte to map error")
log.Println(url, err)
return
}
// 提取结果
if _, ok := resultMap["code"]; ok {
switch resultMap["code"].(type) {
case string:
if resultMap["code"].(string) == "200" {
isYes = true
} else {
err = errors.New("token error")
}
case float64:
if resultMap["code"].(float64) == float64(200) {
isYes = true
} else {
err = errors.New("token error")
}
}
} else {
isYes = false
err = errors.New("checking token error, please check server")
return ok, errors.New("checking token error, please check server")
}
return
}
type PostReq struct {
Id int `json:"id"`
Texts string `json:"texts"`
Mode int `json:"mode"`
Backup map[string]interface{} `json:"backup"`
}
type PostResp struct {
Status int `json:"status"`
Id int `json:"id"`
Result string `json:"result"`
Backup map[string]interface{} `json:"backup"`
}
func PostGet(req PostReq, url string) (resp *PostResp, err error) {
var (
jsonByte []byte
reqHttp *http.Request
respHttp *http.Response
respBody []byte
client = &http.Client{
Timeout: time.Duration(500) * time.Millisecond,
}
)
if jsonByte, err = json.Marshal(req); err != nil {
return
}
//logrus.Println("req:", string(jsonByte))
if reqHttp, err = http.NewRequest("POST", url, bytes.NewBuffer(jsonByte)); err != nil {
return
}
reqHttp.Header.Set("Content-Type", "application/json")
if respHttp, err = client.Do(reqHttp); err != nil {
logrus.Errorln(err.Error())
return
}
defer func() {
if respHttp.Body != nil {
_ = respHttp.Body.Close()
}
}()
if respBody, err = ioutil.ReadAll(respHttp.Body); err != nil {
logrus.Errorln(err.Error())
return
}
//logrus.Println(string(respBody))
if respHttp.StatusCode == 200 {
resp = &PostResp{}
if err = json.Unmarshal(respBody, resp); err != nil {
logrus.Errorln(err.Error())
return
}
} else {
err = errors.New(respHttp.Status)
logrus.Errorln(err.Error())
}
//logrus.Println(err, fmt.Sprintf("status=%v,result=%v,backup=%v,error=%v", resp.Status, resp.Result, resp.Backup, err.Error()))
return
}
// TODO:检验签名
func CheckSignature(secretID, timestamp, signature string) (isOk bool, err error) {
var (
stitchStr string
key string
apiKey []byte
getSignature string
)
stitchStr = fmt.Sprintf("secretID=%v&timestamp=%v", secretID, timestamp)
if MysqlConn != nil {
if key, err = MysqlConn.selectApiKey(secretID); err != nil {
return
}
apiKey = []byte(key)
}
// TODO :MD5加密
m5 := md5.New()
m5.Write([]byte(stitchStr))
stitchStr = hex.EncodeToString(m5.Sum(nil))
// TODO : 生成signature
mac := hmac.New(sha1.New, apiKey)
mac.Write([]byte(stitchStr))
getSignature = hex.EncodeToString(mac.Sum(nil))
if getSignature == signature {
isOk = true
} else {
err = errors.New(fmt.Sprintf("signature error.[getSecretID=%v,getKey=%v,timestamp=%v,getSignature=%v]", secretID, key, timestamp, getSignature))
}
return
}
// TODO: 校验token
func CheckToken(token, userId string) (ok bool, err error) {
var (
Url = initServer.ConfigMap["tokenVerityURL"].(string)
params = url.Values{}
responseData []byte
resp *http.Response
resultMap = make(map[string]interface{})
)
params["user_id"] = []string{userId}
params["token"] = []string{token}
if resp, err = http.PostForm(Url, params); err != nil {
return
}
defer func(response *http.Response) {
if err = response.Body.Close(); err != nil {
return
}
}(resp)
if resp.StatusCode != 200 {
err = errors.New("request error")
return
}
if responseData, err = ioutil.ReadAll(resp.Body); err != nil {
return
}
if err = json.Unmarshal(responseData, &resultMap); err != nil {
return
}
switch resultMap["code"].(type) {
case float64:
if resultMap["code"].(float64) == 200 {
ok = true
} else {
err = errors.New("verifying token error")
}
case int:
if resultMap["code"].(int) == 200 {
ok = true
} else {
err = errors.New("verifying token error")
}
}
return
}
// TODO:生成唯一ID为sessionId
func GetUuid() (getUid string) {
var (
uid uuid.UUID
errMsg error
)
if uid, errMsg = uuid.NewV4(); errMsg != nil {
if uid, errMsg = uuid.NewV1(); errMsg != nil {
if uid, errMsg = uuid.NewV6(); errMsg != nil {
getUid = time.Now().String()
return
}
}
}
getUid = uid.String()
return
}
// TODO:保存音频
func SaveAudioData(meetingId, sessionID string, audioData []byte, dataNum int) (singleFileName string, errMsg error) {
var (
pcmPath string
err error
sFile *os.File
)
pcmPath = initServer.ConfigMap["pcmPath"].(string)
// 确定文件格式
now := time.Now()
dataSubx := fmt.Sprintf("%d-%02d-%02d", now.Year(), now.Month(), now.Day())
fileDir := fmt.Sprintf("%v/%v/%v/%v", pcmPath, meetingId, sessionID, dataSubx)
sInt := fmt.Sprintf("%08d", dataNum)
sName := fmt.Sprintf("%v/%v/%v/%v/%v-%v.wav", pcmPath, meetingId, sessionID, dataSubx, sessionID, sInt)
//fileName = fmt.Sprintf("%v-%v.wav", sessionID, sInt)
singleFileName = sName
//if dataNum >=4 {
// fileInt := fmt.Sprintf("%08d", dataNum-3)
// fileName := fmt.Sprintf("%v/%v/%v/%v/%v-%v.wav", pcmPath, meetingId, sessionID, dataSubx, sessionID, fileInt)
// _ = os.Remove(fileName)
//}
// 创建文件夹
_, err = os.Stat(fileDir)
if os.IsNotExist(err) {
if err = os.MkdirAll(fileDir, os.ModePerm); err != nil {
fmt.Println(err)
errMsg = err
return
}
}
waveHeader := pcmToWavHeader(1, 16000, 16, len(audioData))
// 将数据存入单独文件
if sFile, err = os.OpenFile(sName, os.O_RDONLY|os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0766); err != nil {
errMsg = err
return
}
if _, err = sFile.Write(waveHeader); err != nil {
if err = sFile.Close(); err != nil {
return
}
errMsg = err
return
}
if _, err = sFile.Write(audioData); err != nil {
if err = sFile.Close(); err != nil {
return
}
errMsg = err
return
}
if err = sFile.Close(); err != nil {
errMsg = err
return
}
return
}
// TODO:判断文件是否存在
func ExistsFile(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
// TODO:根据检索获取热词文本地址
func GetHotWordTextPath(userId string) (hotWordPath string, err error) {
hotWordPathStr := fmt.Sprintf("%v/%v.txt", initServer.ConfigMap["hotWordTextDir"].(string), userId)
if ok, _ := ExistsFile(hotWordPathStr); ok {
hotWordPath = hotWordPathStr
} else {
hotWordPathStr = ""
err = errors.New("the file is not existing")
}
return
}
// TODO:获取热词文本地址
func GetHotWordPath(userId string) (hotWordPath string, err error) {
var (
Url = initServer.ConfigMap["hotWordURL"].(string)
params = url.Values{}
responseData []byte
resp *http.Response
resultMap = make(map[string]interface{})
)
params["user_id"] = []string{userId}
if resp, err = http.PostForm(Url, params); err != nil {
return
}
defer func(response *http.Response) {
if response != nil {
_ = response.Body.Close()
}
}(resp)
if resp.StatusCode != 200 {
err = errors.New("network request error")
return
}
if responseData, err = ioutil.ReadAll(resp.Body); err != nil {
return
}
if err = json.Unmarshal(responseData, &resultMap); err != nil {
return
}
switch resultMap["code"].(type) {
case float64:
if resultMap["code"].(float64) == 200 {
hotWordPath = resultMap["data"].(map[string]interface{})["path"].(string)
} else {
err = errors.New("hot word text is not exiting")
}
case int:
if resultMap["code"].(float64) == 200 {
hotWordPath = resultMap["data"].(map[string]interface{})["path"].(string)
} else {
err = errors.New("hot word text is not exiting")
}
}
log.Println("user_id: ", userId)
log.Println("hotWordPath: ", hotWordPath)
return
}
// TODO: POST结构请求处理
func postHandler(URL string, reqJson string) (result map[string]interface{}, err error) {
var (
Url = URL
resp *http.Response
responseData []byte
)
result = make(map[string]interface{})
if resp, err = http.Post(Url, "application/json", strings.NewReader(reqJson)); err != nil {
log.Println(URL, err.Error())
return
}
defer func(response *http.Response) {
if err = response.Body.Close(); err != nil {
log.Println(URL, err.Error())
}
}(resp)
if resp.StatusCode != 200 {
err = errors.New("request error")
log.Println(URL, err.Error())
}
if responseData, err = ioutil.ReadAll(resp.Body); err != nil {
log.Println(URL, err.Error())
}
if err = json.Unmarshal(responseData, &result); err != nil {
log.Println(URL, err.Error())
}
return
}
// TODO: 粤语
func CantoneseHandler(text, preText string, usePunctuation, useSensitive, useDigitConvert bool) (result, preResult string, err error) {
var (
resultMap = make(map[string]interface{})
jsonStr string
)
// 转数字
if useDigitConvert {
jsonStr = fmt.Sprintf(`{"id":"","texts":"%v"}`, text)
if resultMap, err = postHandler(initServer.ConfigMap["digitConvertURL-ZH-HK"].(string), jsonStr); err != nil {
log.Println(err)
} else {
if resultMap != nil {
switch resultMap["status"].(type) {
case float64:
if resultMap["status"].(float64) == float64(200) {
text = resultMap["result"].(string)
}
case int:
if resultMap["status"].(int) == 200 {
text = resultMap["result"].(string)
}
}
}
}
}
// 标点
if usePunctuation {
temporaryResult := preText + text
// 请求json数据格式
jsonMap := make(map[string]interface{})
jsonMap["id"] = ""
if len([]rune(temporaryResult)) <= 512 && preText == "" {
jsonMap["texts"] = temporaryResult
jsonMap["pre_vad_length"] = -1
} else if len([]rune(temporaryResult)) <= 512 && preText != "" {
jsonMap["texts"] = temporaryResult
jsonMap["pre_vad_length"] = len([]rune(preText))
} else if len([]rune(temporaryResult)) > 512 && preText == "" {
jsonMap["texts"] = string([]rune(temporaryResult)[:512])
jsonMap["pre_vad_length"] = -1
} else {
jsonMap["texts"] = string([]rune(temporaryResult)[:512])
jsonMap["pre_vad_length"] = len([]rune(preText))
}
jsonData, _ := json.Marshal(jsonMap)
if resultMap, err = postHandler(initServer.ConfigMap["punctuationURL-ZH-HK"].(string), string(jsonData)); err != nil {
log.Println(err)
preText = ""
} else {
if resultMap != nil {
switch resultMap["status"].(type) {
case float64:
if resultMap["status"].(float64) == float64(200) {
text = resultMap["result"].(string)
preText = resultMap["pre_punc_result"].(string)
} else {
preText = ""
}
case int:
if resultMap["status"].(int) == 200 {
text = resultMap["result"].(string)
preText = resultMap["pre_punc_result"].(string)
} else {
preText = ""
}
}
}
}
}
// 敏感词
if useSensitive {
jsonStr = fmt.Sprintf(`{"id":"","texts":"%v"}`, text)
if resultMap, err = postHandler(initServer.ConfigMap["sensitiveWordURL-ZH-HK"].(string), jsonStr); err != nil {
log.Println(err)
} else {
if resultMap != nil {
switch resultMap["status"].(type) {
case float64:
if resultMap["status"].(float64) == float64(200) {
text = resultMap["result"].(string)
} else {
preText = ""
}
case int:
if resultMap["status"].(int) == 200 {
text = resultMap["result"].(string)
} else {
preText = ""
}
}
}
}
}
result = text
preResult = preText
return
}
// TODO: 中文
func ChineseHandler(text, preText string, usePunctuation, useSensitive, useDigitConvert bool) (result, preResult string, err error) {
var (
resultMap = make(map[string]interface{})
jsonStr string
)
// 转数字
if useDigitConvert {
jsonStr = fmt.Sprintf(`{"id":"","texts":"%v"}`, text)
if resultMap, err = postHandler(initServer.ConfigMap["digitConvertURL-ZH-CN"].(string), jsonStr); err != nil {
log.Println(err)
} else {
if resultMap != nil {
switch resultMap["status"].(type) {
case float64:
if resultMap["status"].(float64) == float64(200) {
text = resultMap["result"].(string)
}
case int:
if resultMap["status"].(int) == 200 {
text = resultMap["result"].(string)
}
}
}
}
}
// 标点
if usePunctuation {
temporaryResult := preText + text
// 请求json数据格式
jsonMap := make(map[string]interface{})
jsonMap["id"] = ""
if len([]rune(temporaryResult)) <= 512 && preText == "" {
jsonMap["texts"] = temporaryResult
jsonMap["pre_vad_length"] = -1
} else if len([]rune(temporaryResult)) <= 512 && preText != "" {
jsonMap["texts"] = temporaryResult
jsonMap["pre_vad_length"] = len([]rune(preText))
} else if len([]rune(temporaryResult)) > 512 && preText == "" {
jsonMap["texts"] = string([]rune(temporaryResult)[:512])
jsonMap["pre_vad_length"] = -1
} else {
jsonMap["texts"] = string([]rune(temporaryResult)[:512])
jsonMap["pre_vad_length"] = len([]rune(preText))
}
jsonData, _ := json.Marshal(jsonMap)
if resultMap, err = postHandler(initServer.ConfigMap["punctuationURL-ZH-CN"].(string), string(jsonData)); err != nil {
preText = ""
log.Println(err)
} else {
if resultMap != nil {
switch resultMap["status"].(type) {
case float64:
if resultMap["status"].(float64) == float64(200) {
text = resultMap["result"].(string)
preText = resultMap["pre_punc_result"].(string)
} else {
preText = ""
}
case int:
if resultMap["status"].(int) == 200 {
text = resultMap["result"].(string)
preText = resultMap["pre_punc_result"].(string)
} else {
preText = ""
}
}
}
}
}
// 敏感词
if useSensitive {
jsonStr = fmt.Sprintf(`{"id":"","texts":"%v"}`, text)
if resultMap, err = postHandler(initServer.ConfigMap["sensitiveWordURL-ZH-CN"].(string), jsonStr); err != nil {
log.Println(err)
} else {
if resultMap != nil {
switch resultMap["status"].(type) {
case float64:
if resultMap["status"].(float64) == float64(200) {
text = resultMap["result"].(string)
} else {
preText = ""
}
case int:
if resultMap["status"].(int) == 200 {
text = resultMap["result"].(string)
} else {
preText = ""
}
}
}
}
}
result = text
preResult = preText
return
}
// TODO: 英文
func EnglishHandler(text, preText string, preVadLen int, usePunctuation, useC2a bool) (result, preResult string, preResultVadLen int, err error) {
var (
resultMap = make(map[string]interface{})
jsonStr string
)
if usePunctuation {
temporaryResult := preText + " " + text
// 请求json数据格式
jsonMap := make(map[string]interface{})
jsonMap["id"] = ""
jsonMap["texts"] = temporaryResult
if preVadLen != 0 {
jsonMap["pre_vad_length"] = preVadLen
} else {
jsonMap["pre_vad_length"] = -1
}
jsonData, _ := json.Marshal(jsonMap)
if resultMap, err = postHandler(initServer.ConfigMap["EnPunctuationURL"].(string), string(jsonData)); err != nil {
preText = ""
log.Println(err)
} else {
if resultMap != nil {
switch resultMap["status"].(type) {
case float64:
if resultMap["status"].(float64) == float64(200) {
text = resultMap["result"].(string)
preText = resultMap["pre_punc_result"].(string)
switch resultMap["pre_vad_length"].(type) {
case float64:
preResultVadLen = int(resultMap["pre_vad_length"].(float64))
case int:
preResultVadLen = resultMap["pre_vad_length"].(int)
}
} else {
preText = ""
preResultVadLen = 0
}
case int:
if resultMap["status"].(int) == 200 {
text = resultMap["result"].(string)
preText = resultMap["pre_punc_result"].(string)
switch resultMap["pre_vad_length"].(type) {
case float64:
preResultVadLen = int(resultMap["pre_vad_length"].(float64))
case int:
preResultVadLen = resultMap["pre_vad_length"].(int)
}
} else {
preText = ""
preResultVadLen = 0
}
}
}
}
}
if useC2a {
jsonStr = fmt.Sprintf(`{"id":"","texts":"%v"}`, text)
if resultMap, err = postHandler(initServer.ConfigMap["e2nURL"].(string), jsonStr); err != nil {
log.Println(err)
} else {
if resultMap != nil {
switch resultMap["status"].(type) {
case float64:
if resultMap["status"].(float64) == float64(200) {
text = resultMap["result"].(string)
} else {
preText = ""
}
case int:
if resultMap["status"].(int) == 200 {
text = resultMap["result"].(string)
} else {
preText = ""
}
}
}
}
}
result = text
preResult = preText
return
}
package funcHandler
import (
"RaisoundWebSocketBase2C/initServer"
"database/sql"
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql"
"log"
"time"
)
type mysqlConnInit struct {
connAddr string
db *sql.DB
table string
}
func conn(connAddr string) (conn *sql.DB, err error) {
if conn, err = sql.Open("mysql", connAddr); err != nil {
return
}
conn.SetMaxOpenConns(20)
conn.SetMaxIdleConns(2)
conn.SetConnMaxLifetime(60 * 24 * 365 * time.Hour)
conn.SetConnMaxIdleTime(60 * 24 * 365 * time.Hour)
if err = conn.Ping(); err != nil {
return
}
return
}
func InitMysqlConn(mysqlMap map[string]interface{}) (mysqlConn *mysqlConnInit, err error) {
var (
mysqlDB *sql.DB
)
connPath := fmt.Sprintf("%s:%s@(%s)/%s?charset=utf8", mysqlMap["user"].(string), mysqlMap["passwd"].(string), mysqlMap["address"].(string), mysqlMap["dbName"].(string))
if mysqlDB, err = conn(connPath); err != nil {
log.Println(err.Error())
}
mysqlConn = &mysqlConnInit{
connAddr: connPath,
db: mysqlDB,
table: mysqlMap["tableName"].(string),
}
go func() {
var e error
for {
if mysqlConn.db != nil {
if e = mysqlConn.db.Ping(); e != nil {
if mysqlDB, err = conn(connPath); err != nil {
log.Println(err.Error())
time.Sleep(5 * time.Second)
continue
}
mysqlConn.db = mysqlDB
}
} else {
if mysqlDB, err = conn(connPath); err != nil {
log.Println(err.Error())
time.Sleep(5 * time.Second)
continue
}
mysqlConn.db = mysqlDB
}
time.Sleep(30 * time.Second)
}
}()
return
}
func (mysqlConn *mysqlConnInit) selectApiKey(secretId string) (getKey string, err error) {
var (
getRow *sql.Rows
key string
)
if err = mysqlConn.db.Ping(); err != nil {
return
} else {
tableName := mysqlConn.table
selectStr := fmt.Sprintf("SELECT `key` FROM %v WHERE `is_active`=1 AND `secret`='%v'", tableName, secretId)
//log.Println(selectStr)
if getRow, err = mysqlConn.db.Query(selectStr); err != nil {
err = errors.New("check error")
return
} else {
if getRow != nil {
//if !getRow.Next() {
// err = errors.New("secretId is invalid or wrong")
// return
//}
for getRow.Next() {
_ = getRow.Scan(&key)
}
getKey = key
if getKey == "" {
err = errors.New("secretId is invalid or wrong")
}
} else {
}
defer func() {
if getRow != nil {
_ = getRow.Close()
}
}()
}
}
return
}
var (
MysqlConn *mysqlConnInit
err error
)
func init() {
go func() {
select {
case <-initServer.StartSignal:
}
for {
if initServer.MysqlMap != nil {
if MysqlConn, err = InitMysqlConn(initServer.MysqlMap); err != nil {
log.Println(err)
}
break
} else {
time.Sleep(1 * time.Second)
}
}
}()
}
package funcHandler
import (
"fmt"
"github.com/garyburd/redigo/redis"
)
type RdbStruct struct {
rDbConn redis.Conn
channel string
server string
}
func InitRdb(channel, server, passwd string, ) (Db *RdbStruct, err error) {
var (
pool *redis.Pool
)
pool = &redis.Pool{ //实例化一个连接池
MaxIdle: 10, //最初的连接数量
// MaxActive:1000000, //最大连接数量
MaxActive: 0, //连接池最大连接数量,不确定可以用0(0表示自动定义),按需分配
IdleTimeout: 300, //连接关闭时间 300秒 (300秒不使用自动关闭)
Dial: func() (conn redis.Conn, e error) {
//此处对应redis ip及端口号
if conn, e = redis.Dial("tcp", server); e != nil {
return
}
//此处1234对应redis密码
if _, e = conn.Do("AUTH", passwd); err != nil {
_ = conn.Close()
return nil, err
}
return conn, err
},
}
Db = &RdbStruct{
rDbConn: pool.Get(),
channel: channel,
server: server,
}
return
}
func (rdb *RdbStruct) PubMsg(msg string) (err error) {
if _, err = rdb.rDbConn.Do("Publish", rdb.channel, msg); err != nil {
var (
rDb redis.Conn
)
if rDb, err = redis.Dial("tcp", rdb.server); err != nil {
return
}
if _, err = rDb.Do("Publish", rdb.channel, msg); err != nil {
return
}
rdb.rDbConn = rDb
}
return
}
func (rdb *RdbStruct) LeftPubMsg(msg string) (err error) {
if _, err = rdb.rDbConn.Do("lpush", "audioQueue", msg); err != nil {
fmt.Println(err)
var (
rDb redis.Conn
)
if rDb, err = redis.Dial("tcp", rdb.server); err != nil {
return
}
if _, err = rdb.rDbConn.Do("lpush", "audioQueue", msg); err != nil {
return
}
rdb.rDbConn = rDb
}
fmt.Println(msg)
return
}
func (rdb *RdbStruct) RedisClose() {
var err error
if err = rdb.rDbConn.Close(); err != nil {
fmt.Println("HHHHHHHHHH", err)
}
}
package funcHandler
import (
"RaisoundWebSocketBase2C/initServer"
"fmt"
"io/ioutil"
"os"
)
type WavStruct struct {
file *os.File
meetingId string
EFile string
audioLength int
isClose bool
}
//InitSaveWave TODO:初始化wave文件
func InitSaveWave(meetingId string)(wav *WavStruct, err error){
var (
file *os.File
audioLength int
)
// 创建文件夹
pcmPath := initServer.ConfigMap["pcmPath"].(string)
// 确定文件格式
fileDir := fmt.Sprintf("%v/%v", pcmPath, meetingId)
eName := fmt.Sprintf("%v/%vf.wav",fileDir,meetingId)
if exist, _ := ExistsFile(eName); exist {
content,_:=ioutil.ReadFile(eName)
audioLength = len(content)
}
_, err = os.Stat(fileDir)
if os.IsNotExist(err) {
if err = os.MkdirAll(fileDir, os.ModePerm); err != nil {
return
}
}
// 将数据存入总文件
if file, err = os.OpenFile(eName, os.O_RDONLY|os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0766); err != nil {
return
}
wav = &WavStruct{
file:file,
meetingId: meetingId,
EFile: eName,
audioLength: audioLength,
}
return
}
func (w *WavStruct) SaveData(data []byte) (filePath string,err error){
if _ , err = w.file.Write(data); err != nil{
if w.file, err = os.OpenFile(w.EFile, os.O_RDONLY|os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0766); err != nil {
return
}
if _ , err = w.file.Write(data); err != nil{
return
}
}
filePath = w.EFile
w.audioLength = w.audioLength+len(data)
return
}
func (w *WavStruct) CloseWav()(err error){
//_ ,_ = w.file.Seek(44,0)
//w.file.WriteAt()
if !w.isClose{
w.isClose = true
if err = w.file.Close(); err != nil{
}
file1, _ := os.OpenFile(w.EFile, os.O_RDWR, os.ModePerm)
l,_:= file1.Seek(0,0)
waveHeader := pcmToWavHeader(1, 16000, 16, w.audioLength)
_,_=file1.WriteAt(waveHeader,l)
_ = file1.Close()
}
return
}
/**
dst:二进制字符串
numChannel:1=单声道,2=多声道
sampleRate:采样率 8000/16000
*/
func pcmToWavHeader(numChannel, sampleRate, bitsPerSample, audioLen int) (wavHeader []byte) {
longSampleRate := sampleRate
byteRate := bitsPerSample * sampleRate * numChannel / 8
totalAudioLen := audioLen
totalDataLen := totalAudioLen + 36
var header = make([]byte, 44)
// RIFF/WAVE header
header[0] = 'R'
header[1] = 'I'
header[2] = 'F'
header[3] = 'F'
header[4] = byte(totalDataLen & 0xff)
header[5] = byte((totalDataLen >> 8) & 0xff)
header[6] = byte((totalDataLen >> 16) & 0xff)
header[7] = byte((totalDataLen >> 24) & 0xff)
//WAVE
header[8] = 'W'
header[9] = 'A'
header[10] = 'V'
header[11] = 'E'
// 'fmt ' chunk
header[12] = 'f'
header[13] = 'm'
header[14] = 't'
header[15] = ' '
// 4 bytes: size of 'fmt ' chunk
header[16] = 16
header[17] = 0
header[18] = 0
header[19] = 0
// format = 1
header[20] = 1
header[21] = 0
header[22] = byte(numChannel)
header[23] = 0
header[24] = byte(longSampleRate & 0xff)
header[25] = byte((longSampleRate >> 8) & 0xff)
header[26] = byte((longSampleRate >> 16) & 0xff)
header[27] = byte((longSampleRate >> 24) & 0xff)
header[28] = byte(byteRate & 0xff)
header[29] = byte((byteRate >> 8) & 0xff)
header[30] = byte((byteRate >> 16) & 0xff)
header[31] = byte((byteRate >> 24) & 0xff)
// block align
header[32] = byte(numChannel * bitsPerSample / 8)
header[33] = 0
// bits per sample
header[34] = byte(bitsPerSample)
header[35] = 0
//data
header[36] = 'd'
header[37] = 'a'
header[38] = 't'
header[39] = 'a'
header[40] = byte(totalAudioLen & 0xff)
header[41] = byte((totalAudioLen >> 8) & 0xff)
header[42] = byte((totalAudioLen >> 16) & 0xff)
header[43] = byte((totalAudioLen >> 24) & 0xff)
wavHeader = header
return
}
package initServer
import (
_ "embed"
"flag"
"github.com/go-ini/ini"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"os"
"runtime"
"strings"
"sync"
"time"
)
////go:embed privateKey.pem
//var privateKey []byte
//go:embed mysqlData.yaml
var mysqlData []byte
// 配置文件数据结构
var (
Version bool
mysqlConfig string
Config = flag.String("config", "./config.ini", "the server config")
ConfigMap = make(map[string]interface{})
MysqlMap = make(map[string]interface{})
//LicenseMap map[string]interface{}
ZhCn = make(map[string]map[string]interface{})
ZhCnLock sync.Mutex
ZhHk = make(map[string]map[string]interface{})
ZhHkLock sync.Mutex
EN = make(map[string]map[string]interface{})
ENLock sync.Mutex
StartSignal = make(chan struct{}, 1)
)
func traceMemStats() {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
log.Printf("Alloc:%d(bytes) HeapIdle:%d(bytes) HeapReleased:%d(bytes)", ms.Alloc, ms.HeapIdle, ms.HeapReleased)
}
// TODO: 配置文件处理
func GetConfig() {
var (
conf *ini.File
err error
//getLicense string
//rasData []byte
)
/////////////////////////////////////////
traceMemStats()
if conf, err = ini.Load(*Config); err != nil {
panic(err.Error())
}
if conf != nil {
ConfigMap["websocketLog"] = conf.Section("daemon").Key("websocketLog").String()
// websocket 共用配置
ConfigMap["port"] = conf.Section("websocketCommon").Key("port").String()
ConfigMap["language"] = conf.Section("websocketCommon").Key("language").String()
ConfigMap["logPath"] = conf.Section("websocketCommon").Key("logPath").String()
ConfigMap["pcmPath"] = conf.Section("websocketCommon").Key("pcmPath").String()
ConfigMap["redisChannel"] = conf.Section("websocketCommon").Key("redisChannel").String()
ConfigMap["redisURL"] = conf.Section("websocketCommon").Key("redisURL").String()
ConfigMap["redisPasswd"] = conf.Section("websocketCommon").Key("redisPasswd").String()
ConfigMap["tokenVerityURL"] = conf.Section("websocketCommon").Key("tokenVerityURL").String()
ConfigMap["hotWordTextDir"] = conf.Section("websocketCommon").Key("hotWordTextDir").String()
ConfigMap["hotWordURL"] = conf.Section("websocketCommon").Key("hotWordURL").String()
ConfigMap["postURL"] = conf.Section("websocketCommon").Key("postURL").String()
ConfigMap["enPostURL"] = conf.Section("websocketCommon").Key("enPostURL").String()
// 中文引擎
ConfigMap["digitConvertURL-ZH-CN"] = conf.Section("websocketZH-CN").Key("digitConvertURL").String()
ConfigMap["hotWord-ZH-CN"] = conf.Section("websocketZH-CN").Key("defaultHotWordFile").String()
ConfigMap["sensitiveWordURL-ZH-CN"] = conf.Section("websocketZH-CN").Key("sensitiveWordURL").String()
ConfigMap["punctuationURL-ZH-CN"] = conf.Section("websocketZH-CN").Key("punctuationURL").String()
ConfigMap["websocketArrZH-CN"] = strings.Split(conf.Section("websocketZH-CN").Key("websocketArr").String(), ",")
// 粤语迎请
ConfigMap["websocketArrZH-HK"] = strings.Split(conf.Section("websocketZH-HK").Key("websocketArr").String(), ",")
ConfigMap["hotWord-ZH-HK"] = conf.Section("websocketZH-HK").Key("defaultHotWordFile").String()
ConfigMap["digitConvertURL-ZH-HK"] = conf.Section("websocketZH-HK").Key("digitConvertURL").String()
ConfigMap["sensitiveWordURL-ZH-HK"] = conf.Section("websocketZH-HK").Key("sensitiveWordURL").String()
ConfigMap["punctuationURL-ZH-HK"] = conf.Section("websocketZH-HK").Key("punctuationURL").String()
// 英文引擎
ConfigMap["EnWebsocketArr"] = strings.Split(conf.Section("websocketEN").Key("websocketArr").String(), ",")
ConfigMap["hotWord-EN"] = conf.Section("websocketEN").Key("defaultHotWordFile").String()
ConfigMap["e2nURL"] = conf.Section("websocketEN").Key("e2nURL").String()
ConfigMap["EnPunctuationURL"] = conf.Section("websocketEN").Key("punctuationURL").String()
//获取许可证
ConfigMap["license"] = conf.Section("common").Key("license").String()
} else {
panic("config error")
}
if e := yaml.Unmarshal(mysqlData, &MysqlMap); e != nil {
panic(e)
}
go func() {
// 创建文件夹
_, e := os.Stat(ConfigMap["logPath"].(string))
if os.IsNotExist(e) {
if e = os.MkdirAll(ConfigMap["logPath"].(string), os.ModePerm); e != nil {
logrus.Println(e.Error())
}
}
//_, e = os.Stat(ConfigMap["websocketLog"].(string))
//if os.IsNotExist(e) {
// if e = os.MkdirAll(ConfigMap["websocketLog"].(string), os.ModePerm); e != nil {
// log.Println(e.Error())
// }
//}
_, e = os.Stat(ConfigMap["pcmPath"].(string))
if os.IsNotExist(e) {
if e = os.MkdirAll(ConfigMap["pcmPath"].(string), os.ModePerm); e != nil {
logrus.Println(e.Error())
}
}
_, e = os.Stat(ConfigMap["hotWordTextDir"].(string))
if os.IsNotExist(e) {
if e = os.MkdirAll(ConfigMap["hotWordTextDir"].(string), os.ModePerm); e != nil {
logrus.Println(e.Error())
}
}
}()
//if getLicense, err = readLicense(); err != nil {
// panic("get license error: " + err.Error())
//}
//// 解析ras
//if rasData, err = RsaDecrypt(getLicense); err != nil {
// panic("RAS decryption error")
//}
//if err = json.Unmarshal(rasData, &LicenseMap); err != nil {
// panic("[]byte to map error: " + err.Error())
//}
// 中文迎请探针
go func(arr []string) {
for _, ip := range arr {
value := make(map[string]interface{})
value["num"] = 0
value["alive"] = true
ZhCn[ip] = value
}
for {
var group sync.WaitGroup
ZhCnLock.Lock()
for _, ip := range arr {
group.Add(1)
go func(ipStr string) {
if c, _, e := websocket.DefaultDialer.Dial(ipStr, nil); e != nil {
ZhCn[ipStr]["alive"] = false
} else {
if c != nil {
_ = c.Close()
ZhCn[ipStr]["alive"] = true
} else {
ZhCn[ipStr]["alive"] = false
}
}
group.Done()
return
}(ip)
}
group.Wait()
ZhCnLock.Unlock()
time.Sleep(10 * time.Second)
}
}(ConfigMap["websocketArrZH-CN"].([]string))
// 粤语迎请探针
go func(arr []string) {
for _, ip := range arr {
value := make(map[string]interface{})
value["num"] = 0
value["alive"] = true
ZhHk[ip] = value
}
for {
var group sync.WaitGroup
ZhHkLock.Lock()
for _, ip := range arr {
group.Add(1)
go func(ipStr string) {
if c, _, e := websocket.DefaultDialer.Dial(ipStr, nil); e != nil {
ZhHk[ipStr]["alive"] = false
} else {
if c != nil {
_ = c.Close()
ZhHk[ipStr]["alive"] = true
} else {
ZhHk[ipStr]["alive"] = false
}
}
group.Done()
return
}(ip)
}
group.Wait()
ZhHkLock.Unlock()
time.Sleep(10 * time.Second)
}
}(ConfigMap["websocketArrZH-HK"].([]string))
// 英语迎请探针
go func(arr []string) {
for _, ip := range arr {
value := make(map[string]interface{})
value["num"] = 0
value["alive"] = true
EN[ip] = value
}
for {
var group sync.WaitGroup
ENLock.Lock()
for _, ip := range arr {
group.Add(1)
go func(ipStr string) {
if c, _, e := websocket.DefaultDialer.Dial(ipStr, nil); e != nil {
EN[ipStr]["alive"] = false
} else {
if c != nil {
_ = c.Close()
EN[ipStr]["alive"] = true
} else {
EN[ipStr]["alive"] = false
}
}
group.Done()
return
}(ip)
}
group.Wait()
ENLock.Unlock()
time.Sleep(10 * time.Second)
}
}(ConfigMap["EnWebsocketArr"].([]string))
/////////////////////////////////////////
traceMemStats()
}
// TODO:读取license.txt
//func readLicense() (license string, err error) {
// var (
// txt []byte
// )
// if txt, err = ioutil.ReadFile(ConfigMap["license"].(string)); err != nil {
// return
// }
// license = string(txt)
// return
//}
// TODO: 解密
//func RsaDecrypt(ciphertext string) ([]byte, error) {
// //base64解密
// data, _ := base64.URLEncoding.DecodeString(ciphertext)
// //解密
// block, _ := pem.Decode(privateKey)
// if block == nil {
// return nil, errors.New("privateKey error!")
// }
// //解析PKCS1格式的私钥
// priv, err := x509.ParsePKCS1PrivateKey(block.Bytes)
// if err != nil {
// return nil, err
// }
// // 解密
// return rsa.DecryptPKCS1v15(rand.Reader, priv, data)
//}
/* 获取设备唯一标识 */
//// getDeviceID TODO:获取设备ID
//func getDeviceID() (deviceID string, err error) {
// var (
// id string
// cID string
// stdout bytes.Buffer
// cmd *exec.Cmd
// )
// if id, err = machineid.ID(); err != nil {
// return
// }
// command := "hostname"
// cmd = exec.Command("bash", "-c", command)
// cmd.Stdout = &stdout
// if err = cmd.Run(); err != nil {
// err = errors.New("cmd run error")
// cmd = exec.Command("sh", "-c", command)
// cmd.Stdout = &stdout
// if err = cmd.Run(); err != nil {
// return
// }
// }
// cID = strings.Replace(strings.Replace(stdout.String(), "\n", "", -1), " ", "", -1)
// tempId := fmt.Sprintf("%s-%s", id, cID)
// m := md5.New()
// m.Write([]byte(tempId))
// deviceID = hex.EncodeToString(m.Sum(nil))
// return
//}
func init() {
/////////////////////////////////////////
traceMemStats()
flag.BoolVar(&Version, "version", false, "")
mysql := flag.String("mysqlConfig", "", "MySQL connection messages")
flag.Parse()
logrus.SetReportCaller(true)
mysqlConfig = *mysql
if !Version {
var (
//deviceID string
err error
)
if mysqlConfig != "" {
if mysqlData, err = ioutil.ReadFile(mysqlConfig); err != nil {
panic(err.Error())
}
logrus.Println(string(mysqlData))
}
GetConfig()
close(StartSignal)
//if deviceID, err = getDeviceID(); err != nil {
// panic(fmt.Sprintf("get device id error: %v", err.Error()))
//}
//if deviceID != LicenseMap["deviceID"].(string) {
// panic(fmt.Sprintf("the program does not match the server.Please take the deviceID:[%s] to the relevant person to get the license", deviceID))
//}
//go func() {
// for {
// if int(time.Now().Unix())-int(LicenseMap["startTime"].(float64)) > int(LicenseMap["timeOut"].(float64)) {
// log.Println("the license is out of date")
// panic("the license is out of date")
// }
// time.Sleep(5 * time.Second)
//
// }
//}()
}
/////////////////////////////////////////
traceMemStats()
}
address: huisheng_mysql:3306
user: root
passwd: Huisheng@12345
dbName: auth
tableName: auth_info
\ No newline at end of file
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAzkgl6RVRIUcbS+ICwqQo30aDUlegMopOdSqcfmitRFjcDJKg
QVF89acnnhJcHbpFZvv8oil88ezeKiPEAa8KsfZZHPg2H6wqjL5c7kojINbQPgBV
pjHpOwKc4Sbc+A7lO9fFTPJ9XgRZp9NCPejt6UAqjY7sfgwW3p5OPfcfmOfgP+Hq
NR+sVVvSWBx7yigoQFHWinOSdYR3R8OohvinItob7Q0rtUEXqRgO0mgJGvEK0596
G6+YUwZIqYOwallbBrUVQEl1g7Jb7TgwNsCFw1mC7Gw/kJBMxEmlPkESfRozRbwm
pf4ZSMBTQBcf1lrk7ZYvKznXznsWkDLJl7WdtQIDAQABAoIBADsJIvBmDWX8t68r
66ZQ3PGpeIPkWBCezH1zyH3U72GUHMXsYLjQ3XZgdFIk5wALAznJ/C9cuEl0hNCC
k+H3epQb8Lz9vF+sIjFNecMorI0FbWuRJPH/bsRYpxU4zhygBrLtuaWf8IkrrYz+
RQL62rvahVMmCEC6rQDhRXsucniLwGLhSWMPVS4FXB9QsgkAVf6qSpaD9RVv/k1H
Q8MQ9fSdtz/fegvtJeGFXJufy//ZlhObw3PRXb0+9mGSKft65o+XLqkt7+3qyynF
0hYPtNiJMgsmWQs5VM2ZCWg2SassEmkDLJACscNH7QxFI/T9wjsN1/PNThcaq0lJ
zpLu2DECgYEA7X0QrizFuQzfZ91TU5Bx6Hn4grr9zWkyNy3pMJyw/VoGx5nj7/z5
Gkr0aSLGvL8kUPicgOeE9RK+RJvzTdVMLnEc6VTG1qABtq7NIWGnXnCdtaikoWGZ
dwB1dJ9CIdMEd20hbG/9qSkYheInMT9jJVU8/mtDTswAKx7omXT+nYcCgYEA3lxf
ZgMZsA6iYqnIGfPn8e9Ku7H/hULZDRdSEuGI5C426gFI32gJSfvghRQwrBHo7P7b
IZxFCnTK/xPVyFOUN2o4tQ9RJETjWazoI5SYmGNNiZ2jovdoZOAVm/58+ez+5B+A
t9/cDY0Y3aJC6iNrZKCXY4BlcrqfI4NYmmx0WeMCgYAh1McjiM1B50uMCENujA0u
CPIV7X1qXaoBolCt1CSCcui0Vn/aGDeEP1lCAOo+yRvrTP8+fby7DMh+DZlDZTdN
BmAP/Tu2J2UXGR+vubGWCoTs8/E/7B9ojHAvBODu4JiqSqmQQQPprSkJKYZbxaPi
FZpyw+T6vKTpf6Wy9q+u0QKBgQCpeqBfoVdP4CeWqa9RUJzlwqwkWSvXRzcmCjNN
Hsd2m17Rhn8jgCksUaUz6Uqbq97r2+frOy9II/Kg7l5QHJvojLaCEhWXXkYmkGoy
zExuQanUjOkvCgFweXce+Z4J47/24af6/c28KmHcc5Mq4Co3fUtF1JP+GQ24RKEg
ui8IXwKBgEpw4uG0f4+fLlO6v/J8fvhzjsqpP3LL1qee5ULb2uHd6aSpcqmxgGeJ
0iOu/QkBqTzPglVrFZ1saQJWG8vLhrTlfRMK6BJVvdtHQyfe1m4+QcXfb2r9mmEl
Wv3ABuiExTz6XGSZ6vOsV7ICSrjUA/PBhTYbSOT8uYfRtirUtOvY
-----END RSA PRIVATE KEY-----
\ No newline at end of file
package main
import (
"RaisoundWebSocketBase2C/initServer"
"RaisoundWebSocketBase2C/server"
"fmt"
"github.com/gin-gonic/gin"
"log"
"net/http"
_ "net/http/pprof"
"runtime"
)
func traceMemStats() {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
log.Printf("Alloc:%d(bytes) HeapIdle:%d(bytes) HeapReleased:%d(bytes)", ms.Alloc, ms.HeapIdle, ms.HeapReleased)
}
func main() {
if initServer.Version {
log.Println("This websocket is for regular users and was developed on November 28, 2022.")
}
if !initServer.Version {
///
traceMemStats()
if initServer.ConfigMap["language"].(string) == "ALL" || initServer.ConfigMap["language"].(string) == "all" {
r := gin.Default()
r.GET("/chinese/recognize", server.ChineseServerRecognize)
r.GET("/english/recognize", server.EnglishServerRecognize)
r.GET("/cantonese/recognize", server.CantoneseServerRecognize)
//r.POST("/loadFile", server.LoadFile)
r.GET("/getEnd/:meeting_id", server.GetEnd)
r.GET("/getAudioFile", server.GetAudioFile)
r.POST("/removeMeetingId", server.RemoveFile)
r.GET("/getAudioPath/:token/:aid/:bid", server.LoadFile)
r.GET("/loadAudioPath/:aid/:bid", server.LoadFile1)
r.GET("/download", server.DownLoadAudioFile)
err := http.ListenAndServe(fmt.Sprintf(":%v", initServer.ConfigMap["port"].(string)), r)
//err := r.Run(fmt.Sprintf(":%v", initServer.ConfigMap["port"].(string)))
//println(string(debug.Stack()))
if err != nil {
return
}
}
if initServer.ConfigMap["language"].(string) == "EN" || initServer.ConfigMap["language"].(string) == "en" {
r := gin.Default()
r.GET("/english/recognize", server.EnglishServerRecognize)
//r.POST("/loadFile", server.LoadFile)
r.GET("/getEnd/:meeting_id", server.GetEnd)
r.GET("/getAudioFile", server.GetAudioFile)
r.POST("/removeMeetingId", server.RemoveFile)
r.GET("/getAudioPath/:token/:aid/:bid", server.LoadFile)
r.GET("/loadAudioPath/:aid/:bid", server.LoadFile1)
r.GET("/download", server.DownLoadAudioFile)
err := r.Run(fmt.Sprintf(":%v", initServer.ConfigMap["port"].(string)))
//println(string(debug.Stack()))
if err != nil {
return
}
}
if initServer.ConfigMap["language"].(string) == "ZH-CN" || initServer.ConfigMap["language"].(string) == "zh-cn" {
r := gin.Default()
r.GET("/chinese/recognize", server.ChineseServerRecognize)
//r.POST("/loadFile", server.LoadFile)
r.GET("/getEnd/:meeting_id", server.GetEnd)
r.GET("/getAudioFile", server.GetAudioFile)
r.POST("/removeMeetingId", server.RemoveFile)
r.GET("/getAudioPath/:token/:aid/:bid", server.LoadFile)
r.GET("/loadAudioPath/:aid/:bid", server.LoadFile1)
r.GET("/download", server.DownLoadAudioFile)
err := r.Run(fmt.Sprintf(":%v", initServer.ConfigMap["port"].(string)))
//println(string(debug.Stack()))
if err != nil {
return
}
}
if initServer.ConfigMap["language"].(string) == "ZH-HK" || initServer.ConfigMap["language"].(string) == "zh-hk" {
r := gin.Default()
r.GET("/cantonese/recognize", server.CantoneseServerRecognize)
//r.POST("/loadFile", server.LoadFile)
r.GET("/getEnd/:meeting_id", server.GetEnd)
r.GET("/getAudioFile", server.GetAudioFile)
r.POST("/removeMeetingId", server.RemoveFile)
r.GET("/getAudioPath/:token/:aid/:bid", server.LoadFile)
r.GET("/loadAudioPath/:aid/:bid", server.LoadFile1)
r.GET("/download", server.DownLoadAudioFile)
err := r.Run(fmt.Sprintf(":%v", initServer.ConfigMap["port"].(string)))
//println(string(debug.Stack()))
if err != nil {
return
}
}
///
traceMemStats()
}
}
package server
import (
"RaisoundWebSocketBase2C/funcHandler"
"RaisoundWebSocketBase2C/initServer"
"encoding/json"
"errors"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"path"
"regexp"
"strconv"
"strings"
"time"
)
var (
upgrade = &websocket.Upgrader{
// 允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
// 校验参数
func checkParameter(c *gin.Context) (isOk bool, err error) {
var (
urlValue url.Values
userId string
token string
secretId string
signature string
timestamp string
timestampInt int
)
urlValue = c.Request.URL.Query()
token = urlValue.Get("user_id")
userId = urlValue.Get("token")
secretId = urlValue.Get("secretID")
signature = urlValue.Get("signature")
timestamp = urlValue.Get("timestamp")
if token != "" {
if userId != "" {
return funcHandler.CheckToken(token, userId)
} else {
err = errors.New("the user_id is missing")
log.Println(err.Error())
return
}
} else {
if secretId != "" {
if signature != "" {
if timestamp != "" {
if timestampInt, err = strconv.Atoi(timestamp); err != nil {
return
}
if timestampInt+600 <= time.Now().Second() {
err = errors.New("the signature has expired and is not valid")
log.Println(err.Error())
return
}
return funcHandler.CheckSignature(secretId, timestamp, signature)
} else {
err = errors.New("the timestamp is missing")
log.Println(err.Error())
return
}
} else {
err = errors.New("the signature is missing")
log.Println(err.Error())
return
}
} else {
err = errors.New("the secretId is missing or token is missing")
log.Println(err.Error())
return
}
}
return
}
// TODO: 中文实时转写服务
func ChineseServerRecognize(c *gin.Context) {
var (
Conn *websocket.Conn
wsConn *Connection
err error
ok bool
)
if Conn, err = upgrade.Upgrade(c.Writer, c.Request, nil); err != nil {
log.Println(err.Error())
return
}
if ok, err = checkParameter(c); ok {
if wsConn, err = InitConnection(Conn, "ZH-CN", fmt.Sprintf("http://%v/download", c.Request.Host)); err != nil {
return
}
for {
//log.Println("77777777777777777")
if wsConn != nil {
if err = wsConn.Client2InChan(); err != nil {
goto ERR
}
} else {
goto ERR
}
}
ERR:
if !wsConn.sendEndEngine {
wsConn.sendEndEngine = true
if e := wsConn.WriteInChan([]byte(`{"signal" : "end"}`)); e != nil {
log.Println("write inChan error:", e.Error())
}
wsConn.isCloseInChan = true
}
time.Sleep(300 * time.Millisecond)
wsConn.CloseConnection()
} else {
log.Println(err)
if Conn != nil {
if e := Conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"code":412,"desc":"%v."}`, err.Error()))); e != nil {
log.Println(e.Error())
return
}
_ = Conn.Close()
}
return
}
}
// TODO: 英语实时转写服务
func EnglishServerRecognize(c *gin.Context) {
var (
Conn *websocket.Conn
wsConn *Connection
err error
ok bool
)
if Conn, err = upgrade.Upgrade(c.Writer, c.Request, nil); err != nil {
log.Println(err.Error())
return
}
if ok, err = checkParameter(c); ok {
if wsConn, err = InitConnection(Conn, "EN", fmt.Sprintf("http://%v/download", c.Request.Host)); err != nil {
return
}
for {
if wsConn != nil {
if wsConn != nil {
if err = wsConn.Client2InChan(); err != nil {
goto ERR
}
} else {
goto ERR
}
} else {
goto ERR
}
}
ERR:
if !wsConn.sendEndEngine {
wsConn.sendEndEngine = true
if e := wsConn.WriteInChan([]byte(`{"signal" : "end"}`)); e != nil {
log.Println("write inChan error:", e.Error())
}
wsConn.isCloseInChan = true
}
time.Sleep(300 * time.Millisecond)
wsConn.CloseConnection()
} else {
if Conn != nil {
if e := Conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"code":412,"desc":"%v."}`, err.Error()))); e != nil {
log.Println(e.Error())
return
}
_ = Conn.Close()
}
return
}
}
// TODO: 粤语实时转写服务
func CantoneseServerRecognize(c *gin.Context) {
var (
Conn *websocket.Conn
wsConn *Connection
err error
ok bool
)
if Conn, err = upgrade.Upgrade(c.Writer, c.Request, nil); err != nil {
log.Println(err.Error())
return
}
if ok, err = checkParameter(c); ok {
if wsConn, err = InitConnection(Conn, "ZH-HK", fmt.Sprintf("http://%v/download", c.Request.Host)); err != nil {
return
}
for {
if err = wsConn.Client2InChan(); err != nil {
goto ERR
}
}
ERR:
if !wsConn.sendEndEngine {
wsConn.sendEndEngine = true
if e := wsConn.WriteInChan([]byte(`{"signal" : "end"}`)); e != nil {
log.Println("write inChan error:", e.Error())
}
wsConn.isCloseInChan = true
}
time.Sleep(300 * time.Millisecond)
wsConn.CloseConnection()
} else {
if Conn != nil {
if e := Conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"code":412,"desc":"%v."}`, err.Error()))); e != nil {
log.Println(e.Error())
return
}
_ = Conn.Close()
}
return
}
}
// TODO:上传文件热词文件cd
//func LoadFile1(c *gin.Context) {
// var (
// file *multipart.FileHeader
// err error
// )
// if file, err = c.FormFile("file"); err != nil {
// c.JSONP(200, gin.H{
// "status": 400,
// "desc": err.Error(),
// })
// return
// }
// if file != nil {
// if float64(file.Size)/float64(1048576) > 0.2 {
// c.JSONP(200, gin.H{
// "status": 400,
// "desc": "the file is too big.",
// })
// return
// }
// }
// fileUid, _ := uuid.NewV4()
// if err = c.SaveUploadedFile(file, fmt.Sprintf("%v/%v.txt", initServer.ConfigMap["hotWordTextDir"].(string), fileUid.String())); err != nil {
// c.JSONP(400, gin.H{
// "status": 400,
// "desc": err.Error(),
// })
// return
// }
// c.JSONP(200, gin.H{
// "status": 200,
// "file_uid": fileUid.String(),
// })
// return
//}
func LoadFile1(ctx *gin.Context) {
fileIdA := ctx.Param("aid")
fileIdB := ctx.Param("bid")
audioFile := fmt.Sprintf("%v/%v/%v", initServer.ConfigMap["pcmPath"].(string), fileIdA, fileIdB)
ok, _ := funcHandler.ExistsFile(audioFile)
if !ok {
ctx.JSONP(404, gin.H{
"code": 404,
"description": "the file is not existing.",
})
return
}
ctx.Header("Content-Type", "application/octet-stream")
ctx.Header("Content-Disposition", "attachment; filename="+audioFile)
ctx.Header("Content-Transfer-Encoding", "binary")
ctx.FileAttachment(audioFile, fileIdB)
}
func DownLoadAudioFile(ctx *gin.Context) {
urlValue := ctx.Request.URL.Query()
file := urlValue.Get("file")
//file := ctx.Param("file")
ctx.Header("Content-Type", "application/octet-stream")
ctx.Header("Content-Disposition", "attachment; filename="+file)
ctx.Header("Content-Transfer-Encoding", "binary")
ctx.FileAttachment(file, path.Base(file))
}
func LoadFile(ctx *gin.Context) {
token := ctx.Param("token")
fileIdA := ctx.Param("aid")
fileIdB := ctx.Param("bid")
if yes, err := funcHandler.VerityToken(token, initServer.ConfigMap["tokenVerityURL"].(string)); yes && err == nil {
audioFile := fmt.Sprintf("%v/%v/%v", initServer.ConfigMap["pcmPath"].(string), fileIdA, fileIdB)
ctx.Header("Content-Type", "application/octet-stream")
ctx.Header("Content-Disposition", "attachment; filename="+audioFile)
ctx.Header("Content-Transfer-Encoding", "binary")
ctx.FileAttachment(audioFile, fileIdB)
} else if !yes && err == nil {
ctx.JSONP(404, gin.H{
"code": 404,
"description": "file path error.",
},
)
} else {
if err != nil {
if err.Error() != "checking token error, please check server" {
ctx.JSONP(403, gin.H{
"code": 403,
"description": "The address is invalid",
},
)
} else {
ctx.JSONP(400, gin.H{
"code": 400,
"description": err.Error(),
},
)
}
}
}
}
type reqBody struct {
MeetingId string `json:"meeting_id"`
}
func RemoveFile(ctx *gin.Context) {
var (
reqData = &reqBody{}
err error
)
if err = ctx.BindJSON(reqData); err != nil {
ctx.JSONP(400, gin.H{
"status": 400,
"desc": err.Error(),
})
return
}
//if len(strings.Split(initServer.ConfigMap["audioPath"].(string),""))
if reqData.MeetingId == "" || initServer.ConfigMap["pcmPath"].(string) == "" {
ctx.JSONP(400, gin.H{
"status": 400,
"desc": "wrong root directory or meeting_id.",
})
return
}
meetingIdName := fmt.Sprintf("%v/%v", initServer.ConfigMap["pcmPath"].(string), reqData.MeetingId)
if err = os.RemoveAll(meetingIdName); err != nil {
ctx.JSONP(400, gin.H{
"status": 400,
"desc": err.Error(),
})
return
}
ctx.JSONP(200, gin.H{
"status": 200,
"desc": "meeting data is removed successfully",
})
return
}
func GetAudioFile(ctx *gin.Context) {
token := ctx.Request.Header.Get("token")
if token == "" {
ctx.JSONP(400, gin.H{
"code": 400,
"description": "token is empty.",
},
)
} else {
if yes, err := funcHandler.VerityToken(token, initServer.ConfigMap["tokenVerityURL"].(string)); yes && err == nil {
meetingID := ctx.Query("meeting_id")
if meetingID == "" {
ctx.JSONP(400, gin.H{
"code": 400,
"description": "The parameter value is missing.",
},
)
} else {
audioFile := fmt.Sprintf("%v/%v/%vf.wav", initServer.ConfigMap["pcmPath"].(string), meetingID, meetingID)
if exit, _ := funcHandler.ExistsFile(audioFile); exit {
ctx.JSONP(200, gin.H{
"code": 200,
"path": fmt.Sprintf("getAudioPath/%v/%v/%vf.wav", token, meetingID, meetingID),
"data": fmt.Sprintf("http://%v/getAudioPath/%v/%v/%vf.wav", ctx.Request.Host, token, meetingID, meetingID),
"description": "succeed",
},
)
} else {
ctx.JSONP(404, gin.H{
"code": 404,
"description": "The file does not exist.",
},
)
}
}
} else if !yes && err == nil {
ctx.JSONP(401, gin.H{
"code": 401,
"description": "token error.",
},
)
} else if err != nil {
ctx.JSONP(404, gin.H{
"code": 404,
"description": err.Error(),
},
)
}
}
}
func GetEnd(ctx *gin.Context) {
meetingId := ctx.Param("meeting_id")
if meetingId == "" {
ctx.JSONP(
400,
gin.H{
"code": 400,
"description": "the meeting_id is empty,please check the value of meeting_id.",
})
} else {
reCompile, _ := regexp.Compile("_\\d{0,3}$")
var (
tempMeetingId string
tempSpeaker string
)
if reCompile.Match([]byte(meetingId)) {
tempMeetingId = reCompile.ReplaceAllString(meetingId, "")
tempSpeaker = strings.Replace(reCompile.FindString(meetingId), "_", "", -1)
} else {
tempMeetingId = meetingId
tempSpeaker = "false"
}
audioFile := fmt.Sprintf("%v/%v/%vf.wav", initServer.ConfigMap["pcmPath"].(string), meetingId, meetingId)
if exit, _ := funcHandler.ExistsFile(audioFile); exit {
var (
publicMsg = make(map[string]interface{})
endMsg = make(map[string]interface{})
wavStruct *funcHandler.WavStruct
rb *funcHandler.RdbStruct
err error
)
content, _ := ioutil.ReadFile(audioFile)
duration := len(content) - 44
if content[0] == 'R' && content[1] == 'I' && content[2] == 'F' && content[3] == 'F' {
} else {
if wavStruct, err = funcHandler.InitSaveWave(meetingId); err != nil {
ctx.JSONP(400, gin.H{
"code": 400,
"description": err.Error(),
},
)
}
if wavStruct != nil {
_ = wavStruct.CloseWav()
}
}
if rb, err = funcHandler.InitRdb(initServer.ConfigMap["redisChannel"].(string), initServer.ConfigMap["redisURL"].(string), initServer.ConfigMap["redisPasswd"].(string)); err != nil {
ctx.JSONP(400, gin.H{
"code": 400,
"description": err.Error(),
},
)
}
if rb != nil {
timestamp, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", float64(time.Now().UnixMilli())/float64(32000)), 64)
durationTime, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", float64(duration)/float64(1000)), 64)
publicMsg["timestamp"] = timestamp
publicMsg["url"] = ""
publicMsg["raw_text"] = ""
publicMsg["end_of_speech"] = 1
publicMsg["pcmtime"] = 0.0
publicMsg["duration"] = durationTime
publicMsg["meeting_id"] = tempMeetingId
publicMsg["session_id"] = ""
publicMsg["entire_file_path"] = audioFile
publicMsg["end"] = 1
publicMsg["speaker"] = fmt.Sprintf("%v", tempSpeaker) //speaker
data, _ := json.Marshal(publicMsg)
_ = rb.PubMsg(string(data))
endMsg["speaker"] = tempSpeaker
endMsg["meeting_id"] = tempMeetingId
endMsg["file_path"] = audioFile
jsonQueueData, _ := json.Marshal(endMsg)
_ = rb.LeftPubMsg(string(jsonQueueData))
rb.RedisClose()
ctx.JSONP(200, gin.H{
"code": 200,
"description": "succeed",
"meeting_id": meetingId,
})
}
} else {
ctx.JSONP(403, gin.H{
"code": 403,
"description": "can find the meeting",
},
)
}
}
}
package server
import (
"RaisoundWebSocketBase2C/engineServer"
"RaisoundWebSocketBase2C/funcHandler"
"RaisoundWebSocketBase2C/initServer"
"bytes"
"encoding/base64"
"errors"
"fmt"
"github.com/goccy/go-json"
"github.com/gofrs/uuid"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"log"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
type Connection struct {
Conn *websocket.Conn
Engine *engineServer.Engine
Wave *funcHandler.WavStruct
rdb *funcHandler.RdbStruct
audioBuf bytes.Buffer
InChan chan []byte
OutChan chan []byte
CloseChan chan []byte
sendEndEngine bool
isClose bool
isCloseInChan bool
isCloseOutChan bool
isCloseWave bool
sessionID string
meetingId string
entireMeetingId string
speaker string
lang string
mutex sync.Mutex
engineErr bool
downloadUrl string
}
func InitConnection(conn *websocket.Conn, lang, downloadUrl string) (Conn *Connection, err error) {
logrus.Println("HHHHHHHHHHHHHHHHHHHH", "START WEBSOCKET CONNECTION")
var (
uid string
rDB *funcHandler.RdbStruct
)
if rDB, err = funcHandler.InitRdb(initServer.ConfigMap["redisChannel"].(string), initServer.ConfigMap["redisURL"].(string), initServer.ConfigMap["redisPasswd"].(string)); err != nil {
logrus.Errorln(err)
err = nil
}
uid = funcHandler.GetUuid()
Conn = &Connection{
Conn: conn,
rdb: rDB,
InChan: make(chan []byte, 64),
OutChan: make(chan []byte, 64),
CloseChan: make(chan []byte, 1),
sessionID: uid,
lang: lang,
downloadUrl: downloadUrl,
}
go Conn.inChan2EngineLoop()
go Conn.engine2OutChanLoop()
go Conn.outChan2ClientLoop()
go func() {
sendStr := `{"code":200,"results":"hello heartbeat","description":"this is a heartbeat signal that indicates that the connection is still maintained"}`
time.Sleep(30 * time.Second)
for {
//logrus.Println(sendStr)
if e := Conn.WriteOutChan([]byte(sendStr)); e != nil {
logrus.Errorln(e)
goto ERR
}
time.Sleep(5 * time.Second)
}
ERR:
Conn.CloseConnection()
}()
logrus.Println("HHHHHHHHHHHHHHHHHHHH", "INIT WEBSOCKET CONNECTION")
return
}
func (Conn *Connection) WriteInChan(data []byte) (err error) {
if !Conn.isCloseInChan {
select {
case Conn.InChan <- data:
case <-time.After(1 * time.Second):
err = errors.New("connection closed")
}
} else {
err = errors.New("inChan is closed")
}
return
}
func (Conn *Connection) ReadInChan() (data []byte, err error) {
select {
case data = <-Conn.InChan:
case <-Conn.CloseChan:
err = errors.New("connection closed")
}
return
}
func (Conn *Connection) WriteOutChan(data []byte) (err error) {
if !Conn.isCloseOutChan {
select {
case Conn.OutChan <- data:
case <-time.After(1 * time.Second):
err = errors.New("connection closed")
}
} else {
err = errors.New("outChan is closed")
}
return
}
func (Conn *Connection) ReadOutChan() (data []byte, err error) {
select {
case data = <-Conn.OutChan:
//case <-time.After(1 * time.Second):
// err = errors.New("connection closed")
}
return
}
func (Conn *Connection) CloseConnection() {
if Conn.Engine != nil {
logrus.Errorln("ERROR")
_ = Conn.Engine.CloseEngine()
} else {
logrus.Errorln("engine connection Initialization error")
}
Conn.engineErr = true
time.Sleep(300 * time.Millisecond)
_ = Conn.Conn.Close()
if !Conn.isClose {
Conn.isClose = true
time.Sleep(300 * time.Millisecond)
close(Conn.CloseChan)
Conn.isCloseInChan = true
close(Conn.InChan)
close(Conn.OutChan)
Conn.isCloseOutChan = true
}
}
func (Conn *Connection) Client2InChan() (err error) {
var (
engine *engineServer.Engine
wave *funcHandler.WavStruct
recData []byte
recParams = make(map[string]interface{})
action interface{}
audioData interface{}
ok bool
)
if err = Conn.Conn.SetReadDeadline(time.Now().Add(30 * time.Minute)); err != nil {
return
}
if _, recData, err = Conn.Conn.ReadMessage(); err != nil {
//logrus.Println("BBBBBBBBBBBB",err)
return
}
if err = json.Unmarshal(recData, &recParams); err != nil {
return
}
if action, ok = recParams["action"]; ok {
if action.(string) == "InitStreamingASR" {
logrus.Println(string(recData))
// wave音频文件保存处理加载
if meetingId, okMeetingId := recParams["meeting_id"]; okMeetingId {
Conn.entireMeetingId = meetingId.(string)
reCompile, _ := regexp.Compile("_\\d{0,3}$")
if reCompile.Match([]byte(meetingId.(string))) {
Conn.meetingId = reCompile.ReplaceAllString(meetingId.(string), "")
Conn.speaker = strings.Replace(reCompile.FindString(meetingId.(string)), "_", "", -1)
} else {
Conn.meetingId = meetingId.(string)
Conn.speaker = "false"
Conn.entireMeetingId = Conn.meetingId
}
} else {
uid, _ := uuid.NewV1()
Conn.meetingId = uid.String()
Conn.speaker = "false"
Conn.entireMeetingId = Conn.meetingId
}
wave, err = funcHandler.InitSaveWave(Conn.entireMeetingId + time.Now().String())
if wave != nil {
Conn.Wave = wave
}
// 模型加载
if userId, okUserId := recParams["user_id"]; okUserId {
logrus.Println("get user_id: ", userId.(string))
if pathStr, _ := funcHandler.GetHotWordTextPath(userId.(string)); pathStr != "" {
logrus.Println(pathStr)
if engine, err = engineServer.InitEngine(Conn.lang, pathStr); err != nil {
sendStr := `{"code":400,"description":"engine connection Initialization error"}`
if err = Conn.WriteOutChan([]byte(sendStr)); err != nil {
return
}
err = errors.New("engine connection Initialization error")
Conn.engineErr = true
return
}
} else {
logrus.Println("get user_id: ", userId.(string))
path, _ := funcHandler.GetHotWordPath(userId.(string))
if path == "" {
if Conn.lang == "ZH-CN" {
path = initServer.ConfigMap["hotWord-ZH-CN"].(string)
}
if Conn.lang == "ZH-HK" {
path = initServer.ConfigMap["hotWord-ZH-HK"].(string)
}
if Conn.lang == "EN" {
path = initServer.ConfigMap["hotWord-EN"].(string)
}
}
if engine, err = engineServer.InitEngine(Conn.lang, path); err != nil {
sendStr := `{"code":400,"description":"engine connection Initialization error"}`
if err = Conn.WriteOutChan([]byte(sendStr)); err != nil {
return
}
err = errors.New("engine connection Initialization error")
Conn.engineErr = true
return
}
}
} else {
pathStr := ""
if Conn.lang == "ZH-CN" {
pathStr = initServer.ConfigMap["hotWord-ZH-CN"].(string)
}
if Conn.lang == "ZH-HK" {
pathStr = initServer.ConfigMap["hotWord-ZH-HK"].(string)
}
if Conn.lang == "EN" {
pathStr = initServer.ConfigMap["hotWord-EN"].(string)
}
if engine, err = engineServer.InitEngine(Conn.lang, pathStr); err != nil {
sendStr := `{"code":400,"description":"engine connection Initialization error"}`
if err = Conn.WriteOutChan([]byte(sendStr)); err != nil {
return
}
err = errors.New("engine connection Initialization error")
logrus.Errorln(err)
Conn.engineErr = true
return
}
}
Conn.Engine = engine
sendStr := fmt.Sprintf(`{"code":200,"session_id":"%v","meeting_id":"%v","description":"speech recognition initialization succeeded"}`, Conn.sessionID, Conn.meetingId)
if err = Conn.WriteOutChan([]byte(sendStr)); err != nil {
return
}
}
if action.(string) == "StreamingASR" {
if audioData, ok = recParams["audio_data"]; ok {
audio, _ := base64.StdEncoding.DecodeString(audioData.(string))
if err = Conn.WriteInChan(audio); err != nil {
return
}
// 音频保存处理
//go func(w *funcHandler.WavStruct, data []byte) {
// if w != nil {
// if _, e := w.SaveData(data); e != nil {
// wave, _ = funcHandler.InitSaveWave(Conn.entireMeetingId)
// if wave != nil {
// Conn.Wave = wave
// }
// _, _ = w.SaveData(data)
// }
// }
// return
//}(Conn.Wave, audio)
if Conn.Wave != nil {
if _, e := Conn.Wave.SaveData(audio); e != nil {
wave, _ = funcHandler.InitSaveWave(Conn.entireMeetingId)
if wave != nil {
Conn.Wave = wave
}
_, _ = Conn.Wave.SaveData(audio)
}
}
//go func(data []byte) {
// _, _ = Conn.audioBuf.Write(data)
// return
//}(audio)
_, _ = Conn.audioBuf.Write(audio)
}
}
if action.(string) == "Switchover" {
if language, languageOk := recParams["language"]; languageOk {
if !Conn.sendEndEngine {
if e := Conn.WriteInChan([]byte(fmt.Sprintf(`{"signal" : "Switchover","language":"%v"}`, language.(string)))); e != nil {
logrus.Errorln("write inChan error:", e.Error())
}
}
if language.(string) == "ZH-CN" || language.(string) == "ZH-HK" || language.(string) == "EN" {
Conn.lang = language.(string)
}
} else {
jsonStr := fmt.Sprintf(`{"code":400,"desc":"switching languages failed because of incorrect parameters","time,":"%v"}`, time.Now().String())
if err = Conn.WriteOutChan([]byte(jsonStr)); err != nil {
}
Conn.CloseConnection()
}
}
if action.(string) == "HeartBeat" {
//if !Conn.sendEndEngine {
// if e := Conn.WriteInChan([]byte(`{"signal" : "stop"}`)); e != nil {
// logrus.Errorln("write inChan error:", e.Error())
// }
//}
time.Sleep(200 * time.Millisecond)
jsonStr := fmt.Sprintf(`{"code":200,"desc":"heartBeat","time":"%v"}`, time.Now().String())
if err = Conn.WriteOutChan([]byte(jsonStr)); err != nil {
return
}
}
if action.(string) == "PausedASR" {
if !Conn.sendEndEngine {
if e := Conn.WriteInChan([]byte(`{"signal" : "stop"}`)); e != nil {
logrus.Errorln("write inChan error:", e.Error())
}
}
}
if action.(string) == "RelStreamingASR" {
if !Conn.sendEndEngine {
Conn.sendEndEngine = true
if e := Conn.WriteInChan([]byte(`{"signal" : "end"}`)); e != nil {
logrus.Errorln("write inChan error:", e.Error())
}
Conn.isCloseInChan = true
}
time.Sleep(500 * time.Millisecond)
jsonStr := fmt.Sprintf(`{"code":200,"desc":"release streamingASR successfully","time":"%v"}`, time.Now().String())
if err = Conn.WriteOutChan([]byte(jsonStr)); err != nil {
}
Conn.CloseConnection()
}
}
return
}
func (Conn *Connection) inChan2EngineLoop() {
var (
inChanData []byte
err error
)
for {
//logrus.Println("2222222222222222222222222")
if inChanData, err = Conn.ReadInChan(); err != nil {
err = errors.New("read channel closed")
goto ERR
}
if Conn.Engine != nil {
if bytes.Contains(inChanData, []byte("end")) && len(inChanData) < 256 {
logrus.Println(string(inChanData))
if err = Conn.Engine.WriteMsg(inChanData, 1); err != nil {
err = errors.New("engine connection closed")
goto ERR
}
} else if bytes.Contains(inChanData, []byte("stop")) && len(inChanData) < 256 {
logrus.Println(string(inChanData))
if err = Conn.Engine.WriteMsg(inChanData, 2); err != nil {
err = errors.New("engine connection closed")
goto ERR
}
} else if bytes.Contains(inChanData, []byte("language")) && len(inChanData) < 256 {
logrus.Println(string(inChanData))
if err = Conn.Engine.WriteMsg(inChanData, 3); err != nil {
err = errors.New("engine connection closed")
goto ERR
}
} else {
if err = Conn.Engine.WriteMsg(inChanData, 0); err != nil {
err = errors.New("engine connection closed")
goto ERR
}
}
}
}
ERR:
//logrus.Println("222222222222",err)
if !Conn.sendEndEngine {
Conn.sendEndEngine = true
if e := Conn.WriteInChan([]byte(`{"signal" : "end"}`)); e != nil {
logrus.Errorln("write inChan error:", e.Error())
}
Conn.isCloseInChan = true
}
for {
//logrus.Println("33333333333333")
var data []byte
select {
case data = <-Conn.InChan:
case <-time.After(1 * time.Second):
goto END
}
if data != nil {
if string(data) == `{"signal" : "end"}` {
//logrus.Println("agagaagag",string(data))
if Conn.Engine != nil {
if err = Conn.Engine.WriteMsg(data, 1); err != nil {
logrus.Errorln("engine error: ", err.Error())
goto END
}
}
goto END
} else {
if Conn.Engine != nil {
if err = Conn.Engine.WriteMsg(data, 0); err != nil {
logrus.Errorln("engine error: ", err.Error())
goto END
}
}
}
} else {
goto END
}
}
END:
//logrus.Println("3333333333",err)
jsonStr := fmt.Sprintf(`{"code":200,"desc":"connection closed","time":"%v"}`, time.Now().String())
if err = Conn.WriteOutChan([]byte(jsonStr)); err != nil {
logrus.Errorln(err)
}
time.Sleep(300 * time.Millisecond)
if Conn.rdb != nil {
//logrus.Println("closing redis connection.")
Conn.rdb.RedisClose()
}
Conn.CloseConnection()
}
func (Conn *Connection) engine2OutChanLoop() {
var (
hzRegexp, _ = regexp.Compile("([\u4e00-\u9fa5]+)")
recData = make(map[string]interface{})
sendData []byte
err error
//preResult string
//preVadLen int
audioNum = 1
duration float64
num int
backup = make(map[string]interface{})
PostUrl = initServer.ConfigMap["postURL"].(string)
enPostURL = initServer.ConfigMap["enPostURL"].(string)
//audioFIleChan = make(chan map[string]interface{}, 256)
killAudiolen int
)
for {
//logrus.Println("44444444444444444444")
if Conn.Engine != nil {
logrus.Println(recData)
if recData, err = Conn.Engine.ReadMsg(); err != nil {
err = errors.New("read engine message error")
goto ERR
}
recData["session_id"] = Conn.sessionID
recData["meeting_id"] = Conn.meetingId
recData["code"] = 200
if isFinal, okIsFinal := recData["is_final"]; okIsFinal {
if isFinal.(float64) == 1 {
if Conn.lang == "ZH-CN" {
if _, ok := recData["results"]; ok {
if recData["results"].(string) != "" {
reqBody := funcHandler.PostReq{}
reqBody.Texts = recData["results"].(string)
reqBody.Mode = 1
reqBody.Backup = backup
if resp, e := funcHandler.PostGet(reqBody, PostUrl); e != nil {
backup = make(map[string]interface{})
} else {
if resp != nil {
if resp.Status == 200 {
backup = resp.Backup
recData["results"] = resp.Result
} else {
backup = make(map[string]interface{})
}
} else {
backup = make(map[string]interface{})
}
}
}
//if recData["results"].(string) != "" {
// if hzRegexp.MatchString(recData["results"].(string)) {
// if recData["results"], preResult, err = funcHandler.ChineseHandler(recData["results"].(string), preResult, true, true, true); err != nil {
// //logrus.Println(err.Error())
// }
// } else {
// if recData["results"], preResult, preVadLen, err = funcHandler.EnglishHandler(recData["results"].(string), preResult, preVadLen, true, true); err != nil {
// //logrus.Println(err.Error())
// }
// recData["results"] = recData["results"].(string) + "."
// preResult = ""
// preVadLen = 0
// }
//}
} else {
continue
}
}
if Conn.lang == "ZH-HK" {
if _, ok := recData["results"]; ok {
if recData["results"].(string) != "" {
reqBody := funcHandler.PostReq{}
reqBody.Texts = recData["results"].(string)
reqBody.Mode = 1
reqBody.Backup = backup
if resp, e := funcHandler.PostGet(reqBody, PostUrl); e != nil {
backup = make(map[string]interface{})
} else {
if resp != nil {
if resp.Status == 200 {
backup = resp.Backup
recData["results"] = resp.Result
} else {
backup = make(map[string]interface{})
}
} else {
backup = make(map[string]interface{})
}
}
}
//if recData["results"].(string) != "" {
// if hzRegexp.MatchString(recData["results"].(string)) {
// if recData["results"], preResult, err = funcHandler.CantoneseHandler(recData["results"].(string), preResult, true, true, true); err != nil {
// //logrus.Println(err.Error())
// }
// } else {
// if recData["results"], preResult, preVadLen, err = funcHandler.EnglishHandler(recData["results"].(string), preResult, preVadLen, true, true); err != nil {
// //logrus.Println(err.Error())
// }
// recData["results"] = recData["results"].(string) + "."
// preResult = ""
// preVadLen = 0
// }
//}
} else {
continue
}
}
if Conn.lang == "EN" {
if _, ok := recData["results"]; ok {
if recData["results"].(string) != "" {
if hzRegexp.MatchString(recData["results"].(string)) {
reqBody := funcHandler.PostReq{}
reqBody.Texts = recData["results"].(string)
reqBody.Mode = 1
reqBody.Backup = backup
if resp, e := funcHandler.PostGet(reqBody, PostUrl); e != nil {
backup = make(map[string]interface{})
} else {
if resp != nil {
if resp.Status == 200 {
backup = make(map[string]interface{})
recData["results"] = resp.Result
} else {
backup = make(map[string]interface{})
}
} else {
backup = make(map[string]interface{})
}
}
recData["results"] = recData["results"].(string) + "。"
backup = make(map[string]interface{})
} else {
//if recData["results"], preResult, preVadLen, err = funcHandler.EnglishHandler(recData["results"].(string), preResult, preVadLen, true, true); err != nil {
// //logrus.Println(err.Error())
//}
reqBody := funcHandler.PostReq{}
reqBody.Texts = recData["results"].(string)
reqBody.Mode = 1
reqBody.Backup = backup
if resp, e := funcHandler.PostGet(reqBody, enPostURL); e != nil {
backup = make(map[string]interface{})
} else {
if resp != nil {
if resp.Status == 200 {
backup = resp.Backup
recData["results"] = resp.Result
} else {
backup = make(map[string]interface{})
}
} else {
backup = make(map[string]interface{})
}
}
}
}
} else {
continue
}
}
logrus.Println(recData)
if recData["results"].(string) == "" {
if Conn.audioBuf.Len() > 32000*120 {
_ = Conn.audioBuf.Next(32000 * 6)
killAudiolen = killAudiolen + 32000*6
log.Println("已经提取的音频长度:", killAudiolen)
}
} else {
emptyLen := 0
duration = 0
var audio []byte
if int(32*recData["start"].(float64)) > killAudiolen {
// 如果当前文本的开始时刻置换的长度比已经提取出来的音频长度大的处理逻辑
emptyLen = int(32*recData["start"].(float64)) - killAudiolen
_ = Conn.audioBuf.Next(emptyLen)
killAudiolen = int(recData["start"].(float64) * 32)
duration = (recData["end"].(float64) - recData["start"].(float64)) * 32
} else {
// 如果当前文本的开始时刻置换的长度比已经提取出来的音频长度小的处理逻辑
duration = 32*recData["end"].(float64) - float64(killAudiolen)
}
log.Println("返回的开始长度:", int(32*recData["start"].(float64)), "返回的结束长度:", int(32*recData["end"].(float64)), "缓存通道剩余长度:", Conn.audioBuf.Len(), "已经提取的音频长度:", killAudiolen, "要提取的多余长度:", emptyLen, "要取的实际长度:", duration)
if duration > 0 {
audio = Conn.audioBuf.Next(int(duration))
}
killAudiolen = int(recData["end"].(float64) * 32)
sFile := ""
if sFile, err = funcHandler.SaveAudioData(Conn.entireMeetingId, Conn.sessionID, audio, audioNum); err != nil {
logrus.Errorln(err.Error())
}
recData["audioFileURL"] = fmt.Sprintf("%v?file=%v", Conn.downloadUrl, sFile)
audioNum++
durationFloat, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", recData["end"].(float64)/float64(1000)), 64)
pcmTime, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", duration/float64(32000)), 64)
if Conn.rdb != nil {
timestamp, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", float64(time.Now().UnixMilli())/float64(1000)), 64)
rdbStr := fmt.Sprintf(`{"timestamp":%v,"duration":%v,"url":"%v","raw_text":"%v","end_of_speech":%v,"pcmtime":%v,"meeting_id":"%v","session_id":"%v","entire_file_path":"%v","end":%v,"speaker":"%v"}`, timestamp, durationFloat, sFile, recData["results"].(string), 1, pcmTime, Conn.meetingId, Conn.sessionID, Conn.Wave.EFile, 0, Conn.speaker)
log.Println(rdbStr)
_ = Conn.rdb.PubMsg(rdbStr)
}
audio = nil
}
////logrus.Println("audio buff:", Conn.audioBuf.Len(), "duration:", audioLen, "startTime:", recData["start"], "endTime", recData["end"])
//buff := Conn.audioBuf.Len()
//if recData["results"].(string) == "" || recData["results"].(string) == "!!!!!!!!!!!!!" {
// // 硬拼保存
// audioLen := int(recData["duration"].(float64))
// var (
// audio []byte
// sFile string
// )
// if buff >= audioLen {
// audio = Conn.audioBuf.Next(audioLen)
// duration = duration + float64(audioLen)
// } else {
// audio = Conn.audioBuf.Bytes()
// duration = duration + float64(len(audio))
// }
// log.Println("HHHHHHHHHHHHHHHHHHHHH", fmt.Sprintf("缓存通道大小:%v,缓存通道剩余大小:%v,当前获取音频大小:%v,前总音频大小:%v,后总音频大小:%v", buff, Conn.audioBuf.Len(), audioLen, duration-float64(audioLen), duration), "startTime:", recData["start"], "endTime", recData["end"], "length:", recData["end"].(float64)*32, recData["results"].(string))
// if recData["results"].(string) == "!!!!!!!!!!!!!" {
// log.Println(recData)
// if sFile, err = funcHandler.SaveAudioData(Conn.entireMeetingId, Conn.sessionID, audio, audioNum); err != nil {
// logrus.Errorln(err.Error())
// }
// audioNum++
// pcmTime := recData["end"].(float64) - recData["start"].(float64)
// pcmTime, _ = strconv.ParseFloat(fmt.Sprintf("%.4f", pcmTime/float64(1000)), 64)
// durationFloat, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", duration/float64(32000)), 64)
// cacheTemp := make(map[string]interface{})
// cacheTemp["file"] = sFile
// cacheTemp["startTime"] = recData["start"].(float64)
// cacheTemp["endTime"] = recData["end"].(float64)
// cacheTemp["timeLength"] = pcmTime
// cacheTemp["duration"] = durationFloat
// audioFIleChan <- cacheTemp
// recData["results"] = ""
// }
//}
////log.Println("audio buff:", buff, "get audio:", len(audio), "audio buff balance:", Conn.audioBuf.Len(), "audioLen:", audioLen, "duration:", fmt.Sprintf("%v", duration), "startTime:", recData["start"], "endTime", recData["end"])
//if (recData["results"].(string) != "" && recData["results"].(string) != "!!!!!!!!!!!!!") || recData["results"].(string) == "@@@@" {
// if len(audioFIleChan) > 0 {
// //log.Println("audio buff:", buff, "get audio:", len(audio), "audio buff balance:", Conn.audioBuf.Len(), "audioLen:", audioLen, "duration:", fmt.Sprintf("%v", duration), "startTime:", recData["start"], "endTime", recData["end"], "file:", sFile, "result:", recData["results"].(string))
// redisSend := <-audioFIleChan
// log.Println(recData, redisSend)
// if recData["results"].(string) != "@@@@" {
// if Conn.rdb != nil {
// timestamp, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", float64(time.Now().UnixMilli())/float64(1000)), 64)
// pcmTime := redisSend["timeLength"]
// durationFloat := redisSend["duration"]
// rdbStr := fmt.Sprintf(`{"timestamp":%v,"duration":%v,"url":"%v","raw_text":"%v","end_of_speech":%v,"pcmtime":%v,"meeting_id":"%v","session_id":"%v","entire_file_path":"%v","end":%v,"speaker":"%v"}`, timestamp, durationFloat, redisSend["file"], recData["results"].(string), 1, pcmTime, Conn.meetingId, Conn.sessionID, Conn.Wave.EFile, 0, Conn.speaker)
// _ = Conn.rdb.PubMsg(rdbStr)
// }
// }
// }
//}
}
if isFinal.(float64) == 3 {
err = errors.New("end engine")
goto ERR
}
if sendData, err = json.Marshal(recData); err != nil {
err = errors.New("parsing error")
continue
}
if err = Conn.WriteOutChan(sendData); err != nil {
err = errors.New("send data to outer Channel error")
goto ERR
}
}
} else {
if num > 30 {
err = errors.New("engine not init")
goto ERR
}
time.Sleep(1 * time.Second)
num++
}
if Conn.engineErr {
time.Sleep(300 * time.Millisecond)
err = errors.New("engine closed")
goto ERR
}
}
ERR:
//logrus.Println("44444444444444",err)
var sFile string
audio := Conn.audioBuf.Bytes()
durationFloat, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", (duration+float64(len(audio)))/float64(32000)), 64)
if sFile, err = funcHandler.SaveAudioData(Conn.entireMeetingId, Conn.sessionID, audio, audioNum); err != nil {
logrus.Errorln(err.Error())
}
if Conn.rdb != nil {
if Conn.Wave != nil {
if !Conn.isCloseWave {
_ = Conn.Wave.CloseWav()
Conn.isCloseWave = true
}
timestamp, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", float64(time.Now().UnixMilli())/float64(1000)), 64)
rdbStr := fmt.Sprintf(`{"timestamp":%v,"duration":%v,"url":"%v","raw_text":"%v","end_of_speech":%v,"pcmtime":%v,"meeting_id":"%v","session_id":"%v","entire_file_path":"%v","end":%v,"speaker":"%v"}`, timestamp, durationFloat, sFile, "", 1, 0, Conn.meetingId, Conn.sessionID, Conn.Wave.EFile, 1, Conn.speaker)
_ = Conn.rdb.PubMsg(rdbStr)
rdbEnd := fmt.Sprintf(`{"meeting_id":"%v","file_path":"%v","speaker":"%v"}`, Conn.meetingId, Conn.Wave.EFile, Conn.speaker)
_ = Conn.rdb.LeftPubMsg(rdbEnd)
} else {
if Conn.meetingId == "" {
uid, _ := uuid.NewV1()
Conn.meetingId = uid.String()
}
Conn.Wave, _ = funcHandler.InitSaveWave(Conn.entireMeetingId)
//logrus.Println(Conn.meetingId)
timestamp, _ := strconv.ParseFloat(fmt.Sprintf("%.4f", float64(time.Now().UnixMilli())/float64(1000)), 64)
rdbStr := fmt.Sprintf(`{"timestamp":%v,"duration":%v,"url":"%v","raw_text":"%v","end_of_speech":%v,"pcmtime":%v,"meeting_id":"%v","session_id":"%v","entire_file_path":"%v","end":%v,"speaker":"%v"}`, timestamp, durationFloat, sFile, "", 1, 0, Conn.meetingId, Conn.sessionID, Conn.Wave.EFile, 1, Conn.speaker)
_ = Conn.rdb.PubMsg(rdbStr)
_ = Conn.Wave.CloseWav()
}
}
Conn.CloseConnection()
}
func (Conn *Connection) outChan2ClientLoop() {
var (
recData []byte
err error
)
for {
//logrus.Println("555555555555555555555555555")
if recData, err = Conn.ReadOutChan(); err != nil {
err = errors.New("read outChan message error")
goto ERR
}
//logrus.Println(string(recData))
if err = Conn.Conn.SetWriteDeadline(time.Now().Add(30 * time.Minute)); err != nil {
err = errors.New("the engine connection closed")
goto ERR
}
if err = Conn.Conn.WriteMessage(websocket.TextMessage, recData); err != nil {
err = errors.New("the connection closed")
goto ERR
}
}
ERR:
//logrus.Println("66666666666666666666",err)
for {
//logrus.Println("6666666666666666666")
var (
recData1 []byte
)
select {
case recData1 = <-Conn.OutChan:
case <-time.Tick(300 * time.Millisecond):
goto End
}
if recData1 != nil {
if err = Conn.Conn.SetWriteDeadline(time.Now().Add(3 * time.Minute)); err != nil {
err = errors.New("the engine connection closed")
goto ERR
}
if err = Conn.Conn.WriteMessage(websocket.TextMessage, recData1); err != nil {
err = errors.New("the connection closed")
goto ERR
}
} else {
goto End
}
}
End:
Conn.CloseConnection()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment