Skip to content

Commit

Permalink
Setup Observe of Dots server #11 and Handle DB trigger when status ch…
Browse files Browse the repository at this point in the history
…anged, implement notify observer #12
  • Loading branch information
tamnguyen-tma committed May 22, 2018
1 parent e729092 commit 5be622d
Show file tree
Hide file tree
Showing 10 changed files with 432 additions and 91 deletions.
18 changes: 18 additions & 0 deletions dots_server/db_models/test_dump.sql
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,24 @@ VALUES
(1,128,'','',12332,1000,'2017-04-13 13:44:34','2017-04-13 13:44:34'),
(2,128,'','',12333,1000,'2017-04-13 13:44:34','2017-04-13 13:44:34');

# mitigation_scope trigger when status change
# ------------------------------------------------------------

DROP FUNCTION IF EXISTS MySQLNotification;
CREATE FUNCTION MySQLNotification RETURNS INTEGER SONAME 'mysql-notification.so';

DELIMITER @@

CREATE TRIGGER status_changed_trigger AFTER UPDATE ON mitigation_scope
FOR EACH ROW
BEGIN
IF NEW.status <> OLD.status THEN
SELECT MySQLNotification(NEW.id, NEW.client_identifier, NEW.mitigation_id, NEW.status) INTO @x;
END IF;
END@@

DELIMITER ;


# signal_session_configuration
# ------------------------------------------------------------
Expand Down
196 changes: 107 additions & 89 deletions dots_server/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,110 +59,128 @@ func createResource(ctx *libcoap.Context, path string, typ reflect.Type, control
}
log.Debugf("listen.go: createResource, path=%+v", path)

