Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion add-event.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func AddEvent(ctx context.Context, relay Relay, evt *nostr.Event) (accepted bool
}
}

notifyListeners(evt)
if srv, ok := getServer(ctx); ok {
srv.notifyListeners(evt)
} else {
BroadcastEvent(evt)
}

return true, ""
}
Expand Down
4 changes: 2 additions & 2 deletions broadcasting.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import (
"github.com/nbd-wtf/go-nostr"
)

func BroadcastEvent(evt *nostr.Event) {
notifyListeners(evt)
func (s *Server) BroadcastEvent(evt *nostr.Event) {
s.notifyListeners(evt)
}
14 changes: 13 additions & 1 deletion extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package relayer

import "context"

const AUTH_CONTEXT_KEY = iota
const (
AUTH_CONTEXT_KEY = iota
SERVER_CONTEXT_KEY
)

func GetAuthStatus(ctx context.Context) (pubkey string, ok bool) {
value := ctx.Value(AUTH_CONTEXT_KEY)
Expand All @@ -14,3 +17,12 @@ func GetAuthStatus(ctx context.Context) (pubkey string, ok bool) {
}
return "", false
}

func getServer(ctx context.Context) (*Server, bool) {
value := ctx.Value(SERVER_CONTEXT_KEY)
if value == nil {
return nil, false
}
srv, ok := value.(*Server)
return srv, ok
}
9 changes: 5 additions & 4 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (s *Server) doEvent(ctx context.Context, ws *WebSocket, request []json.RawM
}
}

notifyListeners(&evt)
s.notifyListeners(&evt)
ws.WriteJSON(nostr.OKEnvelope{EventID: evt.ID, OK: true})
return ""
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func (s *Server) doReq(ctx context.Context, ws *WebSocket, request []json.RawMes
}

ws.WriteJSON(nostr.EOSEEnvelope(id))
setListener(id, ws, filters)
s.setListener(id, ws, filters)
return ""
}

Expand All @@ -261,7 +261,7 @@ func (s *Server) doClose(ctx context.Context, ws *WebSocket, request []json.RawM
return "CLOSE has no <id>"
}

removeListenerId(ws, id)
s.removeListenerId(ws, id)
return ""
}

Expand Down Expand Up @@ -304,6 +304,7 @@ func (s *Server) handleMessage(ctx context.Context, ws *WebSocket, message []byt
json.Unmarshal(request[0], &typ)

ctx = context.WithValue(ctx, AUTH_CONTEXT_KEY, ws)
ctx = context.WithValue(ctx, SERVER_CONTEXT_KEY, s)

switch typ {
case "EVENT":
Expand Down Expand Up @@ -405,7 +406,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
if _, ok := s.clients[conn]; ok {
conn.Close()
delete(s.clients, conn)
removeListener(ws)
s.removeListener(ws)
}
s.clientsMu.Unlock()
s.Log.Infof("disconnected from %s", ip)
Expand Down
116 changes: 64 additions & 52 deletions listener.go
Original file line number Diff line number Diff line change
@@ -1,82 +1,85 @@
package relayer

import (
"sync"

"github.com/nbd-wtf/go-nostr"
)
import "github.com/nbd-wtf/go-nostr"

type Listener struct {
filters nostr.Filters
}

var (
listeners = make(map[*WebSocket]map[string]*Listener)
listenersMutex = sync.RWMutex{}
)

func GetListeningFilters() nostr.Filters {
listenersMutex.RLock()
defer listenersMutex.RUnlock()
respfilters := make(nostr.Filters, 0, len(listeners)*2)
serversMutex.RLock()
defer serversMutex.RUnlock()

respfilters := make(nostr.Filters, 0, len(servers)*2)
for srv := range servers {
respfilters = appendDistinctFilters(respfilters, srv.GetListeningFilters())
}

return respfilters
}

func (s *Server) GetListeningFilters() nostr.Filters {
s.listenersMu.RLock()
defer s.listenersMu.RUnlock()
respfilters := make(nostr.Filters, 0, len(s.listeners)*2)

// here we go through all the existing listeners
for _, connlisteners := range listeners {
for _, connlisteners := range s.listeners {
for _, listener := range connlisteners {
for _, listenerfilter := range listener.filters {
for _, respfilter := range respfilters {
// check if this filter specifically is already added to respfilters
if nostr.FilterEqual(listenerfilter, respfilter) {
goto nextconn
}
}

// field not yet present on respfilters, add it
respfilters = append(respfilters, listenerfilter)

// continue to the next filter
nextconn:
continue
}
respfilters = appendDistinctFilters(respfilters, listener.filters)
}
}

// respfilters will be a slice with all the distinct filter we currently have active
return respfilters
}

func setListener(id string, ws *WebSocket, filters nostr.Filters) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
func appendDistinctFilters(dst nostr.Filters, src nostr.Filters) nostr.Filters {
for _, listenerfilter := range src {
duplicate := false
for _, respfilter := range dst {
if nostr.FilterEqual(listenerfilter, respfilter) {
duplicate = true
break
}
}
if !duplicate {
dst = append(dst, listenerfilter)
}
}
return dst
}

subs, ok := listeners[ws]
func (s *Server) setListener(id string, ws *WebSocket, filters nostr.Filters) {
s.listenersMu.Lock()
defer s.listenersMu.Unlock()

subs, ok := s.listeners[ws]
if !ok {
subs = make(map[string]*Listener)
listeners[ws] = subs
s.listeners[ws] = subs
}

subs[id] = &Listener{filters: filters}
}

// Remove a specific subscription id from listeners for a given ws client
func removeListenerId(ws *WebSocket, id string) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
func (s *Server) removeListenerId(ws *WebSocket, id string) {
s.listenersMu.Lock()
defer s.listenersMu.Unlock()

if subs, ok := listeners[ws]; ok {
delete(listeners[ws], id)
if subs, ok := s.listeners[ws]; ok {
delete(s.listeners[ws], id)
if len(subs) == 0 {
delete(listeners, ws)
delete(s.listeners, ws)
}
}
}

// Remove WebSocket conn from listeners
func removeListener(ws *WebSocket) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
clear(listeners[ws])
delete(listeners, ws)
func (s *Server) removeListener(ws *WebSocket) {
s.listenersMu.Lock()
defer s.listenersMu.Unlock()
clear(s.listeners[ws])
delete(s.listeners, ws)
}

type listenerDelivery struct {
Expand All @@ -85,10 +88,10 @@ type listenerDelivery struct {
event nostr.Event
}

func notifyListeners(event *nostr.Event) {
listenersMutex.RLock()
deliveries := make([]listenerDelivery, 0, len(listeners))
for ws, subs := range listeners {
func (s *Server) notifyListeners(event *nostr.Event) {
s.listenersMu.RLock()
deliveries := make([]listenerDelivery, 0, len(s.listeners))
for ws, subs := range s.listeners {
for id, listener := range subs {
if !listener.filters.Match(event) {
continue
Expand All @@ -100,10 +103,19 @@ func notifyListeners(event *nostr.Event) {
})
}
}
listenersMutex.RUnlock()
s.listenersMu.RUnlock()

for _, delivery := range deliveries {
delivery := delivery
delivery.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &delivery.subID, Event: delivery.event})
}
}

func BroadcastEvent(evt *nostr.Event) {
serversMutex.RLock()
defer serversMutex.RUnlock()

for srv := range servers {
srv.notifyListeners(evt)
}
}
Loading
Loading