diff --git a/pulsar/table_view.go b/pulsar/table_view.go index 58a664aed7..edd3a0dde5 100644 --- a/pulsar/table_view.go +++ b/pulsar/table_view.go @@ -56,12 +56,18 @@ type TableView interface { // ContainsKey returns true if this TableView contains a mapping for the specified key. ContainsKey(key string) bool - // Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key. + // Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key or the Message cannot be encoded to the SchemaValueType Get(key string) interface{} - // Entries returns a map view of the mappings contained in this TableView. + // Message returns the Message to which the specified key is mapped, or nil if this map contains no mapping for the key. + Message(key string) Message + + // Entries returns a map view of the mappings contained in this TableView, with values encoded into SchemaValueType. Entries() map[string]interface{} + // Messages returns a map view of the Message mappings contained in this TableView. + Messages() map[string]Message + // Keys returns a slice of the keys contained in this TableView. Keys() []string diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go index 17e0b90f3b..c577ff43cf 100644 --- a/pulsar/table_view_impl.go +++ b/pulsar/table_view_impl.go @@ -39,7 +39,7 @@ type TableViewImpl struct { options TableViewOptions dataMu sync.Mutex - data map[string]interface{} + data map[string]Message readersMu sync.Mutex cancelRaders map[string]cancelReader @@ -75,7 +75,7 @@ func newTableView(client *client, options TableViewOptions) (TableView, error) { tv := TableViewImpl{ client: client, options: options, - data: make(map[string]interface{}), + data: make(map[string]Message), cancelRaders: make(map[string]cancelReader), logger: logger, closedCh: make(chan struct{}), @@ -178,6 +178,23 @@ func (tv *TableViewImpl) ContainsKey(key string) bool { } func (tv *TableViewImpl) Get(key string) interface{} { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() + msg, ok := tv.data[key] + if !ok { + return nil + } + + v, err := tv.schemaValueFromMessage(msg) + if err != nil { + tv.logger.Errorf("getting value for message, %w; msg is %v", err, msg) + return nil + } + + return v +} + +func (tv *TableViewImpl) Message(key string) Message { tv.dataMu.Lock() defer tv.dataMu.Unlock() return tv.data[key] @@ -186,10 +203,23 @@ func (tv *TableViewImpl) Get(key string) interface{} { func (tv *TableViewImpl) Entries() map[string]interface{} { tv.dataMu.Lock() defer tv.dataMu.Unlock() + data := make(map[string]interface{}, len(tv.data)) - for k, v := range tv.data { + for k, msg := range tv.data { + v, err := tv.schemaValueFromMessage(msg) + if err != nil { + tv.logger.Errorf("getting value for message, %w; msg is %v", len(tv.listeners), err, msg) + continue + } data[k] = v } + + return data +} + +func (tv *TableViewImpl) Messages() map[string]Message { + tv.dataMu.Lock() + defer tv.dataMu.Unlock() return tv.data } @@ -245,23 +275,32 @@ func (tv *TableViewImpl) handleMessage(msg Message) { tv.dataMu.Lock() defer tv.dataMu.Unlock() - payload := reflect.New(tv.options.SchemaValueType) if len(msg.Payload()) == 0 { delete(tv.data, msg.Key()) } else { - if err := msg.GetSchemaValue(payload.Interface()); err != nil { - tv.logger.Errorf("msg.GetSchemaValue() failed with %v; msg is %v", err, msg) - } - tv.data[msg.Key()] = reflect.Indirect(payload).Interface() + tv.data[msg.Key()] = msg } + v, err := tv.schemaValueFromMessage(msg) + if err != nil { + tv.logger.Errorf("will not action %d listeners, getting value for message, %w; msg is %v", len(tv.listeners), err, msg) + return + } for _, listener := range tv.listeners { - if err := listener(msg.Key(), reflect.Indirect(payload).Interface()); err != nil { + if err := listener(msg.Key(), v); err != nil { tv.logger.Errorf("table view listener failed for %v: %w", msg, err) } } } +func (tv *TableViewImpl) schemaValueFromMessage(msg Message) (interface{}, error) { + payload := reflect.New(tv.options.SchemaValueType) + if err := msg.GetSchemaValue(payload.Interface()); err != nil { + return nil, fmt.Errorf("msg.GetSchemaValue() failed: %w", err) + } + return reflect.Indirect(payload).Interface(), nil +} + func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader Reader) { for { msg, err := reader.Next(ctx) diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go index 45b9441169..cf069d1fa9 100644 --- a/pulsar/table_view_test.go +++ b/pulsar/table_view_test.go @@ -81,6 +81,72 @@ func TestTableView(t *testing.T) { } } +func TestTableView_Message(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.NoError(t, err) + defer client.Close() + + topic := newTopicName() + schema := NewStringSchema(nil) + + // Create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: schema, + }) + assert.NoError(t, err) + defer producer.Close() + + numMsg := 10 + valuePrefix := "hello table view: " + publicationTimeForKey := map[string]time.Time{} + keys := make([]string, 0, numMsg) + + for i := 0; i < numMsg; i++ { + key := fmt.Sprintf("%d", i) + keys = append(keys, key) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + _, err = producer.Send(ctx, &ProducerMessage{ + Key: key, + Value: fmt.Sprintf(valuePrefix + key), + }) + assert.NoError(t, err) + + publicationTimeForKey[key] = time.Now() + } + + // Create table view + v := "" + tv, err := client.CreateTableView(TableViewOptions{ + Topic: topic, + Schema: schema, + SchemaValueType: reflect.TypeOf(&v), + }) + assert.NoError(t, err) + defer tv.Close() + + // Wait until table view receives all messages + for tv.Size() < numMsg { + time.Sleep(time.Second * 500) + t.Logf("TableView number of elements: %d", tv.Size()) + } + + for _, k := range keys { + msg := tv.Message(k) + + // Check that the payload can be accessed as bytes + assert.Equal(t, []byte(fmt.Sprintf("%s%s", valuePrefix, k)), msg.Payload()) + + // Check publication times can be accessed and are close to the recorded times above + assert.WithinDuration(t, publicationTimeForKey[k], msg.PublishTime(), time.Millisecond*10) + } +} + func TestTableViewSchemas(t *testing.T) { var tests = []struct { name string