From ae9cd762e034e7b0c20803db4584a9c91db2ec73 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 24 Sep 2019 11:16:24 +0800 Subject: [PATCH] Support set max cache size for consumer. --- include/CCommon.h | 4 +++- include/CPushConsumer.h | 4 +++- src/extern/CPushConsumer.cpp | 13 +++++++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/include/CCommon.h b/include/CCommon.h index a765184c5..391b28946 100644 --- a/include/CCommon.h +++ b/include/CCommon.h @@ -45,7 +45,9 @@ typedef enum _CStatus_ { PULLCONSUMER_ERROR_CODE_START = 30, PULLCONSUMER_START_FAILED = 30, PULLCONSUMER_FETCH_MQ_FAILED = 31, - PULLCONSUMER_FETCH_MESSAGE_FAILED = 32 + PULLCONSUMER_FETCH_MESSAGE_FAILED = 32, + + Not_Support = 500 } CStatus; typedef enum _CLogLevel_ { diff --git a/include/CPushConsumer.h b/include/CPushConsumer.h index 10b55873b..0369a807f 100644 --- a/include/CPushConsumer.h +++ b/include/CPushConsumer.h @@ -18,8 +18,8 @@ #ifndef __C_PUSH_CONSUMER_H__ #define __C_PUSH_CONSUMER_H__ -#include "CMessageExt.h" #include "CCommon.h" +#include "CMessageExt.h" #ifdef __cplusplus extern "C" { @@ -56,6 +56,8 @@ ROCKETMQCLIENT_API int SetPushConsumerLogPath(CPushConsumer* consumer, const cha ROCKETMQCLIENT_API int SetPushConsumerLogFileNumAndSize(CPushConsumer* consumer, int fileNum, long fileSize); ROCKETMQCLIENT_API int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLevel level); ROCKETMQCLIENT_API int SetPushConsumerMessageModel(CPushConsumer* consumer, CMessageModel messageModel); +ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSize(CPushConsumer* consumer, int maxCacheSize); +ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSizeInMb(CPushConsumer* consumer, int maxCacheSizeInMb); #ifdef __cplusplus }; diff --git a/src/extern/CPushConsumer.cpp b/src/extern/CPushConsumer.cpp index 621eabde7..66ef93757 100644 --- a/src/extern/CPushConsumer.cpp +++ b/src/extern/CPushConsumer.cpp @@ -229,7 +229,20 @@ int SetPushConsumerMessageBatchMaxSize(CPushConsumer* consumer, int batchSize) { ((DefaultMQPushConsumer*)consumer)->setConsumeMessageBatchMaxSize(batchSize); return OK; } +int SetPushConsumerMaxCacheMessageSize(CPushConsumer* consumer, int maxCacheSize) { + if (consumer == NULL || maxCacheSize <= 0) { + return NULL_POINTER; + } + ((DefaultMQPushConsumer*)consumer)->setMaxCacheMsgSizePerQueue(maxCacheSize); + return OK; +} +int SetPushConsumerMaxCacheMessageSizeInMb(CPushConsumer* consumer, int maxCacheSizeInMb) { + if (consumer == NULL || maxCacheSizeInMb <= 0) { + return NULL_POINTER; + } + return Not_Support; +} int SetPushConsumerInstanceName(CPushConsumer* consumer, const char* instanceName) { if (consumer == NULL) { return NULL_POINTER;