ECR: Add get registry scanning configuration operation (#7402)

This commit is contained in:
Daniel Fangl 2024-02-29 11:33:45 +01:00 committed by GitHub
parent 12460e510d
commit ce447bfc2a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 190 additions and 18 deletions

View File

@ -2684,7 +2684,7 @@
- [X] get_lifecycle_policy
- [ ] get_lifecycle_policy_preview
- [X] get_registry_policy
- [ ] get_registry_scanning_configuration
- [X] get_registry_scanning_configuration
- [X] get_repository_policy
- [ ] initiate_layer_upload
- [X] list_images

View File

@ -45,7 +45,7 @@ ecr
- [X] get_lifecycle_policy
- [ ] get_lifecycle_policy_preview
- [X] get_registry_policy
- [ ] get_registry_scanning_configuration
- [X] get_registry_scanning_configuration
- [X] get_repository_policy
- [ ] initiate_layer_upload
- [X] list_images

View File

@ -1,6 +1,7 @@
import hashlib
import json
import re
import threading
from collections import namedtuple
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional, Tuple
@ -390,6 +391,11 @@ class ECRBackend(BaseBackend):
self.registry_policy: Optional[str] = None
self.replication_config: Dict[str, Any] = {"rules": []}
self.repositories: Dict[str, Repository] = {}
self.registry_scanning_configuration: Dict[str, Any] = {
"scanType": "BASIC",
"rules": [],
}
self.registry_scanning_configuration_update_lock = threading.RLock()
self.tagger = TaggingService(tag_name="tags")
@staticmethod
@ -495,6 +501,19 @@ class ECRBackend(BaseBackend):
self.repositories[repository_name] = repository
self.tagger.tag_resource(repository.arn, tags)
# check if any of the registry scanning policies applies to the repository
with self.registry_scanning_configuration_update_lock:
for rule in self.registry_scanning_configuration["rules"]:
for repo_filter in rule["repositoryFilters"]:
if self._match_repository_filter(
repo_filter["filter"], repository_name
):
repository.scanning_config["scanFrequency"] = rule[
"scanFrequency"
]
# AWS testing seems to indicate that this is always overwritten
repository.scanning_config["appliedScanFilters"] = [repo_filter]
return repository
def delete_repository(
@ -1117,16 +1136,41 @@ class ECRBackend(BaseBackend):
return {"replicationConfiguration": replication_config}
def put_registry_scanning_configuration(self, rules: List[Dict[str, Any]]) -> None:
for rule in rules:
for repo_filter in rule["repositoryFilters"]:
for repo in self.repositories.values():
if repo_filter["filter"] == repo.name or re.match(
repo_filter["filter"], repo.name
):
repo.scanning_config["scanFrequency"] = rule["scanFrequency"]
# AWS testing seems to indicate that this is always overwritten
repo.scanning_config["appliedScanFilters"] = [repo_filter]
def _match_repository_filter(self, filter: str, repository_name: str) -> bool:
filter_regex = filter.replace("*", ".*")
return filter in repository_name or bool(
re.match(filter_regex, repository_name)
)
def get_registry_scanning_configuration(self) -> Dict[str, Any]:
return self.registry_scanning_configuration
def put_registry_scanning_configuration(
self, scan_type: str, rules: List[Dict[str, Any]]
) -> None:
# locking here to avoid simultaneous updates which leads to inconsistent state
with self.registry_scanning_configuration_update_lock:
self.registry_scanning_configuration = {
"scanType": scan_type,
"rules": rules,
}
# reset all rules first
for repo in self.repositories.values():
repo.scanning_config["scanFrequency"] = "MANUAL"
repo.scanning_config["appliedScanFilters"] = []
for rule in rules:
for repo_filter in rule["repositoryFilters"]:
for repo in self.repositories.values():
if self._match_repository_filter(
repo_filter["filter"], repo.name
):
repo.scanning_config["scanFrequency"] = rule[
"scanFrequency"
]
# AWS testing seems to indicate that this is always overwritten
repo.scanning_config["appliedScanFilters"] = [repo_filter]
def describe_registry(self) -> Dict[str, Any]:
return {

View File

@ -324,11 +324,24 @@ class ECRResponse(BaseResponse):
)
)
def get_registry_scanning_configuration(self) -> str:
registry_scanning_config = (
self.ecr_backend.get_registry_scanning_configuration()
)
return json.dumps(
{
"registryId": self.current_account,
"scanningConfiguration": registry_scanning_config,
}
)
def put_registry_scanning_configuration(self) -> str:
scan_type = self._get_param("scanType")
rules = self._get_param("rules")
self.ecr_backend.put_registry_scanning_configuration(rules)
return json.dumps({"scanType": scan_type, "rules": rules})
self.ecr_backend.put_registry_scanning_configuration(scan_type, rules)
return json.dumps(
{"registryScanningConfiguration": {"scanType": scan_type, "rules": rules}}
)
def describe_registry(self) -> str:
return json.dumps(self.ecr_backend.describe_registry())

View File

@ -1,6 +1,10 @@
import boto3
from moto import mock_aws
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
ECR_REGION = "us-east-1"
ECR_REPO = "test-repo"
@mock_aws
@ -133,9 +137,120 @@ def test_put_registry_scanning_configuration():
"repositoryArn": repo_arn,
"repositoryName": repo_name,
"scanOnPush": False,
"scanFrequency": "SCAN_ON_PUSH",
"appliedScanFilters": [
{"filter": f"{repo_name[:4]}*", "filterType": "WILDCARD"}
],
"scanFrequency": "MANUAL",
"appliedScanFilters": [],
}
]
@mock_aws
def test_registry_scanning_configuration_lifecycle():
client = boto3.client("ecr", region_name=ECR_REGION)
client.create_repository(repositoryName=ECR_REPO)
get_scanning_config_response = client.get_registry_scanning_configuration()
assert get_scanning_config_response["registryId"] == ACCOUNT_ID
assert get_scanning_config_response["scanningConfiguration"] == {
"rules": [],
"scanType": "BASIC",
}
put_scanning_config_response = client.put_registry_scanning_configuration(
scanType="BASIC",
rules=[
{
"repositoryFilters": [
{
"filter": "test-*",
"filterType": "WILDCARD",
}
],
"scanFrequency": "SCAN_ON_PUSH",
}
],
)
assert put_scanning_config_response["registryScanningConfiguration"] == {
"rules": [
{
"repositoryFilters": [{"filter": "test-*", "filterType": "WILDCARD"}],
"scanFrequency": "SCAN_ON_PUSH",
}
],
"scanType": "BASIC",
}
# check if scanning config is returned in get operation
get_scanning_config_response = client.get_registry_scanning_configuration()
assert get_scanning_config_response["registryId"] == ACCOUNT_ID
assert get_scanning_config_response["scanningConfiguration"] == {
"rules": [
{
"repositoryFilters": [{"filter": "test-*", "filterType": "WILDCARD"}],
"scanFrequency": "SCAN_ON_PUSH",
}
],
"scanType": "BASIC",
}
# check if the scanning config is returned in batch_get_repository_scanning_configuration
repo_scanning_config_result = client.batch_get_repository_scanning_configuration(
repositoryNames=[ECR_REPO]
)
assert repo_scanning_config_result["scanningConfigurations"][0] == {
"appliedScanFilters": [{"filter": "test-*", "filterType": "WILDCARD"}],
"repositoryArn": f"arn:aws:ecr:{ECR_REGION}:{ACCOUNT_ID}:repository/{ECR_REPO}",
"repositoryName": ECR_REPO,
"scanFrequency": "SCAN_ON_PUSH",
"scanOnPush": False,
}
# create new repository and check if scanning config is applied
client.create_repository(repositoryName="test-repo-2")
repo_scanning_config_result = client.batch_get_repository_scanning_configuration(
repositoryNames=["test-repo-2"]
)
assert repo_scanning_config_result["scanningConfigurations"][0] == {
"appliedScanFilters": [{"filter": "test-*", "filterType": "WILDCARD"}],
"repositoryArn": f"arn:aws:ecr:{ECR_REGION}:{ACCOUNT_ID}:repository/test-repo-2",
"repositoryName": "test-repo-2",
"scanFrequency": "SCAN_ON_PUSH",
"scanOnPush": False,
}
# revert scanning config and see if it is properly applied to all repositories
put_scanning_config_response = client.put_registry_scanning_configuration(
scanType="BASIC",
rules=[],
)
assert put_scanning_config_response["registryScanningConfiguration"] == {
"rules": [],
"scanType": "BASIC",
}
get_scanning_config_response = client.get_registry_scanning_configuration()
assert get_scanning_config_response["registryId"] == ACCOUNT_ID
assert get_scanning_config_response["scanningConfiguration"] == {
"rules": [],
"scanType": "BASIC",
}
repo_scanning_config_result = client.batch_get_repository_scanning_configuration(
repositoryNames=[ECR_REPO, "test-repo-2"]
)
assert repo_scanning_config_result["scanningConfigurations"] == [
{
"appliedScanFilters": [],
"repositoryArn": f"arn:aws:ecr:{ECR_REGION}:{ACCOUNT_ID}:repository/{ECR_REPO}",
"repositoryName": ECR_REPO,
"scanFrequency": "MANUAL",
"scanOnPush": False,
},
{
"appliedScanFilters": [],
"repositoryArn": f"arn:aws:ecr:{ECR_REGION}:{ACCOUNT_ID}:repository/test-repo-2",
"repositoryName": "test-repo-2",
"scanFrequency": "MANUAL",
"scanOnPush": False,
},
]