Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions samcli/lib/build/app_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def _get_build_graph(
container_env_vars = self._make_env_vars(layer, file_env_vars, inline_env_vars)

layer_build_details = LayerBuildDefinition(
layer.name,
layer.full_path,
layer.codeuri,
layer.build_method,
layer.compatible_runtimes,
Expand Down Expand Up @@ -278,11 +278,6 @@ def update_template(
template_dict = stack.template_dict
normalized_resources = stack.resources
for logical_id, resource in template_dict.get("Resources", {}).items():

# clone normalized metadata from stack.resources
normalized_metadata = normalized_resources.get(logical_id, {}).get("Metadata")
if normalized_metadata:
resource["Metadata"] = normalized_metadata
resource_iac_id = ResourceMetadataNormalizer.get_resource_id(resource, logical_id)
full_path = get_full_path(stack.stack_path, resource_iac_id)
has_build_artifact = full_path in built_artifacts
Expand All @@ -293,6 +288,11 @@ def update_template(
# So skip it because there is no path/uri to update
continue

# clone normalized metadata from stack.resources only to built resources
normalized_metadata = normalized_resources.get(logical_id, {}).get("Metadata")
if normalized_metadata:
resource["Metadata"] = normalized_metadata

resource_type = resource.get("Type")
properties = resource.setdefault("Properties", {})

Expand Down
10 changes: 5 additions & 5 deletions samcli/lib/build/build_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def _layer_build_definition_to_toml_table(layer_build_definition: "LayerBuildDef
toml table of LayerBuildDefinition
"""
toml_table = tomlkit.table()
toml_table[LAYER_NAME_FIELD] = layer_build_definition.name
toml_table[LAYER_NAME_FIELD] = layer_build_definition.full_path
toml_table[CODE_URI_FIELD] = layer_build_definition.codeuri
toml_table[BUILD_METHOD_FIELD] = layer_build_definition.build_method
toml_table[COMPATIBLE_RUNTIMES_FIELD] = layer_build_definition.compatible_runtimes
Expand Down Expand Up @@ -501,7 +501,7 @@ class LayerBuildDefinition(AbstractBuildDefinition):

def __init__(
self,
name: str,
full_path: str,
codeuri: Optional[str],
build_method: Optional[str],
compatible_runtimes: Optional[List[str]],
Expand All @@ -511,7 +511,7 @@ def __init__(
env_vars: Optional[Dict] = None,
):
super().__init__(source_hash, manifest_hash, env_vars, architecture)
self.name = name
self.full_path = full_path
self.codeuri = codeuri
self.build_method = build_method
self.compatible_runtimes = compatible_runtimes
Expand All @@ -521,7 +521,7 @@ def __init__(

def __str__(self) -> str:
return (
f"LayerBuildDefinition({self.name}, {self.codeuri}, {self.source_hash}, {self.uuid}, "
f"LayerBuildDefinition({self.full_path}, {self.codeuri}, {self.source_hash}, {self.uuid}, "
f"{self.build_method}, {self.compatible_runtimes}, {self.architecture}, {self.env_vars})"
)

Expand All @@ -543,7 +543,7 @@ def __eq__(self, other: Any) -> bool:
return False

return (
self.name == other.name
self.full_path == other.full_path
and self.codeuri == other.codeuri
and self.build_method == other.build_method
and self.compatible_runtimes == other.compatible_runtimes
Expand Down
137 changes: 77 additions & 60 deletions samcli/lib/utils/file_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def _get_image_lambda_function_image_names(function_config: FunctionConfig) -> L
}

self._input_on_change: Callable = on_change
self._watch_lock: Lock = threading.Lock()

def _on_zip_change(self, paths: List[str]) -> None:
"""
Expand Down Expand Up @@ -171,11 +172,12 @@ def _on_change(self, resources: List[str], package_type: str) -> None:
package_type: str
determine if the changed resource is a source code path or an image name
"""
changed_functions: List[FunctionConfig] = []
for resource in resources:
if self._observed_functions[package_type].get(resource, None):
changed_functions += self._observed_functions[package_type][resource]
self._input_on_change(changed_functions)
with self._watch_lock:
changed_functions: List[FunctionConfig] = []
for resource in resources:
if self._observed_functions[package_type].get(resource, None):
changed_functions += self._observed_functions[package_type][resource]
self._input_on_change(changed_functions)

def watch(self, function_config: FunctionConfig) -> None:
"""
Expand All @@ -191,13 +193,14 @@ def watch(self, function_config: FunctionConfig) -> None:
ObserverException:
if not able to observe the input function source path/image
"""
if self.get_resources.get(function_config.packagetype, None):
resources = self.get_resources[function_config.packagetype](function_config)
for resource in resources:
functions = self._observed_functions[function_config.packagetype].get(resource, [])
functions += [function_config]
self._observed_functions[function_config.packagetype][resource] = functions
self._observers[function_config.packagetype].watch(resource)
with self._watch_lock:
if self.get_resources.get(function_config.packagetype, None):
resources = self.get_resources[function_config.packagetype](function_config)
for resource in resources:
functions = self._observed_functions[function_config.packagetype].get(resource, [])
functions += [function_config]
self._observed_functions[function_config.packagetype][resource] = functions
self._observers[function_config.packagetype].watch(resource)

def unwatch(self, function_config: FunctionConfig) -> None:
"""
Expand Down Expand Up @@ -381,6 +384,7 @@ def __init__(self) -> None:

self._code_modification_handler.on_modified = self.on_change
self._code_deletion_handler.on_deleted = self.on_change
self._watch_lock = threading.Lock()
self._lock: Lock = threading.Lock()

def on_change(self, event: FileSystemEvent) -> None:
Expand All @@ -394,37 +398,40 @@ def on_change(self, event: FileSystemEvent) -> None:
event: watchdog.events.FileSystemEvent
Determines that there is a change happened to some file/dir in the observed paths
"""
LOG.debug("a %s change got detected in path %s", event.event_type, event.src_path)
for group, _observed_paths in self._observed_paths_per_group.items():
if event.event_type == "deleted":
observed_paths = [
path
for path in _observed_paths
if path == event.src_path
or path in self._watch_dog_observed_paths.get(f"{event.src_path}_False", [])
]
else:
observed_paths = [path for path in _observed_paths if event.src_path.startswith(path)]

if not observed_paths:
continue

LOG.debug("affected paths of this change %s", observed_paths)
changed_paths = []
for path in observed_paths:
path_obj = Path(path)
# The path got deleted
if not path_obj.exists():
_observed_paths.pop(path, None)
changed_paths += [path]
with self._watch_lock:
LOG.debug("a %s change got detected in path %s", event.event_type, event.src_path)
for group, _observed_paths in self._observed_paths_per_group.items():
if event.event_type == "deleted":
observed_paths = [
path
for path in _observed_paths
if path == event.src_path
or path in self._watch_dog_observed_paths.get(f"{event.src_path}_False", [])
]
else:
new_checksum = calculate_checksum(path)
if new_checksum != _observed_paths.get(path, None):
observed_paths = [path for path in _observed_paths if event.src_path.startswith(path)]

if not observed_paths:
continue

LOG.debug("affected paths of this change %s", observed_paths)
changed_paths = []
for path in observed_paths:
path_obj = Path(path)
# The path got deleted
if not path_obj.exists():
_observed_paths.pop(path, None)
changed_paths += [path]
_observed_paths[path] = new_checksum
else:
new_checksum = calculate_checksum(path)
if new_checksum and new_checksum != _observed_paths.get(path, None):
changed_paths += [path]
_observed_paths[path] = new_checksum
else:
LOG.debug("the path %s content does not change", path)

if changed_paths:
self._observed_groups_handlers[group](changed_paths)
if changed_paths:
self._observed_groups_handlers[group](changed_paths)

def add_group(self, group: str, on_change: Callable) -> None:
"""
Expand Down Expand Up @@ -464,22 +471,26 @@ def watch(self, resource: str, group: str) -> None:
FileObserverException:
if the input path is not exist
"""
path_obj = Path(resource)
if not path_obj.exists():
raise FileObserverException("Can not observe non exist path")
with self._watch_lock:
path_obj = Path(resource)
if not path_obj.exists():
raise FileObserverException("Can not observe non exist path")

_observed_paths = self._observed_paths_per_group[group]
_observed_paths[resource] = calculate_checksum(resource)
_observed_paths = self._observed_paths_per_group[group]
_check_sum = calculate_checksum(resource)
if not _check_sum:
raise Exception(f"Failed to calculate the hash of resource {resource}")
_observed_paths[resource] = _check_sum

LOG.debug("watch resource %s", resource)
# recursively watch the input path, and all child path for any modification
self._watch_path(resource, resource, self._code_modification_handler, True)
LOG.debug("watch resource %s", resource)
# recursively watch the input path, and all child path for any modification
self._watch_path(resource, resource, self._code_modification_handler, True)

LOG.debug("watch resource %s's parent %s", resource, str(path_obj.parent))
# watch only the direct parent path child directories for any deletion
# Parent directory watching is needed, as if the input path got deleted,
# watchdog will not send an event for it
self._watch_path(str(path_obj.parent), resource, self._code_deletion_handler, False)
LOG.debug("watch resource %s's parent %s", resource, str(path_obj.parent))
# watch only the direct parent path child directories for any deletion
# Parent directory watching is needed, as if the input path got deleted,
# watchdog will not send an event for it
self._watch_path(str(path_obj.parent), resource, self._code_deletion_handler, False)

def _watch_path(
self, watch_dog_path: str, original_path: str, watcher_handler: FileSystemEventHandler, recursive: bool
Expand Down Expand Up @@ -511,6 +522,7 @@ def _watch_path(
child_paths += [original_path]
self._watch_dog_observed_paths[watch_dog_path] = child_paths
if first_time:
LOG.debug("Create Observer for resource %s with recursive %s", original_watch_dog_path, recursive)
self._observed_watches[watch_dog_path] = self._observer.schedule(
watcher_handler, original_watch_dog_path, recursive=recursive
)
Expand Down Expand Up @@ -556,6 +568,7 @@ def _unwatch_path(self, watch_dog_path: str, original_path: str, group: str, rec

# Allow watching the same path in 2 Modes recursivly, and non-recusrsivly.
# here, we need to only stop watching the input path in a specific recursive mode
original_watch_dog_path = watch_dog_path
watch_dog_path = f"{watch_dog_path}_{recursive}"
_observed_paths = self._observed_paths_per_group[group]
child_paths = self._watch_dog_observed_paths.get(watch_dog_path, [])
Expand All @@ -565,6 +578,7 @@ def _unwatch_path(self, watch_dog_path: str, original_path: str, group: str, rec
if not child_paths:
self._watch_dog_observed_paths.pop(watch_dog_path, None)
if self._observed_watches.get(watch_dog_path, None):
LOG.debug("Unschedule Observer for resource %s with recursive %s", original_watch_dog_path, recursive)
self._observer.unschedule(self._observed_watches[watch_dog_path])
self._observed_watches.pop(watch_dog_path, None)

Expand All @@ -585,10 +599,13 @@ def stop(self):
self._observer.stop()


def calculate_checksum(path: str) -> str:
path_obj = Path(path)
if path_obj.is_file():
checksum = file_checksum(path)
else:
checksum = dir_checksum(path)
return checksum
def calculate_checksum(path: str) -> Optional[str]:
try:
path_obj = Path(path)
if path_obj.is_file():
checksum = file_checksum(path)
else:
checksum = dir_checksum(path)
return checksum
except Exception:
return None
8 changes: 4 additions & 4 deletions samcli/local/lambdafn/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,13 @@ def create(self, function_config, debug_context=None, container_host=None, conta
)
debug_context = None

self._observer.watch(function_config)
self._observer.start()

container = super().create(function_config, debug_context, container_host, container_host_interface)
self._function_configs[function_config.full_path] = function_config
self._containers[function_config.full_path] = container

self._observer.watch(function_config)
self._observer.start()

return container

def _on_invoke_done(self, container):
Expand Down Expand Up @@ -486,12 +486,12 @@ def _on_code_change(self, functions):
function_full_path,
resource,
)
self._observer.unwatch(function_config)
self._function_configs.pop(function_full_path, None)
container = self._containers.get(function_full_path, None)
if container:
self._container_manager.stop(container)
self._containers.pop(function_full_path, None)
self._observer.unwatch(function_config)


def _unzip_file(filepath):
Expand Down
Loading