Files
backend/frpLogger/frpLogger.go
NanamiAdmin 839bad3c94 feat(frpLogger): add real-time instance log streaming functionality
Implement cross-platform log streaming for frpc instances with support for Windows, systemd, and init.d systems. Includes WebSocket API endpoint for real-time log streaming, token validation, and instance ownership checks. Update README and API documentation to reflect new functionality.

The implementation handles:
- Platform-specific log collection (Windows Event Log, journalctl, log files)
- WebSocket-based real-time streaming
- Token validation and instance access control
- Log level parsing and formatting
- Historical log retrieval since service start
2026-03-26 17:46:50 +08:00

229 lines
4.8 KiB
Go

package frpLogger
import (
"database/sql"
"fmt"
"log"
"net/http"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type LogMessage struct {
Level string `json:"level"`
Content string `json:"content"`
Timestamp string `json:"timestamp"`
}
type InstanceLogStreamer struct {
instanceID int
serviceName string
conn *websocket.Conn
stopChan chan struct{}
doneChan chan struct{}
running bool
runningMutex sync.Mutex
}
func NewInstanceLogStreamer(instanceID int, serviceName string, conn *websocket.Conn) *InstanceLogStreamer {
return &InstanceLogStreamer{
instanceID: instanceID,
serviceName: serviceName,
conn: conn,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
running: false,
}
}
func (s *InstanceLogStreamer) Start() {
s.runningMutex.Lock()
if s.running {
s.runningMutex.Unlock()
return
}
s.running = true
s.runningMutex.Unlock()
go s.streamLogs()
}
func (s *InstanceLogStreamer) Stop() {
s.runningMutex.Lock()
defer s.runningMutex.Unlock()
if !s.running {
return
}
close(s.stopChan)
<-s.doneChan
s.running = false
}
func (s *InstanceLogStreamer) IsRunning() bool {
s.runningMutex.Lock()
defer s.runningMutex.Unlock()
return s.running
}
func (s *InstanceLogStreamer) streamLogs() {
defer close(s.doneChan)
if !IsInstanceRunning(s.instanceID) {
s.sendInfo("Instance is not running, no logs to stream")
return
}
initSystem := GetInitSystem()
switch initSystem {
case "windows":
s.streamWindowsLogs()
case "systemd":
s.streamSystemdLogs()
case "init.d":
s.streamInitDLogs()
default:
s.sendError(fmt.Sprintf("Unsupported init system: %s", initSystem))
}
}
func (s *InstanceLogStreamer) sendLog(level, content string) {
msg := LogMessage{
Level: level,
Content: content,
Timestamp: time.Now().Format("2006-01-02 15:04:05.000"),
}
if err := s.conn.WriteJSON(msg); err != nil {
log.Printf("[InstanceLogStreamer] Failed to send log message: %v", err)
}
}
func (s *InstanceLogStreamer) sendError(content string) {
s.sendLog("ERROR", content)
}
func (s *InstanceLogStreamer) sendInfo(content string) {
s.sendLog("INFO", content)
}
type FrpcInstance struct {
ID int
UserID int
Name string
BootAtStart bool
RunUser string
ConfigPath string
CreatedAt time.Time
}
type User struct {
UserID int
Username string
Passwd string
Type string
CreatedAt string
}
var (
db *sql.DB
frpcDB *sql.DB
dbMutex sync.RWMutex
isDebug bool
debugMux sync.RWMutex
)
func SetDatabase(userDB, instanceDB *sql.DB) {
dbMutex.Lock()
defer dbMutex.Unlock()
db = userDB
frpcDB = instanceDB
}
func SetDebugMode(debug bool) {
debugMux.Lock()
defer debugMux.Unlock()
isDebug = debug
}
func GetDebugMode() bool {
debugMux.RLock()
defer debugMux.RUnlock()
return isDebug
}
func DBQueryFrpcInstanceByID(instanceID int) (FrpcInstance, error) {
dbMutex.RLock()
defer dbMutex.RUnlock()
var instance FrpcInstance
var createdAtStr string
err := frpcDB.QueryRow("SELECT id, userID, name, bootAtStart, runUser, configPath, createdAt FROM frpcInstances WHERE id = ?", instanceID).Scan(
&instance.ID, &instance.UserID, &instance.Name, &instance.BootAtStart, &instance.RunUser, &instance.ConfigPath, &createdAtStr)
if err != nil {
return instance, fmt.Errorf("failed to query frpc instance: %w", err)
}
instance.CreatedAt, _ = time.Parse(time.RFC3339, createdAtStr)
return instance, nil
}
func GetUserByID(userID int) (*User, error) {
dbMutex.RLock()
defer dbMutex.RUnlock()
var user User
err := db.QueryRow("SELECT userID, username, passwd, type, createdAt FROM userLogin WHERE userID = ?", userID).
Scan(&user.UserID, &user.Username, &user.Passwd, &user.Type, &user.CreatedAt)
if err != nil {
return nil, fmt.Errorf("failed to query user: %w", err)
}
return &user, nil
}
func GetServiceNameByInstanceID(instanceID int) (string, error) {
instance, err := DBQueryFrpcInstanceByID(instanceID)
if err != nil {
return "", fmt.Errorf("failed to query frpc instance: %w", err)
}
user, err := GetUserByID(instance.UserID)
if err != nil {
return "", fmt.Errorf("failed to get user info: %w", err)
}
serviceName := fmt.Sprintf("superfrpc_%s_%s", user.Username, instance.Name)
return serviceName, nil
}
func ParseInstanceIDFromQuery(r *http.Request) (int, error) {
instanceIDStr := r.URL.Query().Get("instanceID")
if instanceIDStr == "" {
return 0, fmt.Errorf("instanceID is required")
}
instanceID, err := strconv.Atoi(instanceIDStr)
if err != nil {
return 0, fmt.Errorf("invalid instanceID format")
}
return instanceID, nil
}
func ParseTokenFromQuery(r *http.Request) string {
return r.URL.Query().Get("token")
}