ECR: batch_get_repository_scanning_configuration() (#6924)

This commit is contained in:
Bert Blommers 2023-10-16 21:44:18 +00:00 committed by GitHub
parent a0434335d2
commit a1f261189e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 203 additions and 6 deletions

View File

@ -2510,12 +2510,12 @@
## ecr
<details>
<summary>63% implemented</summary>
<summary>68% implemented</summary>
- [ ] batch_check_layer_availability
- [X] batch_delete_image
- [X] batch_get_image
- [ ] batch_get_repository_scanning_configuration
- [X] batch_get_repository_scanning_configuration
- [ ] complete_layer_upload
- [ ] create_pull_through_cache_rule
- [X] create_repository
@ -2545,7 +2545,7 @@
- [X] put_image_tag_mutability
- [X] put_lifecycle_policy
- [X] put_registry_policy
- [ ] put_registry_scanning_configuration
- [X] put_registry_scanning_configuration
- [X] put_replication_configuration
- [X] set_repository_policy
- [X] start_image_scan

View File

@ -32,7 +32,7 @@ ecr
The parameter AcceptedMediaTypes has not yet been implemented
- [ ] batch_get_repository_scanning_configuration
- [X] batch_get_repository_scanning_configuration
- [ ] complete_layer_upload
- [ ] create_pull_through_cache_rule
- [X] create_repository
@ -70,7 +70,7 @@ ecr
- [X] put_image_tag_mutability
- [X] put_lifecycle_policy
- [X] put_registry_policy
- [ ] put_registry_scanning_configuration
- [X] put_registry_scanning_configuration
- [X] put_replication_configuration
- [X] set_repository_policy
- [X] start_image_scan

View File

@ -3,7 +3,7 @@ import json
import re
from collections import namedtuple
from datetime import datetime, timezone
from typing import Any, Dict, List, Iterable, Optional
from typing import Any, Dict, List, Iterable, Optional, Tuple
from botocore.exceptions import ParamValidationError
@ -93,6 +93,13 @@ class Repository(BaseObject, CloudFormationModel):
self.policy: Optional[str] = None
self.lifecycle_policy: Optional[str] = None
self.images: List[Image] = []
self.scanning_config = {
"repositoryArn": self.arn,
"repositoryName": self.name,
"scanOnPush": False,
"scanFrequency": "MANUAL",
"appliedScanFilters": [],
}
def _determine_encryption_config(
self, encryption_config: Optional[Dict[str, str]]
@ -789,6 +796,20 @@ class ECRBackend(BaseBackend):
return response
def batch_get_repository_scanning_configuration(
self, names: List[str]
) -> Tuple[List[Dict[str, Any]], List[str]]:
configs = []
failing = []
for name in names:
try:
configs.append(
self._get_repository(name=name, registry_id=None).scanning_config
)
except RepositoryNotFoundException:
failing.append(name)
return configs, failing
def list_tags_for_resource(self, arn: str) -> Dict[str, List[Dict[str, str]]]:
resource = self._parse_resource_arn(arn)
repo = self._get_repository(resource.repo_name, resource.account_id)
@ -1093,6 +1114,17 @@ class ECRBackend(BaseBackend):
return {"replicationConfiguration": replication_config}
def put_registry_scanning_configuration(self, rules: List[Dict[str, Any]]) -> None:
for rule in rules:
for repo_filter in rule["repositoryFilters"]:
for repo in self.repositories.values():
if repo_filter["filter"] == repo.name or re.match(
repo_filter["filter"], repo.name
):
repo.scanning_config["scanFrequency"] = rule["scanFrequency"]
# AWS testing seems to indicate that this is always overwritten
repo.scanning_config["appliedScanFilters"] = [repo_filter]
def describe_registry(self) -> Dict[str, Any]:
return {
"registryId": self.account_id,

View File

@ -108,6 +108,25 @@ class ECRResponse(BaseResponse):
)
return json.dumps(response)
def batch_get_repository_scanning_configuration(self) -> str:
names = self._get_param("repositoryNames")
configs, missing = self.ecr_backend.batch_get_repository_scanning_configuration(
names
)
return json.dumps(
{
"scanningConfigurations": configs,
"failures": [
{
"repositoryName": m,
"failureCode": "REPOSITORY_NOT_FOUND",
"failureReason": "REPOSITORY_NOT_FOUND",
}
for m in missing
],
}
)
def complete_layer_upload(self) -> None:
self.error_on_dryrun()
raise NotImplementedError("ECR.complete_layer_upload is not yet implemented")
@ -303,5 +322,11 @@ class ECRResponse(BaseResponse):
)
)
def put_registry_scanning_configuration(self) -> str:
scan_type = self._get_param("scanType")
rules = self._get_param("rules")
self.ecr_backend.put_registry_scanning_configuration(rules)
return json.dumps({"scanType": scan_type, "rules": rules})
def describe_registry(self) -> str:
return json.dumps(self.ecr_backend.describe_registry())

