Commit 387a7370 by zhengcheng.wang

Initial commit

parents
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/gRPC-voice-assistants-second-pass.iml" filepath="$PROJECT_DIR$/.idea/gRPC-voice-assistants-second-pass.iml" />
</modules>
</component>
</project>
\ No newline at end of file
File added
package main
import (
"context"
"fmt"
streamRecognize "gRPC-voice-assistants-second-pass/protobufHandler"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"time"
)
func main() {
client, err := grpc.Dial("localhost:20065", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalln(err)
}
defer client.Close()
c := streamRecognize.NewPipelineClient(client)
p, _ := c.Inference(context.Background())
p.Send(&streamRecognize.DataRequest{Desc: "configs", Data: []byte(fmt.Sprintf(`{"meeting_id":"%v","mode":1,"is_not_vad":%v}`, time.Now().Format("2006-01-02T15-04-05"), true))})
data := make([]byte, 3200)
for i := 0; i < 100; i++ {
p.Send(&streamRecognize.DataRequest{Desc: "audio_internal", Data: data})
}
time.Sleep(3 * time.Second)
defer p.CloseSend()
}
#gRPC协议接口启动监听的接口。
#如若用docker跑动这个协议接口,切记端口映射,容器端口即为此监听的端口。
port: 20065
#这个是redis的基本配置。除了ip、port、password和db外,还包括连接池(openConn)的大小和空闲连接数(idleConn)的大小。
#openConn建议用100,idleConn建议用10。这部分主要是提高redis连接质量。
#这部分目前是预留功能,暂时没有应用到实际上,在配置中暂时跳过。
redisConfig:
ip: huisheng_redis
port: 6379
password: ""
db: 0
openConn: 100
idleConn: 10
#这部分主要配置转写引擎,目前仅支持中文的引擎配置。
#mode配负载均衡的策略,预定有四种,分别是一致性哈希算法(hash)、轮询(polling)、随机(random)、权重轮询(weight_polling)、最小连接数(min_connecting),目前暂不支持配置,属于预留功能。
#urlArr是引擎配置集群,目前仅支持一个,后续可以以同样的格式进行配置。
#切记ip的配置,如果用docker容器之间的网络连接,记得配置docker的net以及ip的替换。
engineConfig:
mode: hash
urlArr:
- ws://172.16.5.177:20086/
hotWord:
getHotWordFileUrl: 'http://'
# 默认热词文本完整地址
defaultWordFile:
#后处理配置,目前只有一个。
postProcessing:
postUrl: http://172.16.5.177:8141/encode
#音频文件保存地址等基本配置。
#在docker中注意数据卷的配置。
#audioDir为音频文件保存地址目录。
fileConfig:
audioDir: .\ #/opt/huisheng/data/wav/audio/20065
\ No newline at end of file
module gRPC-voice-assistants-second-pass
go 1.20
require (
github.com/bytedance/sonic v1.8.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/dlclark/regexp2 v1.9.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.9.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.11.2 // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.9 // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.8.0 h1:ea0Xadu+sHlu7x5O3gKhRpQ1IKiMrSiHttPF0ybECuA=
github.com/bytedance/sonic v1.8.0/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.9.0 h1:pTK/l/3qYIKaRXuHnEnIf7Y5NxfRPfpb7dis6/gdlVI=
github.com/dlclark/regexp2 v1.9.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8=
github.com/gin-gonic/gin v1.9.0/go.mod h1:W1Me9+hsUSyj3CePGrd1/QrKJMSJ1Tu/0hFEH89961k=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.11.2 h1:q3SHpufmypg+erIExEKUmsgmhDTyhcJ38oeKGACXohU=
github.com/go-playground/validator/v10 v10.11.2/go.mod h1:NieE624vt4SCTJtD87arVLvdmjPAeV8BQlHtMnw9D7s=
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.9 h1:rmenucSohSTiyL09Y+l2OCk+FrMxGMzho2+tjr5ticU=
github.com/ugorji/go/codec v1.2.9/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd h1:sLpv7bNL1AsX3fdnWh9WVh7ejIzXdOc1RRHGeAmeStU=
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
package initServerConfig
import (
"flag"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
"io/ioutil"
)
type configBase struct {
Port string `yaml:"port"`
RedisConfig redis `yaml:"redisConfig"`
EngineConfig engine `yaml:"engineConfig"`
HotWord struct {
GetHotWordFileUrl string `yaml:"getHotWordFileUrl"`
DefaultWordFile string `yaml:"defaultWordFile"`
} `yaml:"hotWord"`
PostProcessing post `yaml:"postProcessing"`
FileConfig file `yaml:"fileConfig"`
}
type redis struct {
Ip string `yaml:"ip"`
Port int `yaml:"port"`
Password string `yaml:"password"`
Db int `yaml:"db"`
OpenConn int `yaml:"openConn"`
IdleConn int `yaml:"idleConn"`
}
type engine struct {
Mode string `yaml:"mode"`
UrlArr []string `yaml:"urlArr"`
}
type post struct {
PostUrl string `yaml:"postUrl"`
}
type file struct {
AudioDir string `yaml:"audioDir"`
}
func getConfig(yamlPath string) (configStruct *configBase, err error) {
var (
fileData []byte
)
configStruct = &configBase{}
if fileData, err = ioutil.ReadFile(yamlPath); err != nil {
return
}
if err = yaml.Unmarshal(fileData, configStruct); err != nil {
return
}
return
}
var (
StartCh = make(chan []byte, 1)
config string
Config *configBase
err error
)
func init() {
logrus.SetReportCaller(true)
logrus.SetFormatter(&logrus.TextFormatter{
//以下设置只是为了使输出更美观
ForceColors: true,
ForceQuote: true,
DisableColors: true,
DisableQuote: true,
TimestampFormat: "2006-01-02 15:04:05.000",
})
flag.StringVar(&config, "config", "./config.yaml", "The full path to the program's configuration file")
flag.Parse()
if Config, err = getConfig(config); err != nil {
logrus.Errorln("get configuration error: ", err)
}
logrus.Println(Config)
close(StartCh)
}
package loadBalancingHandler
import (
"gRPC-voice-assistants-second-pass/initServerConfig"
"github.com/sirupsen/logrus"
)
var (
Load *HashConsistency
)
func init() {
var (
err error
)
select {
case <-initServerConfig.StartCh:
}
if Load, err = NewHaseConsistency(200, initServerConfig.Config.EngineConfig.UrlArr...); err != nil {
logrus.Fatalln(err)
}
}
package loadBalancingHandler
import (
"errors"
"hash/crc32"
"log"
"net"
"net/url"
"sort"
"strconv"
"sync"
"time"
)
type Uint32Slice []uint32
func (s Uint32Slice) Len() int {
return len(s)
}
func (s Uint32Slice) Less(i, j int) bool {
return s[i] < s[j]
}
func (s Uint32Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
type HashConsistency struct {
replicas int
addrArr []string
keySlice Uint32Slice
sliceLock sync.Mutex
hashMap sync.Map
}
func NewHaseConsistency(replicas int, urls ...string) (hash *HashConsistency, err error) {
hash = &HashConsistency{
replicas: replicas,
}
for _, v := range urls {
addr := func(URL string) string {
U, e := url.Parse(URL)
if e != nil {
return URL
}
return U.Host
}(v)
hash.addrArr = append(hash.addrArr, addr)
hash.hashMap.Store(addr, v)
}
go func() {
_ = hash.AddNode(hash.addrArr...)
}()
go hash.keepalive()
return
}
func (hash *HashConsistency) keepalive() {
for {
var (
groupWait sync.WaitGroup
)
for _, v := range hash.addrArr {
addr := v
groupWait.Add(1)
go func(str string) {
if conn, err := net.DialTimeout("tcp", str, 1*time.Second); err != nil {
go func() {
_ = hash.DeleteNode(str)
}()
} else {
if conn == nil {
go func() {
_ = hash.DeleteNode(str)
}()
} else {
go func() {
_ = hash.AddNode(str)
}()
}
}
groupWait.Done()
}(addr)
}
groupWait.Wait()
time.Sleep(10 * time.Second)
}
}
func (hash *HashConsistency) hashFn(addr string) (hashValue uint32) {
hashValue = crc32.ChecksumIEEE([]byte(addr))
return
}
func (hash *HashConsistency) getIdx(key uint32) (idx int, err error) {
idx = sort.Search(len(hash.keySlice), func(i int) bool {
return hash.keySlice[i] >= key
})
if idx == len(hash.keySlice) {
err = errors.New("can not find the value")
}
return
}
func (hash *HashConsistency) AddNode(args ...string) (err error) {
hash.sliceLock.Lock()
defer hash.sliceLock.Unlock()
if len(args) <= 0 {
err = errors.New("the parameter is empty")
return
}
for _, addr := range args {
getAddr := func(URL string) string {
U, e := url.Parse(URL)
if e != nil {
return URL
}
return U.Host
}(addr)
if getAddr != "" {
for i := 0; i <= hash.replicas; i++ {
if i != 0 {
getAddr = getAddr + strconv.Itoa(i)
}
hashValue := hash.hashFn(getAddr)
hash.keySlice = append(hash.keySlice, hashValue)
hash.hashMap.Store(hashValue, addr)
}
}
}
sort.Sort(hash.keySlice)
return
}
func (hash *HashConsistency) DeleteNode(args ...string) (err error) {
hash.sliceLock.Lock()
defer hash.sliceLock.Unlock()
if len(args) <= 0 {
err = errors.New("the parameter is empty")
return
}
for _, addr := range args {
getAddr := func(URL string) string {
U, e := url.Parse(URL)
if e != nil {
return URL
}
return U.Host
}(addr)
if getAddr != "" {
for i := 0; i <= hash.replicas; i++ {
if i != 0 {
getAddr = getAddr + strconv.Itoa(i)
}
addrHashValue := hash.hashFn(getAddr)
if idx, e := hash.getIdx(addrHashValue); e != nil {
continue
} else {
hash.hashMap.Delete(addrHashValue)
hash.keySlice = append(hash.keySlice[:idx], hash.keySlice[idx+1:]...)
}
}
}
}
sort.Sort(hash.keySlice)
return
}
func (hash *HashConsistency) GetNode(key string) (addr string, err error) {
hash.sliceLock.Lock()
defer hash.sliceLock.Unlock()
var (
ok bool
getAddr any
ipPort any
)
if len(hash.keySlice) == 0 {
err = errors.New("the node is empty")
return
}
keyHashValue := hash.hashFn(key)
idx := sort.Search(len(hash.keySlice), func(i int) bool {
return hash.keySlice[i] >= keyHashValue
})
if idx == len(hash.keySlice) {
idx = 0
}
if ipPort, ok = hash.hashMap.Load(hash.keySlice[idx]); !ok {
err = errors.New("the node is empty")
log.Println(err, key, hash.keySlice[idx])
return
}
if getAddr, ok = hash.hashMap.Load(ipPort.(string)); !ok {
err = errors.New("the node is empty")
log.Println(err, key, hash.keySlice[idx])
return
}
addr = getAddr.(string)
return
}
package loadBalancingHandler
import (
"fmt"
"log"
"testing"
)
func TestHashConsistency_AddNode(t *testing.T) {
load, _ := NewHaseConsistency(10, "http://192.168.0.61:1000/get", "ws://192.168.0.61:1001/get/qqq", "ws://192.168.0.29:1002", "ws://192.168.0.61:1003", "ws://192.168.0.29:20086/", "192.168.0.61:20000", "http://192.168.0.61:1234/getEncript")
//_ = load.AddNode("192.168.0.61:1000", "192.168.0.61:1001", "192.168.0.61:1002", "192.168.0.61:1003")
//go func() {
// time.Sleep(10 * time.Millisecond)
// _ = load.DeleteNode("192.168.0.61:1000")
// time.Sleep(10 * time.Millisecond)
// _ = load.DeleteNode("192.168.0.61:1001")
// time.Sleep(10 * time.Millisecond)
// _ = load.DeleteNode("192.168.0.61:1002")
// time.Sleep(10 * time.Millisecond)
// _ = load.DeleteNode("192.168.0.61:1004")
// for i := 4; i < 1500; i++ {
// _ = load.AddNode(fmt.Sprintf("192.168.%v.61:10%v", i, i))
// }
//}()
a0 := 0
a1 := 0
a2 := 0
a3 := 0
for i := 0; i < 200; i++ {
addr := fmt.Sprintf("192.168.0.61:%v", i)
str, _ := load.GetNode(addr)
log.Println(addr, str)
//time.Sleep(3 * time.Second)
if str == "192.168.0.61:1000" {
a0++
}
if str == "192.168.0.61:1001" {
a1++
}
if str == "192.168.0.61:1002" {
a2++
}
if str == "192.168.0.61:1003" {
a3++
}
}
log.Println(a0, a1, a2, a3)
}
package loadBalancingHandler
import (
"hash/crc32"
"regexp"
"sync"
)
type Random struct {
lock sync.Mutex
addrSlice []string
keySlice Uint32Slice
addrMap sync.Map
reg *regexp.Regexp
}
func NewRandom(addresses ...string) (random *Random, err error) {
var (
regC *regexp.Regexp
)
if regC, err = regexp.Compile("(\\d{1,2}|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d{1,2}|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d{1,2}|1\\d\\d|2[0-4]\\d|25[0-5])\\.(\\d{1,2}|1\\d\\d|2[0-4]\\d|25[0-5])\\:(\\d+)"); err != nil {
return
}
random = &Random{
reg: regC,
}
for _, v := range addresses {
addr := regC.FindString(v)
random.addrSlice = append(random.addrSlice, addr)
random.addrMap.Store(addr, v)
}
return
}
func (random *Random) hashFn(addr string) (hashValue uint32) {
hashValue = crc32.ChecksumIEEE([]byte(addr))
return
}
package main
import (
"fmt"
"gRPC-voice-assistants-second-pass/initServerConfig"
_ "gRPC-voice-assistants-second-pass/initServerConfig"
"gRPC-voice-assistants-second-pass/server"
"github.com/gin-gonic/gin"
"github.com/soheilhy/cmux"
"log"
"net"
"time"
)
func main() {
var (
err error
lis net.Listener
)
//server.ConfigMap["empty"] = b
if lis, err = net.Listen("tcp", fmt.Sprintf(":%v", initServerConfig.Config.Port)); err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Println("listen: ", fmt.Sprintf(":%v ,time: %v", fmt.Sprintf(":%v", initServerConfig.Config.Port), time.Now().String()))
cm := cmux.New(lis)
grpcL := cm.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
grpcServer := server.GrpcServer()
go func() {
if e := grpcServer.Serve(grpcL); e != nil {
log.Fatalf("failed to serve: %v", e)
}
}()
// http
router := gin.Default()
go func() {
httpL := cm.Match(cmux.HTTP1Fast())
router.GET("/loadAudioFile/:appId/:meetingId", server.LoadAudioFile)
router.GET("/checkServer", server.CheckServer)
_ = router.RunListener(httpL)
}()
_ = cm.Serve()
}
package otherHandler
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/sirupsen/logrus"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"regexp"
"time"
)
type PostReq struct {
Id int `json:"id"`
Texts string `json:"texts"`
Mode int `json:"mode"`
Backup map[string]interface{} `json:"backup"`
}
type PostResp struct {
Status int `json:"status"`
Id int `json:"id"`
Result string `json:"result"`
Raw string `json:"raw"`
Backup map[string]interface{} `json:"backup"`
}
func PostGet(req PostReq, url string) (resp *PostResp, err error) {
var (
jsonByte []byte
reqHttp *http.Request
respHttp *http.Response
respBody []byte
client = &http.Client{
//Timeout: 5*time.Second,
}
)
if jsonByte, err = json.Marshal(req); err != nil {
return
}
logrus.Println("req:", string(jsonByte))
if reqHttp, err = http.NewRequest("POST", url, bytes.NewBuffer(jsonByte)); err != nil {
return
}
reqHttp.Header.Set("Content-Type", "application/json")
if respHttp, err = client.Do(reqHttp); err != nil {
return
}
defer func() {
if respHttp.Body != nil {
_ = respHttp.Body.Close()
}
}()
if respBody, err = ioutil.ReadAll(respHttp.Body); err != nil {
return
}
logrus.Println(string(respBody))
if respHttp.StatusCode == 200 {
resp = &PostResp{}
if err = json.Unmarshal(respBody, resp); err != nil {
return
}
} else {
err = errors.New(respHttp.Status)
}
//logrus.Println(err, fmt.Sprintf("status=%v,result=%v,backup=%v,error=%v", resp.Status, resp.Result, resp.Backup, err.Error()))
return
}
func Find() {
rd, _ := ioutil.ReadDir("D:\\editSoftware\\golang\\goProject\\src")
local, _ := time.LoadLocation("Asia/Shanghai")
layout := "2006-01-02 15:04:05"
startStr := "2022-08-25 10:35:35"
endStr := "2023-03-01"
reg, _ := regexp.Compile("^\\d{4}-\\d{2}-\\d{2}$")
if reg.MatchString(startStr) {
startStr = startStr + " 00:00:00"
}
if reg.MatchString(endStr) {
endStr = endStr + " 23:59:59"
}
start, err := time.ParseInLocation(layout, startStr, local)
if err != nil {
log.Fatalln(err)
}
end, err := time.ParseInLocation(layout, endStr, local)
if err != nil {
log.Fatalln(err)
}
for _, i := range rd {
if start.Before(i.ModTime()) && end.After(i.ModTime()) {
log.Println("name=", i.Name(), ",modTime=", i.ModTime(), ",mode=", i.Mode(), ",size=", i.Size(), ",isDir=", i.IsDir(), ",sys=", i.Sys())
}
}
}
// TODO:保存音频
func SaveAudioData(audioDir, appId, meetingId string, audioData []byte, dataNum int) (singleFileName string, errMsg error) {
var (
err error
sFile *os.File
)
// 确定文件格式
//now := time.Now()
//dataSubx := fmt.Sprintf("%d-%02d-%02d", now.Year(), now.Month(), now.Day())
fileDir := fmt.Sprintf("%v/%v/%v", audioDir, appId, meetingId)
sInt := fmt.Sprintf("%05d", dataNum)
sName := fmt.Sprintf("%v/%v-%v.wav", fileDir, meetingId, sInt)
singleFileName = sName
// 创建文件夹
_, err = os.Stat(fileDir)
if os.IsNotExist(err) {
if err = os.MkdirAll(fileDir, os.ModePerm); err != nil {
fmt.Println(err)
errMsg = err
return
}
}
waveHeader := pcmToWavHeader(1, 16000, 16, len(audioData))
// 将数据存入单独文件
if sFile, err = os.OpenFile(sName, os.O_RDONLY|os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0766); err != nil {
errMsg = err
return
}
if _, err = sFile.Write(waveHeader); err != nil {
if err = sFile.Close(); err != nil {
return
}
errMsg = err
return
}
if _, err = sFile.Write(audioData); err != nil {
if err = sFile.Close(); err != nil {
return
}
errMsg = err
return
}
if err = sFile.Close(); err != nil {
errMsg = err
return
}
return
}
// TODO:判断文件是否存在
func ExistsFile(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
/*
*
dst:二进制字符串
numChannel:1=单声道,2=多声道
sampleRate:采样率 8000/16000
*/
//func pcmToWavHeader(numChannel, sampleRate, bitsPerSample, audioLen int) (wavHeader []byte) {
// longSampleRate := sampleRate
// byteRate := bitsPerSample * sampleRate * numChannel / 8
// totalAudioLen := audioLen
// totalDataLen := totalAudioLen + 36
// var header = make([]byte, 44)
// // RIFF/WAVE header
// header[0] = 'R'
// header[1] = 'I'
// header[2] = 'F'
// header[3] = 'F'
// header[4] = byte(totalDataLen & 0xff)
// header[5] = byte((totalDataLen >> 8) & 0xff)
// header[6] = byte((totalDataLen >> 16) & 0xff)
// header[7] = byte((totalDataLen >> 24) & 0xff)
// //WAVE
// header[8] = 'W'
// header[9] = 'A'
// header[10] = 'V'
// header[11] = 'E'
// // 'fmt ' chunk
// header[12] = 'f'
// header[13] = 'm'
// header[14] = 't'
// header[15] = ' '
// // 4 bytes: size of 'fmt ' chunk
// header[16] = 16
// header[17] = 0
// header[18] = 0
// header[19] = 0
// // format = 1
// header[20] = 1
// header[21] = 0
// header[22] = byte(numChannel)
// header[23] = 0
// header[24] = byte(longSampleRate & 0xff)
// header[25] = byte((longSampleRate >> 8) & 0xff)
// header[26] = byte((longSampleRate >> 16) & 0xff)
// header[27] = byte((longSampleRate >> 24) & 0xff)
// header[28] = byte(byteRate & 0xff)
// header[29] = byte((byteRate >> 8) & 0xff)
// header[30] = byte((byteRate >> 16) & 0xff)
// header[31] = byte((byteRate >> 24) & 0xff)
// // block align
// header[32] = byte(numChannel * bitsPerSample / 8)
// header[33] = 0
// // bits per sample
// header[34] = byte(bitsPerSample)
// header[35] = 0
// //data
// header[36] = 'd'
// header[37] = 'a'
// header[38] = 't'
// header[39] = 'a'
// header[40] = byte(totalAudioLen & 0xff)
// header[41] = byte((totalAudioLen >> 8) & 0xff)
// header[42] = byte((totalAudioLen >> 16) & 0xff)
// header[43] = byte((totalAudioLen >> 24) & 0xff)
//
// wavHeader = header
// return
//}
// TODO:获取热词文本地址
func GetHotWordPath(URL, userId string) (hotWordPath string, err error) {
var (
Url = URL
params = url.Values{}
responseData []byte
resp *http.Response
resultMap = make(map[string]interface{})
)
params["user_id"] = []string{userId}
if resp, err = http.PostForm(Url, params); err != nil {
return
}
defer func(response *http.Response) {
if response != nil {
_ = response.Body.Close()
}
}(resp)
if resp.StatusCode != 200 {
err = errors.New("network request error")
return
}
if responseData, err = ioutil.ReadAll(resp.Body); err != nil {
return
}
if err = json.Unmarshal(responseData, &resultMap); err != nil {
return
}
switch resultMap["code"].(type) {
case float64:
if resultMap["code"].(float64) == 200 {
hotWordPath = resultMap["data"].(map[string]interface{})["path"].(string)
} else {
err = errors.New("hot word text is not exiting")
}
case int:
if resultMap["code"].(float64) == 200 {
hotWordPath = resultMap["data"].(map[string]interface{})["path"].(string)
} else {
err = errors.New("hot word text is not exiting")
}
}
log.Println("user_id: ", userId)
log.Println("hotWordPath: ", hotWordPath)
return
}
package otherHandler
import (
"github.com/sirupsen/logrus"
"log"
"net/url"
"testing"
)
func TestPostGet(t *testing.T) {
req := PostReq{
Texts: "三十七点八度分号血压二百到二百二十一六十至七十八毫米汞柱遵医嘱予丙泊酚调节冒号六点零毫升每小时",
Mode: 1,
Backup: make(map[string]any),
}
resp, err := PostGet(req, "http://172.16.5.177:8141/encode")
log.Println(resp, err)
}
func TestFind(t *testing.T) {
host := func(URL string) string {
U, _ := url.Parse(URL)
return U.Host
}("http://huisheng_mysql:8141/encode")
logrus.Println(host)
}
package otherHandler
import (
"fmt"
"io/ioutil"
"os"
)
type WavStruct struct {
file *os.File
meetingId string
EFile string
audioLength int
isClose bool
}
// InitSaveWave TODO:初始化wave文件
func InitSaveWave(audioDir, meetingId string) (wav *WavStruct, err error) {
var (
file *os.File
audioLength int
)
// 确定文件格式
fileDir := fmt.Sprintf("%v", audioDir)
eName := fmt.Sprintf("%v/%v.wav", fileDir, meetingId)
if exist, _ := ExistsFile(eName); exist {
content, _ := ioutil.ReadFile(eName)
audioLength = len(content)
}
_, err = os.Stat(fileDir)
if os.IsNotExist(err) {
if err = os.MkdirAll(fileDir, os.ModePerm); err != nil {
return
}
}
// 将数据存入总文件
if file, err = os.OpenFile(eName, os.O_RDONLY|os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0766); err != nil {
return
}
wav = &WavStruct{
file: file,
meetingId: meetingId,
EFile: eName,
audioLength: audioLength,
}
return
}
func (w *WavStruct) SaveData(data []byte) (filePath string, err error) {
if _, err = w.file.Write(data); err != nil {
if w.file, err = os.OpenFile(w.EFile, os.O_RDONLY|os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0766); err != nil {
return
}
if _, err = w.file.Write(data); err != nil {
return
}
}
filePath = w.EFile
w.audioLength = w.audioLength + len(data)
return
}
func (w *WavStruct) CloseWav() (err error) {
//_ ,_ = w.file.Seek(44,0)
//w.file.WriteAt()
if !w.isClose {
w.isClose = true
if err = w.file.Close(); err != nil {
}
file1, _ := os.OpenFile(w.EFile, os.O_RDWR, os.ModePerm)
l, _ := file1.Seek(0, 0)
waveHeader := pcmToWavHeader(1, 16000, 16, w.audioLength)
_, _ = file1.WriteAt(waveHeader, l)
_ = file1.Close()
}
return
}
/*
*
dst:二进制字符串
numChannel:1=单声道,2=多声道
sampleRate:采样率 8000/16000
*/
func pcmToWavHeader(numChannel, sampleRate, bitsPerSample, audioLen int) (wavHeader []byte) {
longSampleRate := sampleRate
byteRate := bitsPerSample * sampleRate * numChannel / 8
totalAudioLen := audioLen
totalDataLen := totalAudioLen + 36
var header = make([]byte, 44)
// RIFF/WAVE header
header[0] = 'R'
header[1] = 'I'
header[2] = 'F'
header[3] = 'F'
header[4] = byte(totalDataLen & 0xff)
header[5] = byte((totalDataLen >> 8) & 0xff)
header[6] = byte((totalDataLen >> 16) & 0xff)
header[7] = byte((totalDataLen >> 24) & 0xff)
//WAVE
header[8] = 'W'
header[9] = 'A'
header[10] = 'V'
header[11] = 'E'
// 'fmt ' chunk
header[12] = 'f'
header[13] = 'm'
header[14] = 't'
header[15] = ' '
// 4 bytes: size of 'fmt ' chunk
header[16] = 16
header[17] = 0
header[18] = 0
header[19] = 0
// format = 1
header[20] = 1
header[21] = 0
header[22] = byte(numChannel)
header[23] = 0
header[24] = byte(longSampleRate & 0xff)
header[25] = byte((longSampleRate >> 8) & 0xff)
header[26] = byte((longSampleRate >> 16) & 0xff)
header[27] = byte((longSampleRate >> 24) & 0xff)
header[28] = byte(byteRate & 0xff)
header[29] = byte((byteRate >> 8) & 0xff)
header[30] = byte((byteRate >> 16) & 0xff)
header[31] = byte((byteRate >> 24) & 0xff)
// block align
header[32] = byte(numChannel * bitsPerSample / 8)
header[33] = 0
// bits per sample
header[34] = byte(bitsPerSample)
header[35] = 0
//data
header[36] = 'd'
header[37] = 'a'
header[38] = 't'
header[39] = 'a'
header[40] = byte(totalAudioLen & 0xff)
header[41] = byte((totalAudioLen >> 8) & 0xff)
header[42] = byte((totalAudioLen >> 16) & 0xff)
header[43] = byte((totalAudioLen >> 24) & 0xff)
wavHeader = header
return
}
syntax = "proto3";
option go_package = "./protobufHandler;streamRecognize";
//package streamRecognize;
service Pipeline {
rpc Inference(stream DataRequest) returns (stream ResultResponse);
}
/* desc : 1. configs 2. audio_internal 3. audio_final*/
/* 第一帧,暂时没有结果响应,继续后续操作即可:
第一:desc=configs
第二:data为json{"meeting_id":"",mode:1,"app_id":""}转bytes,其中:
meeting_id为为每次连接赋予的唯一码,可以用timestamp精确到毫秒级表示
mode是标点的模式,目前有:
- mode = 1:默认值,智能标点+语音标点
- mode = 0:仅智能标点,禁用语音标点
- mode = 2: 仅语音标点
- mode = 3: 省略句末标点+语音标点
- mode = 4: 空格替代标点,禁用语音标点
app_id为软件打开时为此次软件打开的唯一码,可以用timestamp精确到毫秒级表示,如若没有后台会以日期划分。
*/
/*
语音识别:
第一:desc=audio_internal
第二:data为收集的音频数据
*/
/*
语音识别即可出最终结果,相当于is_final:
第一:desc=audio_final
第二:data为空即可。
*/
message DataRequest {
string desc = 1;
bytes data = 2;
}
message ResultResponse {
string message = 1;
bool is_final = 2;
string desc = 3;
}
### 北科瑞声语音助理核心接口接入文档(`gRPC`)
#### 一、说明
- 本接口采取GRPC协议以保证数据传输的安全性和稳健性,同时保证达到`边说边转文字`的效果。
- 本接口数据传输的过程完全以双全工通讯的方式,严格以`流对流`的方式进行通讯。
- 所有的数据传输格式必须严格按照接口规定的方式,否则会导致连接中断或无法达到预期效果。
#### 二、`protobuf`配置
···
syntax = "proto3";
option go_package = "./protobufHandler;streamRecognize";
//package streamRecognize;
service Pipeline {
rpc Inference(stream DataRequest) returns (stream ResultResponse);
}
message DataRequest {
string desc = 1;
bytes data = 2;
}
message ResultResponse {
string message = 1;
bool is_final = 2;
string desc = 3;
}
···
#### 三、步骤
- 本接口的通讯规则严格按照三个步骤进行,不过按照同一套数据格式的细微差别进行区分。
- 切记三个步骤的顺序不能颠倒或打乱,强要求固定顺序。
- 强烈建议整个通讯的数据传输方式使用异步的处理逻辑,以保证数据的完整性和实时性,同时可以充分体现出双全工通讯的优势。
###### 第一步(初始化)
第一帧,暂时没有结果响应,继续后续操作即可:
第一:desc=configs
第二:data为json{"meeting_id":"",mode:1,"app_id":"","is_not_vad":false,user_id:""}转bytes,其中:
meeting_id为为每次连接赋予的唯一码,可以用timestamp精确到毫秒级表示
mode是标点的模式,目前有:
- mode = 1:默认值,智能标点+语音标点
- mode = 0:仅智能标点,禁用语音标点
- mode = 2: 仅语音标点
- mode = 3: 省略句末标点+语音标点
- mode = 4: 空格替代标点,禁用语音标点
app_id为软件打开时为此次软件打开的唯一码,可以用timestamp精确到毫秒级表示,如若没有后台会以日期划分。
is_not_vad为不启动vad,建议默认false
###### 第二部(转写步骤)
- 音频数据pcm格式要求:采样频率`16000HZ`,声道数`1`,采样位数`16bit`
- 此过程是实时采菊音频实时传输的过程,即以`流`的方式传输数据,建议每一帧的长度`3200`
- 语音识别:
- 第一:desc=audio_internal
- 第二:data为收集的音频数据
- 结果
- message -> 结果数据
- is_final -> 结果标识:false临时结果,true正是结果
- desc -> 结果描述
###### 第三步(结束)
语音识别即可出最终结果,相当于is_final:
第一:desc=audio_final
第二:data为空即可。
#### 四、补充接口
- 音频下载
- 请求方式:`GET`
- 请求路径:`http:{host}/loadAudioFile/{app_id}/{meeting_id}`
- `host`服务所在的网络地址;`IP:PORT`,和gRPC接口一致
- `app_id`初始化确定的`app_id`
- `meeting_id`初始化确定的`meeting_id`
- 服务检测
- 请求方式:`GET`
- 请求路径:`http:{host}//checkServer`
- `host`服务所在的网络地址;`IP:PORT`,和gRPC接口一致
package server
import (
"fmt"
"gRPC-voice-assistants-second-pass/initServerConfig"
"gRPC-voice-assistants-second-pass/otherHandler"
streamRecognize "gRPC-voice-assistants-second-pass/protobufHandler"
"gRPC-voice-assistants-second-pass/serverHandler"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"net"
"net/url"
"time"
)
func GrpcServer() (grpcServer *grpc.Server) {
var keep = keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
//grpc.KeepaliveEnforcementPolicy(kaep),
var kasp = keepalive.ServerParameters{
//MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
//MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
grpcServer = grpc.NewServer(grpc.KeepaliveEnforcementPolicy(keep), grpc.KeepaliveParams(kasp))
server := &serverHandler.Service{}
streamRecognize.RegisterPipelineServer(grpcServer, server)
return
}
func LoadAudioFile(ctx *gin.Context) {
appId := ctx.Param("appId")
meetingId := ctx.Param("meetingId")
audioFile := fmt.Sprintf("%v/%v/%v/%v.wav", initServerConfig.Config.FileConfig.AudioDir, appId, meetingId, meetingId)
if ok, _ := otherHandler.ExistsFile(audioFile); !ok {
ctx.JSONP(400, gin.H{
"status": 400,
"desc": "the audio file is not existing.",
})
return
}
ctx.Header("Content-Type", "application/octet-stream")
ctx.Header("Content-Disposition", "attachment; filename="+audioFile)
ctx.Header("Content-Transfer-Encoding", "binary")
ctx.FileAttachment(audioFile, fmt.Sprintf("%v.wav", meetingId))
return
}
func CheckServer(ctx *gin.Context) {
engine := func(URL string) string {
U, err := url.Parse(URL)
if err != nil {
logrus.Errorln("the engine address is error")
return ""
}
return U.Host
}(initServerConfig.Config.EngineConfig.UrlArr[0])
post := func(URL string) string {
U, err := url.Parse(URL)
if err != nil {
logrus.Errorln("the post server address is error")
return ""
}
return U.Host
}(initServerConfig.Config.PostProcessing.PostUrl)
serverInfo := make(map[string]bool)
serverInfo["engine_is_alive"] = true
serverInfo["post_server_is_alive"] = true
conn1, err1 := net.DialTimeout("tcp", engine, 100*time.Millisecond)
if err1 != nil {
serverInfo["engine_is_alive"] = false
}
if conn1 == nil {
serverInfo["engine_is_alive"] = false
} else {
_ = conn1.Close()
}
conn2, err2 := net.DialTimeout("tcp", post, 100*time.Millisecond)
if err2 != nil {
serverInfo["post_server_is_alive"] = false
}
if conn2 == nil {
serverInfo["post_server_is_alive"] = false
} else {
_ = conn2.Close()
}
ctx.JSONP(200, gin.H{
"status": 200,
"info": serverInfo,
})
}
package serverHandler
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"gRPC-voice-assistants-second-pass/engineServer"
"gRPC-voice-assistants-second-pass/otherHandler"
serverPb "gRPC-voice-assistants-second-pass/protobufHandler"
"log"
"time"
)
type Stream struct {
engine *engineServer.Engine
wave *otherHandler.WavStruct
audioBuff bytes.Buffer
inChan chan []byte
isCloseInChan bool
outChan chan []byte
isCloseOutChan bool
closeChan chan []byte
isCloseCloseChan bool
dataIndex int32
isEnd bool
isNotVad bool
engineAddr string
meetingId string
mode int
PostUrl string
AudioDir string
AppId string
}
type StreamConfig struct {
IsNotVad bool //启用vad与否
EngineAddr string
AppId string
MeetingId string
AudioDir string
Mode int
PostUrl string
HotWordFile string
/*
- mode = 1:默认值,智能标点+语音标点
- mode = 0:仅智能标点,禁用语音标点
- mode = 2: 仅语音标点
- mode = 3: 省略句末标点+语音标点
- mode = 4: 空格替代标点,禁用语音标点
*/
}
func NewStream(connConfig StreamConfig) (stream *Stream, err error) {
var (
wav *otherHandler.WavStruct
conn *engineServer.Engine
engineAddr = connConfig.EngineAddr
meetingId = connConfig.MeetingId
isNotVad = connConfig.IsNotVad
hotWordFile = connConfig.HotWordFile
)
if wav, err = otherHandler.InitSaveWave(fmt.Sprintf("%v/%v/%v", connConfig.AudioDir, connConfig.AppId, connConfig.MeetingId), meetingId); err != nil {
return
}
if conn, err = engineServer.InitEngine(engineAddr, isNotVad, hotWordFile); err != nil {
return
}
stream = &Stream{
engine: conn,
wave: wav,
inChan: make(chan []byte, 1024),
outChan: make(chan []byte, 1024),
closeChan: make(chan []byte, 1),
meetingId: meetingId,
dataIndex: 1,
engineAddr: engineAddr,
isNotVad: isNotVad,
mode: connConfig.Mode,
PostUrl: connConfig.PostUrl,
AudioDir: connConfig.AudioDir,
AppId: connConfig.AppId,
}
go stream.inChan2EngineLoop()
go stream.Engine2OutChanLoop()
return
}
func (stream *Stream) CloseChan() {
if stream.isEnd {
time.Sleep(500 * time.Millisecond)
if stream.engine != nil {
_ = stream.engine.CloseEngine()
}
} else {
time.Sleep(500 * time.Millisecond)
_ = stream.engine.CloseEngine()
}
if !stream.isCloseInChan {
stream.isCloseInChan = true
close(stream.inChan)
}
time.Sleep(500 * time.Millisecond)
if !stream.isCloseOutChan {
stream.isCloseOutChan = true
}
if !stream.isCloseCloseChan {
stream.isCloseCloseChan = true
close(stream.closeChan)
}
if stream.wave != nil {
_ = stream.wave.CloseWav()
}
}
func (stream *Stream) WriteMsg(data []byte) (err error) {
if !stream.isCloseInChan {
select {
case stream.inChan <- data:
}
} else {
err = errors.New("inChan is closed")
}
go func() {
if _, e := stream.wave.SaveData(data); e != nil {
_, _ = stream.wave.SaveData(data)
}
if _, e1 := stream.audioBuff.Write(data); e1 != nil {
_, _ = stream.audioBuff.Write(data)
}
}()
return
}
func (stream *Stream) readInChanMsg() (data []byte, err error) {
select {
case data = <-stream.inChan:
case <-stream.closeChan:
err = errors.New("connection by inChan is closed")
}
return
}
func (stream *Stream) writeOutChan(dataMap map[string]interface{}) (err error) {
var (
data []byte
)
if data, err = json.Marshal(dataMap); err != nil {
return
}
select {
case stream.outChan <- data:
}
return
}
func (stream *Stream) ReadMsg() (resp *serverPb.ResultResponse, err error) {
if !stream.isCloseOutChan {
var (
data []byte
dataMap map[string]interface{}
)
select {
case data = <-stream.outChan:
case <-stream.closeChan:
}
if len(data) > 0 {
if err = json.Unmarshal(data, &dataMap); err != nil {
return
}
if dataMap["is_final"].(float64) == 1 {
resp = &serverPb.ResultResponse{
IsFinal: true,
Message: fmt.Sprintf(`{"result":"%v","raw":"%v"}`, dataMap["results"].(string), dataMap["raw"].(string)),
Desc: "final_results",
}
}
if dataMap["is_final"].(float64) == 0 {
resp = &serverPb.ResultResponse{
IsFinal: false,
Message: fmt.Sprintf(`{"result":"%v","raw":"%v"}`, dataMap["results"].(string), dataMap["raw"].(string)),
Desc: "partial_results",
}
}
if dataMap["is_final"].(float64) == 3 {
resp = &serverPb.ResultResponse{
IsFinal: true,
Message: "",
Desc: "speech end",
}
err = errors.New(dataMap["desc"].(string))
}
stream.dataIndex++
} else {
err = errors.New("outChan is closed")
}
} else {
err = errors.New("outChan is closed")
}
return
}
func (stream *Stream) inChan2EngineLoop() {
var (
recData []byte
err error
)
for {
if recData, err = stream.readInChanMsg(); err != nil {
goto ERR
}
if string(recData) == `{"signal" : "end"}` {
stream.isEnd = true
if err = stream.engine.WriteMsg(recData, 1); err != nil {
goto ERR
}
} else if string(recData) == `{"signal" : "stop"}` {
if err = stream.engine.WriteMsg(recData, 2); err != nil {
goto ERR
}
} else {
if err = stream.engine.WriteMsg(recData, 0); err != nil {
goto ERR
}
//go func(data []byte) {
// stream.audioBuff.Write(data)
//}(recData)
//go func(data []byte) {
// _, _ = stream.wave.SaveData(data)
//}(recData)
}
}
ERR:
}
func (stream *Stream) Engine2OutChanLoop() {
var (
recData map[string]interface{}
err error
backup = make(map[string]interface{})
audioNum = 1
killAudioLen int
duration float64
)
for {
if recData, err = stream.engine.ReadMsg(); err != nil {
goto ERR
}
if _, ok := recData["results"]; ok {
recData["raw"] = recData["results"].(string)
}
if recData["is_final"].(float64) == 1 {
reqBody := otherHandler.PostReq{}
reqBody.Texts = recData["results"].(string)
reqBody.Mode = stream.mode
reqBody.Backup = backup
if resp, e := otherHandler.PostGet(reqBody, stream.PostUrl); e != nil {
backup = make(map[string]interface{})
} else {
if resp != nil {
if resp.Status == 200 {
backup = resp.Backup
recData["results"] = resp.Result
recData["raw"] = resp.Raw
} else {
backup = make(map[string]interface{})
}
} else {
backup = make(map[string]interface{})
}
}
///////////////////////////////////////////////////
if recData["results"].(string) == "" {
// Conn.audioBuf 音频缓存通道
// killAudiolen 已提取出来的音频长度
// 这个步骤是根据设定的音频缓存通道最大阈值进行假定空语音从语音通道中剔除
if stream.audioBuff.Len() > 32000*60 {
_ = stream.audioBuff.Next(32000 * 6)
killAudioLen = killAudioLen + 32000*6
log.Println("已经提取的音频长度:", killAudioLen)
}
} else {
log.Println(recData)
// emptyLen 当前一句话之前要提前再去掉的多余语音(不属于这句文本的语音)
// duration 当前文本的音频长度
emptyLen := 0
duration = 0
if int(32*recData["start"].(float64)) > killAudioLen {
// 如果当前文本的开始时刻置换的长度比已经提取出来的音频长度大的处理逻辑
emptyLen = int(32*recData["start"].(float64)) - killAudioLen
_ = stream.audioBuff.Next(emptyLen)
killAudioLen = int(recData["start"].(float64) * 32)
duration = (recData["end"].(float64) - recData["start"].(float64)) * 32
} else {
// 如果当前文本的开始时刻置换的长度比已经提取出来的音频长度小的处理逻辑
duration = 32*recData["end"].(float64) - float64(killAudioLen)
}
log.Println("返回的开始长度:", int(32*recData["start"].(float64)), "返回的结束长度:", int(32*recData["end"].(float64)), "缓存通道剩余长度:", stream.audioBuff.Len(), "已经提取的音频长度:", killAudioLen, "要提取的多余长度:", emptyLen, "要取的实际长度:", duration)
audio := stream.audioBuff.Next(int(duration))
killAudioLen = int(recData["end"].(float64) * 32)
go func(num int) {
if _, e := otherHandler.SaveAudioData(stream.AudioDir, stream.AppId, stream.meetingId, audio, num); e != nil {
_, _ = otherHandler.SaveAudioData(stream.AudioDir, stream.AppId, stream.meetingId, audio, num)
}
}(audioNum)
audioNum++
}
}
if recData["is_final"].(float64) == 3 {
if len(recData) > 0 {
if err = stream.writeOutChan(recData); err != nil {
goto ERR
}
}
goto ERR
}
if len(recData) > 0 {
if err = stream.writeOutChan(recData); err != nil {
goto ERR
}
}
}
ERR:
stream.CloseChan()
}
package serverHandler
import (
"encoding/json"
"errors"
"gRPC-voice-assistants-second-pass/initServerConfig"
"gRPC-voice-assistants-second-pass/loadBalancingHandler"
"gRPC-voice-assistants-second-pass/otherHandler"
streamRecognize "gRPC-voice-assistants-second-pass/protobufHandler"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/peer"
"log"
"regexp"
"time"
)
type Service struct {
streamRecognize.UnimplementedPipelineServer
}
type config struct {
AppId string `json:"app_id"`
MeetingId string `json:"meeting_id"`
Mode int `json:"mode"`
IsNotVad bool `json:"is_not_vad"`
UserId string `json:"user_id"`
}
func (server *Service) Inference(data streamRecognize.Pipeline_InferenceServer) (err error) {
var (
Peer *peer.Peer
peerOK bool
clientIP string
engineAddr string
stream *Stream
startStream = make(chan []byte, 1)
isClose = make(chan []byte, 1)
configOK bool
hotwordFile string
streamConfig = StreamConfig{
AudioDir: initServerConfig.Config.FileConfig.AudioDir,
PostUrl: initServerConfig.Config.PostProcessing.PostUrl,
}
)
if Peer, peerOK = peer.FromContext(data.Context()); !peerOK {
clientIP = time.Now().Format("2006-01-02#15:04")
} else {
log.Println(Peer.Addr.String())
reg, _ := regexp.Compile("^.*:")
clientIP = reg.FindString(Peer.Addr.String())[:len(reg.FindString(Peer.Addr.String()))-1]
}
log.Println(clientIP)
if engineAddr, err = loadBalancingHandler.Load.GetNode(clientIP); err != nil {
err = errors.New("all engine is dawn")
logrus.Errorln(err)
return
}
streamConfig.EngineAddr = engineAddr
go func(req streamRecognize.Pipeline_InferenceServer) {
var (
rcvErr error
)
for {
var (
getData *streamRecognize.DataRequest
)
if getData, rcvErr = req.Recv(); rcvErr != nil {
logrus.Errorln(rcvErr)
goto ERR
}
if !configOK {
if getData.GetDesc() == "configs" {
configOK = true
getConfig := &config{}
if rcvErr = json.Unmarshal(getData.GetData(), getConfig); rcvErr != nil {
logrus.Errorln(rcvErr)
goto ERR
}
if getConfig.AppId == "" {
getConfig.AppId = time.Now().Format("2006-01-02")
}
if hotwordFile, err = otherHandler.GetHotWordPath(initServerConfig.Config.HotWord.GetHotWordFileUrl, getConfig.UserId); err != nil {
logrus.Errorln(err)
hotwordFile = initServerConfig.Config.HotWord.DefaultWordFile
} else {
if hotwordFile == "" {
hotwordFile = initServerConfig.Config.HotWord.DefaultWordFile
}
}
streamConfig.HotWordFile = hotwordFile
streamConfig.Mode = getConfig.Mode
streamConfig.IsNotVad = !getConfig.IsNotVad
streamConfig.MeetingId = getConfig.MeetingId
streamConfig.AppId = getConfig.AppId
if stream, rcvErr = NewStream(streamConfig); rcvErr != nil {
logrus.Errorln(rcvErr)
goto ERR
}
close(startStream)
}
}
if getData.GetDesc() == "audio_internal" {
//logrus.Println(getData.GetDesc())
if configOK {
if stream != nil {
if rcvErr = stream.WriteMsg(getData.GetData()); rcvErr != nil {
logrus.Errorln(rcvErr)
goto ERR
}
} else {
err = errors.New("cannot connect to the engine")
rcvErr = err
logrus.Errorln(rcvErr)
goto ERR
}
} else {
err = errors.New("please invoke the configuration request first")
rcvErr = err
logrus.Errorln(rcvErr)
goto ERR
}
}
if getData.GetDesc() == "audio_final" {
if configOK {
if stream != nil {
if rcvErr = stream.WriteMsg([]byte(`{"signal" : "stop"}`)); rcvErr != nil {
logrus.Errorln(rcvErr)
goto ERR
}
} else {
err = errors.New("cannot connect to the engine")
rcvErr = err
logrus.Errorln(rcvErr)
goto ERR
}
} else {
err = errors.New("please invoke the configuration request first")
rcvErr = err
logrus.Errorln(rcvErr)
goto ERR
}
}
}
ERR:
if stream != nil {
if rcvErr = stream.WriteMsg([]byte(`{"signal" : "end"}`)); rcvErr != nil {
logrus.Errorln(rcvErr)
}
stream.CloseChan()
}
close(isClose)
}(data)
{
var (
sendErr error
)
for {
select {
case <-startStream:
case <-isClose:
}
var (
sendData *streamRecognize.ResultResponse
)
if stream != nil {
if sendData, sendErr = stream.ReadMsg(); sendErr != nil {
if sendErr.Error() != errors.New("outChan is closed").Error() {
logrus.Errorln(sendErr)
}
goto ERROR
}
log.Println("HHHHHHHHHH", sendData)
if sendErr = data.Send(sendData); sendErr != nil {
logrus.Errorln(sendErr)
goto ERROR
}
} else {
err = errors.New("cannot connect to the engine")
sendErr = err
logrus.Errorln(sendErr)
goto ERROR
}
}
ERROR:
if stream != nil {
stream.CloseChan()
}
return
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment