Skip to content

Commit

Permalink
Merge pull request #1541 from memphisdev/RND-142-an-indication-for-cl…
Browse files Browse the repository at this point in the history
…ients-need-to-update-their-sdk-version-their-language-backend-side

RND-142-an-indication-for-clients-need-to-update-their-sdk-version-their-language-backend-side
  • Loading branch information
shohamroditimemphis authored Dec 21, 2023
2 parents fe990d0 + 9243b3f commit f6638a1
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 36 deletions.
6 changes: 4 additions & 2 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2738,7 +2738,9 @@ func GetAllProducersByStationID(stationId int) ([]models.ExtendedProducer, error
s.name,
p.is_active,
COUNT(CASE WHEN p.is_active THEN 1 END) OVER (PARTITION BY p.name) AS connected_producers_count,
COUNT(CASE WHEN NOT p.is_active THEN 1 END) OVER (PARTITION BY p.name) AS disconnected_producers_count
COUNT(CASE WHEN NOT p.is_active THEN 1 END) OVER (PARTITION BY p.name) AS disconnected_producers_count,
p.version,
p.sdk
FROM producers AS p
LEFT JOIN stations AS s ON s.id = p.station_id
WHERE p.station_id = $1 AND p.type = 'application'
Expand Down Expand Up @@ -3100,7 +3102,7 @@ func GetAllConsumersByStation(stationId int) ([]models.ExtendedConsumer, error)
}
defer conn.Release()
query := `SELECT DISTINCT ON (c.name, c.consumers_group) c.id, c.name, c.updated_at, c.is_active, c.consumers_group, c.max_ack_time_ms, c.max_msg_deliveries, s.name, c.partitions,
COUNT (CASE WHEN c.is_active THEN 1 END) OVER (PARTITION BY c.name) AS count
COUNT (CASE WHEN c.is_active THEN 1 END) OVER (PARTITION BY c.name) AS count, c.version, c.sdk
FROM consumers AS c
LEFT JOIN stations AS s ON s.id = c.station_id
WHERE c.station_id = $1 AND c.type = 'application' ORDER BY c.name, c.consumers_group, c.updated_at DESC
Expand Down
43 changes: 31 additions & 12 deletions models/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ type ExtendedConsumer struct {
StationName string `json:"station_name,omitempty"`
PartitionsList []int `json:"partitions_list"`
Count int `json:"count"`
Version int `json:"version"`
Sdk string `json:"sdk"`
}

type ExtendedConsumerResponse struct {
ID int `json:"id"`
Name string `json:"name"`
UpdatedAt time.Time `json:"updated_at"`
IsActive bool `json:"is_active"`
ConsumersGroup string `json:"consumers_group"`
MaxAckTimeMs int64 `json:"max_ack_time_ms"`
MaxMsgDeliveries int `json:"max_msg_deliveries"`
StationName string `json:"station_name,omitempty"`
PartitionsList []int `json:"partitions_list"`
Count int `json:"count"`
SdkLanguage string `json:"sdk_language"`
UpdateAvailable bool `json:"update_available"`
}

type LightConsumer struct {
Expand All @@ -55,18 +72,20 @@ type LightConsumer struct {
}

type Cg struct {
Name string `json:"name"`
UnprocessedMessages int `json:"unprocessed_messages"`
PoisonMessages int `json:"poison_messages"`
IsActive bool `json:"is_active"`
InProcessMessages int `json:"in_process_messages"`
MaxAckTimeMs int64 `json:"max_ack_time_ms"`
MaxMsgDeliveries int `json:"max_msg_deliveries"`
ConnectedConsumers []ExtendedConsumer `json:"connected_consumers"`
DisconnectedConsumers []ExtendedConsumer `json:"disconnected_consumers"`
DeletedConsumers []ExtendedConsumer `json:"deleted_consumers"`
LastStatusChangeDate time.Time `json:"last_status_change_date"`
PartitionsList []int `json:"partitions_list"`
Name string `json:"name"`
UnprocessedMessages int `json:"unprocessed_messages"`
PoisonMessages int `json:"poison_messages"`
IsActive bool `json:"is_active"`
InProcessMessages int `json:"in_process_messages"`
MaxAckTimeMs int64 `json:"max_ack_time_ms"`
MaxMsgDeliveries int `json:"max_msg_deliveries"`
ConnectedConsumers []ExtendedConsumerResponse `json:"connected_consumers"`
DisconnectedConsumers []ExtendedConsumerResponse `json:"disconnected_consumers"`
DeletedConsumers []ExtendedConsumerResponse `json:"deleted_consumers"`
LastStatusChangeDate time.Time `json:"last_status_change_date"`
PartitionsList []int `json:"partitions_list"`
SdkLanguage string `json:"sdk_language"`
UpdateAvailable bool `json:"update_available"`
}

type GetAllConsumersByStationSchema struct {
Expand Down
16 changes: 16 additions & 0 deletions models/producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ type ExtendedProducer struct {
IsActive bool `json:"is_active"`
ConnectedProducersCount int `json:"connected_producers_count"`
DisconnedtedProducersCount int `json:"disconnected_producers_count"`
Version int `json:"version"`
Sdk string `json:"sdk"`
}

type ExtendedProducerResponse struct {
ID int `json:"id"`
Name string `json:"name"`
Type string `json:"type,omitempty"`
ConnectionId string `json:"connection_id,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
StationName string `json:"station_name"`
IsActive bool `json:"is_active"`
ConnectedProducersCount int `json:"connected_producers_count"`
DisconnedtedProducersCount int `json:"disconnected_producers_count"`
SdkLanguage string `json:"sdk_language"`
UpdateAvailable bool `json:"update_available"`
}

type LightProducer struct {
Expand Down
24 changes: 19 additions & 5 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,11 @@ func (s *Server) createConsumerDirect(c *client, reply string, msg []byte) {
return
}
if err != nil {
s.Errorf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error())
if strings.Contains(err.Error(), "not exist") {
s.Warnf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error())
} else {
s.Errorf("[tenant: %v][user: %v]createConsumerDirect at getSchemaUpdateInitFromStation: Consumer %v at station %v: %v", ccr.TenantName, ccr.Username, ccr.Name, ccr.StationName, err.Error())
}
respondWithRespErr(s.MemphisGlobalAccountString(), s, reply, err, &resp)
return
}
Expand Down Expand Up @@ -382,12 +386,13 @@ func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station mode
Name: consumer.ConsumersGroup,
MaxAckTimeMs: consumer.MaxAckTimeMs,
MaxMsgDeliveries: consumer.MaxMsgDeliveries,
ConnectedConsumers: []models.ExtendedConsumer{},
DisconnectedConsumers: []models.ExtendedConsumer{},
DeletedConsumers: []models.ExtendedConsumer{},
ConnectedConsumers: []models.ExtendedConsumerResponse{},
DisconnectedConsumers: []models.ExtendedConsumerResponse{},
DeletedConsumers: []models.ExtendedConsumerResponse{},
IsActive: consumer.IsActive,
LastStatusChangeDate: consumer.UpdatedAt,
PartitionsList: consumer.PartitionsList,
SdkLanguage: consumers[0].Sdk,
}
m[consumer.ConsumersGroup] = cg
} else {
Expand All @@ -397,9 +402,16 @@ func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station mode
m[consumer.ConsumersGroup].IsActive = consumer.IsActive
m[consumer.ConsumersGroup].LastStatusChangeDate = consumer.UpdatedAt
cg = m[consumer.ConsumersGroup]
cg.SdkLanguage = consumers[0].Sdk
}

needToUpdateVersion := false
if consumer.Version < lastConsumerCreationReqVersion && consumer.IsActive {
needToUpdateVersion = true
cg.UpdateAvailable = true
}

