Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented anchor peer configuration #661

Merged
merged 4 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions src/api-engine/api/lib/configtxlator/configtxlator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from subprocess import call, run
from api.config import FABRIC_TOOL, FABRIC_VERSION

import logging
LOG = logging.getLogger(__name__)

class ConfigTxLator:
"""
Expand All @@ -24,17 +26,21 @@ def proto_encode(self, input, type, output):
output: A file to write the output to.
"""
try:
call([self.configtxlator,
"proto_encode",
"--input={}".format(input),
"--type={}".format(type),
"--output={}".format(output),
])
command = [self.configtxlator,
"proto_encode",
"--input={}".format(input),
"--type={}".format(type),
"--output={}".format(output),
]

LOG.info(" ".join(command))

call(command)
except Exception as e:
err_msg = "configtxlator proto decode fail! "
raise Exception(err_msg + str(e))

def proto_decode(self, input, type):
def proto_decode(self, input, type, output):
"""
Converts a proto message to JSON.

Expand All @@ -45,16 +51,17 @@ def proto_decode(self, input, type):
config
"""
try:
res = run([self.configtxlator,
command = [self.configtxlator,
"proto_decode",
"--type={}".format(type),
"--input={}".format(input),
],
capture_output=True)
if res.returncode == 0 :
return res.stdout
else:
return res.stderr
"--output={}".format(output),
]

LOG.info(" ".join(command))

call(command)

except Exception as e:
err_msg = "configtxlator proto decode fail! "
raise Exception(err_msg + str(e))
Expand All @@ -71,13 +78,17 @@ def compute_update(self, original, updated, channel_id, output):
output: A file to write the JSON document to.
"""
try:
call([self.configtxlator,
"compute_update",
"--original={}".format(original),
"--updated={}".format(updated),
"--channel_id={}".format(channel_id),
"--output={}".format(output),
])
command = [self.configtxlator,
"compute_update",
"--original={}".format(original),
"--updated={}".format(updated),
"--channel_id={}".format(channel_id),
"--output={}".format(output),
]

LOG.info(" ".join(command))

call(command)
except Exception as e:
err_msg = "configtxlator compute update fail! "
raise Exception(err_msg + str(e))
19 changes: 16 additions & 3 deletions src/api-engine/api/lib/peer/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,25 @@ def update(self, channel, channel_tx, orderer_url):
orderer_url: Ordering service endpoint.
"""
try:
res = os.system("{} channel update -c {} -f {} -o {}"
.format(self.peer, channel, channel_tx, orderer_url))
ORDERER_CA = os.getenv("ORDERER_CA")

command = [
self.peer,
"channel", "update",
"-f", channel_tx,
"-c", channel,
"-o", orderer_url,
"--ordererTLSHostnameOverride", orderer_url.split(":")[0],
"--tls",
"--cafile", ORDERER_CA
]
LOG.info(" ".join(command))

res = subprocess.run(command, check=True)

except Exception as e:
err_msg = "update channel failed for {}!".format(e)
raise Exception(err_msg)
res = res >> 8
return res

def fetch(self, block_path, channel, orderer_general_url, max_retries=5, retry_interval=1):
Expand Down
118 changes: 109 additions & 9 deletions src/api-engine/api/routes/channel/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from api.config import CELLO_HOME
from api.common.serializers import PageQuerySerializer
from api.utils.common import with_common_response, parse_block_file, to_dict
from api.utils.common import with_common_response, parse_block_file, to_dict, json_filter, json_add_anchor_peer, json_create_envelope
from api.lib.configtxgen import ConfigTX, ConfigTxGen
from api.lib.peer.channel import Channel as PeerChannel
from api.lib.configtxlator.configtxlator import ConfigTxLator
Expand Down Expand Up @@ -147,7 +147,8 @@ def create(self, request):
peer_channel_join(name, peers, org)

