Skip to content

Commit

Permalink
Merge pull request #18 from SolaceDev/dev
Browse files Browse the repository at this point in the history
Release v1.6.0
  • Loading branch information
cjwmorgan-sol authored Apr 29, 2024
2 parents a0760fb + bb4c4fc commit 73d5d85
Show file tree
Hide file tree
Showing 27 changed files with 6,188 additions and 44 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ jobs:
check-latest: true
- name: Check Go Version
run: go version
- name: run go tool staticcheck
# use pinned version of staticcheck this need to match with the go version for compatibility
# Compatibility with go version is listed in the release description of https://github.com/dominikh/go-tools/releases
# need at least version v0.4.4 for go version 1.21
run: |
go install honnef.co/go/tools/cmd/staticcheck@v0.4.6
staticcheck -checks=all ./...
- name: Compiles
if: ${{ success() }}
run: go build ./...

- name: Runs go fmt
Expand Down
14 changes: 14 additions & 0 deletions internal/ccsmp/ccsmp_callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "solclient/solClient.h"
#include "solclient/solClientMsg.h"
#include "./ccsmp_helper.h"

solClient_rxMsgCallback_returnCode_t
messageReceiveCallback(solClient_opaqueSession_pt opaqueSession_p, solClient_opaqueMsg_pt msg_p, void *user_p)
Expand All @@ -30,6 +31,19 @@ messageReceiveCallback(solClient_opaqueSession_pt opaqueSession_p, solClient_opa
return goMessageReceiveCallback(opaqueSession_p, msg_p, user_p);
}

solClient_rxMsgCallback_returnCode_t
requestResponseReplyMessageReceiveCallback(solClient_opaqueSession_pt opaqueSession_p, solClient_opaqueMsg_pt msg_p, void *user_p) {
solClient_rxMsgCallback_returnCode_t goReplyMessageReceiveCallback(solClient_opaqueSession_pt, solClient_opaqueMsg_pt, void *, char *);
char * correlationId = NULL;
// when receiving message that is not a reply deliver to subscription dispatch
if ( SOLCLIENT_OK != solClientgo_msg_isRequestReponseMsg(msg_p, &correlationId) ) {
// discard any message that is not a reply message
// note any subscription that matches the replyto topic will get an independent dispatch callback
return SOLCLIENT_CALLBACK_OK;
}
return goReplyMessageReceiveCallback(opaqueSession_p, msg_p, user_p, correlationId);
}