View File

@ -0,0 +1,140 @@
import boto3
from moto import mock_ecr
@mock_ecr
def test_batch_get_repository_scanning_configuration():
client = boto3.client("ecr", region_name="us-east-1")
repo_name = "test-repo"
repo_arn = client.create_repository(repositoryName=repo_name)["repository"][
"repositoryArn"
]
# Default ScanningConfig is returned
resp = client.batch_get_repository_scanning_configuration(
repositoryNames=[repo_name]
)
assert resp["scanningConfigurations"] == [
{
"repositoryArn": repo_arn,
"repositoryName": repo_name,
"scanOnPush": False,
"scanFrequency": "MANUAL",
"appliedScanFilters": [],
}
]
assert resp["failures"] == []
# Non-existing repositories are returned as failures
resp = client.batch_get_repository_scanning_configuration(
repositoryNames=["unknown"]
)
assert resp["scanningConfigurations"] == []
assert resp["failures"] == [
{
"repositoryName": "unknown",
"failureCode": "REPOSITORY_NOT_FOUND",
"failureReason": "REPOSITORY_NOT_FOUND",
}
]
@mock_ecr
def test_put_registry_scanning_configuration():
client = boto3.client("ecr", region_name="us-east-1")
repo_name = "test-repo"
repo_arn = client.create_repository(repositoryName=repo_name)["repository"][
"repositoryArn"
]
#####
# Creating a ScanningConfig where
# filter == repo_name
####
client.put_registry_scanning_configuration(
scanType="ENHANCED",
rules=[
{
"scanFrequency": "CONTINUOUS_SCAN",
"repositoryFilters": [{"filter": repo_name, "filterType": "WILDCARD"}],
}
],
)
resp = client.batch_get_repository_scanning_configuration(
repositoryNames=[repo_name]
)
assert resp["scanningConfigurations"] == [
{
"repositoryArn": repo_arn,
"repositoryName": repo_name,
"scanOnPush": False,
"scanFrequency": "CONTINUOUS_SCAN",
"appliedScanFilters": [{"filter": repo_name, "filterType": "WILDCARD"}],
}
]
#####
# Creating a ScanningConfig where
# filter == repo*
####
client.put_registry_scanning_configuration(
scanType="ENHANCED",
rules=[
{
"scanFrequency": "SCAN_ON_PUSH",
"repositoryFilters": [
{"filter": f"{repo_name[:4]}*", "filterType": "WILDCARD"}
],
}
],
)
resp = client.batch_get_repository_scanning_configuration(
repositoryNames=[repo_name]
)
assert resp["scanningConfigurations"] == [
{
"repositoryArn": repo_arn,
"repositoryName": repo_name,
"scanOnPush": False,
"scanFrequency": "SCAN_ON_PUSH",
"appliedScanFilters": [
{"filter": f"{repo_name[:4]}*", "filterType": "WILDCARD"}
],
}
]
#####
# Creating a ScanningConfig where
# filter == unknown_repo
####
client.put_registry_scanning_configuration(
scanType="ENHANCED",
rules=[
{
"scanFrequency": "MANUAL",
"repositoryFilters": [{"filter": "unknown", "filterType": "WILDCARD"}],
}
],
)
resp = client.batch_get_repository_scanning_configuration(
repositoryNames=[repo_name]
)
assert resp["scanningConfigurations"] == [
{
"repositoryArn": repo_arn,
"repositoryName": repo_name,
"scanOnPush": False,
"scanFrequency": "SCAN_ON_PUSH",
"appliedScanFilters": [
{"filter": f"{repo_name[:4]}*", "filterType": "WILDCARD"}
],
}
]