Files
backend/watchdog/tcpClient.go

281 lines
5.7 KiB
Go

package watchdog
import (
"bufio"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"super-frpc/global"
"super-frpc/sys"
"super-frpc/utils"
"super-frpc/postLog"
"sync"
"time"
)
var (
tcpConn net.Conn
tcpConnMutex sync.Mutex
isConnected bool
recvChan chan string
stopRecvChan chan struct{}
recvWg sync.WaitGroup
)
func Init() error {
if global.CurrentConfig.Watchdog.Enabled {
if err := ensureWatchdogProcess(); err != nil {
postLog.Error(fmt.Sprintf("Failed to boot watchdog program: %v", err))
return err
}
}
tcpConnMutex.Lock()
defer tcpConnMutex.Unlock()
if recvChan != nil || stopRecvChan != nil {
return nil
}
if tcpConn != nil {
return fmt.Errorf("TCP client already initialized")
}
recvChan = make(chan string, 100)
stopRecvChan = make(chan struct{})
isConnected = false
return nil
}
func ensureWatchdogProcess() error {
watchdogName, err := getWatchdogBinaryName()
if err != nil {
return err
}
if isWatchdogProcessRunning(watchdogName) {
postLog.Info(fmt.Sprintf("Watchdog process %s is already running, skipping initialize...", watchdogName))
return nil
}
postLog.Info(fmt.Sprintf("Watchdog process %s is not running, starting it...", watchdogName))
watchdogPath := filepath.Join(".", "watchdog", watchdogName)
if !utils.IsFileExist(watchdogPath) {
return fmt.Errorf("watchdog binary not found: %s", watchdogPath)
}
cmd := exec.Command(watchdogPath)
devNull, err := os.OpenFile(os.DevNull, os.O_RDWR, 0)
if err != nil {
return fmt.Errorf("failed to open null device: %w", err)
}
defer devNull.Close()
cmd.Stdin = devNull
cmd.Stdout = devNull
cmd.Stderr = devNull
configureWatchdogCommand(cmd)
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start watchdog process %s: %w", watchdogPath, err)
}
return nil
}
func getWatchdogBinaryName() (string, error) {
switch sys.GetInitSystem() {
case "systemd":
return "watchdog_systemd", nil
case "init.d":
return "watchdog_initd", nil
case "windows":
if utils.IsFileExist(filepath.Join(".", "watchdog", "watchdog_windows.exe")) {
return "watchdog_windows.exe", nil
}
return "watchdog_windows", nil
default:
return "", fmt.Errorf("unsupported init system: %s", sys.GetInitSystem())
}
}
func isWatchdogProcessRunning(watchdogName string) bool {
switch sys.GetInitSystem() {
case "windows":
cmd := exec.Command("tasklist", "/FI", fmt.Sprintf("IMAGENAME eq %s", watchdogName))
output, err := cmd.CombinedOutput()
if err != nil {
return false
}
outputStr := strings.ToLower(string(output))
return strings.Contains(outputStr, strings.ToLower(watchdogName)) && !strings.Contains(outputStr, "no tasks are running")
case "systemd", "init.d":
cmd := exec.Command("ps", "-A", "-o", "comm=")
output, err := cmd.CombinedOutput()
if err != nil {
return false
}
for _, line := range strings.Split(string(output), "\n") {
if strings.TrimSpace(line) == watchdogName {
return true
}
}
return false
default:
return false
}
}
func tcpConnect(ipaddr string, port int) error {
tcpConnMutex.Lock()
defer tcpConnMutex.Unlock()
if tcpConn != nil {
return fmt.Errorf("already connected")
}
address := net.JoinHostPort(ipaddr, fmt.Sprintf("%d", port))
conn, err := net.DialTimeout("tcp", address, 3*time.Second)
if err != nil {
return fmt.Errorf("failed to connect to %s: %v", address, err)
}
tcpConn = conn
isConnected = true
recvWg.Add(1)
go recvMsg()
return nil
}
func sendMsg(message string, timeout int) (string, error) {
tcpConnMutex.Lock()
if tcpConn == nil {
tcpConnMutex.Unlock()
return "", fmt.Errorf("not connected")
}
_, err := tcpConn.Write([]byte(message + "\n"))
if err != nil {
tcpConnMutex.Unlock()
return "", fmt.Errorf("failed to send message: %v", err)
}
tcpConnMutex.Unlock()
select {
case response := <-recvChan:
for len(recvChan) > 0 {
<-recvChan
}
return strings.TrimSpace(response), nil
case <-time.After(time.Duration(timeout) * time.Second):
return "", fmt.Errorf("timeout waiting for response")
}
}
func recvMsg() {
defer recvWg.Done()
tcpConnMutex.Lock()
if tcpConn == nil {
tcpConnMutex.Unlock()
return
}
conn := tcpConn
tcpConnMutex.Unlock()
reader := bufio.NewReader(conn)
for {
select {
case <-stopRecvChan:
return
default:
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
data, err := reader.ReadBytes('\n')
if len(data) > 0 {
line := strings.TrimSpace(string(data))
if len(line) > 0 {
select {
case recvChan <- line:
default:
// drop the message
}
// Here add logic to handle the message
if !isResponseMessage(line) {
// postLog.Debug(fmt.Sprintf("[Watchdog] TCP Socket received message: %s", line))
parseCommand(line)
}
}
}
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
tcpConnMutex.Lock()
if tcpConn != nil {
tcpConn.Close()
tcpConn = nil
isConnected = false
}
tcpConnMutex.Unlock()
return
}
}
}
}
func isResponseMessage(msg string) bool {
return msg == "success" || msg == "failed"
}
func Destroy() error {
tcpConnMutex.Lock()
if stopRecvChan != nil {
close(stopRecvChan)
stopRecvChan = nil
}
tcpConnMutex.Unlock()
recvWg.Wait()
tcpConnMutex.Lock()
defer tcpConnMutex.Unlock()
if tcpConn != nil {
err := tcpConn.Close()
tcpConn = nil
isConnected = false
if err != nil {
return fmt.Errorf("failed to close connection: %v", err)
}
}
if recvChan != nil {
close(recvChan)
recvChan = nil
}
return nil
}
func IsConnected() bool {
tcpConnMutex.Lock()
defer tcpConnMutex.Unlock()
return isConnected && tcpConn != nil
}