consumerRes := models.ExtendedConsumer{
consumerRes := models.ExtendedConsumerResponse{
ID: consumer.ID,
Name: consumer.Name,
IsActive: consumer.IsActive,
Expand All @@ -409,6 +421,8 @@ func (ch ConsumersHandler) GetCgsByStation(stationName StationName, station mode
StationName: consumer.StationName,
Count: consumer.Count,
PartitionsList: consumer.PartitionsList,
SdkLanguage: consumer.Sdk,
UpdateAvailable: needToUpdateVersion,
}

if consumer.IsActive {
Expand Down
12 changes: 7 additions & 5 deletions server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ var noMetricsInstalledLog bool
var noMetricsPermissionLog bool

const (
healthyStatus = "healthy"
unhealthyStatus = "unhealthy"
dangerousStatus = "dangerous"
riskyStatus = "risky"
healthyStatus = "healthy"
unhealthyStatus = "unhealthy"
dangerousStatus = "dangerous"
riskyStatus = "risky"
lastProducerCreationReqVersion = 3
lastConsumerCreationReqVersion = 3
)

func clientSetClusterConfig() error {
Expand Down Expand Up @@ -530,7 +532,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
functionsEnabled = false
}

connectedProducers, disconnectedProducers, deletedProducers := make([]models.ExtendedProducer, 0), make([]models.ExtendedProducer, 0), make([]models.ExtendedProducer, 0)
connectedProducers, disconnectedProducers, deletedProducers := make([]models.ExtendedProducerResponse, 0), make([]models.ExtendedProducerResponse, 0), make([]models.ExtendedProducerResponse, 0)
if station.IsNative {
connectedProducers, disconnectedProducers, deletedProducers, err = producersHandler.GetProducersByStation(station)
if err != nil {
Expand Down
26 changes: 16 additions & 10 deletions server/memphis_handlers_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Server) createProducerDirectCommon(c *client, pName, pType, pConnection
var created bool
station, created, err = CreateDefaultStation(user.TenantName, s, pStationName, user.ID, user.Username, "", 0)
if err != nil {
if strings.Contains(err.Error(), "already exists") || strings.Contains(err.Error(), "max amount") {
if strings.Contains(err.Error(), "already exists") || strings.Contains(err.Error(), "max amount") {
serv.Warnf("[tenant: %v][user: %v]createProducerDirectCommon at CreateDefaultStation: creating default station error - producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error())
} else {
serv.Errorf("[tenant: %v][user: %v]createProducerDirectCommon at CreateDefaultStation: creating default station error - producer %v at station %v: %v", user.TenantName, user.Username, pName, pStationName.external, err.Error())
Expand Down Expand Up @@ -256,30 +256,36 @@ func (s *Server) createProducerDirect(c *client, reply string, msg []byte) {
respondWithResp(s.MemphisGlobalAccountString(), s, reply, &resp)
}

func (ph ProducersHandler) GetProducersByStation(station models.Station) ([]models.ExtendedProducer, []models.ExtendedProducer, []models.ExtendedProducer, error) { // for socket io endpoint
func (ph ProducersHandler) GetProducersByStation(station models.Station) ([]models.ExtendedProducerResponse, []models.ExtendedProducerResponse, []models.ExtendedProducerResponse, error) { // for socket io endpoint
producers, err := db.GetAllProducersByStationID(station.ID)
if err != nil {
return []models.ExtendedProducer{}, []models.ExtendedProducer{}, []models.ExtendedProducer{}, err
return []models.ExtendedProducerResponse{}, []models.ExtendedProducerResponse{}, []models.ExtendedProducerResponse{}, err
}

var connectedProducers []models.ExtendedProducer
var disconnectedProducers []models.ExtendedProducer
var deletedProducers []models.ExtendedProducer
var connectedProducers []models.ExtendedProducerResponse
var disconnectedProducers []models.ExtendedProducerResponse
var deletedProducers []models.ExtendedProducerResponse
producersNames := []string{}

for _, producer := range producers {
if slices.Contains(producersNames, producer.Name) {
continue
}
needToUpdateVersion := false
if producer.Version < lastProducerCreationReqVersion && producer.IsActive {
needToUpdateVersion = true
}

producerExtendedRes := models.ExtendedProducer{
producerExtendedRes := models.ExtendedProducerResponse{
ID: producer.ID,
Name: producer.Name,
StationName: producer.StationName,
UpdatedAt: producer.UpdatedAt,
IsActive: producer.IsActive,
DisconnedtedProducersCount: producer.DisconnedtedProducersCount,
ConnectedProducersCount: producer.ConnectedProducersCount,
SdkLanguage: producer.Sdk,
UpdateAvailable: needToUpdateVersion,
}

producersNames = append(producersNames, producer.Name)
Expand All @@ -291,15 +297,15 @@ func (ph ProducersHandler) GetProducersByStation(station models.Station) ([]mode
}

if len(connectedProducers) == 0 {
connectedProducers = []models.ExtendedProducer{}
connectedProducers = []models.ExtendedProducerResponse{}
}

if len(disconnectedProducers) == 0 {
disconnectedProducers = []models.ExtendedProducer{}
disconnectedProducers = []models.ExtendedProducerResponse{}
}

if len(deletedProducers) == 0 {
deletedProducers = []models.ExtendedProducer{}
deletedProducers = []models.ExtendedProducerResponse{}
}

sort.Slice(connectedProducers, func(i, j int) bool {
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers_schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func (sh SchemasHandler) CreateNewVersion(c *gin.Context) {
schemaContent := body.SchemaContent
err = validateSchemaContent(schemaContent, schema.Type)
if err != nil {
serv.Errorf("[tenant: %v][user: %v]CreateNewVersion at validateSchemaContent: Schema %v: %v", user.TenantName, user.Username, body.SchemaName, err.Error())
serv.Warnf("[tenant: %v][user: %v]CreateNewVersion at validateSchemaContent: Schema %v: %v", user.TenantName, user.Username, body.SchemaName, err.Error())
c.AbortWithStatusJSON(SCHEMA_VALIDATION_ERROR_STATUS_CODE, gin.H{"message": err.Error()})
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/memphis_handlers_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func memphisWSGetStationOverviewData(s *Server, h *Handlers, stationName string,
functionsEnabled = false
}

connectedProducers, disconnectedProducers, deletedProducers := make([]models.ExtendedProducer, 0), make([]models.ExtendedProducer, 0), make([]models.ExtendedProducer, 0)
connectedProducers, disconnectedProducers, deletedProducers := make([]models.ExtendedProducerResponse, 0), make([]models.ExtendedProducerResponse, 0), make([]models.ExtendedProducerResponse, 0)
if station.IsNative {
connectedProducers, disconnectedProducers, deletedProducers, err = h.Producers.GetProducersByStation(station)
if err != nil {
Expand Down

0 comments on commit f6638a1

Please sign in to comment.