Skip to content

Commit 94d97a0

Browse files
authored
Export send batch messages api in c style
2 parents 8b6cb07 + d403cd4 commit 94d97a0

File tree

5 files changed

+197
-2
lines changed

5 files changed

+197
-2
lines changed

example/CBatchProducer.c

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include <stdio.h>
19+
#include "CBatchMessage.h"
20+
#include "CCommon.h"
21+
#include "CMessage.h"
22+
#include "CProducer.h"
23+
#include "CSendResult.h"
24+
25+
void StartSendMessage(CProducer* producer) {
26+
int i = 0;
27+
int ret_code = 0;
28+
char body[128];
29+
CBatchMessage* batchMessage = CreateBatchMessage("T_TestTopic");
30+
31+
for (i = 0; i < 10; i++) {
32+
CMessage* msg = CreateMessage("T_TestTopic");
33+
SetMessageTags(msg, "Test_Tag");
34+
SetMessageKeys(msg, "Test_Keys");
35+
memset(body, 0, sizeof(body));
36+
snprintf(body, sizeof(body), "new message body, index %d", i);
37+
SetMessageBody(msg, body);
38+
AddMessage(batchMessage, msg);
39+
}
40+
CSendResult result;
41+
int ok = SendBatchMessage(producer, batchMessage, &result);
42+
printf("SendBatchMessage is %s .....\n", ok == 0 ? "Success" : ok == 11 ? "FAILED" : " It is null value");
43+
DestroyBatchMessage(batchMessage);
44+
}
45+
46+
void CreateProducerAndStartSendMessage() {
47+
printf("Producer Initializing.....\n");
48+
CProducer* producer = CreateProducer("Group_producer");
49+
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
50+
StartProducer(producer);
51+
printf("Producer start.....\n");
52+
StartSendMessage(producer);
53+
ShutdownProducer(producer);
54+
DestroyProducer(producer);
55+
printf("Producer Shutdown!\n");
56+
}
57+
58+
int main(int argc, char* argv[]) {
59+
printf("Send Batch.....\n");
60+
CreateProducerAndStartSendMessage();
61+
return 0;
62+
}

include/CBatchMessage.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#ifndef __C_BATCHMESSAGE_H__
19+
#define __C_BATCHMESSAGE_H__
20+
#include "CCommon.h"
21+
#include "CMessage.h"
22+
23+
#ifdef __cplusplus
24+
extern "C" {
25+
#endif
26+
27+
typedef struct CBatchMessage CBatchMessage;
28+
29+
ROCKETMQCLIENT_API CBatchMessage* CreateBatchMessage();
30+
ROCKETMQCLIENT_API int AddMessage(CBatchMessage* batchMsg, CMessage* msg);
31+
ROCKETMQCLIENT_API int DestroyBatchMessage(CBatchMessage* batchMsg);
32+
33+
#ifdef __cplusplus
34+
};
35+
#endif
36+
#endif //__C_BATCHMESSAGE_H__

include/CProducer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#ifndef __C_PRODUCER_H__
1919
#define __C_PRODUCER_H__
2020

21+
#include "CBatchMessage.h"
2122
#include "CMessage.h"
2223
#include "CSendResult.h"
2324
#include "CMQException.h"
@@ -53,6 +54,7 @@ ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer* producer, int level);
5354
ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size);
5455

5556
ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result);
57+
ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result);
5658
ROCKETMQCLIENT_API int SendMessageAsync(CProducer* producer,
5759
CMessage* msg,
5860
CSendSuccessCallback cSendSuccessCallback,
@@ -79,4 +81,4 @@ ROCKETMQCLIENT_API int SendMessageOrderlyAsync(CProducer* producer,
7981
#ifdef __cplusplus
8082
};
8183
#endif
82-
#endif //__C_PRODUCER_H__
84+
#endif //__C_PRODUCER_H__

src/extern/CBatchMessage.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include <vector>
19+
20+
#include "CBatchMessage.h"
21+
#include "CCommon.h"
22+
#include "CMessage.h"
23+
#include "MQMessage.h"
24+
25+
using std::vector;
26+
27+
#ifdef __cplusplus
28+
extern "C" {
29+
#endif
30+
31+
using namespace rocketmq;
32+
33+
CBatchMessage* CreateBatchMessage() {
34+
vector<MQMessage>* msgs = new vector<MQMessage>();
35+
return (CBatchMessage*)msgs;
36+
}
37+
38+
int AddMessage(CBatchMessage* batchMsg, CMessage* msg) {
39+
if (msg == NULL) {
40+
return NULL_POINTER;
41+
}
42+
if (batchMsg == NULL) {
43+
return NULL_POINTER;
44+
}
45+
MQMessage* message = (MQMessage*)msg;
46+
((vector<MQMessage>*)batchMsg)->push_back(*message);
47+
return OK;
48+
}
49+
int DestroyBatchMessage(CBatchMessage* batchMsg) {
50+
if (batchMsg == NULL) {
51+
return NULL_POINTER;
52+
}
53+
delete (vector<MQMessage>*)batchMsg;
54+
return OK;
55+
}
56+
57+
#ifdef __cplusplus
58+
};
59+
#endif

src/extern/CProducer.cpp

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "DefaultMQProducer.h"
1919
#include "AsyncCallback.h"
20+
#include "CBatchMessage.h"
2021
#include "CProducer.h"
2122
#include "CCommon.h"
2223
#include "CSendResult.h"
@@ -156,6 +157,41 @@ int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result) {
156157
return OK;
157158
}
158159

160+
int SendBatchMessage(CProducer* producer, CBatchMessage* batcMsg, CSendResult* result) {
161+
// CSendResult sendResult;
162+
if (producer == NULL || batcMsg == NULL || result == NULL) {
163+
return NULL_POINTER;
164+
}
165+
try {
166+
DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer;
167+
vector<MQMessage>* message = (vector<MQMessage>*)batcMsg;
168+
SendResult sendResult = defaultMQProducer->send(*message);
169+
switch (sendResult.getSendStatus()) {
170+
case SEND_OK:
171+
result->sendStatus = E_SEND_OK;
172+
break;
173+
case SEND_FLUSH_DISK_TIMEOUT:
174+
result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT;
175+
break;
176+
case SEND_FLUSH_SLAVE_TIMEOUT:
177+
result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT;
178+
break;
179+
case SEND_SLAVE_NOT_AVAILABLE:
180+
result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE;
181+
break;
182+
default:
183+
result->sendStatus = E_SEND_OK;
184+
break;
185+
}
186+
result->offset = sendResult.getQueueOffset();
187+
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
188+
result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
189+
} catch (exception& e) {
190+
return PRODUCER_SEND_SYNC_FAILED;
191+
}
192+
return OK;
193+
}
194+
159195
int SendMessageAsync(CProducer* producer,
160196
CMessage* msg,
161197
CSendSuccessCallback cSendSuccessCallback,
@@ -346,4 +382,4 @@ int SetProducerMaxMessageSize(CProducer* producer, int size) {
346382
}
347383
#ifdef __cplusplus
348384
};
349-
#endif
385+
#endif

0 commit comments

Comments
 (0)