diff --git a/google/cloud/storage/bucket.py b/google/cloud/storage/bucket.py index c8df6c600..9d8c5e431 100644 --- a/google/cloud/storage/bucket.py +++ b/google/cloud/storage/bucket.py @@ -56,6 +56,7 @@ from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS from google.cloud.storage.constants import REGION_LOCATION_TYPE from google.cloud.storage.constants import STANDARD_STORAGE_CLASS +from google.cloud.storage.ip_filter import IPFilter from google.cloud.storage.notification import BucketNotification from google.cloud.storage.notification import NONE_PAYLOAD_FORMAT from google.cloud.storage.retry import DEFAULT_RETRY @@ -88,6 +89,7 @@ _FROM_STRING_MESSAGE = ( "Bucket.from_string() is deprecated. " "Use Bucket.from_uri() instead." ) +_IP_FILTER_PROPERTY = "ipFilter" def _blobs_page_start(iterator, page, response): @@ -3887,6 +3889,59 @@ def generate_signed_url( query_parameters=query_parameters, ) + @property + def ip_filter(self): + """Retrieve or set the IP Filter configuration for this bucket. + + See https://cloud.google.com/storage/docs/ip-filtering-overview and + https://cloud.google.com/storage/docs/json_api/v1/buckets#ipFilter + + .. note:: + The getter for this property returns an + :class:`~google.cloud.storage.ip_filter.IPFilter` object, which is a + structured representation of the bucket's IP filter configuration. + Modifying the returned object has no effect. To update the bucket's + IP filter, create and assign a new ``IPFilter`` object to this + property and then call + :meth:`~google.cloud.storage.bucket.Bucket.patch`. + + .. code-block:: python + + from google.cloud.storage.ip_filter import ( + IPFilter, + PublicNetworkSource, + ) + + ip_filter = IPFilter() + ip_filter.mode = "Enabled" + ip_filter.public_network_source = PublicNetworkSource( + allowed_ip_cidr_ranges=["203.0.113.5/32"] + ) + bucket.ip_filter = ip_filter + bucket.patch() + + :setter: Set the IP Filter configuration for this bucket. + :getter: Gets the IP Filter configuration for this bucket. + + :rtype: :class:`~google.cloud.storage.ip_filter.IPFilter` or ``NoneType`` + :returns: + An ``IPFilter`` object representing the configuration, or ``None`` + if no filter is configured. + """ + resource = self._properties.get(_IP_FILTER_PROPERTY) + if resource: + return IPFilter._from_api_resource(resource) + return None + + @ip_filter.setter + def ip_filter(self, value): + if value is None: + self._patch_property(_IP_FILTER_PROPERTY, None) + elif isinstance(value, IPFilter): + self._patch_property(_IP_FILTER_PROPERTY, value._to_api_resource()) + else: + self._patch_property(_IP_FILTER_PROPERTY, value) + class SoftDeletePolicy(dict): """Map a bucket's soft delete policy. diff --git a/google/cloud/storage/ip_filter.py b/google/cloud/storage/ip_filter.py new file mode 100644 index 000000000..e5b2318bf --- /dev/null +++ b/google/cloud/storage/ip_filter.py @@ -0,0 +1,143 @@ +# Copyright 2014 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""IP Filter configuration for Google Cloud Storage Buckets.""" + +from typing import Dict, Any, Optional, List + +_MODE = "mode" +_PUBLIC_NETWORK_SOURCE = "publicNetworkSource" +_VPC_NETWORK_SOURCES = "vpcNetworkSources" +_ALLOWED_IP_CIDR_RANGES = "allowedIpCidrRanges" +_NETWORK = "network" +_ALLOW_ALL_SERVICE_AGENT_ACCESS = "allowAllServiceAgentAccess" +_ALLOW_CROSS_ORG_VPCS = "allowCrossOrgVpcs" + + +class PublicNetworkSource: + """Represents a public network source for a GCS Bucket IP Filter. + + :type allowed_ip_cidr_ranges: list(str) or None + :param allowed_ip_cidr_ranges: A list of public IPv4 or IPv6 ranges in + CIDR notation that are allowed to access + the bucket. + """ + + def __init__(self, allowed_ip_cidr_ranges: Optional[List[str]] = None): + self.allowed_ip_cidr_ranges = allowed_ip_cidr_ranges or [] + + def _to_api_resource(self) -> Dict[str, Any]: + """Serializes this object to a dictionary for API requests.""" + return {_ALLOWED_IP_CIDR_RANGES: self.allowed_ip_cidr_ranges} + + +class VpcNetworkSource: + """Represents a VPC network source for a GCS Bucket IP Filter. + + :type network: str + :param network: The resource name of the VPC network. + + :type allowed_ip_cidr_ranges: list(str) or None + :param allowed_ip_cidr_ranges: A list of IPv4 or IPv6 ranges in CIDR + notation allowed to access the bucket + from this VPC. + """ + + def __init__( + self, network: str, allowed_ip_cidr_ranges: Optional[List[str]] = None + ): + self.network = network + self.allowed_ip_cidr_ranges = allowed_ip_cidr_ranges or [] + + def _to_api_resource(self) -> Dict[str, Any]: + """Serializes this object to a dictionary for API requests.""" + return { + _NETWORK: self.network, + _ALLOWED_IP_CIDR_RANGES: self.allowed_ip_cidr_ranges, + } + + +class IPFilter: + """Represents a GCS Bucket IP Filter configuration. + + This class is a helper for constructing the IP Filter dictionary to be + assigned to a bucket's ``ip_filter`` property. + """ + + """ + Attributes: + mode (str): Required. The mode of the IP filter. Can be "Enabled" or "Disabled". + allow_all_service_agent_access (bool): Required. If True, allows Google + Cloud service agents to bypass the IP filter. + public_network_source (PublicNetworkSource): (Optional) The configuration + for requests from the public internet. + vpc_network_sources (list(VpcNetworkSource)): (Optional) A list of + configurations for requests from VPC networks. + allow_cross_org_vpcs (bool): (Optional) If True, allows VPCs from + other organizations to be used in the configuration. + """ + + def __init__(self): + self.mode: Optional[str] = None + self.public_network_source: Optional[PublicNetworkSource] = None + self.vpc_network_sources: List[VpcNetworkSource] = [] + self.allow_all_service_agent_access: Optional[bool] = None + self.allow_cross_org_vpcs: Optional[bool] = None + + @classmethod + def _from_api_resource(cls, resource: Dict[str, Any]) -> "IPFilter": + """Factory: creates an IPFilter instance from a server response.""" + ip_filter = cls() + ip_filter.mode = resource.get(_MODE) + ip_filter.allow_all_service_agent_access = resource.get( + _ALLOW_ALL_SERVICE_AGENT_ACCESS, None + ) + + public_network_source_data = resource.get(_PUBLIC_NETWORK_SOURCE, None) + if public_network_source_data: + ip_filter.public_network_source = PublicNetworkSource( + allowed_ip_cidr_ranges=public_network_source_data.get( + _ALLOWED_IP_CIDR_RANGES, [] + ) + ) + + vns_res_list = resource.get(_VPC_NETWORK_SOURCES, []) + ip_filter.vpc_network_sources = [ + VpcNetworkSource( + network=vns.get(_NETWORK), + allowed_ip_cidr_ranges=vns.get(_ALLOWED_IP_CIDR_RANGES, []), + ) + for vns in vns_res_list + ] + ip_filter.allow_cross_org_vpcs = resource.get(_ALLOW_CROSS_ORG_VPCS, None) + return ip_filter + + def _to_api_resource(self) -> Dict[str, Any]: + """Serializes this object to a dictionary for API requests.""" + resource = { + _MODE: self.mode, + _ALLOW_ALL_SERVICE_AGENT_ACCESS: self.allow_all_service_agent_access, + } + + if self.public_network_source: + resource[ + _PUBLIC_NETWORK_SOURCE + ] = self.public_network_source._to_api_resource() + if self.vpc_network_sources is not None: + resource[_VPC_NETWORK_SOURCES] = [ + vns._to_api_resource() for vns in self.vpc_network_sources + ] + if self.allow_cross_org_vpcs is not None: + resource[_ALLOW_CROSS_ORG_VPCS] = self.allow_cross_org_vpcs + return resource diff --git a/tests/system/test_bucket.py b/tests/system/test_bucket.py index f06de8e8c..3b05e8483 100644 --- a/tests/system/test_bucket.py +++ b/tests/system/test_bucket.py @@ -17,6 +17,11 @@ from google.api_core import exceptions from . import _helpers +from google.cloud.storage.ip_filter import ( + IPFilter, + PublicNetworkSource, + VpcNetworkSource, +) def test_bucket_create_w_alt_storage_class(storage_client, buckets_to_delete): @@ -1299,3 +1304,66 @@ def test_new_bucket_with_hierarchical_namespace( bucket = storage_client.create_bucket(bucket_obj) buckets_to_delete.append(bucket) assert bucket.hierarchical_namespace_enabled is True + + +def test_bucket_ip_filter_patch(storage_client, buckets_to_delete): + """Test setting and clearing IP filter configuration without enabling enforcement.""" + bucket_name = _helpers.unique_name("ip-filter-control") + bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) + buckets_to_delete.append(bucket) + + ip_filter = IPFilter() + ip_filter.mode = "Disabled" + ip_filter.allow_all_service_agent_access = True + ip_filter.public_network_source = PublicNetworkSource( + allowed_ip_cidr_ranges=["203.0.113.10/32"] + ) + ip_filter.vpc_network_sources.append( + VpcNetworkSource( + network=f"projects/{storage_client.project}/global/networks/default", + allowed_ip_cidr_ranges=["10.0.0.0/8"], + ) + ) + bucket.ip_filter = ip_filter + bucket.patch() + + # Reload and verify the full configuration was set correctly. + bucket.reload() + reloaded_filter = bucket.ip_filter + assert reloaded_filter is not None + assert reloaded_filter.mode == "Disabled" + assert reloaded_filter.allow_all_service_agent_access is True + assert reloaded_filter.public_network_source.allowed_ip_cidr_ranges == [ + "203.0.113.10/32" + ] + assert len(reloaded_filter.vpc_network_sources) == 1 + +def test_list_buckets_with_ip_filter(storage_client, buckets_to_delete): + """Test that listing buckets returns a summarized IP filter.""" + bucket_name = _helpers.unique_name("ip-filter-list") + bucket = _helpers.retry_429_503(storage_client.create_bucket)(bucket_name) + buckets_to_delete.append(bucket) + + ip_filter = IPFilter() + ip_filter.mode = "Disabled" + ip_filter.allow_all_service_agent_access = True + ip_filter.public_network_source = PublicNetworkSource( + allowed_ip_cidr_ranges=["203.0.113.10/32"] + ) + bucket.ip_filter = ip_filter + bucket.patch() + + buckets_list = list(storage_client.list_buckets(prefix=bucket_name)) + found_bucket = next((b for b in buckets_list if b.name == bucket_name), None) + + assert found_bucket is not None + summarized_filter = found_bucket.ip_filter + + assert summarized_filter is not None + assert summarized_filter.mode == "Disabled" + assert summarized_filter.allow_all_service_agent_access is True + + # Check that the summarized filter does not include full details. + assert summarized_filter.public_network_source is None + assert summarized_filter.vpc_network_sources == [] + diff --git a/tests/unit/test_bucket.py b/tests/unit/test_bucket.py index e494cc18a..809b572e0 100644 --- a/tests/unit/test_bucket.py +++ b/tests/unit/test_bucket.py @@ -4612,6 +4612,54 @@ def test_generate_signed_url_v4_w_incompatible_params(self): virtual_hosted_style=True, bucket_bound_hostname="cdn.example.com" ) + def test_ip_filter_getter_unset(self): + """Test that ip_filter is None when not set.""" + bucket = self._make_one() + self.assertIsNone(bucket.ip_filter) + + def test_ip_filter_getter_w_value(self): + """Test getting an existing ip_filter configuration.""" + from google.cloud.storage.ip_filter import IPFilter + + ipf_property = {"mode": "Enabled"} + properties = {"ipFilter": ipf_property} + bucket = self._make_one(properties=properties) + + ip_filter = bucket.ip_filter + self.assertIsInstance(ip_filter, IPFilter) + self.assertEqual(ip_filter.mode, "Enabled") + + def test_ip_filter_setter(self): + """Test setting the ip_filter with a helper class.""" + from google.cloud.storage.ip_filter import IPFilter + from google.cloud.storage.bucket import _IP_FILTER_PROPERTY + + bucket = self._make_one() + ip_filter = IPFilter() + ip_filter.mode = "Enabled" + + bucket.ip_filter = ip_filter + + self.assertIn(_IP_FILTER_PROPERTY, bucket._changes) + self.assertEqual( + bucket._properties[_IP_FILTER_PROPERTY], + { + "mode": "Enabled", + "vpcNetworkSources": [], + "allowAllServiceAgentAccess": None, + }, + ) + + def test_ip_filter_setter_w_none(self): + """Test clearing the ip_filter by setting it to None.""" + from google.cloud.storage.bucket import _IP_FILTER_PROPERTY + + bucket = self._make_one(properties={"ipFilter": {"mode": "Enabled"}}) + bucket.ip_filter = None + + self.assertIn(_IP_FILTER_PROPERTY, bucket._changes) + self.assertIsNone(bucket._properties.get(_IP_FILTER_PROPERTY)) + class Test__item_to_notification(unittest.TestCase): def _call_fut(self, iterator, item): diff --git a/tests/unit/test_ip_filter.py b/tests/unit/test_ip_filter.py new file mode 100644 index 000000000..369462f2f --- /dev/null +++ b/tests/unit/test_ip_filter.py @@ -0,0 +1,106 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + + +class TestIPFilterHelpers(unittest.TestCase): + @staticmethod + def _get_public_network_source_class(): + from google.cloud.storage.ip_filter import PublicNetworkSource + + return PublicNetworkSource + + @staticmethod + def _get_vpc_network_source_class(): + from google.cloud.storage.ip_filter import VpcNetworkSource + + return VpcNetworkSource + + @staticmethod + def _get_ip_filter_class(): + from google.cloud.storage.ip_filter import IPFilter + + return IPFilter + + def test_public_network_source_serialization(self): + pns_class = self._get_public_network_source_class() + pns = pns_class(allowed_ip_cidr_ranges=["1.2.3.4/32"]) + resource = pns._to_api_resource() + self.assertEqual(resource, {"allowedIpCidrRanges": ["1.2.3.4/32"]}) + + def test_vpc_network_source_serialization(self): + vns_class = self._get_vpc_network_source_class() + vns = vns_class( + network="projects/p/global/networks/n", + allowed_ip_cidr_ranges=["10.0.0.0/8"], + ) + resource = vns._to_api_resource() + self.assertEqual( + resource, + { + "network": "projects/p/global/networks/n", + "allowedIpCidrRanges": ["10.0.0.0/8"], + }, + ) + + def test_ip_filter_full_serialization(self): + ip_filter_class = self._get_ip_filter_class() + pns_class = self._get_public_network_source_class() + vns_class = self._get_vpc_network_source_class() + + ip_filter = ip_filter_class() + ip_filter.mode = "Enabled" + ip_filter.public_network_source = pns_class( + allowed_ip_cidr_ranges=["1.2.3.4/32"] + ) + ip_filter.vpc_network_sources.append( + vns_class( + network="projects/p/global/networks/n", + allowed_ip_cidr_ranges=["10.0.0.0/8"], + ) + ) + ip_filter.allow_all_service_agent_access = True + + resource = ip_filter._to_api_resource() + expected = { + "mode": "Enabled", + "publicNetworkSource": {"allowedIpCidrRanges": ["1.2.3.4/32"]}, + "vpcNetworkSources": [ + { + "network": "projects/p/global/networks/n", + "allowedIpCidrRanges": ["10.0.0.0/8"], + } + ], + "allowAllServiceAgentAccess": True, + } + self.assertEqual(resource, expected) + + def test_ip_filter_deserialization(self): + ip_filter_class = self._get_ip_filter_class() + resource = { + "mode": "Enabled", + "publicNetworkSource": {"allowedIpCidrRanges": ["1.2.3.4/32"]}, + "allowAllServiceAgentAccess": False, + } + + ip_filter = ip_filter_class._from_api_resource(resource) + + self.assertEqual(ip_filter.mode, "Enabled") + self.assertIsNotNone(ip_filter.public_network_source) + self.assertEqual( + ip_filter.public_network_source.allowed_ip_cidr_ranges, ["1.2.3.4/32"] + ) + self.assertEqual(ip_filter.vpc_network_sources, []) + self.assertIs(ip_filter.allow_all_service_agent_access, False)