solClient_rxMsgCallback_returnCode_t
defaultMessageReceiveCallback(solClient_opaqueSession_pt opaqueSession_p, solClient_opaqueMsg_pt msg_p, void *user_p)
{
Expand Down
150 changes: 144 additions & 6 deletions internal/ccsmp/ccsmp_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package ccsmp
#include "./ccsmp_helper.h"
solClient_rxMsgCallback_returnCode_t messageReceiveCallback ( solClient_opaqueSession_pt opaqueSession_p, solClient_opaqueMsg_pt msg_p, void *user_p );
solClient_rxMsgCallback_returnCode_t requestResponseReplyMessageReceiveCallback ( solClient_opaqueSession_pt opaqueSession_p, solClient_opaqueMsg_pt msg_p, void *user_p );
solClient_rxMsgCallback_returnCode_t defaultMessageReceiveCallback ( solClient_opaqueSession_pt opaqueSession_p, solClient_opaqueMsg_pt msg_p, void *user_p );
void eventCallback ( solClient_opaqueSession_pt opaqueSession_p, solClient_session_eventCallbackInfo_pt eventInfo_p, void *user_p );
void handleLogCallback(solClient_log_callbackInfo_pt logInfo_p, void *user_p);
Expand Down Expand Up @@ -69,6 +70,9 @@ type SolClientSessionRxMsgDispatchFuncInfo = C.solClient_session_rxMsgDispatchFu
// SolClientVersionInfo is assigned a value
type SolClientVersionInfo = C.solClient_version_info_t

// SolClientCorrelationID is assigned a value
type SolClientCorrelationID = *C.char

// Reexport various CCSMP variables

// SolClientPropEnableVal is assigned a value
Expand All @@ -77,16 +81,26 @@ var SolClientPropEnableVal = C.SOLCLIENT_PROP_ENABLE_VAL
// SolClientPropDisableVal is assigned a value
var SolClientPropDisableVal = C.SOLCLIENT_PROP_DISABLE_VAL

// Reexport solclientgo variables

// SolClientGoPropCorrelationPrefix property value
// var SolClientGoPropCorrelationPrefix = C.GoString(C.SOLCLIENTGO_REPLY_CORRELATION_PREFIX)
var SolClientGoPropCorrelationPrefix = C.SOLCLIENTGO_REPLY_CORRELATION_PREFIX

// Callbacks

// SolClientMessageCallback is assigned a function
type SolClientMessageCallback = func(msgP SolClientMessagePt, userP unsafe.Pointer) bool

// SolClientReplyMessageCallback assigned a function
type SolClientReplyMessageCallback = func(msgP SolClientMessagePt, userP unsafe.Pointer, correlationP string) bool

// SolClientSessionEventCallback is assigned a function
type SolClientSessionEventCallback = func(sessionEvent SolClientSessionEvent, responseCode SolClientResponseCode, info string, correlationP unsafe.Pointer, userP unsafe.Pointer)

// maps to callbacks
var sessionToRXCallbackMap sync.Map
var sessionToReplyRXCallbackMap sync.Map
var sessionToEventCallbackMap sync.Map

//export goMessageReceiveCallback
Expand All @@ -101,6 +115,19 @@ func goMessageReceiveCallback(sessionP SolClientSessionPt, msgP SolClientMessage
return C.SOLCLIENT_CALLBACK_OK
}

//export goReplyMessageReceiveCallback
func goReplyMessageReceiveCallback(sessionP SolClientSessionPt, msgP SolClientMessagePt, userP unsafe.Pointer, correlationIDP SolClientCorrelationID) C.solClient_rxMsgCallback_returnCode_t {
// propagate to request reponse reply message handler
if callback, ok := sessionToReplyRXCallbackMap.Load(sessionP); ok {
if callback.(SolClientReplyMessageCallback)(msgP, userP, C.GoString(correlationIDP)) {
return C.SOLCLIENT_CALLBACK_TAKE_MSG
}
return C.SOLCLIENT_CALLBACK_OK
}
logging.Default.Error("Received reply message from core API without an associated session callback")
return C.SOLCLIENT_CALLBACK_OK
}

//export goDefaultMessageReceiveCallback
func goDefaultMessageReceiveCallback(sessionP SolClientSessionPt, msgP SolClientMessagePt, userP unsafe.Pointer) C.solClient_rxMsgCallback_returnCode_t {
logging.Default.Error("Received message from core API on the default session callback")
Expand Down Expand Up @@ -207,6 +234,19 @@ func (session *SolClientSession) SetMessageCallback(callback SolClientMessageCal
return nil
}

// SetReplyMessageCallback sets the message callback to use
func (session *SolClientSession) SetReplyMessageCallback(callback SolClientReplyMessageCallback) error {
if session == nil || session.pointer == nil {
return fmt.Errorf("could not set message receive callback for nil session")
}
if callback == nil {
sessionToReplyRXCallbackMap.Delete(session.pointer)
} else {
sessionToReplyRXCallbackMap.Store(session.pointer, callback)
}
return nil
}

// SetEventCallback sets the event callback to use
func (session *SolClientSession) SetEventCallback(callback SolClientSessionEventCallback) error {
if session == nil || session.pointer == nil {
Expand Down Expand Up @@ -289,6 +329,7 @@ func (session *SolClientSession) SolClientSessionDestroy() *SolClientErrorInfoWr
// last line of defence to make sure everything is cleaned up
sessionToEventCallbackMap.Delete(session.pointer)
sessionToRXCallbackMap.Delete(session.pointer)
sessionToReplyRXCallbackMap.Delete(session.pointer)
return handleCcsmpError(func() SolClientReturnCode {
return C.solClient_session_destroy(&session.pointer)
})
Expand All @@ -302,26 +343,82 @@ func (session *SolClientSession) SolClientSessionPublish(message SolClientMessag
})
}

// SolClientSessionSubscribe wraps solClient_session_topicSubscribeWithDispatch
func (session *SolClientSession) SolClientSessionSubscribe(topic string, dispatch *SolClientSessionRxMsgDispatchFuncInfo, correlationID uintptr) *SolClientErrorInfoWrapper {
// solClientSessionSubscribeWithFlags wraps solClient_session_topicSubscribeWithDispatch
func (session *SolClientSession) solClientSessionSubscribeWithFlags(topic string, flags C.solClient_subscribeFlags_t, dispatchID uintptr, correlationID uintptr) *SolClientErrorInfoWrapper {
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.solClient_session_topicSubscribeWithDispatch(session.pointer, C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, cString, dispatch, C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
return C.SessionTopicSubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
})
}

// SolClientSessionUnsubscribe wraps solClient_session_topicUnsubscribeWithDispatch
func (session *SolClientSession) SolClientSessionUnsubscribe(topic string, dispatch *SolClientSessionRxMsgDispatchFuncInfo, correlationID uintptr) *SolClientErrorInfoWrapper {
// solClientSessionSubscribeWithFlags wraps solClient_session_topicSubscribeWithDispatch
func (session *SolClientSession) solClientSessionSubscribeReplyTopicWithFlags(topic string, flags C.solClient_subscribeFlags_t, dispatchID uintptr, correlationID uintptr) *SolClientErrorInfoWrapper {
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionReplyTopicSubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
})
}

// solClientSessionUnsubscribeWithFlags wraps solClient_session_topicUnsubscribeWithDispatch
func (session *SolClientSession) solClientSessionUnsubscribeWithFlags(topic string, flags C.solClient_subscribeFlags_t, dispatchID uintptr, correlationID uintptr) *SolClientErrorInfoWrapper {
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.solClient_session_topicUnsubscribeWithDispatch(session.pointer, C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, cString, dispatch, C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
return C.SessionTopicUnsubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
})
}

