fix(watchdog): now the client could successfully send more msgs to watchdog instead of the only connection msg

- Add debug logging for watchdog operations
- Standardize message format for instance operations
- Improve TCP message handling with proper line termination
- Enhance error handling in watchdog operations
- Add proper cleanup of recvChan in sendMsg
- Update response handling in instance handlers
This commit is contained in:
2026-04-07 23:29:07 +08:00
parent 37c10f1c07
commit 2426933f49
3 changed files with 102 additions and 63 deletions

View File

@@ -627,7 +627,11 @@ func StartInstanceHandler(w http.ResponseWriter, r *http.Request) {
}
if is.watchdogConnected {
watchdog.AddInstance(serviceName)
if !watchdog.AddInstance(serviceName) {
postLog.Warning(fmt.Sprintf("[StartInstanceHandler] Failed to add watchdog instance %s", serviceName))
SendSuccessResponse(w, "Instance started successfully but watchdog instance add failed", nil)
return
}
}
SendSuccessResponse(w, "Instance started successfully", nil)
@@ -702,7 +706,7 @@ func StopInstanceHandler(w http.ResponseWriter, r *http.Request) {
return
}
postLog.Info(fmt.Sprintf("[StopInstanceHandler] Windows service %s stopped successfully", serviceName))
SendSuccessResponse(w, "Instance stopped successfully", nil)
// SendSuccessResponse(w, "Instance stopped successfully", nil)
case "systemd":
if err := StopSystemdService(serviceName); err != nil {
@@ -711,7 +715,7 @@ func StopInstanceHandler(w http.ResponseWriter, r *http.Request) {
return
}
postLog.Info(fmt.Sprintf("[StopInstanceHandler] Systemd service %s stopped successfully", serviceName))
SendSuccessResponse(w, "Instance stopped successfully", nil)
// SendSuccessResponse(w, "Instance stopped successfully", nil)
case "init.d":
if err := StopInitDService(serviceName); err != nil {
@@ -720,13 +724,22 @@ func StopInstanceHandler(w http.ResponseWriter, r *http.Request) {
return
}
postLog.Info(fmt.Sprintf("[StopInstanceHandler] Init.d service %s stopped successfully", serviceName))
SendSuccessResponse(w, "Instance stopped successfully", nil)
// SendSuccessResponse(w, "Instance stopped successfully", nil)
default:
postLog.Error(fmt.Sprintf("[StopInstanceHandler] Unsupported init system: %s", initType))
SendErrorResponse(w, http.StatusInternalServerError, fmt.Sprintf("Unsupported init system: %s", initType))
return
}
if is.watchdogConnected {
if !watchdog.RemoveInstance(serviceName) {
postLog.Warning(fmt.Sprintf("[StopInstanceHandler] Failed to remove watchdog instance %s", serviceName))
SendSuccessResponse(w, "Instance stopped successfully but watchdog instance remove failed", nil)
} else {
SendSuccessResponse(w, "Instance stopped successfully", nil)
}
}
}
func RestartInstanceHandler(w http.ResponseWriter, r *http.Request) {

View File

@@ -2,23 +2,45 @@ package watchdog
import (
"fmt"
"super-frpc/postLog"
)
func AddInstance(serviceName string) bool {
postLog.Debug(fmt.Sprintf("[watchdog] Add service monitor: %s", serviceName))
if !IsConnected() {
return false
}
message := fmt.Sprintf("[instance.add] <serviceName>%s</serviceName>", serviceName)
message := fmt.Sprintf("[monitor.add] <serviceName>%s</serviceName>", serviceName)
response, err := sendMsg(message, 3)
if err != nil {
return false
} else if response == "success" {
return true
}
return response == "success"
return false
}
func RemoveInstance(serviceName string) bool {
postLog.Debug(fmt.Sprintf("[watchdog] Remove service monitor: %s", serviceName))
if !IsConnected() {
return false
}
message := fmt.Sprintf("[monitor.remove] <serviceName>%s</serviceName>", serviceName)
response, err := sendMsg(message, 3)
if err != nil {
return false
} else if response == "success" {
return true
}
return false
}
func CloseInstance(serviceName string) bool {
postLog.Debug(fmt.Sprintf("[watchdog] Remove service monitor: %s", serviceName))
if !IsConnected() {
return false
}
@@ -27,9 +49,11 @@ func RemoveInstance(serviceName string) bool {
response, err := sendMsg(message, 3)
if err != nil {
return false
} else if response == "success" {
return true
}
return response == "success"
return false
}
func Close() bool {
@@ -41,7 +65,9 @@ func Close() bool {
response, err := sendMsg(message, 3)
if err != nil {
return false
} else if response == "success" {
return true
}
return response == "success"
return false
}

View File

@@ -4,10 +4,10 @@ import (
"bufio"
"fmt"
"net"
"strings"
"super-frpc/postLog"
"sync"
"time"
"strings"
)
var (
@@ -57,7 +57,7 @@ func tcpConnect(ipaddr string, port int) error {
return nil
}
func sendMsg(message string, target int) (string, error) {
func sendMsg(message string, timeout int) (string, error) {
tcpConnMutex.Lock()
if tcpConn == nil {
@@ -65,7 +65,7 @@ func sendMsg(message string, target int) (string, error) {
return "", fmt.Errorf("not connected")
}
_, err := tcpConn.Write([]byte(message))
_, err := tcpConn.Write([]byte(message + "\n"))
if err != nil {
tcpConnMutex.Unlock()
return "", fmt.Errorf("failed to send message: %v", err)
@@ -75,68 +75,68 @@ func sendMsg(message string, target int) (string, error) {
select {
case response := <-recvChan:
return response, nil
case <-time.After(time.Duration(target) * time.Second):
for len(recvChan) > 0 {
<-recvChan
}
return strings.TrimSpace(response), nil
case <-time.After(time.Duration(timeout) * time.Second):
return "", fmt.Errorf("timeout waiting for response")
}
}
func recvMsg() {
defer recvWg.Done()
defer recvWg.Done()
tcpConnMutex.Lock()
if tcpConn == nil {
tcpConnMutex.Unlock()
return
}
conn := tcpConn
tcpConnMutex.Unlock()
tcpConnMutex.Lock()
if tcpConn == nil {
tcpConnMutex.Unlock()
return
}
conn := tcpConn
tcpConnMutex.Unlock()
reader := bufio.NewReader(conn)
reader := bufio.NewReader(conn)
for {
select {
case <-stopRecvChan:
return
default:
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
// ⭐ 改为读取到换行符 \n 为止
data, err := reader.ReadBytes('\n')
if len(data) > 0 {
// ⭐ 用 strings.TrimSpace 去掉末尾的 \n
line := strings.TrimSpace(string(data))
if len(line) > 0 {
select {
case recvChan <- line:
default:
}
for {
select {
case <-stopRecvChan:
return
default:
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
if !isResponseMessage(line) {
postLog.Debug(fmt.Sprintf("[Watchdog] TCP Socket received message: %s", line))
}
}
}
data, err := reader.ReadBytes('\n')
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
// 处理非超时的严重错误(如服务端断开)
tcpConnMutex.Lock()
if tcpConn != nil {
tcpConn.Close()
tcpConn = nil
isConnected = false
}
tcpConnMutex.Unlock()
return
}
}
}
if len(data) > 0 {
line := strings.TrimSpace(string(data))
if len(line) > 0 {
select {
case recvChan <- line:
default:
}
if !isResponseMessage(line) {
postLog.Debug(fmt.Sprintf("[Watchdog] TCP Socket received message: %s", line))
}
}
}
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
tcpConnMutex.Lock()
if tcpConn != nil {
tcpConn.Close()
tcpConn = nil
isConnected = false
}
tcpConnMutex.Unlock()
return
}
}
}
}
func isResponseMessage(msg string) bool {