请使用golang beego 解释并优化下面代码:var driverMutex1 syncMutex token2driver1 = makemapstringchan- intfunc HandelModelData1frames int32 coefl float32 token string commonInfo string seq int var finish bool = t
这段代码是一个用于处理模型数据的函数,使用了beego框架和golang的并发库sync。
首先,函数定义了一些全局变量,包括一个互斥锁driverMutex1和一个映射token2driver1,用于存储token和信号通道的对应关系。
接下来,函数开始处理模型数据。首先获取了一个互斥锁driverMutex1,然后根据传入的token从token2driver1映射中获取信号通道sig。如果不存在对应的信号通道,则创建一个新的信号通道,并将其存储到token2driver1映射中。如果存在对应的信号通道,则向其发送一个退出信号0。最后释放互斥锁。
然后,函数调用了一个HTTP请求获取流信息的API,根据返回的状态码和流信息进行处理。如果状态码不为200,则打印错误日志并返回。接着,将返回的流信息进行解析,并检查返回的code字段是否为200,如果不是,则打印错误日志并返回。
接下来,函数从流信息中获取IP和端口,并调用getUDPSocket函数创建一个UDP连接。然后调用StartBSDriver函数启动驱动。
然后,函数构造一个start请求并将其发送到UDP连接。然后进入一个循环,循环处理frames中的每个frame。在每次循环中,首先检查是否收到退出信号,如果收到,则打印日志并设置finish为false,跳出循环。否则,将frame转换为BSFrame,并调用sendData函数发送数据到UDP连接。最后,休眠15毫秒。
循环结束后,函数调用StopBSDriver函数停止驱动。
最后,函数返回。
接下来对代码进行优化:
-
由于token2driver1是一个全局变量,可能被多个goroutine并发访问和修改,所以在对其进行操作时需要加锁。但是在实际的逻辑中,对token2driver1的操作是在两个不同的地方进行的,分别是获取sig和删除sig。可以将获取sig的操作和删除sig的操作分别进行加锁,这样可以减小锁的粒度。
-
对于HTTP请求获取流信息的API,可以使用go标准库net/http来发送请求,可以简化代码。
-
在循环处理frames时,可以使用goroutine并发处理每个frame,提高处理速度。
下面是优化后的代码:
var (
driverMutex1 sync.Mutex
token2driver1 = make(map[string]chan<- int)
)
func HandelModelData1(frames [][]int32, coefl float32, token string, commonInfo string, seq int) {
var finish bool = true
var sig chan<- int
driverMutex.Lock()
sig, ok := token2driver[token]
if !ok {
sig = make(chan int, 1)
token2driver[token] = sig
} else {
logs.Info("%s, emmit sig flag, to quit last request", commonInfo)
select {
case sig <- 0: // 退出前面的驱动
default:
}
}
driverMutex.Unlock()
defer func() {
if finish {
driverMutex.Lock()
_, ok := token2driver[token]
if ok {
close(sig)
delete(token2driver, token)
}
driverMutex.Unlock()
logs.Info("%s, finish", commonInfo)
}
}()
api := appconf.StreamInfoApi + "?token=" + token
resp, err := http.Get(api)
if err != nil {
logs.Error("%s, api = %v, err = %v", commonInfo, api, err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logs.Error("%s, api = %v, status = %d", commonInfo, api, resp.StatusCode)
return
}
var streamInfoRes protocol.StreamInfoRes
err = json.NewDecoder(resp.Body).Decode(&streamInfoRes)
if err != nil {
logs.Error("%s, err = %v", commonInfo, err)
return
}
if streamInfoRes.Code != 200 {
logs.Error("%s HandelModelData, err = %v", commonInfo, streamInfoRes)
return
}
sip := streamInfoRes.Data.Ip
sport := 11111
if sip == "" {
logs.Error("%s, invalid ip", commonInfo)
return
}
socket, err := getUDPSocket(sip, sport)
if err != nil {
logs.Error("%s, ip = %s, port = %d, err = %v", commonInfo, sip, sport, err)
return
}
defer socket.Close()
err = StartBSDriver(token, seq)
if err != nil {
logs.Error("%s, StartBSDriver failed, err = %v", commonInfo, err)
return
}
logs.Info("%s, sip = %s, sport = %d", commonInfo, sip, sport)
startReq := make(map[string]interface{})
startReq["userIp"] = sip
startData, _ := json.Marshal(&startReq)
var nsent = 0
for nsent < len(startData) {
n, err := socket.Write(startData[nsent:])
if err != nil {
logs.Error("%s, ip = %s, port = %d, err = %v", commonInfo, sip, sport, err)
return
}
nsent += n
}
logs.Info("%s, ip = %s, port = %d, OnAudioDriver", commonInfo, sip, sport)
var wg sync.WaitGroup
for i := 0; i < len(frames); i++ {
wg.Add(1)
go func(frame []int32) {
defer wg.Done()
framef := make([]float32, 61)
for j := 0; j < 51; j++ {
framef[j] = float32(frame[j]) / coefl
}
select {
case <-sig:
logs.Info("%s, recv quit sig, quit now", commonInfo)
finish = false
return
default:
data := MakeBSFrame(framef)
err := sendData(socket, data)
if err != nil {
logs.Error("%s, sendData failed, err = %v", commonInfo, err)
return
}
time.Sleep(15 * time.Millisecond)
}
}(frames[i])
}
wg.Wait()
err = StopBSDriver(token, seq)
if err != nil {
logs.Error("%s, StopBSDriver failed, err = %v", commonInfo, err)
return
}
return
}
``
原文地址: http://www.cveoy.top/t/topic/h91q 著作权归作者所有。请勿转载和采集!