// solClientSessionUnsubscribeReplyTopicWithFlags wraps solClient_session_topicUnsubscribeWithDispatch
func (session *SolClientSession) solClientSessionUnsubscribeReplyTopicWithFlags(topic string, flags C.solClient_subscribeFlags_t, dispatchID uintptr, correlationID uintptr) *SolClientErrorInfoWrapper {
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionReplyTopicUnsubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
})
}

// SolClientSessionSubscribeReplyTopic wraps solClient_session_topicSubscribeWithDispatch
func (session *SolClientSession) SolClientSessionSubscribeReplyTopic(topic string, dispatchID uintptr, correlationID uintptr) *SolClientErrorInfoWrapper {
return session.solClientSessionSubscribeReplyTopicWithFlags(topic, C.SOLCLIENT_SUBSCRIBE_FLAGS_LOCAL_DISPATCH_ONLY, dispatchID, correlationID)
}

// SolClientSessionUnsubscribeReplyTopic wraps solClient_session_topicUnsubscribeWithDispatch
func (session *SolClientSession) SolClientSessionUnsubscribeReplyTopic(topic string, dispatchID uintptr, correlationID uintptr) *SolClientErrorInfoWrapper {
return session.solClientSessionUnsubscribeReplyTopicWithFlags(topic, C.SOLCLIENT_SUBSCRIBE_FLAGS_LOCAL_DISPATCH_ONLY, dispatchID, correlationID)
}

// SolClientSessionSubscribe wraps solClient_session_topicSubscribeWithDispatch
func (session *SolClientSession) SolClientSessionSubscribe(topic string, dispatchID uintptr, correlationID uintptr) *SolClientErrorInfoWrapper {
return session.solClientSessionSubscribeWithFlags(topic, C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, dispatchID, correlationID)
}

// SolClientSessionUnsubscribe wraps solClient_session_topicUnsubscribeWithDispatch
func (session *SolClientSession) SolClientSessionUnsubscribe(topic string, dispatchID uintptr, correlationID uintptr) *SolClientErrorInfoWrapper {
return session.solClientSessionUnsubscribeWithFlags(topic, C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, dispatchID, correlationID)
}

// SolClientEndpointProvision wraps solClient_session_endpointProvision
func (session *SolClientSession) SolClientEndpointProvision(properties []string) *SolClientErrorInfoWrapper {
return handleCcsmpError(func() SolClientReturnCode {
Expand Down Expand Up @@ -396,6 +493,39 @@ func (session *SolClientSession) SolClientSessionGetClientName() (string, *SolCl
return string(clientName[:endIndex]), nil
}

// SolClientSessionGetP2PTopicPrefix wraps solClient_session_getProperty
func (session *SolClientSession) SolClientSessionGetP2PTopicPrefix() (string, *SolClientErrorInfoWrapper) {
const maxTopicSize = 251 // max topic size including the nul terminal
p2pTopicInUseKey := C.CString(SolClientSessionPropP2pinboxInUse)
defer C.free(unsafe.Pointer(p2pTopicInUseKey))
p2pTopicInUse := make([]byte, maxTopicSize)
// Get the P2P topic for this session/transport.
// It is used together with inbox request/reply MEP using
// native CCSMP inbox
// Example CCSMP session
// P2PINBOX_IN_USE: '#P2P/v:mybroker/mPuoLl8m/myhost/5221/00000001/oWxIwBFz28/#'
// This only works if the session is connected
errorInfo := handleCcsmpError(func() SolClientReturnCode {
return C.solClient_session_getProperty(session.pointer, p2pTopicInUseKey, (*C.char)(unsafe.Pointer(&p2pTopicInUse[0])), maxTopicSize)
})
if errorInfo != nil {
return "", errorInfo
}
endIndex := maxTopicSize
for i := 0; i < maxTopicSize; i++ {
if p2pTopicInUse[i] == 0 {
endIndex = i
break
}
}
// truncate last character '#'
if endIndex > 0 {
endIndex = endIndex - 1
p2pTopicInUse[endIndex] = 0
}
return string(p2pTopicInUse[:endIndex]), nil
}

// SolClientVersionGet wraps solClient_version_get
func SolClientVersionGet() (err *SolClientErrorInfoWrapper, version, dateTime, variant string) {
var versionInfo *SolClientVersionInfo
Expand Down Expand Up @@ -445,6 +575,14 @@ func NewSessionDispatch(id uint64) (*SolClientSessionRxMsgDispatchFuncInfo, uint
}, ptr
}

// NewSessionReplyDispatch function
func NewSessionReplyDispatch(id uint64) uintptr {
// This is not a misuse of unsafe.Pointer as we are not storing a pointer.
// CGO defines void* as unsafe.Pointer, however it is just arbitrary data.
// We want to store a number at void*
return uintptr(id)
}

// GetLastErrorInfo should NOT be called in most cases as it is dependent on the thread.
// Unless you know that the goroutine running the code will not be interrupted, do NOT
// call this function!
Expand Down
Loading

0 comments on commit 73d5d85

Please sign in to comment.