# set anchor peer
set_anchor_peer(name, org, peers, ordering_node)
anchor_peer = Node.objects.get(id=peers[0])
set_anchor_peer(name, org, anchor_peer, ordering_node)

# save channel to db
channel = Channel(
Expand Down Expand Up @@ -425,31 +426,130 @@ def peer_channel_join(name, peers, org):
CELLO_HOME, org.network.name, name)
)

def set_anchor_peer(name, org, peers, ordering_node):
def set_anchor_peer(name, org, anchor_peer, ordering_node):
"""
Set anchor peer for the channel.
:param org: Organization object.
:param peers: list of Node objects
:param anchor_peer: Anchor peer node
:param ordering_node: Orderer node
:return: none
"""
peer_channel_fetch(name, org, peers, ordering_node)
org_msp = '{}'.format(org.name.split(".", 1)[0].capitalize())
channel_artifacts_path = "{}/{}".format(CELLO_HOME, org.network.name)

# Fetch the channel block from the orderer
peer_channel_fetch(name, org, anchor_peer, ordering_node)

# Decode block to JSON
ConfigTxLator().proto_decode(
input="{}/config_block.pb".format(channel_artifacts_path),
type="common.Block",
output="{}/config_block.json".format(channel_artifacts_path),
)

# Get the config data from the block
json_filter(
input="{}/config_block.json".format(channel_artifacts_path),
output="{}/config.json".format(channel_artifacts_path),
expression=".data.data[0].payload.data.config"
)

# add anchor peer config
anchor_peer_config = {
"AnchorPeers": {
"mod_policy": "Admins",
"value": {
"anchor_peers": [
{
"host": anchor_peer.name + "." + org.name,
"port": 7051
}
]
},
"version": 0
}
}

json_add_anchor_peer(
input="{}/config.json".format(channel_artifacts_path),
output="{}/modified_config.json".format(channel_artifacts_path),
anchor_peer_config=anchor_peer_config,
org_msp=org_msp
)

ConfigTxLator().proto_encode(
input="{}/config.json".format(channel_artifacts_path),
type="common.Config",
output="{}/config.pb".format(channel_artifacts_path),
)

ConfigTxLator().proto_encode(
input="{}/modified_config.json".format(channel_artifacts_path),
type="common.Config",
output="{}/modified_config.pb".format(channel_artifacts_path),
)

ConfigTxLator().compute_update(
original="{}/config.pb".format(channel_artifacts_path),
updated="{}/modified_config.pb".format(channel_artifacts_path),
channel_id=name,
output="{}/config_update.pb".format(channel_artifacts_path),
)

ConfigTxLator().proto_decode(
input="{}/config_update.pb".format(channel_artifacts_path),
type="common.ConfigUpdate",
output="{}/config_update.json".format(channel_artifacts_path),
)

def peer_channel_fetch(name, org, peers, ordering_node):
# Create config update envelope
json_create_envelope(
input="{}/config_update.json".format(channel_artifacts_path),
output="{}/config_update_in_envelope.json".format(channel_artifacts_path),
channel=name
)

ConfigTxLator().proto_encode(
input="{}/config_update_in_envelope.json".format(channel_artifacts_path),
type="common.Envelope",
output="{}/config_update_in_envelope.pb".format(channel_artifacts_path),
)

# Update the channel of anchor peer
peer_channel_update(name, org, anchor_peer, ordering_node, channel_artifacts_path)


def peer_channel_fetch(name, org, anchor_peer, ordering_node):
"""
Fetch the channel block from the orderer.
:param peers: list of Node objects
:param anchor_peer: Anchor peer node
:param org: Organization object.
:param channel_name: Name of the channel.
:return: none
"""
peer_node = Node.objects.get(id=peers[0])
envs = init_env_vars(peer_node, org)
envs = init_env_vars(anchor_peer, org)
peer_channel_cli = PeerChannel(**envs)
peer_channel_cli.fetch(block_path="{}/{}/config_block.pb".format(CELLO_HOME, org.network.name),
channel=name, orderer_general_url="{}.{}:{}".format(
ordering_node.name, org.name.split(".", 1)[1], str(7050)))