var toMethodHandler = func(method controllers.ServiceMethod) libcoap.MethodHandler {
return func(context *libcoap.Context,
resource *libcoap.Resource,
session *libcoap.Session,
request *libcoap.Pdu,
token *[]byte,
query *string,
response *libcoap.Pdu) {

log.WithField("MessageID", request.MessageID).Info("Incoming Request")

response.MessageID = request.MessageID
response.Token = request.Token

cn, err := session.DtlsGetPeerCommonName()
if err != nil {
log.WithError(err).Warn("DtlsGetPeercCommonName() failed")
response.Code = libcoap.ResponseForbidden
return
}
resource.RegisterHandler(libcoap.RequestGet, toMethodHandler(controller.HandleGet, typ, controller, is_unknown))
resource.RegisterHandler(libcoap.RequestPut, toMethodHandler(controller.HandlePut, typ, controller, is_unknown))
resource.RegisterHandler(libcoap.RequestPost, toMethodHandler(controller.HandlePost, typ, controller, is_unknown))
resource.RegisterHandler(libcoap.RequestDelete, toMethodHandler(controller.HandleDelete, typ, controller, is_unknown))
return resource
}

log.Infof("CommonName is %v", cn)
func toMethodHandler(method controllers.ServiceMethod, typ reflect.Type, controller controllers.ControllerInterface, is_unknown bool) libcoap.MethodHandler {
return func(context *libcoap.Context,
resource *libcoap.Resource,
session *libcoap.Session,
request *libcoap.Pdu,
token *[]byte,
query *string,
response *libcoap.Pdu) {

customer, err := models.GetCustomerByCommonName(cn)
if err != nil || customer.Id == 0 {
log.WithError(err).Warn("Customer not found.")
response.Code = libcoap.ResponseForbidden
return
}
log.WithField("MessageID", request.MessageID).Info("Incoming Request")

response.MessageID = request.MessageID
response.Token = request.Token

cn, err := session.DtlsGetPeerCommonName()
if err != nil {
log.WithError(err).Warn("DtlsGetPeercCommonName() failed")
response.Code = libcoap.ResponseForbidden
return
}

log.Debugf("request.Data=\n%s", hex.Dump(request.Data))

log.Debugf("typ=%+v:", typ)
log.Debugf("request.Path(): %+v", request.Path())

var body interface{}

if typ == reflect.TypeOf(messages.SignalChannelRequest{}) {
uri := request.Path()
for i := range uri {
if strings.HasPrefix(uri[i], "mitigate") {
log.Debug("Request path includes 'mitigate'. Cbor decode with type MitigationRequest")
body, err = unmarshalCbor(request, reflect.TypeOf(messages.MitigationRequest{}))
break;

} else if strings.HasPrefix(uri[i], "config") {
log.Debug("Request path includes 'config'. Cbor decode with type SignalConfigRequest")
body, err = unmarshalCbor(request, reflect.TypeOf(messages.SignalConfigRequest{}))
break;
log.Infof("CommonName is %v", cn)

customer, err := models.GetCustomerByCommonName(cn)
if err != nil || customer.Id == 0 {
log.WithError(err).Warn("Customer not found.")
response.Code = libcoap.ResponseForbidden
return
}

log.Debugf("request.Data=\n%s", hex.Dump(request.Data))

log.Debugf("typ=%+v:", typ)
log.Debugf("request.Path(): %+v", request.Path())

var body interface{}

if typ == reflect.TypeOf(messages.SignalChannelRequest{}) {
uri := request.Path()
for i := range uri {
if strings.HasPrefix(uri[i], "mitigate") {
log.Debug("Request path includes 'mitigate'. Cbor decode with type MitigationRequest")
body, err = unmarshalCbor(request, reflect.TypeOf(messages.MitigationRequest{}))

// Create sub resource to handle observation on behalf of Unknown resource in case of mitigation PUT
sfMed := reflect.ValueOf(method)
sfPut := reflect.ValueOf(controller.HandlePut)
sfDel := reflect.ValueOf(controller.HandleDelete)
if is_unknown && sfMed.Pointer() == sfPut.Pointer() {
p := request.PathString()
r := libcoap.ResourceInit(&p, 0)
r.TurnOnResourceObservable()
r.RegisterHandler(libcoap.RequestGet, toMethodHandler(controller.HandleGet, typ, controller, !is_unknown))
r.RegisterHandler(libcoap.RequestPut, toMethodHandler(controller.HandlePut, typ, controller, !is_unknown))
r.RegisterHandler(libcoap.RequestPost, toMethodHandler(controller.HandlePost, typ, controller, !is_unknown))
r.RegisterHandler(libcoap.RequestDelete, toMethodHandler(controller.HandleDelete, typ, controller, !is_unknown))
context.AddResource(r)
log.Debugf("Create sub resource to handle observation later : uri-path=%+v", p)
} else if !is_unknown && sfMed.Pointer() == sfDel.Pointer() {
query := request.PathString()
context.DeleteResourceByQuery(query)
}
}
break;

} else {
body, err = unmarshalCbor(request, typ)
}

if err != nil {
log.WithError(err).Error("unmarshalCbor failed.")
response.Code = libcoap.ResponseInternalServerError
return
} else if strings.HasPrefix(uri[i], "config") {
log.Debug("Request path includes 'config'. Cbor decode with type SignalConfigRequest")
body, err = unmarshalCbor(request, reflect.TypeOf(messages.SignalConfigRequest{}))
break;
}
}

req := controllers.Request {
Code: request.Code,
Type: request.Type,
Uri: request.Path(),
Queries: request.Queries(),
Body: body,
}
log.Debugf("req=%+v", req)
} else {
body, err = unmarshalCbor(request, typ)
}

res, err := method(req, customer)
if err != nil {
log.WithError(err).Error("controller returned error")
response.Code = libcoap.ResponseInternalServerError
return
}
if err != nil {
log.WithError(err).Error("unmarshalCbor failed.")
response.Code = libcoap.ResponseInternalServerError
return
}

log.Debugf("res=%+v", res)
payload, err := marshalCbor(res.Body)
if err != nil {
log.WithError(err).Error("marshalCbor failed.")
response.Code = libcoap.ResponseInternalServerError
return
}
req := controllers.Request {
Code: request.Code,
Type: request.Type,
Uri: request.Path(),
Queries: request.Queries(),
Body: body,
}
log.Debugf("req=%+v", req)

response.Code = libcoap.Code(res.Code)
response.Data = payload
response.Type = CoAPType(res.Type)
log.Debugf("response.Data=\n%s", hex.Dump(payload))
// add content type cbor
response.Options = append(response.Options, libcoap.OptionContentType.Uint16(60))
res, err := method(req, customer)
if err != nil {
log.WithError(err).Error("controller returned error")
response.Code = libcoap.ResponseInternalServerError
return
}

log.Debugf("res=%+v", res)
payload, err := marshalCbor(res.Body)
if err != nil {
log.WithError(err).Error("marshalCbor failed.")
response.Code = libcoap.ResponseInternalServerError
return
}
}

resource.RegisterHandler(libcoap.RequestGet, toMethodHandler(controller.HandleGet))
resource.RegisterHandler(libcoap.RequestPut, toMethodHandler(controller.HandlePut))
resource.RegisterHandler(libcoap.RequestPost, toMethodHandler(controller.HandlePost))
resource.RegisterHandler(libcoap.RequestDelete, toMethodHandler(controller.HandleDelete))
return resource
}
response.Code = libcoap.Code(res.Code)
response.Data = payload
response.Type = CoAPType(res.Type)
log.Debugf("response.Data=\n%s", hex.Dump(payload))
// add content type cbor
response.Options = append(response.Options, libcoap.OptionContentType.Uint16(60))

return
}
}

func CoAPType(t dots_common.Type) (libcoapType libcoap.Type) {
switch t {
Expand Down
98 changes: 98 additions & 0 deletions dots_server/listen_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package main

import (
"bufio"
"io"
"net"
"os"
"strconv"
"strings"

log "github.com/sirupsen/logrus"
"github.com/nttdots/go-dots/dots_server/models"
"github.com/nttdots/go-dots/libcoap"
"github.com/nttdots/go-dots/dots_server/controllers"
"github.com/nttdots/go-dots/dots_common/messages"
)

var port = 9999

/*
* Listen for notification from DB
*/
func listenDB (context *libcoap.Context) {
listen, err := net.Listen("tcp4", ":" + strconv.Itoa(port))
if err != nil {
log.Debugf("[MySQL-Notification]:Socket listening on port %+v failed,%+v", port, err)
os.Exit(1)
}
log.Debugf("[MySQL-Notification]:Begin listening on port: %+v", port)

for {
conn, err := listen.Accept()
if err != nil {
log.Debugf("[MySQL-Notification]:Error : %+v", err)
continue
}
go handler(conn, context)
}

}

/*
* Handle notifcation from DB
*/
func handler(conn net.Conn, context *libcoap.Context) {

defer conn.Close()

var (
buf = make([]byte, 1024)
r = bufio.NewReader(conn)
)

ILOOP:
for {
n, err := r.Read(buf)
data := string(buf[:n])

switch err {
case io.EOF:
break ILOOP
case nil:
log.Debugf("[MySQL-Notification]: Received mitigation status changed notification from DB for :", data)
if isTransportOver(data) {
break ILOOP
}
// Notify status changed to those clients who are observing this mitigation request
log.Debug("[MySQL-Notification]: Send notification if obsevers exists")
uriPath := messages.MessageTypes[messages.MITIGATION_REQUEST].Path
id, cuid, mid, status, query := context.NotifyOnce(data, uriPath)

idValue, iErr := strconv.ParseInt(id, 10, 64)
midValue, mErr := strconv.Atoi(mid)
statusValue, sErr := strconv.Atoi(status)
if iErr != nil || mErr != nil || sErr != nil {
log.Debugf("[MySQL-Notification]:Failed to parse string to integer")
return
}
// If mitigation status was changed to 6: (attack mitigation is now terminated), delete this mitigation after notifying
if statusValue == models.Terminated {
log.Debugf("[MySQL-Notification]: Mitigation was terminated. Delete corresponding sub-resource and mitigation request.", models.Terminated)
context.DeleteResourceByQuery(query)
controllers.DeleteMitigation(0, cuid, midValue, idValue)
}
default:
log.Debugf("[MySQL-Notification]: Failed to receive data:%+v", err)
return
}
}
}

/*
* Check if nofified data has been transported completely
*/
func isTransportOver(data string) (over bool) {
over = strings.HasSuffix(data, "\r\n\r\n")
return
}
5 changes: 4 additions & 1 deletion dots_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func main() {
&config.SecureFile.ServerKeyFile,
}

// Manage Active Mitigation Request
// Thread for monitoring remaining lifetime of mitigation requests
go controllers.ManageExpiredMitigation(config.LifetimeConfiguration.ManageLifetimeInterval)

log.Debug("listen Signal with DTLS param: %# v", dtlsParam)
Expand All @@ -61,6 +61,9 @@ func main() {
}
defer dataCtx.FreeContext()

// Thread for handling status changed notification from DB
go listenDB (signalCtx)

for {
signalCtx.RunOnce(time.Duration(100) * time.Millisecond)
dataCtx.RunOnce(time.Duration(100) * time.Millisecond)
Expand Down
22 changes: 22 additions & 0 deletions libcoap/callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,26 @@ int verify_certificate(coap_context_t *ctx, coap_dtls_pki_t * setup_data) {
}
}
return 1;
}

void coap_set_dirty(coap_resource_t *resource, char *key, int length) {
if(*key == '\0' && length == 0){
coap_resource_set_dirty(resource, NULL);
} else {
str *query = coap_new_string(length);
query->s = key;
query->length = length;
coap_resource_set_dirty(resource, query);
}
}

coap_resource_t *coap_get_resource(coap_context_t *context, char *key, int length){
if(*key == '\0' && length == 0){
return NULL;
} else {
str *uriPath = coap_new_string(length);
uriPath->s = key;
uriPath->length = length;
return coap_get_resource_from_uri_path(context, *uriPath);
}
}
5 changes: 4 additions & 1 deletion libcoap/callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ typedef struct coap_openssl_context_t {
coap_dtls_context_t dtls;
coap_tls_context_t tls;
int psk_pki_enabled;
} coap_openssl_context_t;
} coap_openssl_context_t;

void coap_set_dirty(coap_resource_t *resource, char *query, int length);
coap_resource_t *coap_get_resource(coap_context_t *context, char *key, int length);
Loading

0 comments on commit 5be622d

Please sign in to comment.