@@ -80,6 +80,13 @@ int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesr
80
80
((DefaultMQPullConsumer *) consumer)->setNamesrvAddr (namesrv);
81
81
return OK;
82
82
}
83
+ int SetPullConsumerNameServerDomain (CPullConsumer *consumer, const char *domain) {
84
+ if (consumer == NULL ) {
85
+ return NULL_POINTER;
86
+ }
87
+ ((DefaultMQPullConsumer *) consumer)->setNamesrvDomain (domain);
88
+ return OK;
89
+ }
83
90
int SetPullConsumerSessionCredentials (CPullConsumer *consumer, const char *accessKey, const char *secretKey,
84
91
const char *channel) {
85
92
if (consumer == NULL ) {
@@ -119,24 +126,26 @@ int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, C
119
126
return NULL_POINTER;
120
127
}
121
128
unsigned int index = 0 ;
129
+ CMessageQueue *temMQ = NULL ;
122
130
std::vector<MQMessageQueue> fullMQ;
123
131
try {
124
132
((DefaultMQPullConsumer *) consumer)->fetchSubscribeMessageQueues (topic, fullMQ);
125
133
*size = fullMQ.size ();
126
134
// Alloc memory to save the pointer to CPP MessageQueue, and the MessageQueues may be changed.
127
135
// Thus, this memory should be released by users using @ReleaseSubscribeMessageQueue every time.
128
- *mqs = (CMessageQueue *) malloc (*size * sizeof (CMessageQueue));
129
- if (*mqs == NULL ) {
136
+ temMQ = (CMessageQueue *) malloc (*size * sizeof (CMessageQueue));
137
+ if (temMQ == NULL ) {
130
138
*size = 0 ;
131
139
*mqs = NULL ;
132
140
return MALLOC_FAILED;
133
141
}
134
142
auto iter = fullMQ.begin ();
135
143
for (index = 0 ; iter != fullMQ.end () && index <= fullMQ.size (); ++iter, index++) {
136
- strncpy (mqs [index]-> topic , iter->getTopic ().c_str (), MAX_TOPIC_LENGTH - 1 );
137
- strncpy (mqs [index]-> brokerName , iter->getBrokerName ().c_str (), MAX_BROKER_NAME_ID_LENGTH - 1 );
138
- mqs [index]-> queueId = iter->getQueueId ();
144
+ strncpy (temMQ [index]. topic , iter->getTopic ().c_str (), MAX_TOPIC_LENGTH - 1 );
145
+ strncpy (temMQ [index]. brokerName , iter->getBrokerName ().c_str (), MAX_BROKER_NAME_ID_LENGTH - 1 );
146
+ temMQ [index]. queueId = iter->getQueueId ();
139
147
}
148
+ *mqs = temMQ;
140
149
} catch (MQException &e) {
141
150
*size = 0 ;
142
151
*mqs = NULL ;
@@ -160,7 +169,7 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression
160
169
PullResult cppPullResult;
161
170
try {
162
171
cppPullResult = ((DefaultMQPullConsumer *) consumer)->pull (messageQueue, subExpression, offset, maxNums);
163
- }catch (exception &e){
172
+ } catch (exception &e) {
164
173
cppPullResult.pullStatus = BROKER_TIMEOUT;
165
174
}
166
175
@@ -171,11 +180,13 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression
171
180
pullResult.minOffset = cppPullResult.minOffset ;
172
181
pullResult.nextBeginOffset = cppPullResult.nextBeginOffset ;
173
182
pullResult.size = cppPullResult.msgFoundList .size ();
183
+ PullResult *tmpPullResult = new PullResult (cppPullResult);
184
+ pullResult.pData = tmpPullResult;
174
185
// Alloc memory to save the pointer to CPP MQMessageExt, which will be release by the CPP SDK core.
175
186
// Thus, this memory should be released by users using @ReleasePullResult
176
187
pullResult.msgFoundList = (CMessageExt **) malloc (pullResult.size * sizeof (CMessageExt *));
177
188
for (size_t i = 0 ; i < cppPullResult.msgFoundList .size (); i++) {
178
- MQMessageExt *msg = const_cast <MQMessageExt *>(&cppPullResult. msgFoundList [i]);
189
+ MQMessageExt *msg = const_cast <MQMessageExt *>(&tmpPullResult-> msgFoundList [i]);
179
190
pullResult.msgFoundList [i] = (CMessageExt *) (msg);
180
191
}
181
192
break ;
@@ -204,9 +215,16 @@ Pull(CPullConsumer *consumer, const CMessageQueue *mq, const char *subExpression
204
215
return pullResult;
205
216
}
206
217
int ReleasePullResult (CPullResult pullResult) {
207
- if (pullResult.size == 0 || pullResult.msgFoundList == NULL ) {
218
+ if (pullResult.size == 0 || pullResult.msgFoundList == NULL || pullResult. pData == NULL ) {
208
219
return NULL_POINTER;
209
220
}
221
+ if (pullResult.pData != NULL ) {
222
+ try {
223
+ delete ((PullResult *) pullResult.pData );
224
+ } catch (exception &e) {
225
+ return NULL_POINTER;
226
+ }
227
+ }
210
228
free ((void *) pullResult.msgFoundList );
211
229
pullResult.msgFoundList = NULL ;
212
230
return OK;
0 commit comments