def peer_channel_update(name, org, anchor_peer, ordering_node, channel_artifacts_path):
"""
Update the channel.
:param anchor_peer: Anchor peer node
:param org: Organization object.
:param channel_name: Name of the channel.
:return: none
"""
envs = init_env_vars(anchor_peer, org)
peer_channel_cli = PeerChannel(**envs)
peer_channel_cli.update(
channel=name,
channel_tx="{}/config_update_in_envelope.pb".format(channel_artifacts_path),
orderer_url="{}.{}:{}".format(
ordering_node.name, org.name.split(".", 1)[1], str(7050)),
)


def init_env_vars(node, org):
"""
Expand Down
109 changes: 109 additions & 0 deletions src/api-engine/api/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import uuid
from zipfile import ZipFile
from json import loads
import json
import logging

LOG = logging.getLogger(__name__)

def make_uuid():
return str(uuid.uuid4())
Expand Down Expand Up @@ -153,3 +156,109 @@ def parse_block_file(data):

def to_dict(data):
return loads(data)


def json_filter(input, output, expression):
"""
Process JSON data using path expression similar to jq

Args:
input (str): JSON data or file path to JSON
output (str): Path expression like ".data.data[0].payload.data.config"

Returns:
dict: Processed JSON data
"""
# if json_data is a file path, read the file
if isinstance(input, str):
with open(input, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = input

# parse the path expression
path_parts = expression.strip('.').split('.')
result = data

for part in path_parts:
# handle array index, like data[0]
if '[' in part and ']' in part:
array_name = part.split('[')[0]
index = int(part.split('[')[1].split(']')[0])
result = result[array_name][index]
else:
result = result[part]

with open(output, 'w', encoding='utf-8') as f:
json.dump(result, f, sort_keys=False, indent=4)

LOG.info("jq {} {} -> {}".format(expression, input, output))

def json_add_anchor_peer(input, output, anchor_peer_config, org_msp):
"""
Add anchor peer to the organization

Args:
input (str): JSON data or file path to JSON
output (str): Path expression like ".data.data[0].payload.data.config"
expression (str): Anchor peer data
"""
# if json_data is a file path, read the file
if isinstance(input, str):
with open(input, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = input

if "groups" not in data["channel_group"]:
data["channel_group"]["groups"] = {}
if "Application" not in data["channel_group"]["groups"]:
data["channel_group"]["groups"]["Application"] = {"groups": {}}
if org_msp not in data["channel_group"]["groups"]["Application"]["groups"]:
data["channel_group"]["groups"]["Application"]["groups"][org_msp] = {"values": {}}

data["channel_group"]["groups"]["Application"]["groups"][org_msp]["values"].update(anchor_peer_config)

with open(output, 'w', encoding='utf-8') as f:
json.dump(data, f, sort_keys=False, indent=4)

LOG.info("jq '.channel_group.groups.Application.groups.Org1MSP.values += ... ' {} -> {}".format(input, output))

def json_create_envelope(input, output, channel):
"""
Create a config update envelope structure

Args:
input (str): Path to the config update JSON file
output (str): Path to save the envelope JSON
channel (str): Name of the channel
"""
try:
# Read the config update file
with open(input, 'r', encoding='utf-8') as f:
config_update = json.load(f)

# Create the envelope structure
envelope = {
"payload": {
"header": {
"channel_header": {
"channel_id": channel,
"type": 2
}
},
"data": {
"config_update": config_update
}
}
}

# Write the envelope to output file
with open(output, 'w', encoding='utf-8') as f:
json.dump(envelope, f, sort_keys=False, indent=4)

LOG.info("echo 'payload ... ' | jq . > {}".format(output))

except Exception as e:
LOG.error("Failed to create config update envelope: {}".format(str(e)))
raise
Loading