Compare commits

5 Commits

Author SHA1 Message Date
pedro 3175c5c23a add sse handle with std lib 2025-07-22 00:11:49 +02:00
pedro 62d3553e7a change ticker interval 2025-07-22 00:08:40 +02:00
pedro 8670442dbc update broker 2025-07-22 00:04:52 +02:00
pedro 941deaf9df add router switch and pgx changes 2025-05-29 13:06:48 +02:00
pedro 48fe13eecc router improvements 2025-05-29 13:06:38 +02:00
6 changed files with 266 additions and 111 deletions
+30 -15
View File
@@ -64,6 +64,9 @@ type Config struct {
// default map[string]DatabaseConfig{}
Databases map[string]DatabaseConfig
// default false
CreateRouter bool
// default false
CreateSSEBroker bool
@@ -87,9 +90,9 @@ type App struct {
}
type Paseto struct {
AsymmetricKey paseto.V4AsymmetricSecretKey
PublicKey paseto.V4AsymmetricPublicKey
Duration time.Duration
SecretKey paseto.V4AsymmetricSecretKey
PublicKey paseto.V4AsymmetricPublicKey
Duration time.Duration
}
func NewApp(config ...Config) *App {
@@ -101,12 +104,15 @@ func NewApp(config ...Config) *App {
Timezone: "UTC",
Paseto: nil,
Databases: make(map[string]DatabaseConfig),
CreateRouter: false,
CreateSSEBroker: false,
CreateSession: false,
CreateMailer: false,
CreateTemplates: false,
}
slog.Debug("NewApp", "config", cfg)
if len(config) > 0 {
cfg = config[0]
if cfg.LogLevel == slog.LevelDebug {
@@ -162,11 +168,11 @@ func NewApp(config ...Config) *App {
var ak paseto.V4AsymmetricSecretKey
var err error
if os.Getenv("PASETO_ASYMMETRIC_KEY") != "" {
slog.Debug("using paseto asymmetric key from env")
ak, err = paseto.NewV4AsymmetricSecretKeyFromHex(os.Getenv("PASETO_ASYMMETRIC_KEY"))
if os.Getenv("PASETO_SECRET_KEY") != "" {
slog.Debug("using paseto secret key from env")
ak, err = paseto.NewV4AsymmetricSecretKeyFromHex(os.Getenv("PASETO_SECRET_KEY"))
if err != nil {
slog.Error("error creating asymmetric key", "error", err)
slog.Error("error creating secret key", "error", err)
ak = paseto.NewV4AsymmetricSecretKey()
}
} else {
@@ -187,15 +193,24 @@ func NewApp(config ...Config) *App {
}
cfg.Paseto = &Paseto{
AsymmetricKey: ak,
PublicKey: pk,
Duration: duration,
SecretKey: ak,
PublicKey: pk,
Duration: duration,
}
}
app := &App{
config: cfg,
Router: newRouter(),
app := &App{config: cfg}
if cfg.CreateRouter {
app.Router = newRouter()
}
// Create PGX pools automatically if there are entries in Databases with driver 'pgx'
for dbName, dbConfig := range cfg.Databases {
if dbConfig.DriverName == "pgx" {
slog.Debug("creating pgx pool", "database", dbName)
app.newPGXPool(dbName)
}
}
slog.Info(
@@ -215,7 +230,7 @@ func NewApp(config ...Config) *App {
)
if cfg.EnvMode != EnvironmentProduction {
slog.Debug("paseto_assymetric_key", "key", cfg.Paseto.AsymmetricKey.ExportHex())
slog.Debug("paseto_secret_key", "key", cfg.Paseto.SecretKey.ExportHex())
}
if cfg.CreateSSEBroker {
@@ -367,7 +382,7 @@ func newLogger(level slog.Level) {
mw := io.MultiWriter(os.Stdout, f)
logger := slog.New(slog.NewTextHandler(mw, &slog.HandlerOptions{
AddSource: true,
AddSource: os.Getenv("ENV_MODE") == "" || os.Getenv("ENV_MODE") == "development",
Level: level,
}))
+167 -86
View File
@@ -11,128 +11,169 @@ import (
"sync"
"time"
"github.com/gofiber/fiber/v2"
"github.com/google/uuid"
"github.com/valyala/fasthttp"
)
type Message struct {
Event string
Data any
Event string `json:"event"`
Data any `json:"data"`
}
type Client struct {
ID string
Send chan Message
Close chan struct{}
IsActive bool
ID string
Send chan Message
Close chan struct{}
}
type SSEBroker struct {
clients map[string]*Client
mu sync.RWMutex
ticker *time.Ticker
done chan struct{}
clients map[string]*Client
mu sync.RWMutex
register chan *Client
unregister chan *Client
broadcast chan Message
done chan struct{}
}
func newSSEBroker() *SSEBroker {
s := &SSEBroker{
clients: make(map[string]*Client),
ticker: time.NewTicker(time.Minute),
done: make(chan struct{}),
b := &SSEBroker{
clients: make(map[string]*Client),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan Message),
done: make(chan struct{}),
}
go s.run()
return s
go b.run()
return b
}
func (s *SSEBroker) run() {
func (b *SSEBroker) run() {
for {
select {
case <-s.ticker.C:
s.Broadcast("ticker")
case <-s.done:
s.ticker.Stop()
case client := <-b.register:
b.mu.Lock()
b.clients[client.ID] = client
b.mu.Unlock()
slog.Info("client registered", "client_id", client.ID)
slog.Debug("clients", "clients", b.clients)
case client := <-b.unregister:
b.mu.Lock()
if _, ok := b.clients[client.ID]; ok {
delete(b.clients, client.ID)
close(client.Send)
close(client.Close)
slog.Info("client unregistered", "client_id", client.ID)
}
b.mu.Unlock()
case message := <-b.broadcast:
b.mu.RLock()
for _, client := range b.clients {
select {
case client.Send <- message:
default:
// Si el canal está lleno, se desconecta el cliente
close(client.Send)
close(client.Close)
b.mu.RUnlock()
b.mu.Lock()
delete(b.clients, client.ID)
b.mu.Unlock()
b.mu.RLock()
slog.Warn("client removed due to full channel", "client_id", client.ID)
}
}
b.mu.RUnlock()
case <-b.done:
return
}
}
}
func (s *SSEBroker) registerClient(id string) *Client {
s.mu.Lock()
defer s.mu.Unlock()
func (b *SSEBroker) RegisterClient(clientID string) *Client {
client := &Client{
ID: id,
Send: make(chan Message, 100),
Close: make(chan struct{}),
IsActive: true,
ID: clientID,
Send: make(chan Message, 10),
Close: make(chan struct{}),
}
s.clients[id] = client
b.register <- client
return client
}
func (s *SSEBroker) unregisterClient(id string) {
s.mu.Lock()
defer s.mu.Unlock()
if client, exists := s.clients[id]; exists {
close(client.Close)
delete(s.clients, id)
func (b *SSEBroker) UnregisterClient(clientID string) {
b.mu.RLock()
if client, ok := b.clients[clientID]; ok {
b.mu.RUnlock()
b.unregister <- client
} else {
b.mu.RUnlock()
}
}
func (s *SSEBroker) Broadcast(event string, data ...any) {
s.mu.RLock()
defer s.mu.RUnlock()
var dataValue any
func (b *SSEBroker) Broadcast(event string, data ...any) {
var payload any
if len(data) > 0 {
dataValue = data[0]
}
message := Message{
Event: event,
Data: dataValue,
}
for _, client := range s.clients {
if client.IsActive {
select {
case client.Send <- message:
default:
slog.Warn("client channel full, skipping message", "client_id", client.ID)
}
}
payload = data[0]
}
b.broadcast <- Message{Event: event, Data: payload}
}
func (s *SSEBroker) HandleSSE(w http.ResponseWriter, r *http.Request) {
slog.Debug("SSE called")
// Interface para abstraer las diferencias entre Fiber y HTTP estándar
type SSEWriter interface {
Write(data []byte) (int, error)
Flush() error
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
// Wrapper para http.ResponseWriter
type httpSSEWriter struct {
w http.ResponseWriter
}
clientID := uuid.New().String()
client := s.registerClient(clientID)
defer s.unregisterClient(clientID)
func (h *httpSSEWriter) Write(data []byte) (int, error) {
return h.w.Write(data)
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
func (h *httpSSEWriter) Flush() error {
if flusher, ok := h.w.(http.Flusher); ok {
flusher.Flush()
}
return nil
}
// Wrapper para bufio.Writer
type fiberSSEWriter struct {
w *bufio.Writer
}
func (f *fiberSSEWriter) Write(data []byte) (int, error) {
return f.w.Write(data)
}
func (f *fiberSSEWriter) Flush() error {
return f.w.Flush()
}
func (b *SSEBroker) handleSSEConnection(clientID string, client *Client, writer SSEWriter) {
slog.Info("SSE connection established", "client_id", clientID)
fmt.Fprintf(w, "event: connection\ndata: %s\n\n", clientID)
flusher.Flush()
defer func() {
b.UnregisterClient(clientID)
slog.Info("SSE connection closed", "client_id", clientID)
}()
clientGone := r.Context().Done()
fmt.Fprintf(writer, "event: client_id\n")
fmt.Fprintf(writer, "data: %s\n\n", clientID)
writer.Flush()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case message := <-client.Send:
slog.Info("message", "message", message)
var data string
switch v := message.Data.(type) {
case string:
@@ -147,26 +188,66 @@ func (s *SSEBroker) HandleSSE(w http.ResponseWriter, r *http.Request) {
}
data = string(jsonData)
}
fmt.Fprintf(w, "event: %s\n", message.Event)
fmt.Fprintf(writer, "event: %s\n", message.Event)
scanner := bufio.NewScanner(strings.NewReader(data))
for scanner.Scan() {
fmt.Fprintf(w, "data: %s\n", scanner.Text())
fmt.Fprintf(writer, "data: %s\n", scanner.Text())
}
fmt.Fprint(writer, "\n")
if err := writer.Flush(); err != nil {
slog.Warn("flush error, closing client", "client_id", clientID)
return
}
fmt.Fprint(w, "\n")
case <-ticker.C:
fmt.Fprintf(writer, "event: ping\n")
fmt.Fprintf(writer, "data: %s\n\n", "ping")
writer.Flush()
case <-client.Close:
slog.Info("client closed", "client_id", clientID)
return
case <-clientGone:
slog.Info("client gone", "client_id", clientID)
return
}
flusher.Flush()
}
}
func (s *SSEBroker) Shutdown() {
close(s.done)
func (b *SSEBroker) HandleFiberCtxSSE(c *fiber.Ctx) error {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
clientID := uuid.New().String()
client := b.RegisterClient(clientID)
c.Status(fiber.StatusOK).Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
writer := &fiberSSEWriter{w: w}
b.handleSSEConnection(clientID, client, writer)
}))
return nil
}
func (b *SSEBroker) HandleHTTPSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
clientID := uuid.New().String()
client := b.RegisterClient(clientID)
writer := &httpSSEWriter{w: w}
go func() {
b.handleSSEConnection(clientID, client, writer)
}()
<-r.Context().Done()
slog.Info("client disconnected", "client_id", clientID)
}
func (b *SSEBroker) Shutdown() {
close(b.done)
}
+10
View File
@@ -5,21 +5,31 @@ go 1.24
require (
github.com/alexedwards/scs/v2 v2.8.0
github.com/go-sql-driver/mysql v1.9.2
github.com/gofiber/fiber/v2 v2.52.9
github.com/golang-migrate/migrate/v4 v4.18.3
github.com/google/uuid v1.6.0
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgx/v5 v5.7.4
github.com/stretchr/testify v1.10.0
github.com/valyala/fasthttp v1.51.0
)
require (
aidanwoods.dev/go-result v0.3.1 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+23
View File
@@ -10,6 +10,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/alexedwards/scs/v2 v2.8.0 h1:h31yUYoycPuL0zt14c0gd+oqxfRwIj6SOjHdKRZxhEw=
github.com/alexedwards/scs/v2 v2.8.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gvmdl7h3RRj8=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -31,6 +33,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sql-driver/mysql v1.9.2 h1:4cNKDYQ1I84SXslGddlsrMhc8k4LeDVj6Ad6WRjiHuU=
github.com/go-sql-driver/mysql v1.9.2/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw=
github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-migrate/migrate/v4 v4.18.3 h1:EYGkoOsvgHHfm5U/naS1RP/6PL/Xv3S4B/swMiAmDLs=
@@ -61,12 +65,21 @@ github.com/jackc/pgx/v5 v5.7.4 h1:9wKznZrhWa2QiHL+NjTSPP6yjl3451BX3imWDnokYlg=
github.com/jackc/pgx/v5 v5.7.4/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
@@ -81,6 +94,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -89,6 +104,12 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA=
github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
@@ -103,6 +124,8 @@ golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
+22 -9
View File
@@ -20,7 +20,10 @@ var (
pgxMutex sync.RWMutex
)
func (a *App) NewPGXPool(name string) *pgxpool.Pool {
func (a *App) newPGXPool(name string) *pgxpool.Pool {
slog.Debug("newPGXPool", "name", name, "datasource", a.Datasource(name))
pgxMutex.Lock()
defer pgxMutex.Unlock()
@@ -44,22 +47,32 @@ func (a *App) NewPGXPool(name string) *pgxpool.Pool {
return dbPool
}
func (a *App) GetPGXPool(name string) (*pgxpool.Pool, bool) {
func (a *App) GetPGXPool(name string) *pgxpool.Pool {
pgxMutex.RLock()
defer pgxMutex.RUnlock()
pool, exists := pgxPools[name]
return pool, exists
if !exists {
slog.Error("database connection not found", "name", name)
return nil
}
return pool
}
func (a *App) ClosePGXPools() {
func (a *App) ClosePGXPool(name string) {
pgxMutex.Lock()
defer pgxMutex.Unlock()
for name, pool := range pgxPools {
pool.Close()
delete(pgxPools, name)
slog.Info("closed database connection", "name", name)
pool, exists := pgxPools[name]
if !exists {
slog.Error("database connection not found", "name", name)
return
}
pool.Close()
delete(pgxPools, name)
slog.Info("closed database connection", "name", name)
}
func NumericToFloat64(n pgtype.Numeric) float64 {
@@ -90,7 +103,7 @@ func NumericToInt64(n pgtype.Numeric) int64 {
func FloatToNumeric(number float64, precision int) (value pgtype.Numeric) {
parse := strconv.FormatFloat(number, 'f', precision, 64)
slog.Info("parse", "parse", parse)
slog.Debug("parse", "parse", parse)
if err := value.Scan(parse); err != nil {
slog.Error("error scanning numeric", "error", err)
+14 -1
View File
@@ -29,12 +29,25 @@ func (r *Router) Use(mw ...Middleware) {
}
}
func (r *Router) Group(fn func(r *Router)) {
func (r *Router) Group(basePath string, fn func(r *Router)) {
sub := &Router{
ServeMux: r.ServeMux,
routeChain: slices.Clone(r.routeChain),
isSub: true,
}
// Añadir middleware para manejar el basePath
sub.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if !strings.HasPrefix(req.URL.Path, basePath) {
http.NotFound(w, req)
return
}
req.URL.Path = strings.TrimPrefix(req.URL.Path, basePath)
next.ServeHTTP(w, req)
})
})
fn(sub)
}