update broker
This commit is contained in:
@@ -6,167 +6,194 @@ import (
|
||||
"fmt"
|
||||
"html/template"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/google/uuid"
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
Event string
|
||||
Data any
|
||||
Event string `json:"event"`
|
||||
Data any `json:"data"`
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
ID string
|
||||
Send chan Message
|
||||
Close chan struct{}
|
||||
IsActive bool
|
||||
ID string
|
||||
Send chan Message
|
||||
Close chan struct{}
|
||||
}
|
||||
|
||||
type SSEBroker struct {
|
||||
clients map[string]*Client
|
||||
mu sync.RWMutex
|
||||
ticker *time.Ticker
|
||||
done chan struct{}
|
||||
clients map[string]*Client
|
||||
mu sync.RWMutex
|
||||
register chan *Client
|
||||
unregister chan *Client
|
||||
broadcast chan Message
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func newSSEBroker() *SSEBroker {
|
||||
s := &SSEBroker{
|
||||
clients: make(map[string]*Client),
|
||||
ticker: time.NewTicker(time.Minute),
|
||||
done: make(chan struct{}),
|
||||
b := &SSEBroker{
|
||||
clients: make(map[string]*Client),
|
||||
register: make(chan *Client),
|
||||
unregister: make(chan *Client),
|
||||
broadcast: make(chan Message),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
go s.run()
|
||||
return s
|
||||
go b.run()
|
||||
return b
|
||||
}
|
||||
|
||||
func (s *SSEBroker) run() {
|
||||
func (b *SSEBroker) run() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ticker.C:
|
||||
s.Broadcast("ticker")
|
||||
case <-s.done:
|
||||
s.ticker.Stop()
|
||||
case client := <-b.register:
|
||||
b.mu.Lock()
|
||||
b.clients[client.ID] = client
|
||||
b.mu.Unlock()
|
||||
slog.Info("client registered", "client_id", client.ID)
|
||||
slog.Debug("clients", "clients", b.clients)
|
||||
|
||||
case client := <-b.unregister:
|
||||
b.mu.Lock()
|
||||
if _, ok := b.clients[client.ID]; ok {
|
||||
delete(b.clients, client.ID)
|
||||
close(client.Send)
|
||||
close(client.Close)
|
||||
slog.Info("client unregistered", "client_id", client.ID)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
|
||||
case message := <-b.broadcast:
|
||||
b.mu.RLock()
|
||||
for _, client := range b.clients {
|
||||
select {
|
||||
case client.Send <- message:
|
||||
default:
|
||||
// Si el canal está lleno, se desconecta el cliente
|
||||
close(client.Send)
|
||||
close(client.Close)
|
||||
b.mu.RUnlock()
|
||||
b.mu.Lock()
|
||||
delete(b.clients, client.ID)
|
||||
b.mu.Unlock()
|
||||
b.mu.RLock()
|
||||
slog.Warn("client removed due to full channel", "client_id", client.ID)
|
||||
}
|
||||
}
|
||||
b.mu.RUnlock()
|
||||
|
||||
case <-b.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SSEBroker) registerClient(id string) *Client {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
func (b *SSEBroker) RegisterClient(clientID string) *Client {
|
||||
client := &Client{
|
||||
ID: id,
|
||||
Send: make(chan Message, 100),
|
||||
Close: make(chan struct{}),
|
||||
IsActive: true,
|
||||
ID: clientID,
|
||||
Send: make(chan Message, 10),
|
||||
Close: make(chan struct{}),
|
||||
}
|
||||
s.clients[id] = client
|
||||
b.register <- client
|
||||
return client
|
||||
}
|
||||
|
||||
func (s *SSEBroker) unregisterClient(id string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if client, exists := s.clients[id]; exists {
|
||||
close(client.Close)
|
||||
delete(s.clients, id)
|
||||
func (b *SSEBroker) UnregisterClient(clientID string) {
|
||||
b.mu.RLock()
|
||||
if client, ok := b.clients[clientID]; ok {
|
||||
b.mu.RUnlock()
|
||||
b.unregister <- client
|
||||
} else {
|
||||
b.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SSEBroker) Broadcast(event string, data ...any) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
var dataValue any
|
||||
func (b *SSEBroker) Broadcast(event string, data ...any) {
|
||||
var payload any
|
||||
if len(data) > 0 {
|
||||
dataValue = data[0]
|
||||
}
|
||||
|
||||
message := Message{
|
||||
Event: event,
|
||||
Data: dataValue,
|
||||
}
|
||||
|
||||
for _, client := range s.clients {
|
||||
if client.IsActive {
|
||||
select {
|
||||
case client.Send <- message:
|
||||
default:
|
||||
slog.Warn("client channel full, skipping message", "client_id", client.ID)
|
||||
}
|
||||
}
|
||||
payload = data[0]
|
||||
}
|
||||
b.broadcast <- Message{Event: event, Data: payload}
|
||||
}
|
||||
|
||||
func (s *SSEBroker) HandleSSE(w http.ResponseWriter, r *http.Request) {
|
||||
slog.Debug("SSE called")
|
||||
func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error {
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("Transfer-Encoding", "chunked")
|
||||
c.Set("Content-Type", "text/event-stream")
|
||||
c.Set("Cache-Control", "no-cache")
|
||||
c.Set("Connection", "keep-alive")
|
||||
c.Set("Transfer-Encoding", "chunked")
|
||||
|
||||
clientID := uuid.New().String()
|
||||
client := s.registerClient(clientID)
|
||||
defer s.unregisterClient(clientID)
|
||||
client := b.RegisterClient(clientID)
|
||||
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
|
||||
slog.Info("SSE connection established", "client_id", clientID)
|
||||
|
||||
slog.Info("SSE connection established", "client_id", clientID)
|
||||
defer func() {
|
||||
b.UnregisterClient(clientID)
|
||||
slog.Info("SSE connection closed", "client_id", clientID)
|
||||
}()
|
||||
|
||||
fmt.Fprintf(w, "event: connection\ndata: %s\n\n", clientID)
|
||||
flusher.Flush()
|
||||
fmt.Fprintf(w, "event: client_id\n")
|
||||
fmt.Fprintf(w, "data: %s\n\n", clientID)
|
||||
w.Flush()
|
||||
|
||||
clientGone := r.Context().Done()
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case message := <-client.Send:
|
||||
slog.Info("message", "message", message)
|
||||
var data string
|
||||
switch v := message.Data.(type) {
|
||||
case string:
|
||||
data = v
|
||||
case template.HTML:
|
||||
data = string(v)
|
||||
default:
|
||||
jsonData, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
slog.Error("error marshaling message", "error", err)
|
||||
continue
|
||||
for {
|
||||
select {
|
||||
case message := <-client.Send:
|
||||
var data string
|
||||
switch v := message.Data.(type) {
|
||||
case string:
|
||||
data = v
|
||||
case template.HTML:
|
||||
data = string(v)
|
||||
default:
|
||||
jsonData, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
slog.Error("error marshaling message", "error", err)
|
||||
continue
|
||||
}
|
||||
data = string(jsonData)
|
||||
}
|
||||
data = string(jsonData)
|
||||
}
|
||||
fmt.Fprintf(w, "event: %s\n", message.Event)
|
||||
|
||||
scanner := bufio.NewScanner(strings.NewReader(data))
|
||||
for scanner.Scan() {
|
||||
fmt.Fprintf(w, "data: %s\n", scanner.Text())
|
||||
}
|
||||
fmt.Fprintf(w, "event: %s\n", message.Event)
|
||||
scanner := bufio.NewScanner(strings.NewReader(data))
|
||||
for scanner.Scan() {
|
||||
fmt.Fprintf(w, "data: %s\n", scanner.Text())
|
||||
}
|
||||
fmt.Fprint(w, "\n")
|
||||
|
||||
fmt.Fprint(w, "\n")
|
||||
case <-client.Close:
|
||||
slog.Info("client closed", "client_id", clientID)
|
||||
return
|
||||
case <-clientGone:
|
||||
slog.Info("client gone", "client_id", clientID)
|
||||
return
|
||||
if err := w.Flush(); err != nil {
|
||||
slog.Warn("flush error, closing client", "client_id", clientID)
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
fmt.Fprintf(w, "event: ping\n")
|
||||
fmt.Fprintf(w, "data: %s\n\n", "ping")
|
||||
|
||||
case <-client.Close:
|
||||
return
|
||||
|
||||
default:
|
||||
err := w.Flush()
|
||||
if err != nil {
|
||||
slog.Warn("error while flushing", "error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
|
||||
flusher.Flush()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SSEBroker) Shutdown() {
|
||||
close(s.done)
|
||||
func (b *SSEBroker) Shutdown() {
|
||||
close(b.done)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user