Skip to content

Commit

Permalink
Added --config-version to the config-update command (#145)
Browse files Browse the repository at this point in the history
* Added --config-version to the config-update command

Signed-off-by: Chris Helma <chelma+github@amazon.com>

* Added config-update to integ tests

Signed-off-by: Chris Helma <chelma+github@amazon.com>

* Removed config-update from integ tests

* The command requires a local configuration directory
  which doesn't exist in the Github Activity's workspace

Signed-off-by: Chris Helma <chelma+github@amazon.com>

---------

Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma authored Dec 22, 2023
1 parent 0fcbe07 commit 8f61a41
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 85 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/manage_arkime.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,3 @@ jobs:
./manage_arkime.py --region us-east-1 config-pull --cluster-name test --capture --previous
./manage_arkime.py --region us-east-1 config-pull --cluster-name test --viewer
./manage_arkime.py --region us-east-1 config-pull --cluster-name test --viewer --previous


32 changes: 22 additions & 10 deletions manage_arkime.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,24 +200,36 @@ def vpc_remove(ctx, cluster_name, vpc_id):
cli.add_command(vpc_remove)

@click.command(help="Updates specified Arkime Cluster's Capture/Viewer configuration")
@click.option("--cluster-name", help="The name of the Arkime Cluster to update", required=True)
@click.option("--force-bounce-capture",
help="Forces a bounce of the Capture Nodes, regardless of whether there is new config.",
@click.option("--cluster-name", help="The name of the Arkime Cluster to operate on", required=True)
@click.option("--capture",
help="Performs the operation for the Capture Nodes' configuration",
is_flag=True,
default=False
)
@click.option("--viewer",
help="Performs the operation for the Viewer Nodes' configuration",
is_flag=True,
show_default=True,
default=False
)
@click.option("--force-bounce-viewer",
help="Forces a bounce of the Viewer Nodes, regardless of whether there is new config.",
@click.option("--force-bounce",
help=("Forces a bounce of the Arkime Nodes' compute, regardless of whether there is new config."
+ " Can be used to re-start/re-load the Nodes' configuration and Docker containers. Useful as"
+ " an ops fallback if things get wonky."),
is_flag=True,
show_default=True,
default=False
)
@click.option("--config-version",
help=("Deploys the specified version of the config (if it exists) to either the Capture or Viewer Nodes (but not"
+ " both). You must specify which of the two components to deploy to."),
type=click.INT,
default=None,
)
@click.pass_context
def config_update(ctx, cluster_name, force_bounce_capture, force_bounce_viewer):
def config_update(ctx, cluster_name, capture, viewer, force_bounce, config_version):
profile = ctx.obj.get("profile")
region = ctx.obj.get("region")
cmd_config_update(profile, region, cluster_name, force_bounce_capture, force_bounce_viewer)
cmd_config_update(profile, region, cluster_name, capture, viewer, force_bounce, config_version)
cli.add_command(config_update)

@click.command(help="Registers a VPC in another AWS Account so it can be captured by the Cluster. Not needed for VPCs"
Expand Down Expand Up @@ -275,7 +287,7 @@ def vpc_deregister_cluster(ctx, cluster_name, vpc_id):
cli.add_command(vpc_deregister_cluster)

@click.command(help="Retrieves metadata about the Arkime Cluster's config deployed to the Capture or Viewer Nodes")
@click.option("--cluster-name", help="The name of the Arkime Cluster to update", required=True)
@click.option("--cluster-name", help="The name of the Arkime Cluster to operate on", required=True)
@click.option("--capture",
help="Performs the operation for the Capture Nodes' configuration",
is_flag=True,
Expand All @@ -300,7 +312,7 @@ def config_list(ctx, cluster_name, capture, viewer, deployed):

@click.command(help=("Retrieves the config deployed to the Arkime Cluster's Capture or Viewer Nodes to your machine."
+ " Pulls the currently config deployed by default."))
@click.option("--cluster-name", help="The name of the Arkime Cluster to update", required=True)
@click.option("--cluster-name", help="The name of the Arkime Cluster to operate on", required=True)
@click.option("--capture",
help="Performs the operation for the Capture Nodes' configuration",
is_flag=True,
Expand Down
12 changes: 8 additions & 4 deletions manage_arkime/aws_interactions/s3_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,13 @@ def get_object_user_metadata(bucket_name: str, s3_key: str, aws_provider: AwsCli
"""
s3_client = aws_provider.get_s3()

response = s3_client.head_object(
Bucket=bucket_name,
Key=s3_key,
)
try:
response = s3_client.head_object(Bucket=bucket_name, Key=s3_key)
except ClientError as ex:
if ex.response['Error']['Code'] == '404':
raise S3ObjectDoesntExist(bucket_name, s3_key)
raise ex

object_metadata = response.get("Metadata", None)
return object_metadata

Expand All @@ -220,6 +223,7 @@ def get_object(bucket_name: str, s3_key: str, local_path: str, aws_provider: Aws
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
raise S3ObjectDoesntExist(bucket_name, s3_key)
raise ex

try:
with open(local_path, 'wb') as file:
Expand Down
176 changes: 115 additions & 61 deletions manage_arkime/commands/config_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,67 +15,96 @@

logger = logging.getLogger(__name__)

def cmd_config_update(profile: str, region: str, cluster_name: str, force_bounce_capture: bool, force_bounce_viewer: bool):
def cmd_config_update(profile: str, region: str, cluster_name: str, capture: bool, viewer: bool, force_bounce: bool,
config_version: int):
logger.debug(f"Invoking config-update with profile '{profile}' and region '{region}'")

one_component_specified = (capture or viewer) and not (capture and viewer) # XOR
no_component_specified = not (capture or viewer)
if config_version and (not one_component_specified):
logger.error("If you specify a specific config version to deploy, you must indicate whether to deploy it to"
+ " either the Capture or Viewer nodes. Aborting...")
exit(1)

# Update Capture/Viewer config in the cloud, if there's a new version locally. Bounce the associated ECS Tasks
# if we updated the configuration so that they pick it up.
aws_provider = AwsClientProvider(aws_profile=profile, aws_region=region)
aws_env = aws_provider.get_aws_env()
bucket_name = constants.get_config_bucket_name(aws_env.aws_account, aws_env.aws_region, cluster_name)

logger.info("Updating Arkime config for Capture Nodes, if necessary...")
should_bounce_capture_nodes = _update_config_if_necessary(
cluster_name,
bucket_name,
constants.get_capture_config_s3_key,
constants.get_capture_config_details_ssm_param_name(cluster_name),
config_wrangling.get_capture_config_archive,
aws_provider
)
if should_bounce_capture_nodes or force_bounce_capture:
raw_capture_details = ssm_ops.get_ssm_param_value(
constants.get_capture_details_ssm_param_name(cluster_name),
aws_provider
)
capture_details = config_wrangling.CaptureDetails(**json.loads(raw_capture_details))
_bounce_ecs_service(
capture_details.ecsCluster,
capture_details.ecsService,
if capture or no_component_specified:
should_bounce_capture_nodes = _update_config_if_necessary(
cluster_name,
bucket_name,
constants.get_capture_config_s3_key,
constants.get_capture_config_details_ssm_param_name(cluster_name),
config_wrangling.get_capture_config_archive,
config_version,
aws_provider
)

if should_bounce_capture_nodes or force_bounce:
raw_capture_details = ssm_ops.get_ssm_param_value(
constants.get_capture_details_ssm_param_name(cluster_name),
aws_provider
)
capture_details = config_wrangling.CaptureDetails(**json.loads(raw_capture_details))
_bounce_ecs_service(
capture_details.ecsCluster,
capture_details.ecsService,
constants.get_capture_config_details_ssm_param_name(cluster_name),
aws_provider
)
else:
logger.info("Skipping Capture Nodes due to user parameters supplied")

logger.info("Updating Arkime config for Viewer Nodes, if necessary...")
should_bounce_viewer_nodes = _update_config_if_necessary(
cluster_name,
bucket_name,
constants.get_viewer_config_s3_key,
constants.get_viewer_config_details_ssm_param_name(cluster_name),
config_wrangling.get_viewer_config_archive,
aws_provider
)
if should_bounce_viewer_nodes or force_bounce_viewer:
raw_viewer_details = ssm_ops.get_ssm_param_value(
constants.get_viewer_details_ssm_param_name(cluster_name),
aws_provider
)
viewer_details = config_wrangling.ViewerDetails(**json.loads(raw_viewer_details))
_bounce_ecs_service(
viewer_details.ecsCluster,
viewer_details.ecsService,
if viewer or no_component_specified:
should_bounce_viewer_nodes = _update_config_if_necessary(
cluster_name,
bucket_name,
constants.get_viewer_config_s3_key,
constants.get_viewer_config_details_ssm_param_name(cluster_name),
config_wrangling.get_viewer_config_archive,
config_version,
aws_provider
)

if should_bounce_viewer_nodes or force_bounce:
raw_viewer_details = ssm_ops.get_ssm_param_value(
constants.get_viewer_details_ssm_param_name(cluster_name),
aws_provider
)
viewer_details = config_wrangling.ViewerDetails(**json.loads(raw_viewer_details))
_bounce_ecs_service(
viewer_details.ecsCluster,
viewer_details.ecsService,
constants.get_viewer_config_details_ssm_param_name(cluster_name),
aws_provider
)
else:
logger.info("Skipping Viewer Nodes due to user parameters supplied")

def _update_config_if_necessary(cluster_name: str, bucket_name: str, s3_key_provider: Callable[[str], str], ssm_param: str,
archive_provider: Callable[[str], LocalFile], aws_provider: AwsClientProvider) -> bool:
archive_provider: Callable[[str], LocalFile], switch_to_version: int,
aws_provider: AwsClientProvider) -> bool:
# Create the local config archive and its metadata
aws_env = aws_provider.get_aws_env()
archive = archive_provider(cluster_name, aws_env)
archive_md5 = get_version_info(archive).md5_version

# See if we need to update the configuration
# Confirm the requested version exists, if specified
if switch_to_version:
try:
raw_metadata = s3.get_object_user_metadata(bucket_name, s3_key_provider(switch_to_version), aws_provider)
switch_version_info = config_wrangling.VersionInfo(**raw_metadata)
except s3.S3ObjectDoesntExist:
logger.warning(f"The requested config version ({switch_to_version}) does not exist; aborting...")
return False

# Pull the currently deployed config details from the cloud so we can see if we need to update the configuration.
# If there isn't a current version in the cloud, we know we should perform the update.
try:
logger.info(f"Pulling existing configuration details from Param Store at: {ssm_param}")
raw_param_val = ssm_ops.get_ssm_param_value(ssm_param, aws_provider)
Expand All @@ -84,40 +113,65 @@ def _update_config_if_necessary(cluster_name: str, bucket_name: str, s3_key_prov
logger.debug(f"No existing configuration details at: {ssm_param}")
cloud_config_details = None

if cloud_config_details and cloud_config_details.version.md5_version == archive_md5:
logger.info(f"Local config is the same as what's currently deployed; skipping")
return False

# Create the config details for the local archive. The ConfigDetails contains a reference to the previous,
# which means it's a recursive data structure. For now, we limit ourselves to only tracking the current and
# previous versions of the configuration to avoid running into storage limits. We can explore maintaining a
# deeper structure later if there's a need to track the full deployed version history.
next_config_version = str(int(cloud_config_details.version.config_version) + 1)
if cloud_config_details:
same_md5 = (cloud_config_details.version.md5_version == archive_md5)
same_config_version = (cloud_config_details.version.config_version == switch_to_version)

if switch_to_version and same_config_version: # Comparing to the previous S3 object's version
logger.info(f"The previous version you specified is the same as what's currently deployed; skipping swap")
return False
elif same_md5 and not switch_to_version: # Comparing to the local config archive's version
logger.info(f"The local config is the same as what's currently deployed; skipping upload")
return False

# Assemble the config details for the next config version. This can be either the config on the local file system
# or a previously deployed version already in the cloud.
#
# The ConfigDetails contains a reference to the previous, which means it's a recursive data structure. For now,
# we limit ourselves to only tracking the current and previous versions of the configuration to avoid running into
# storage limits. We can explore maintaining a deeper structure later if there's a need to track the full deployed
# version history.
next_config_version = (
str(switch_to_version)
if switch_to_version
else str(int(cloud_config_details.version.config_version) + 1)
)

if cloud_config_details:
cloud_config_details.previous = None

local_config_details = config_wrangling.ConfigDetails(
s3=config_wrangling.S3Details(bucket_name, s3_key_provider(next_config_version)),
version=get_version_info(archive, config_version=next_config_version),
previous=cloud_config_details
)
next_config_details = (
config_wrangling.ConfigDetails(
s3=config_wrangling.S3Details(bucket_name, s3_key_provider(next_config_version)),
version=switch_version_info,
previous=cloud_config_details
)
if switch_to_version
else config_wrangling.ConfigDetails(
s3=config_wrangling.S3Details(bucket_name, s3_key_provider(next_config_version)),
version=get_version_info(archive, config_version=next_config_version),
previous=cloud_config_details
)
)

# Upload the archive to S3. Do this first so that if this operation succeeds, but the update of Parameter Store
# fails afterwards, then another run of the CLI command should fix things.
logger.info(f"Uploading config archive to S3 bucket: {bucket_name}")
s3.put_file_to_bucket(
S3File(archive, metadata=local_config_details.version.to_dict()),
bucket_name,
s3_key_provider(next_config_version),
aws_provider
)
# fails afterwards, then another run of the CLI command should fix things. Unnecessary if we're switching to a
# previous version (it's already in S3).
if not switch_to_version:
logger.info(f"Uploading config archive to S3 bucket: {bucket_name}")
s3.put_file_to_bucket(
S3File(archive, metadata=next_config_details.version.to_dict()),
bucket_name,
s3_key_provider(next_config_version),
aws_provider
)

# Update Parameter Store
# Update Parameter Store. This switches the pointer to what version the containers should pull from S3 when they
# start up. The containers then unpack the config bundle as part of their bootstrapping process.
logger.info(f"Updating config details in Param Store at: {ssm_param}")
ssm_ops.put_ssm_param(
ssm_param,
json.dumps(local_config_details.to_dict()),
json.dumps(next_config_details.to_dict()),
aws_provider,
description="The currently deployed configuration details",
overwrite=True
Expand Down
11 changes: 11 additions & 0 deletions test_manage_arkime/aws_interactions/test_s3_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,17 @@ def test_WHEN_get_object_user_metadata_called_THEN_as_expected():

assert None == result

@mock.patch("aws_interactions.s3_interactions.AwsClientProvider")
def test_WHEN_get_object_user_metadata_called_AND_s3_obj_doesnt_exist_THEN_raises(mock_aws_provider):
# Set up our mock
mock_s3_client = mock.Mock()
mock_s3_client.head_object.side_effect = ClientError(error_response={"Error": {"Code": "404", "Message": "Not found"}}, operation_name="")
mock_aws_provider.get_s3.return_value = mock_s3_client

# Run our test
with pytest.raises(s3.S3ObjectDoesntExist):
s3.get_object_user_metadata("my-bucket", "key", mock_aws_provider)

@mock.patch("aws_interactions.s3_interactions.os.path.exists")
@mock.patch("aws_interactions.s3_interactions.AwsClientProvider")
def test_WHEN_get_object_called_AND_file_exists_THEN_raises(mock_aws_provider, mock_exists):
Expand Down
Loading

0 comments on commit 8f61a41

Please sign in to comment.