Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 276 additions & 0 deletions bridgev2/matrix/accountdatasync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
// Copyright (c) 2024 Tulir Asokan
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package matrix

import (
"bytes"
"context"
"encoding/json"
"errors"
"maps"
"sync"
"time"

"github.com/rs/zerolog"

"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)

// accountDataSyncer runs a /sync loop as a double-puppeted user to receive
// room account data changes (m.marked_unread, m.tag, com.beeper.mute).
// This is necessary because Synapse does not forward account data events
// to appservices via the transaction API.
type accountDataSyncer struct {
br *Connector
userID id.UserID
client *mautrix.Client
log zerolog.Logger

cancelSync context.CancelFunc
syncDone sync.WaitGroup

// recentlySet tracks account data content recently set by the bridge itself
// to avoid infinite loops. Key is "roomID:eventType", value is the last
// JSON content that was set. When a sync event arrives with matching content,
// it is recognized as a bridge echo and skipped.
recentlySet map[string]*sentAccountData
recentlySetLock sync.Mutex
}

type sentAccountData struct {
content json.RawMessage
sentAt time.Time
}

var accountDataTypes = []event.Type{
event.AccountDataMarkedUnread,
event.AccountDataRoomTags,
event.AccountDataBeeperMute,
}

func (br *Connector) StartAccountDataSync(userID id.UserID, source *mautrix.Client) {
br.stopAccountDataSync(userID)

// Create a separate client for the sync loop so we don't interfere
// with the double puppet intent's client.
syncClient, err := mautrix.NewClient(source.HomeserverURL.String(), userID, source.AccessToken)
if err != nil {
br.Bridge.Log.Err(err).Stringer("user_id", userID).Msg("Failed to create account data sync client")
return
}
syncClient.SetAppServiceUserID = source.SetAppServiceUserID

syncer := &accountDataSyncer{
br: br,
userID: userID,
client: syncClient,
log: br.Bridge.Log.With().Str("component", "account_data_sync").Stringer("user_id", userID).Logger(),
recentlySet: make(map[string]*sentAccountData),
}

br.accountDataSyncersLock.Lock()
br.accountDataSyncers[userID] = syncer
br.accountDataSyncersLock.Unlock()

syncer.Start()
}

func (br *Connector) stopAccountDataSync(userID id.UserID) {
br.accountDataSyncersLock.Lock()
existing, ok := br.accountDataSyncers[userID]
if ok {
delete(br.accountDataSyncers, userID)
}
br.accountDataSyncersLock.Unlock()
if ok {
existing.Stop()
}
}

func (br *Connector) StopAccountDataSync(userID id.UserID) {
br.stopAccountDataSync(userID)
}

func (br *Connector) StopAllAccountDataSyncers() {
br.accountDataSyncersLock.Lock()
syncers := make(map[id.UserID]*accountDataSyncer, len(br.accountDataSyncers))
maps.Copy(syncers, br.accountDataSyncers)
br.accountDataSyncers = make(map[id.UserID]*accountDataSyncer)
br.accountDataSyncersLock.Unlock()
for _, syncer := range syncers {
syncer.Stop()
}
}

// MarkAccountDataSent records that the bridge set account data for a room,
// so the sync loop can skip the resulting event to avoid loops.
// The content parameter is the JSON-serialized content that was set.
func (br *Connector) MarkAccountDataSent(userID id.UserID, roomID id.RoomID, evtType string, content json.RawMessage) {
br.accountDataSyncersLock.Lock()
syncer, ok := br.accountDataSyncers[userID]
br.accountDataSyncersLock.Unlock()
if ok {
syncer.markSent(roomID, evtType, content)
}
}

func (s *accountDataSyncer) markSent(roomID id.RoomID, evtType string, content json.RawMessage) {
key := string(roomID) + ":" + evtType
s.recentlySetLock.Lock()
s.recentlySet[key] = &sentAccountData{content: content, sentAt: time.Now()}
s.recentlySetLock.Unlock()
}

func (s *accountDataSyncer) wasRecentlySent(roomID id.RoomID, evtType string, incomingContent json.RawMessage) bool {
key := string(roomID) + ":" + evtType
s.recentlySetLock.Lock()
defer s.recentlySetLock.Unlock()
stored, ok := s.recentlySet[key]
if !ok {
return false
}
// Clean up entries older than 10 minutes to prevent memory leaks.
if time.Since(stored.sentAt) >= 10*time.Minute {
delete(s.recentlySet, key)
return false
}
// If content was provided when marking, compare it with the incoming event.
// This correctly handles delayed sync deliveries regardless of timing.
if stored.content != nil {
return bytes.Equal(stored.content, incomingContent)
}
// Fallback for event types where content isn't tracked (e.g. tags):
// use time-based dedup within a short window.
return time.Since(stored.sentAt) < 30*time.Second
}

func (s *accountDataSyncer) Start() {
s.syncDone.Add(1)
go s.syncLoop()
}

func (s *accountDataSyncer) Stop() {
if s.cancelSync != nil {
s.cancelSync()
}
s.syncDone.Wait()
}

func (s *accountDataSyncer) syncLoop() {
defer s.syncDone.Done()
s.log.Debug().Msg("Starting account data sync loop")
var ctx context.Context
ctx, s.cancelSync = context.WithCancel(context.Background())

syncer := &accountDataSyncHandler{
parent: s,
isFirstSync: true,
}
s.client.Syncer = syncer
s.client.SyncPresence = "offline"

err := s.client.SyncWithContext(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
s.log.Err(err).Msg("Account data sync loop exited with error")
} else {
s.log.Debug().Msg("Account data sync loop stopped")
}
}

type accountDataSyncHandler struct {
parent *accountDataSyncer
isFirstSync bool
}

func (h *accountDataSyncHandler) ProcessResponse(ctx context.Context, resp *mautrix.RespSync, since string) error {
if since == "" {
// Skip the initial sync — it contains the full current state,
// not changes we need to react to.
h.isFirstSync = false
return nil
}
if h.isFirstSync {
h.isFirstSync = false
return nil
}

for roomID, roomData := range resp.Rooms.Join {
for _, evt := range roomData.AccountData.Events {
h.handleAccountDataEvent(ctx, roomID, evt)
}
}
return nil
}

func (h *accountDataSyncHandler) handleAccountDataEvent(ctx context.Context, roomID id.RoomID, evt *event.Event) {
// Only handle known account data types
isKnown := false
for _, t := range accountDataTypes {
if evt.Type.Type == t.Type {
isKnown = true
break
}
}
if !isKnown {
return
}

// Skip events that the bridge itself set (loop avoidance).
// Compare raw content bytes to identify bridge echoes regardless of timing.
if h.parent.wasRecentlySent(roomID, evt.Type.Type, evt.Content.VeryRaw) {
h.parent.log.Debug().
Stringer("room_id", roomID).
Str("event_type", evt.Type.Type).
Msg("Skipping account data event that was recently set by bridge")
return
}

evt.RoomID = roomID
evt.Sender = h.parent.userID
evt.Type.Class = event.AccountDataEventType

err := evt.Content.ParseRaw(evt.Type)
if err != nil {
h.parent.log.Warn().Err(err).
Stringer("room_id", roomID).
Str("event_type", evt.Type.Type).
Msg("Failed to parse account data event content")
return
}

h.parent.log.Debug().
Stringer("room_id", roomID).
Str("event_type", evt.Type.Type).
Msg("Dispatching account data event from double puppet sync")
h.parent.br.Bridge.QueueMatrixEvent(ctx, evt)
}

func (h *accountDataSyncHandler) OnFailedSync(_ *mautrix.RespSync, err error) (time.Duration, error) {
if errors.Is(err, mautrix.MUnknownToken) {
h.parent.log.Warn().Msg("Account data sync got M_UNKNOWN_TOKEN, stopping")
return 0, err
}
h.parent.log.Err(err).Msg("Account data sync failed, retrying in 30 seconds")
return 30 * time.Second, nil
}

func (h *accountDataSyncHandler) GetFilterJSON(_ id.UserID) *mautrix.Filter {
everything := []event.Type{{Type: "*"}}
return &mautrix.Filter{
Presence: &mautrix.FilterPart{NotTypes: everything},
AccountData: &mautrix.FilterPart{NotTypes: everything},
Room: &mautrix.RoomFilter{
IncludeLeave: false,
Ephemeral: &mautrix.FilterPart{NotTypes: everything},
AccountData: &mautrix.FilterPart{Types: accountDataTypes},
State: &mautrix.FilterPart{NotTypes: everything},
Timeline: &mautrix.FilterPart{NotTypes: everything},
},
}
}
9 changes: 9 additions & 0 deletions bridgev2/matrix/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type Connector struct {

doublePuppetIntents *exsync.Map[id.UserID, *appservice.IntentAPI]

accountDataSyncers map[id.UserID]*accountDataSyncer
accountDataSyncersLock sync.Mutex

deterministicEventIDServer string

MediaConfig mautrix.RespMediaConfig
Expand Down Expand Up @@ -157,6 +160,10 @@ func (br *Connector) Init(bridge *bridgev2.Bridge) {
br.EventProcessor.On(event.BeeperAcceptMessageRequest, br.handleRoomEvent)
br.EventProcessor.On(event.EphemeralEventReceipt, br.handleEphemeralEvent)
br.EventProcessor.On(event.EphemeralEventTyping, br.handleEphemeralEvent)
br.EventProcessor.On(event.AccountDataMarkedUnread, br.handleRoomEvent)
br.EventProcessor.On(event.AccountDataRoomTags, br.handleRoomEvent)
br.EventProcessor.On(event.AccountDataBeeperMute, br.handleRoomEvent)
br.accountDataSyncers = make(map[id.UserID]*accountDataSyncer)
br.Bot = br.AS.BotIntent()
br.Crypto = NewCryptoHelper(br)
br.Bridge.Commands.(*commands.Processor).AddHandlers(
Expand Down Expand Up @@ -324,6 +331,7 @@ func (br *Connector) PreStop() {
}

func (br *Connector) Stop() {
br.StopAllAccountDataSyncers()
br.EventProcessor.Stop()
if br.Crypto != nil {
br.Crypto.Stop()
Expand Down Expand Up @@ -621,6 +629,7 @@ func (br *Connector) NewUserIntent(ctx context.Context, userID id.UserID, access
return nil, accessToken, err
}
br.doublePuppetIntents.Set(userID, intent)
br.StartAccountDataSync(userID, intent.Client)
return &ASIntent{Connector: br, Matrix: intent}, newToken, nil
}

Expand Down
15 changes: 15 additions & 0 deletions bridgev2/matrix/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func (as *ASIntent) MarkRead(ctx context.Context, roomID id.RoomID, eventID id.E
req.FullyRead = eventID
req.BeeperFullyReadExtra = extraData
}
if as.Matrix.IsCustomPuppet {
markedUnreadContent, _ := json.Marshal(&event.MarkedUnreadEventContent{Unread: false})
as.Connector.MarkAccountDataSent(as.Matrix.UserID, roomID, event.AccountDataMarkedUnread.Type, markedUnreadContent)
}
if as.Matrix.IsCustomPuppet && as.Connector.SpecVersions.Supports(mautrix.BeeperFeatureInboxState) && as.Connector.Config.Homeserver.Software != bridgeconfig.SoftwareHungry {
err = as.Matrix.SetBeeperInboxState(ctx, roomID, &mautrix.ReqSetBeeperInboxState{
//MarkedUnread: ptr.Ptr(false),
Expand All @@ -168,6 +172,10 @@ func (as *ASIntent) MarkUnread(ctx context.Context, roomID id.RoomID, unread boo
if as.Connector.Config.Homeserver.Software == bridgeconfig.SoftwareHungry {
return nil
}
if as.Matrix.IsCustomPuppet {
markedUnreadContent, _ := json.Marshal(&event.MarkedUnreadEventContent{Unread: unread})
as.Connector.MarkAccountDataSent(as.Matrix.UserID, roomID, event.AccountDataMarkedUnread.Type, markedUnreadContent)
}
if as.Matrix.IsCustomPuppet && as.Connector.SpecVersions.Supports(mautrix.BeeperFeatureInboxState) {
return as.Matrix.SetBeeperInboxState(ctx, roomID, &mautrix.ReqSetBeeperInboxState{
MarkedUnread: ptr.Ptr(unread),
Expand Down Expand Up @@ -709,6 +717,9 @@ func (as *ASIntent) DeleteRoom(ctx context.Context, roomID id.RoomID, puppetsOnl
}

func (as *ASIntent) TagRoom(ctx context.Context, roomID id.RoomID, tag event.RoomTag, isTagged bool) error {
if as.Matrix.IsCustomPuppet {
as.Connector.MarkAccountDataSent(as.Matrix.UserID, roomID, event.AccountDataRoomTags.Type, nil)
}
tags, err := as.Matrix.GetTags(ctx, roomID)
if err != nil {
return fmt.Errorf("failed to get room tags: %w", err)
Expand Down Expand Up @@ -745,6 +756,10 @@ func (as *ASIntent) MuteRoom(ctx context.Context, roomID id.RoomID, until time.T
} else {
mutedUntil = until.UnixMilli()
}
if as.Matrix.IsCustomPuppet {
muteContent, _ := json.Marshal(&event.BeeperMuteEventContent{MutedUntil: mutedUntil})
as.Connector.MarkAccountDataSent(as.Matrix.UserID, roomID, event.AccountDataBeeperMute.Type, muteContent)
}
if as.Connector.SpecVersions.Supports(mautrix.BeeperFeatureAccountDataMute) {
return as.Matrix.SetRoomAccountData(ctx, roomID, event.AccountDataBeeperMute.Type, &event.BeeperMuteEventContent{
MutedUntil: mutedUntil,
Expand Down
9 changes: 9 additions & 0 deletions bridgev2/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func (br *Bridge) GetExistingUserByMXID(ctx context.Context, userID id.UserID) (
return br.unlockedGetUserByMXID(ctx, userID, true)
}

// AccountDataSyncStopper is an optional interface that MatrixConnector implementations
// can implement to stop account data sync loops when double puppeting is disabled.
type AccountDataSyncStopper interface {
StopAccountDataSync(userID id.UserID)
}

func (user *User) LogoutDoublePuppet(ctx context.Context) {
user.doublePuppetLock.Lock()
defer user.doublePuppetLock.Unlock()
Expand All @@ -109,6 +115,9 @@ func (user *User) LogoutDoublePuppet(ctx context.Context) {
}
user.doublePuppetIntent = nil
user.doublePuppetInitialized = false
if stopper, ok := user.Bridge.Matrix.(AccountDataSyncStopper); ok {
stopper.StopAccountDataSync(user.MXID)
}
}

func (user *User) LoginDoublePuppet(ctx context.Context, token string) error {
Expand Down