@@ -7,7 +7,6 @@ import { ErrorResponse, OkResponse } from './responses.js'
7
7
import type { PubSub } from '@libp2p/interfaces/pubsub'
8
8
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
9
9
import { pushable } from 'it-pushable'
10
- import { CustomEvent } from '@libp2p/interfaces'
11
10
import { logger } from '@libp2p/logger'
12
11
13
12
const log = logger ( 'libp2p:daemon-server:pubsub' )
@@ -42,10 +41,15 @@ export class PubSubOperations {
42
41
async * subscribe ( topic : string ) {
43
42
try {
44
43
const onMessage = pushable < Uint8Array > ( )
44
+ this . pubsub . subscribe ( topic )
45
45
46
- await this . pubsub . addEventListener ( topic , ( evt ) => {
46
+ await this . pubsub . addEventListener ( 'message' , ( evt ) => {
47
47
const msg = evt . detail
48
48
49
+ if ( msg . topic !== topic ) {
50
+ return
51
+ }
52
+
49
53
onMessage . push ( PSMessage . encode ( {
50
54
from : msg . from . toBytes ( ) ,
51
55
data : msg . data ,
@@ -66,7 +70,7 @@ export class PubSubOperations {
66
70
67
71
async * publish ( topic : string , data : Uint8Array ) {
68
72
try {
69
- this . pubsub . dispatchEvent ( new CustomEvent ( topic , { detail : data } ) )
73
+ this . pubsub . publish ( topic , data )
70
74
yield OkResponse ( )
71
75
} catch ( err : any ) {
72
76
log . error ( err )
0 commit comments