feat(frpLogger): add real-time instance log streaming functionality
Implement cross-platform log streaming for frpc instances with support for Windows, systemd, and init.d systems. Includes WebSocket API endpoint for real-time log streaming, token validation, and instance ownership checks. Update README and API documentation to reflect new functionality. The implementation handles: - Platform-specific log collection (Windows Event Log, journalctl, log files) - WebSocket-based real-time streaming - Token validation and instance access control - Log level parsing and formatting - Historical log retrieval since service start
This commit is contained in:
295
frpLogger/systemd.go
Normal file
295
frpLogger/systemd.go
Normal file
@@ -0,0 +1,295 @@
|
||||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package frpLogger
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (s *InstanceLogStreamer) streamSystemdLogs() {
|
||||
s.sendInfo(fmt.Sprintf("Starting to stream logs for systemd service: %s", s.serviceName))
|
||||
|
||||
instance, err := DBQueryFrpcInstanceByID(s.instanceID)
|
||||
if err != nil {
|
||||
s.sendError(fmt.Sprintf("Failed to get instance info: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
startTime := getSystemdServiceStartTime(s.serviceName)
|
||||
|
||||
logFilePath := s.getLogFilePathFromConfig(instance.ConfigPath)
|
||||
|
||||
if logFilePath != "" {
|
||||
if _, err := os.Stat(logFilePath); err == nil {
|
||||
if !startTime.IsZero() {
|
||||
s.streamLogFileSince(logFilePath, startTime)
|
||||
} else {
|
||||
s.streamLogFile(logFilePath)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
s.streamJournalLogs()
|
||||
}
|
||||
|
||||
func getSystemdServiceStartTime(serviceName string) time.Time {
|
||||
cmd := exec.Command("systemctl", "show", serviceName, "-p", "ExecMainStartTimestamp", "--value")
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
timestampStr := strings.TrimSpace(string(output))
|
||||
if timestampStr == "" || timestampStr == "n/a" {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
layouts := []string{
|
||||
"Mon 2006-01-02 15:04:05 MST",
|
||||
"Mon 2006-01-02 15:04:05 -0700",
|
||||
"2006-01-02 15:04:05 MST",
|
||||
time.RFC3339,
|
||||
}
|
||||
|
||||
for _, layout := range layouts {
|
||||
t, err := time.Parse(layout, timestampStr)
|
||||
if err == nil {
|
||||
return t
|
||||
}
|
||||
}
|
||||
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
func (s *InstanceLogStreamer) streamJournalLogs() {
|
||||
s.sendInfo(fmt.Sprintf("Streaming journal logs for service: %s", s.serviceName))
|
||||
|
||||
startTime := getSystemdServiceStartTime(s.serviceName)
|
||||
|
||||
var cmd *exec.Cmd
|
||||
if !startTime.IsZero() {
|
||||
sinceStr := startTime.Format("2006-01-02 15:04:05")
|
||||
s.sendInfo(fmt.Sprintf("Service started at: %s, fetching logs since then", sinceStr))
|
||||
cmd = exec.Command("journalctl", "-u", s.serviceName, "-f", "--since", sinceStr, "--no-pager")
|
||||
} else {
|
||||
cmd = exec.Command("journalctl", "-u", s.serviceName, "-f", "-n", "100", "--no-pager")
|
||||
}
|
||||
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
s.sendError(fmt.Sprintf("Failed to create stdout pipe: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
s.sendError(fmt.Sprintf("Failed to start journalctl: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-s.stopChan
|
||||
cmd.Process.Kill()
|
||||
cmd.Wait()
|
||||
}()
|
||||
|
||||
reader := bufio.NewReader(stdout)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.stopChan:
|
||||
s.sendInfo("Journal log streaming stopped")
|
||||
cmd.Process.Kill()
|
||||
cmd.Wait()
|
||||
return
|
||||
default:
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
s.sendError(fmt.Sprintf("Error reading journal: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
line = strings.TrimSpace(line)
|
||||
if line != "" {
|
||||
s.sendLog(s.parseSystemdLogLevel(line), line)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *InstanceLogStreamer) parseSystemdLogLevel(line string) string {
|
||||
if strings.Contains(line, " error") || strings.Contains(line, " ERROR") || strings.Contains(line, " err]") {
|
||||
return "ERROR"
|
||||
}
|
||||
if strings.Contains(line, " warning") || strings.Contains(line, " WARNING") || strings.Contains(line, " warn]") {
|
||||
return "WARN"
|
||||
}
|
||||
if strings.Contains(line, " debug") || strings.Contains(line, " DEBUG") || strings.Contains(line, " debug]") {
|
||||
return "DEBUG"
|
||||
}
|
||||
if strings.Contains(line, " info") || strings.Contains(line, " INFO") || strings.Contains(line, " info]") {
|
||||
return "INFO"
|
||||
}
|
||||
return "INFO"
|
||||
}
|
||||
|
||||
func getSystemdServiceLogs(serviceName string, lines int) ([]string, error) {
|
||||
startTime := getSystemdServiceStartTime(serviceName)
|
||||
|
||||
var cmd *exec.Cmd
|
||||
if !startTime.IsZero() {
|
||||
sinceStr := startTime.Format("2006-01-02 15:04:05")
|
||||
cmd = exec.Command("journalctl", "-u", serviceName, "--since", sinceStr, "-n", fmt.Sprintf("%d", lines), "--no-pager", "-o", "cat")
|
||||
} else {
|
||||
cmd = exec.Command("journalctl", "-u", serviceName, "-n", fmt.Sprintf("%d", lines), "--no-pager", "-o", "cat")
|
||||
}
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get systemd logs: %v, output: %s", err, string(output))
|
||||
}
|
||||
|
||||
var logs []string
|
||||
scanner := bufio.NewScanner(strings.NewReader(string(output)))
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line != "" {
|
||||
logs = append(logs, line)
|
||||
}
|
||||
}
|
||||
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
func (s *InstanceLogStreamer) streamSystemdLogsAlternative() {
|
||||
s.sendInfo(fmt.Sprintf("Starting alternative systemd log streaming for service: %s", s.serviceName))
|
||||
|
||||
startTime := getSystemdServiceStartTime(s.serviceName)
|
||||
|
||||
var cmd *exec.Cmd
|
||||
if !startTime.IsZero() {
|
||||
sinceStr := startTime.Format("2006-01-02 15:04:05")
|
||||
cmd = exec.Command("journalctl", "-u", s.serviceName, "--since", sinceStr, "-n", "50", "--no-pager", "-o", "json")
|
||||
} else {
|
||||
cmd = exec.Command("journalctl", "-u", s.serviceName, "-n", "50", "--no-pager", "-o", "json")
|
||||
}
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
s.sendError(fmt.Sprintf("Failed to get initial logs: %v", err))
|
||||
} else {
|
||||
s.parseAndSendJournalJSON(string(output))
|
||||
}
|
||||
|
||||
s.streamJournalLogs()
|
||||
}
|
||||
|
||||
func (s *InstanceLogStreamer) parseAndSendJournalJSON(jsonOutput string) {
|
||||
lines := strings.Split(jsonOutput, "\n")
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" || line == "[" || line == "]" {
|
||||
continue
|
||||
}
|
||||
|
||||
line = strings.TrimSuffix(line, ",")
|
||||
if strings.HasPrefix(line, "{") && strings.HasSuffix(line, "}") {
|
||||
var entry map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(line), &entry); err == nil {
|
||||
if msg, ok := entry["MESSAGE"].(string); ok {
|
||||
level := "INFO"
|
||||
if priority, ok := entry["PRIORITY"].(string); ok {
|
||||
switch priority {
|
||||
case "3":
|
||||
level = "ERROR"
|
||||
case "4":
|
||||
level = "WARN"
|
||||
case "5":
|
||||
level = "INFO"
|
||||
case "6":
|
||||
level = "INFO"
|
||||
case "7":
|
||||
level = "DEBUG"
|
||||
}
|
||||
}
|
||||
s.sendLog(level, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isServiceRunningSystemd(serviceName string) bool {
|
||||
cmd := exec.Command("systemctl", "is-active", serviceName)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return strings.TrimSpace(string(output)) == "active"
|
||||
}
|
||||
|
||||
func getSystemdServiceStatus(serviceName string) string {
|
||||
cmd := exec.Command("systemctl", "is-active", serviceName)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return "UNKNOWN"
|
||||
}
|
||||
return strings.TrimSpace(string(output))
|
||||
}
|
||||
|
||||
func (s *InstanceLogStreamer) streamSystemdStatus() {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
lastStatus := ""
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.stopChan:
|
||||
s.sendInfo("Service status monitoring stopped")
|
||||
return
|
||||
case <-ticker.C:
|
||||
status := getSystemdServiceStatus(s.serviceName)
|
||||
if status != lastStatus {
|
||||
s.sendInfo(fmt.Sprintf("Service status: %s", status))
|
||||
lastStatus = status
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *InstanceLogStreamer) checkAndStreamLogFile() bool {
|
||||
instance, err := DBQueryFrpcInstanceByID(s.instanceID)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
logFilePath := s.getLogFilePathFromConfig(instance.ConfigPath)
|
||||
if logFilePath == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
if _, err := os.Stat(logFilePath); os.IsNotExist(err) {
|
||||
return false
|
||||
}
|
||||
|
||||
go s.streamLogFile(logFilePath)
|
||||
return true
|
||||
}
|
||||
|
||||
func init() {
|
||||
log.Printf("[frpLogger] Systemd log streamer initialized")
|
||||
}
|
||||
Reference in New Issue
Block a user