diff --git a/README.md b/README.md index a3e7d87..b9988e1 100644 --- a/README.md +++ b/README.md @@ -63,12 +63,13 @@ For detailed API documentation, please see [docs/api.md](docs/api.md) - [x] Add session list API - [x] Add session management API - [ ] Add user config modify API -- [ ] Add frpc instance running status management API -- [ ] Add frpc instance log display API -- [ ] Fix random database lock when processing logs +- [x] Add frpc instance running status management API +- [x] Add frpc instance log display API +- [x] Fix random database lock when processing logs - [ ] Add frpc createdBy storage and display - [x] Fix backend can still start frpc instance when it is already running - [ ] Develop an agent software to handle windows service management +- [ ] Refactor all log output level to be more clear ## License diff --git a/docs/api.md b/docs/api.md index 5ed8a09..309e71a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1250,7 +1250,7 @@ instanceID=1 --- -## Real-time Log Streaming (WebSocket) +## Real-time System Log Streaming (WebSocket) **Endpoint:** `/system/getLogs` **Protocol:** WebSocket @@ -1334,6 +1334,95 @@ socket.onerror = (error) => { ``` --- +## Real-time frpc Instance Log Streaming (WebSocket) + +**Endpoint:** `/frpcAct/instanceMgr/logs` +**Method:** GET +**Protocol:** WebSocket +**Auth Required:** Yes (token) +**Permission Level:** Visitor + +Stream real-time logs from a frpc instance via WebSocket connection. This endpoint upgrades the HTTP connection to WebSocket and streams log messages in real-time. + +**Request:** +``` +ws://host:port/frpcAct/instanceMgr/logs?instanceID=1&token=your_token +``` + +**Query Parameters:** + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| instanceID | int | Yes | ID of the instance to stream logs from | +| token | string | Yes | Authentication token | + +**WebSocket Message Format:** + +Log messages are sent as JSON objects: + +```json +{ + "level": "INFO", + "content": "[I] [service.go:XXX] start frpc success", + "timestamp": "2024-01-01 12:00:00.000" +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| level | string | Log level: "DEBUG", "INFO", "WARN", "ERROR" | +| content | string | Log message content | +| timestamp | string | Timestamp in "YYYY-MM-DD HH:MM:SS.mmm" format | + +**Error Response (before WebSocket upgrade):** + +```json +{ + "success": false, + "message": "Instance not found" +} +``` + +**Common Error Messages:** + +| HTTP Status | Message | Description | +|-------------|---------|-------------| +| 400 | "instanceID is required" | Missing instanceID parameter | +| 400 | "invalid instanceID format" | instanceID is not a valid integer | +| 401 | "Token is required" | Missing token parameter | +| 401 | "Invalid token: ..." | Token validation failed | +| 401 | "User not found" | User associated with token does not exist | +| 403 | "You don't have access to this instance" | Instance belongs to another user | +| 404 | "Instance not found" | Instance with given ID does not exist | +| 500 | "Failed to get service name" | Internal server error | + +**Platform-Specific Behavior:** + +| Platform | Init System | Log Source | +|----------|-------------|------------| +| Windows | sc | Config file (if `log_file` is set) or Windows Event Log | +| Linux | systemd | `journalctl -u -f` | +| Linux | init.d | Log files in `/var/log/` or service status monitoring | + +**Connection Lifecycle:** + +1. Client initiates WebSocket connection with `instanceID` and `token` +2. Server validates token and instance ownership +3. Server upgrades connection to WebSocket +4. Server starts streaming logs +5. Connection remains open until client disconnects or error occurs +6. Server sends log messages as they become available + +**Notes:** + +- Only the instance owner can access the logs (user must own the instance) +- The connection will be closed if the token expires during the session +- On Windows, if no log file is configured, the service status is monitored instead +- On Linux init.d systems, log files are searched in common locations (`/var/log/`, etc.) +- Log streaming is real-time; historical logs may be sent initially depending on the platform + +--- + ## User Permissions | Permission | superuser | admin | visitor | @@ -1400,4 +1489,4 @@ The API implements rate limiting to prevent abuse: - Login attempts: Maximum 5 attempts per minute per IP - All other endpoints: 100 requests per minute per token -Exceeding rate limits will result in temporary IP or token blocking. +Exceeding rate limits will result in temporary IP or token blocking. \ No newline at end of file diff --git a/frpLogger/frpLogger.go b/frpLogger/frpLogger.go new file mode 100644 index 0000000..d4a2e58 --- /dev/null +++ b/frpLogger/frpLogger.go @@ -0,0 +1,228 @@ +package frpLogger + +import ( + "database/sql" + "fmt" + "log" + "net/http" + "strconv" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +type LogMessage struct { + Level string `json:"level"` + Content string `json:"content"` + Timestamp string `json:"timestamp"` +} + +type InstanceLogStreamer struct { + instanceID int + serviceName string + conn *websocket.Conn + stopChan chan struct{} + doneChan chan struct{} + running bool + runningMutex sync.Mutex +} + +func NewInstanceLogStreamer(instanceID int, serviceName string, conn *websocket.Conn) *InstanceLogStreamer { + return &InstanceLogStreamer{ + instanceID: instanceID, + serviceName: serviceName, + conn: conn, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + running: false, + } +} + +func (s *InstanceLogStreamer) Start() { + s.runningMutex.Lock() + if s.running { + s.runningMutex.Unlock() + return + } + s.running = true + s.runningMutex.Unlock() + + go s.streamLogs() +} + +func (s *InstanceLogStreamer) Stop() { + s.runningMutex.Lock() + defer s.runningMutex.Unlock() + + if !s.running { + return + } + + close(s.stopChan) + <-s.doneChan + s.running = false +} + +func (s *InstanceLogStreamer) IsRunning() bool { + s.runningMutex.Lock() + defer s.runningMutex.Unlock() + return s.running +} + +func (s *InstanceLogStreamer) streamLogs() { + defer close(s.doneChan) + + if !IsInstanceRunning(s.instanceID) { + s.sendInfo("Instance is not running, no logs to stream") + return + } + + initSystem := GetInitSystem() + + switch initSystem { + case "windows": + s.streamWindowsLogs() + case "systemd": + s.streamSystemdLogs() + case "init.d": + s.streamInitDLogs() + default: + s.sendError(fmt.Sprintf("Unsupported init system: %s", initSystem)) + } +} + +func (s *InstanceLogStreamer) sendLog(level, content string) { + msg := LogMessage{ + Level: level, + Content: content, + Timestamp: time.Now().Format("2006-01-02 15:04:05.000"), + } + + if err := s.conn.WriteJSON(msg); err != nil { + log.Printf("[InstanceLogStreamer] Failed to send log message: %v", err) + } +} + +func (s *InstanceLogStreamer) sendError(content string) { + s.sendLog("ERROR", content) +} + +func (s *InstanceLogStreamer) sendInfo(content string) { + s.sendLog("INFO", content) +} + +type FrpcInstance struct { + ID int + UserID int + Name string + BootAtStart bool + RunUser string + ConfigPath string + CreatedAt time.Time +} + +type User struct { + UserID int + Username string + Passwd string + Type string + CreatedAt string +} + +var ( + db *sql.DB + frpcDB *sql.DB + dbMutex sync.RWMutex + isDebug bool + debugMux sync.RWMutex +) + +func SetDatabase(userDB, instanceDB *sql.DB) { + dbMutex.Lock() + defer dbMutex.Unlock() + db = userDB + frpcDB = instanceDB +} + +func SetDebugMode(debug bool) { + debugMux.Lock() + defer debugMux.Unlock() + isDebug = debug +} + +func GetDebugMode() bool { + debugMux.RLock() + defer debugMux.RUnlock() + return isDebug +} + +func DBQueryFrpcInstanceByID(instanceID int) (FrpcInstance, error) { + dbMutex.RLock() + defer dbMutex.RUnlock() + + var instance FrpcInstance + var createdAtStr string + err := frpcDB.QueryRow("SELECT id, userID, name, bootAtStart, runUser, configPath, createdAt FROM frpcInstances WHERE id = ?", instanceID).Scan( + &instance.ID, &instance.UserID, &instance.Name, &instance.BootAtStart, &instance.RunUser, &instance.ConfigPath, &createdAtStr) + if err != nil { + return instance, fmt.Errorf("failed to query frpc instance: %w", err) + } + instance.CreatedAt, _ = time.Parse(time.RFC3339, createdAtStr) + return instance, nil +} + +func GetUserByID(userID int) (*User, error) { + dbMutex.RLock() + defer dbMutex.RUnlock() + + var user User + err := db.QueryRow("SELECT userID, username, passwd, type, createdAt FROM userLogin WHERE userID = ?", userID). + Scan(&user.UserID, &user.Username, &user.Passwd, &user.Type, &user.CreatedAt) + if err != nil { + return nil, fmt.Errorf("failed to query user: %w", err) + } + + return &user, nil +} + +func GetServiceNameByInstanceID(instanceID int) (string, error) { + instance, err := DBQueryFrpcInstanceByID(instanceID) + if err != nil { + return "", fmt.Errorf("failed to query frpc instance: %w", err) + } + + user, err := GetUserByID(instance.UserID) + if err != nil { + return "", fmt.Errorf("failed to get user info: %w", err) + } + + serviceName := fmt.Sprintf("superfrpc_%s_%s", user.Username, instance.Name) + return serviceName, nil +} + +func ParseInstanceIDFromQuery(r *http.Request) (int, error) { + instanceIDStr := r.URL.Query().Get("instanceID") + if instanceIDStr == "" { + return 0, fmt.Errorf("instanceID is required") + } + + instanceID, err := strconv.Atoi(instanceIDStr) + if err != nil { + return 0, fmt.Errorf("invalid instanceID format") + } + + return instanceID, nil +} + +func ParseTokenFromQuery(r *http.Request) string { + return r.URL.Query().Get("token") +} diff --git a/frpLogger/handler.go b/frpLogger/handler.go new file mode 100644 index 0000000..46405a3 --- /dev/null +++ b/frpLogger/handler.go @@ -0,0 +1,129 @@ +package frpLogger + +import ( + "encoding/json" + "fmt" + "log" + "net/http" +) + +type TokenValidatorFunc func(token string) (int, error) + +type LogSocketHandler struct { + validateToken TokenValidatorFunc +} + +func NewLogSocketHandler(validator TokenValidatorFunc) *LogSocketHandler { + return &LogSocketHandler{ + validateToken: validator, + } +} + +type ErrorResponse struct { + Success bool `json:"success"` + Message string `json:"message"` +} + +func sendWSError(conn interface { + WriteJSON(v interface{}) error +}, message string) { + resp := ErrorResponse{ + Success: false, + Message: message, + } + conn.WriteJSON(resp) +} + +func (h *LogSocketHandler) Handle(w http.ResponseWriter, r *http.Request) { + instanceID, err := ParseInstanceIDFromQuery(r) + if err != nil { + SendHTTPError(w, http.StatusBadRequest, err.Error()) + return + } + + token := ParseTokenFromQuery(r) + if token == "" { + SendHTTPError(w, http.StatusUnauthorized, "Token is required") + return + } + + userID, err := h.validateToken(token) + if err != nil { + SendHTTPError(w, http.StatusUnauthorized, fmt.Sprintf("Invalid token: %v", err)) + return + } + + _, err = GetUserByID(userID) + if err != nil { + SendHTTPError(w, http.StatusUnauthorized, "User not found") + return + } + + instance, err := DBQueryFrpcInstanceByID(instanceID) + if err != nil { + SendHTTPError(w, http.StatusNotFound, "Instance not found") + return + } + + if instance.UserID != userID { + SendHTTPError(w, http.StatusForbidden, "You don't have access to this instance") + return + } + + serviceName, err := GetServiceNameByInstanceID(instanceID) + if err != nil { + SendHTTPError(w, http.StatusInternalServerError, "Failed to get service name") + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("[InstanceLogSocketHandler] Failed to upgrade connection: %v", err) + return + } + defer conn.Close() + + streamer := NewInstanceLogStreamer(instanceID, serviceName, conn) + streamer.Start() + + done := make(chan struct{}) + + go func() { + defer close(done) + for { + _, _, err := conn.ReadMessage() + if err != nil { + return + } + } + }() + + <-done + streamer.Stop() +} + +func SendHTTPError(w http.ResponseWriter, statusCode int, message string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + resp := ErrorResponse{ + Success: false, + Message: message, + } + jsonResp, _ := json.Marshal(resp) + w.Write(jsonResp) +} + +type InstanceLogHandler struct { + validateToken TokenValidatorFunc +} + +func NewInstanceLogHandler(validator TokenValidatorFunc) *InstanceLogHandler { + return &InstanceLogHandler{ + validateToken: validator, + } +} + +func (h *InstanceLogHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + handler := NewLogSocketHandler(h.validateToken) + handler.Handle(w, r) +} diff --git a/frpLogger/initd.go b/frpLogger/initd.go new file mode 100644 index 0000000..211b566 --- /dev/null +++ b/frpLogger/initd.go @@ -0,0 +1,414 @@ +//go:build linux +// +build linux + +package frpLogger + +import ( + "bufio" + "fmt" + "io" + "log" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +func (s *InstanceLogStreamer) streamInitDLogs() { + s.sendInfo(fmt.Sprintf("Starting to stream logs for init.d service: %s", s.serviceName)) + + instance, err := DBQueryFrpcInstanceByID(s.instanceID) + if err != nil { + s.sendError(fmt.Sprintf("Failed to get instance info: %v", err)) + return + } + + startTime := getInitDServiceStartTime(s.serviceName) + + logFilePath := s.getLogFilePathFromConfig(instance.ConfigPath) + + if logFilePath != "" { + if _, err := os.Stat(logFilePath); err == nil { + if !startTime.IsZero() { + s.streamLogFileSince(logFilePath, startTime) + } else { + s.streamLogFile(logFilePath) + } + return + } + } + + possibleLogPaths := s.getPossibleInitDLogPaths() + for _, logPath := range possibleLogPaths { + if _, err := os.Stat(logPath); err == nil { + s.sendInfo(fmt.Sprintf("Found log file: %s", logPath)) + if !startTime.IsZero() { + s.streamLogFileSince(logPath, startTime) + } else { + s.streamLogFile(logPath) + } + return + } + } + + s.sendInfo("No log file found, streaming service status instead...") + s.streamInitDStatus() +} + +func (s *InstanceLogStreamer) getPossibleInitDLogPaths() []string { + var paths []string + + paths = append(paths, fmt.Sprintf("/var/log/%s.log", s.serviceName)) + paths = append(paths, fmt.Sprintf("/var/log/superfrpc/%s.log", s.serviceName)) + paths = append(paths, "/var/log/frpc.log") + paths = append(paths, "/var/log/superfrpc/frpc.log") + + instance, err := DBQueryFrpcInstanceByID(s.instanceID) + if err == nil { + configDir := filepath.Dir(instance.ConfigPath) + paths = append(paths, filepath.Join(configDir, s.serviceName+".log")) + paths = append(paths, filepath.Join(configDir, "frpc.log")) + } + + pidFile := fmt.Sprintf("/var/run/%s.pid", s.serviceName) + if pid, err := os.ReadFile(pidFile); err == nil { + pidStr := strings.TrimSpace(string(pid)) + paths = append(paths, fmt.Sprintf("/proc/%s/fd/1", pidStr)) + paths = append(paths, fmt.Sprintf("/proc/%s/fd/2", pidStr)) + } + + return paths +} + +func (s *InstanceLogStreamer) streamInitDStatus() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + lastStatus := "" + + for { + select { + case <-s.stopChan: + s.sendInfo("Service status monitoring stopped") + return + case <-ticker.C: + status := s.getInitDServiceStatus() + if status != lastStatus { + s.sendInfo(fmt.Sprintf("Service status: %s", status)) + lastStatus = status + } + } + } +} + +func (s *InstanceLogStreamer) getInitDServiceStatus() string { + servicePath := fmt.Sprintf("/etc/init.d/%s", s.serviceName) + + if _, err := os.Stat(servicePath); os.IsNotExist(err) { + return "NOT_INSTALLED" + } + + cmd := exec.Command(servicePath, "status") + output, err := cmd.CombinedOutput() + outputStr := strings.TrimSpace(string(output)) + + if err != nil { + if strings.Contains(outputStr, "is running") { + return "RUNNING" + } + return "STOPPED" + } + + if strings.Contains(outputStr, "is running") { + return "RUNNING" + } + + return "STOPPED" +} + +func getInitDServiceLogs(serviceName string, lines int) ([]string, error) { + startTime := getInitDServiceStartTime(serviceName) + + possibleLogPaths := []string{ + fmt.Sprintf("/var/log/%s.log", serviceName), + fmt.Sprintf("/var/log/superfrpc/%s.log", serviceName), + "/var/log/frpc.log", + } + + for _, logPath := range possibleLogPaths { + if _, err := os.Stat(logPath); err == nil { + if !startTime.IsZero() { + return readLinesSinceTime(logPath, lines, startTime) + } + return readLastLines(logPath, lines) + } + } + + return nil, fmt.Errorf("no log file found for service %s", serviceName) +} + +func readLinesSinceTime(filePath string, maxLines int, sinceTime time.Time) ([]string, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("failed to open log file: %w", err) + } + defer file.Close() + + var result []string + scanner := bufio.NewScanner(file) + + allLines := make([]string, 0) + for scanner.Scan() { + line := scanner.Text() + allLines = append(allLines, line) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading log file: %w", err) + } + + for i := len(allLines) - 1; i >= 0 && len(result) < maxLines; i-- { + line := allLines[i] + lineTime := parseLogLineTime(line) + if lineTime.IsZero() || lineTime.After(sinceTime) || lineTime.Equal(sinceTime) { + result = append([]string{line}, result...) + } + } + + return result, nil +} + +func parseLogLineTime(line string) time.Time { + layouts := []string{ + "2006/01/02 15:04:05", + "2006-01-02 15:04:05", + "Jan 2 15:04:05", + "Jan 02 15:04:05", + "02/Jan/2006:15:04:05", + time.RFC3339, + } + + for _, layout := range layouts { + if len(line) >= len(layout) { + t, err := time.Parse(layout, strings.TrimSpace(line[:min(len(line), len(layout))])) + if err == nil { + return t + } + } + } + + return time.Time{} +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func readLastLines(filePath string, lines int) ([]string, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("failed to open log file: %w", err) + } + defer file.Close() + + var result []string + scanner := bufio.NewScanner(file) + + allLines := make([]string, 0) + for scanner.Scan() { + allLines = append(allLines, scanner.Text()) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading log file: %w", err) + } + + start := 0 + if len(allLines) > lines { + start = len(allLines) - lines + } + + result = allLines[start:] + return result, nil +} + +func isServiceRunningInitD(serviceName string) bool { + servicePath := fmt.Sprintf("/etc/init.d/%s", serviceName) + + if _, err := os.Stat(servicePath); os.IsNotExist(err) { + return false + } + + cmd := exec.Command(servicePath, "status") + output, err := cmd.CombinedOutput() + if err != nil { + return false + } + + return strings.Contains(string(output), "is running") +} + +func getInitDServicePid(serviceName string) int { + pidFile := fmt.Sprintf("/var/run/%s.pid", serviceName) + if content, err := os.ReadFile(pidFile); err == nil { + pidStr := strings.TrimSpace(string(content)) + var pid int + if _, err := fmt.Sscanf(pidStr, "%d", &pid); err == nil { + return pid + } + } + return 0 +} + +func getProcessStartTime(pid int) time.Time { + if pid <= 0 { + return time.Time{} + } + + statPath := fmt.Sprintf("/proc/%d/stat", pid) + content, err := os.ReadFile(statPath) + if err != nil { + return time.Time{} + } + + fields := strings.Fields(string(content)) + if len(fields) < 22 { + return time.Time{} + } + + starttimeTicks, err := fmt.Sscanf(fields[21], "%d", new(uint64)) + if err != nil || starttimeTicks != 1 { + return time.Time{} + } + + var ticks uint64 + fmt.Sscanf(fields[21], "%d", &ticks) + + clkTck := uint64(100) + uptimeBytes, err := os.ReadFile("/proc/uptime") + if err != nil { + return time.Time{} + } + uptimeParts := strings.Fields(string(uptimeBytes)) + if len(uptimeParts) < 1 { + return time.Time{} + } + var uptimeSeconds float64 + fmt.Sscanf(uptimeParts[0], "%f", &uptimeSeconds) + + secondsSinceBoot := float64(ticks) / float64(clkTck) + processStartTime := time.Now().Add(-time.Duration(uptimeSeconds-secondsSinceBoot) * time.Second) + + return processStartTime +} + +func getInitDServiceStartTime(serviceName string) time.Time { + pid := getInitDServicePid(serviceName) + if pid <= 0 { + return time.Time{} + } + return getProcessStartTime(pid) +} + +func (s *InstanceLogStreamer) streamInitDLogsFromProc() { + pid := getInitDServicePid(s.serviceName) + if pid == 0 { + s.sendError("Failed to get service PID") + return + } + + stdoutPath := fmt.Sprintf("/proc/%d/fd/1", pid) + stderrPath := fmt.Sprintf("/proc/%d/fd/2", pid) + + s.sendInfo(fmt.Sprintf("Attempting to stream from /proc/%d", pid)) + + go s.streamProcFile(stdoutPath) + go s.streamProcFile(stderrPath) +} + +func (s *InstanceLogStreamer) streamProcFile(path string) { + file, err := os.Open(path) + if err != nil { + s.sendError(fmt.Sprintf("Failed to open %s: %v", path, err)) + return + } + defer file.Close() + + reader := bufio.NewReader(file) + + for { + select { + case <-s.stopChan: + return + default: + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + time.Sleep(100 * time.Millisecond) + continue + } + return + } + + line = strings.TrimSpace(line) + if line != "" { + s.sendLog(s.parseLogLevel(line), line) + } + } + } +} + +func (s *InstanceLogStreamer) streamInitDLogsViaSyslog() { + s.sendInfo("Attempting to stream logs via syslog...") + + cmd := exec.Command("tail", "-f", "/var/log/syslog") + stdout, err := cmd.StdoutPipe() + if err != nil { + s.sendError(fmt.Sprintf("Failed to create stdout pipe: %v", err)) + return + } + + if err := cmd.Start(); err != nil { + s.sendError(fmt.Sprintf("Failed to start tail: %v", err)) + return + } + + go func() { + <-s.stopChan + cmd.Process.Kill() + cmd.Wait() + }() + + reader := bufio.NewReader(stdout) + + for { + select { + case <-s.stopChan: + cmd.Process.Kill() + cmd.Wait() + return + default: + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + time.Sleep(100 * time.Millisecond) + continue + } + return + } + + line = strings.TrimSpace(line) + if line != "" && strings.Contains(line, s.serviceName) { + s.sendLog(s.parseLogLevel(line), line) + } + } + } +} + +func init() { + log.Printf("[frpLogger] Init.d log streamer initialized") +} diff --git a/frpLogger/os.go b/frpLogger/os.go new file mode 100644 index 0000000..79526a0 --- /dev/null +++ b/frpLogger/os.go @@ -0,0 +1,65 @@ +package frpLogger + +import ( + "fmt" + "os" + "os/exec" + "runtime" + "strings" +) + +func GetInitSystem() string { + if runtime.GOOS == "windows" { + return "windows" + } + if runtime.GOOS == "linux" { + if _, err := os.Stat("/run/systemd/system"); err == nil { + return "systemd" + } + if _, err := os.Stat("/etc/init.d"); err == nil { + return "init.d" + } + } + return "unknown" +} + +func IsInstanceRunning(instanceID int) bool { + initType := GetInitSystem() + serviceName, err := GetServiceNameByInstanceID(instanceID) + if err != nil { + return false + } + + switch initType { + case "windows": + cmd := exec.Command("sc", "query", serviceName) + output, err := cmd.CombinedOutput() + if err != nil { + return false + } + return strings.Contains(string(output), "RUNNING") + + case "systemd": + cmd := exec.Command("systemctl", "is-active", serviceName) + output, err := cmd.CombinedOutput() + if err != nil { + return false + } + return strings.TrimSpace(string(output)) == "active" + + case "init.d": + servicePath := fmt.Sprintf("/etc/init.d/%s", serviceName) + if _, err := os.Stat(servicePath); err != nil { + return false + } + cmd := exec.Command(servicePath, "status") + output, err := cmd.CombinedOutput() + if err != nil { + return false + } + return strings.Contains(string(output), "is running") + + default: + return false + } +} diff --git a/frpLogger/systemd.go b/frpLogger/systemd.go new file mode 100644 index 0000000..3ea9a8e --- /dev/null +++ b/frpLogger/systemd.go @@ -0,0 +1,295 @@ +//go:build linux +// +build linux + +package frpLogger + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "log" + "os" + "os/exec" + "strings" + "time" +) + +func (s *InstanceLogStreamer) streamSystemdLogs() { + s.sendInfo(fmt.Sprintf("Starting to stream logs for systemd service: %s", s.serviceName)) + + instance, err := DBQueryFrpcInstanceByID(s.instanceID) + if err != nil { + s.sendError(fmt.Sprintf("Failed to get instance info: %v", err)) + return + } + + startTime := getSystemdServiceStartTime(s.serviceName) + + logFilePath := s.getLogFilePathFromConfig(instance.ConfigPath) + + if logFilePath != "" { + if _, err := os.Stat(logFilePath); err == nil { + if !startTime.IsZero() { + s.streamLogFileSince(logFilePath, startTime) + } else { + s.streamLogFile(logFilePath) + } + return + } + } + + s.streamJournalLogs() +} + +func getSystemdServiceStartTime(serviceName string) time.Time { + cmd := exec.Command("systemctl", "show", serviceName, "-p", "ExecMainStartTimestamp", "--value") + output, err := cmd.CombinedOutput() + if err != nil { + return time.Time{} + } + + timestampStr := strings.TrimSpace(string(output)) + if timestampStr == "" || timestampStr == "n/a" { + return time.Time{} + } + + layouts := []string{ + "Mon 2006-01-02 15:04:05 MST", + "Mon 2006-01-02 15:04:05 -0700", + "2006-01-02 15:04:05 MST", + time.RFC3339, + } + + for _, layout := range layouts { + t, err := time.Parse(layout, timestampStr) + if err == nil { + return t + } + } + + return time.Time{} +} + +func (s *InstanceLogStreamer) streamJournalLogs() { + s.sendInfo(fmt.Sprintf("Streaming journal logs for service: %s", s.serviceName)) + + startTime := getSystemdServiceStartTime(s.serviceName) + + var cmd *exec.Cmd + if !startTime.IsZero() { + sinceStr := startTime.Format("2006-01-02 15:04:05") + s.sendInfo(fmt.Sprintf("Service started at: %s, fetching logs since then", sinceStr)) + cmd = exec.Command("journalctl", "-u", s.serviceName, "-f", "--since", sinceStr, "--no-pager") + } else { + cmd = exec.Command("journalctl", "-u", s.serviceName, "-f", "-n", "100", "--no-pager") + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + s.sendError(fmt.Sprintf("Failed to create stdout pipe: %v", err)) + return + } + + if err := cmd.Start(); err != nil { + s.sendError(fmt.Sprintf("Failed to start journalctl: %v", err)) + return + } + + go func() { + <-s.stopChan + cmd.Process.Kill() + cmd.Wait() + }() + + reader := bufio.NewReader(stdout) + + for { + select { + case <-s.stopChan: + s.sendInfo("Journal log streaming stopped") + cmd.Process.Kill() + cmd.Wait() + return + default: + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + time.Sleep(100 * time.Millisecond) + continue + } + s.sendError(fmt.Sprintf("Error reading journal: %v", err)) + return + } + + line = strings.TrimSpace(line) + if line != "" { + s.sendLog(s.parseSystemdLogLevel(line), line) + } + } + } +} + +func (s *InstanceLogStreamer) parseSystemdLogLevel(line string) string { + if strings.Contains(line, " error") || strings.Contains(line, " ERROR") || strings.Contains(line, " err]") { + return "ERROR" + } + if strings.Contains(line, " warning") || strings.Contains(line, " WARNING") || strings.Contains(line, " warn]") { + return "WARN" + } + if strings.Contains(line, " debug") || strings.Contains(line, " DEBUG") || strings.Contains(line, " debug]") { + return "DEBUG" + } + if strings.Contains(line, " info") || strings.Contains(line, " INFO") || strings.Contains(line, " info]") { + return "INFO" + } + return "INFO" +} + +func getSystemdServiceLogs(serviceName string, lines int) ([]string, error) { + startTime := getSystemdServiceStartTime(serviceName) + + var cmd *exec.Cmd + if !startTime.IsZero() { + sinceStr := startTime.Format("2006-01-02 15:04:05") + cmd = exec.Command("journalctl", "-u", serviceName, "--since", sinceStr, "-n", fmt.Sprintf("%d", lines), "--no-pager", "-o", "cat") + } else { + cmd = exec.Command("journalctl", "-u", serviceName, "-n", fmt.Sprintf("%d", lines), "--no-pager", "-o", "cat") + } + + output, err := cmd.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("failed to get systemd logs: %v, output: %s", err, string(output)) + } + + var logs []string + scanner := bufio.NewScanner(strings.NewReader(string(output))) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line != "" { + logs = append(logs, line) + } + } + + return logs, nil +} + +func (s *InstanceLogStreamer) streamSystemdLogsAlternative() { + s.sendInfo(fmt.Sprintf("Starting alternative systemd log streaming for service: %s", s.serviceName)) + + startTime := getSystemdServiceStartTime(s.serviceName) + + var cmd *exec.Cmd + if !startTime.IsZero() { + sinceStr := startTime.Format("2006-01-02 15:04:05") + cmd = exec.Command("journalctl", "-u", s.serviceName, "--since", sinceStr, "-n", "50", "--no-pager", "-o", "json") + } else { + cmd = exec.Command("journalctl", "-u", s.serviceName, "-n", "50", "--no-pager", "-o", "json") + } + + output, err := cmd.CombinedOutput() + if err != nil { + s.sendError(fmt.Sprintf("Failed to get initial logs: %v", err)) + } else { + s.parseAndSendJournalJSON(string(output)) + } + + s.streamJournalLogs() +} + +func (s *InstanceLogStreamer) parseAndSendJournalJSON(jsonOutput string) { + lines := strings.Split(jsonOutput, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" || line == "[" || line == "]" { + continue + } + + line = strings.TrimSuffix(line, ",") + if strings.HasPrefix(line, "{") && strings.HasSuffix(line, "}") { + var entry map[string]interface{} + if err := json.Unmarshal([]byte(line), &entry); err == nil { + if msg, ok := entry["MESSAGE"].(string); ok { + level := "INFO" + if priority, ok := entry["PRIORITY"].(string); ok { + switch priority { + case "3": + level = "ERROR" + case "4": + level = "WARN" + case "5": + level = "INFO" + case "6": + level = "INFO" + case "7": + level = "DEBUG" + } + } + s.sendLog(level, msg) + } + } + } + } +} + +func isServiceRunningSystemd(serviceName string) bool { + cmd := exec.Command("systemctl", "is-active", serviceName) + output, err := cmd.CombinedOutput() + if err != nil { + return false + } + return strings.TrimSpace(string(output)) == "active" +} + +func getSystemdServiceStatus(serviceName string) string { + cmd := exec.Command("systemctl", "is-active", serviceName) + output, err := cmd.CombinedOutput() + if err != nil { + return "UNKNOWN" + } + return strings.TrimSpace(string(output)) +} + +func (s *InstanceLogStreamer) streamSystemdStatus() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + lastStatus := "" + + for { + select { + case <-s.stopChan: + s.sendInfo("Service status monitoring stopped") + return + case <-ticker.C: + status := getSystemdServiceStatus(s.serviceName) + if status != lastStatus { + s.sendInfo(fmt.Sprintf("Service status: %s", status)) + lastStatus = status + } + } + } +} + +func (s *InstanceLogStreamer) checkAndStreamLogFile() bool { + instance, err := DBQueryFrpcInstanceByID(s.instanceID) + if err != nil { + return false + } + + logFilePath := s.getLogFilePathFromConfig(instance.ConfigPath) + if logFilePath == "" { + return false + } + + if _, err := os.Stat(logFilePath); os.IsNotExist(err) { + return false + } + + go s.streamLogFile(logFilePath) + return true +} + +func init() { + log.Printf("[frpLogger] Systemd log streamer initialized") +} diff --git a/frpLogger/windows.go b/frpLogger/windows.go new file mode 100644 index 0000000..00184d0 --- /dev/null +++ b/frpLogger/windows.go @@ -0,0 +1,409 @@ +package frpLogger + +import ( + "bufio" + "bytes" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +func (s *InstanceLogStreamer) streamWindowsLogs() { + s.sendInfo(fmt.Sprintf("Starting to stream logs for Windows service: %s", s.serviceName)) + + instance, err := DBQueryFrpcInstanceByID(s.instanceID) + if err != nil { + s.sendError(fmt.Sprintf("Failed to get instance info: %v", err)) + return + } + + startTime := getWindowsServiceStartTime(s.serviceName) + + logFilePath := s.getLogFilePathFromConfig(instance.ConfigPath) + + if logFilePath != "" { + if !startTime.IsZero() { + s.streamLogFileSince(logFilePath, startTime) + } else { + s.streamLogFile(logFilePath) + } + return + } + + s.streamWindowsEventLogs() +} + +func getWindowsServiceStartTime(serviceName string) time.Time { + cmd := exec.Command("wmic", "service", "where", fmt.Sprintf("name='%s'", serviceName), "get", "ProcessId", "/value") + output, err := cmd.CombinedOutput() + if err != nil { + return time.Time{} + } + + outputStr := string(output) + var pid int + lines := strings.Split(outputStr, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "ProcessId=") { + pidStr := strings.TrimPrefix(line, "ProcessId=") + pid, _ = fmt.Sscanf(pidStr, "%d", &pid) + break + } + } + + if pid <= 0 { + return time.Time{} + } + + cmd = exec.Command("wmic", "process", "where", fmt.Sprintf("ProcessId=%d", pid), "get", "CreationDate", "/value") + output, err = cmd.CombinedOutput() + if err != nil { + return time.Time{} + } + + outputStr = string(output) + lines = strings.Split(outputStr, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "CreationDate=") { + dateStr := strings.TrimPrefix(line, "CreationDate=") + t, err := time.Parse("20060102150405.999999-0700", dateStr) + if err == nil { + return t + } + } + } + + return time.Time{} +} + +func (s *InstanceLogStreamer) getLogFilePathFromConfig(configPath string) string { + file, err := os.Open(configPath) + if err != nil { + return "" + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if strings.HasPrefix(line, "log_file") || strings.HasPrefix(line, "logFile") || strings.HasPrefix(line, "log.to") { + parts := strings.SplitN(line, "=", 2) + if len(parts) == 2 { + logPath := strings.TrimSpace(parts[1]) + logPath = strings.Trim(logPath, "\"'") + if logPath != "" && logPath != "console" { + return logPath + } + } + } + } + return "" +} + +func (s *InstanceLogStreamer) streamLogFile(logFilePath string) { + s.streamLogFileSince(logFilePath, time.Time{}) +} + +func (s *InstanceLogStreamer) streamLogFileSince(logFilePath string, sinceTime time.Time) { + s.sendInfo(fmt.Sprintf("Streaming log file: %s", logFilePath)) + + if _, err := os.Stat(logFilePath); os.IsNotExist(err) { + s.sendError(fmt.Sprintf("Log file not found: %s", logFilePath)) + return + } + + file, err := os.Open(logFilePath) + if err != nil { + s.sendError(fmt.Sprintf("Failed to open log file: %v", err)) + return + } + defer file.Close() + + if !sinceTime.IsZero() { + s.sendInfo(fmt.Sprintf("Filtering logs since: %s", sinceTime.Format("2006-01-02 15:04:05"))) + s.streamLogFileFromBeginning(file, sinceTime) + } else { + file.Seek(0, io.SeekEnd) + s.streamLogFileFromEnd(file) + } +} + +func (s *InstanceLogStreamer) streamLogFileFromBeginning(file *os.File, sinceTime time.Time) { + reader := bufio.NewReader(file) + + for { + select { + case <-s.stopChan: + s.sendInfo("Log streaming stopped") + return + default: + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + time.Sleep(100 * time.Millisecond) + continue + } + s.sendError(fmt.Sprintf("Error reading log file: %v", err)) + return + } + + line = strings.TrimSpace(line) + if line != "" { + lineTime := s.parseLogLineTime(line) + if lineTime.IsZero() || lineTime.After(sinceTime) || lineTime.Equal(sinceTime) { + s.sendLog(s.parseLogLevel(line), line) + } + } + } + } +} + +func (s *InstanceLogStreamer) streamLogFileFromEnd(file *os.File) { + reader := bufio.NewReader(file) + + for { + select { + case <-s.stopChan: + s.sendInfo("Log streaming stopped") + return + default: + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + time.Sleep(100 * time.Millisecond) + continue + } + s.sendError(fmt.Sprintf("Error reading log file: %v", err)) + return + } + + line = strings.TrimSpace(line) + if line != "" { + s.sendLog(s.parseLogLevel(line), line) + } + } + } +} + +func (s *InstanceLogStreamer) parseLogLineTime(line string) time.Time { + layouts := []string{ + "2006/01/02 15:04:05", + "2006-01-02 15:04:05", + "2006-01-02T15:04:05", + time.RFC3339, + time.RFC3339Nano, + } + + for _, layout := range layouts { + minLen := len(layout) + if len(line) >= minLen { + t, err := time.Parse(layout, strings.TrimSpace(line[:minLen])) + if err == nil { + return t + } + } + } + + return time.Time{} +} + +func (s *InstanceLogStreamer) parseLogLevel(line string) string { + lowerLine := strings.ToLower(line) + if strings.Contains(lowerLine, "error") || strings.Contains(lowerLine, "err") { + return "ERROR" + } + if strings.Contains(lowerLine, "warn") || strings.Contains(lowerLine, "warning") { + return "WARN" + } + if strings.Contains(lowerLine, "debug") { + return "DEBUG" + } + if strings.Contains(lowerLine, "info") { + return "INFO" + } + return "INFO" +} + +func (s *InstanceLogStreamer) streamWindowsEventLogs() { + s.sendInfo(fmt.Sprintf("Streaming Windows Event Logs for service: %s", s.serviceName)) + + s.sendInfo("Attempting to read from Windows Event Log...") + + cmd := exec.Command("wevtutil", "qe", "Application", "/q:*[System[Provider[@Name='"+s.serviceName+"']]]", "/c:50", "/rd:true", "/f:text") + output, err := cmd.CombinedOutput() + if err != nil { + s.sendInfo("No events found in Application log, trying System log...") + } + + if len(output) > 0 { + s.parseAndSendEventLogOutput(string(output)) + } + + cmd = exec.Command("wevtutil", "qe", "System", "/q:*[System[Provider[@Name='"+s.serviceName+"']]]", "/c:50", "/rd:true", "/f:text") + output, err = cmd.CombinedOutput() + if err != nil { + s.sendInfo("No events found in System log either") + } + + if len(output) > 0 { + s.parseAndSendEventLogOutput(string(output)) + } + + s.sendInfo("Streaming real-time service status...") + + s.streamServiceStatus() +} + +func (s *InstanceLogStreamer) parseAndSendEventLogOutput(output string) { + events := strings.Split(output, "Event[") + for _, event := range events { + if strings.TrimSpace(event) == "" { + continue + } + + var level, msg string + lines := strings.Split(event, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "Level:") { + level = strings.TrimSpace(strings.TrimPrefix(line, "Level:")) + } + if strings.HasPrefix(line, "Message:") { + msg = strings.TrimSpace(strings.TrimPrefix(line, "Message:")) + } + } + + if msg != "" { + if level == "" { + level = "INFO" + } + s.sendLog(level, msg) + } + } +} + +func (s *InstanceLogStreamer) streamServiceStatus() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + lastStatus := "" + + for { + select { + case <-s.stopChan: + s.sendInfo("Service status monitoring stopped") + return + case <-ticker.C: + status := s.getWindowsServiceStatus() + if status != lastStatus { + s.sendInfo(fmt.Sprintf("Service status: %s", status)) + lastStatus = status + } + } + } +} + +func (s *InstanceLogStreamer) getWindowsServiceStatus() string { + cmd := exec.Command("sc", "query", s.serviceName) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Sprintf("Error querying service: %v", err) + } + + outputStr := string(output) + if strings.Contains(outputStr, "RUNNING") { + return "RUNNING" + } else if strings.Contains(outputStr, "STOPPED") { + return "STOPPED" + } else if strings.Contains(outputStr, "PAUSED") { + return "PAUSED" + } else if strings.Contains(outputStr, "START_PENDING") { + return "START_PENDING" + } else if strings.Contains(outputStr, "STOP_PENDING") { + return "STOP_PENDING" + } + + return "UNKNOWN" +} + +func (s *InstanceLogStreamer) streamWindowsLogsAlternative() { + s.sendInfo(fmt.Sprintf("Starting alternative log streaming for service: %s", s.serviceName)) + + instance, err := DBQueryFrpcInstanceByID(s.instanceID) + if err != nil { + s.sendError(fmt.Sprintf("Failed to get instance info: %v", err)) + return + } + + startTime := getWindowsServiceStartTime(s.serviceName) + + configDir := filepath.Dir(instance.ConfigPath) + possibleLogPaths := []string{ + filepath.Join(configDir, s.serviceName+".log"), + filepath.Join(configDir, "frpc.log"), + filepath.Join("C:\\ProgramData\\superfrpc", s.serviceName+".log"), + filepath.Join(os.TempDir(), s.serviceName+".log"), + } + + for _, logPath := range possibleLogPaths { + if _, err := os.Stat(logPath); err == nil { + if !startTime.IsZero() { + s.streamLogFileSince(logPath, startTime) + } else { + s.streamLogFile(logPath) + } + return + } + } + + s.sendInfo("No log file found, streaming service status instead...") + s.streamServiceStatus() +} + +func getWindowsServiceLogs(serviceName string, lines int) ([]string, error) { + cmd := exec.Command("wevtutil", "qe", "Application", + fmt.Sprintf("/q:*[System[Provider[@Name='%s']]]", serviceName), + fmt.Sprintf("/c:%d", lines), + "/rd:true", + "/f:text") + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + if err != nil { + return nil, fmt.Errorf("failed to query event log: %v, stderr: %s", err, stderr.String()) + } + + var logs []string + output := stdout.String() + events := strings.Split(output, "Event[") + + for _, event := range events { + if strings.TrimSpace(event) == "" { + continue + } + + var msg string + lines := strings.Split(event, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "Message:") { + msg = strings.TrimSpace(strings.TrimPrefix(line, "Message:")) + } + } + + if msg != "" { + logs = append(logs, msg) + } + } + + return logs, nil +} diff --git a/main.go b/main.go index a7e9299..30b8474 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "os/signal" + "super-frpc/frpLogger" "super-frpc/postLog" "syscall" "time" @@ -77,6 +78,9 @@ func main() { postLog.Warning(fmt.Sprintf("Failed to initialize frpc database: %v", err)) } + frpLogger.SetDatabase(db, frpcDB) + frpLogger.SetDebugMode(config.Debug) + _, err = GetConfig() if err != nil { postLog.Fatal(fmt.Sprintf("Failed to get config: %v", err)) diff --git a/router.go b/router.go index 0d5b354..b6b4e8a 100644 --- a/router.go +++ b/router.go @@ -3,6 +3,7 @@ package main import ( "fmt" "net/http" + "super-frpc/frpLogger" "super-frpc/postLog" ) @@ -10,8 +11,8 @@ func setupRoutes() { postLog.Info("Setting up routes...") http.HandleFunc("/system/getStatus", GetStatusHandler) http.HandleFunc("/system/getSoftwareInfo", GetSoftwareInfoHandler) - logHandler := postLog.NewLogSocketHandler(postLog.GetLogBroadcaster()) - http.HandleFunc("/system/getLogs", logHandler.Handle) + systemLogHandler := postLog.NewLogSocketHandler(postLog.GetLogBroadcaster()) + http.HandleFunc("/system/getLogs", systemLogHandler.Handle) http.HandleFunc("/register", RegisterHandler) http.HandleFunc("/login", LoginHandler) @@ -34,6 +35,7 @@ func setupRoutes() { http.HandleFunc("/frpcAct/instanceMgr/restart", RestartInstanceHandler) http.HandleFunc("/frpcAct/instanceMgr/status", GetInstanceStatusHandler) http.HandleFunc("/frpcAct/instanceMgr/getInfo", GetInstanceInfoHandler) + http.HandleFunc("/frpcAct/instanceMgr/logs", frpLogger.NewInstanceLogHandler(ValidateTokenFromMap).ServeHTTP) http.HandleFunc("/frpcAct/proxyMgr/create", CreateProxyHandler) http.HandleFunc("/frpcAct/proxyMgr/delete", DeleteProxyHandler) http.HandleFunc("/frpcAct/proxyMgr/list", ListProxiesHandler) diff --git a/session.go b/session.go index c07d749..88d3818 100644 --- a/session.go +++ b/session.go @@ -331,3 +331,18 @@ func ListActiveSessions() []*Session { } return activeSessions } + +func ValidateTokenFromMap(token string) (int, error) { + tokenMux.RLock() + defer tokenMux.RUnlock() + + for userID, tokenInfo := range tokenMap { + if tokenInfo.Token == token { + if time.Since(tokenInfo.CreatedAt) > tokenTTL { + return 0, fmt.Errorf("token expired") + } + return userID, nil + } + } + return 0, fmt.Errorf("invalid token") +}