Enhancements to CloudWatch Export Tasks Testing (#7058)

This commit is contained in:
Gadelkareem 2023-11-27 19:56:37 +01:00 committed by GitHub
parent e409e17960
commit 3c10fb12f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 182 additions and 9 deletions

View File

@ -1,4 +1,5 @@
from datetime import timedelta
import json
from datetime import timedelta, datetime
from typing import Any, Dict, Iterable, List, Tuple, Optional
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.core import CloudFormationModel
@ -719,6 +720,39 @@ class LogResourcePolicy(CloudFormationModel):
logs_backends[account_id][region_name].delete_resource_policy(resource_name)
class ExportTask(BaseModel):
def __init__(
self,
task_id: str,
task_name: str,
log_group_name: str,
destination: str,
destination_prefix: str,
from_time: int,
to: int,
):
self.task_id = task_id
self.task_name = task_name
self.log_group_name = log_group_name
self.destination = destination
self.destination_prefix = destination_prefix
self.from_time = from_time
self.to = to
self.status = {"code": "active", "message": "Task is active"}
def to_json(self) -> Dict[str, Any]:
return {
"taskId": self.task_id,
"taskName": self.task_name,
"logGroupName": self.log_group_name,
"destination": self.destination,
"destinationPrefix": self.destination_prefix,
"from": self.from_time,
"to": self.to,
"status": self.status,
}
class LogsBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
@ -728,6 +762,7 @@ class LogsBackend(BaseBackend):
self.resource_policies: Dict[str, LogResourcePolicy] = dict()
self.destinations: Dict[str, Destination] = dict()
self.tagger = TaggingService()
self.export_tasks: Dict[str, ExportTask] = dict()
@staticmethod
def default_vpc_endpoint_service(
@ -1204,12 +1239,45 @@ class LogsBackend(BaseBackend):
return self.queries[query_id]
def create_export_task(
self, log_group_name: str, destination: Dict[str, Any]
self,
taskName: str,
logGroupName: str,
destination: str,
destinationPrefix: str,
fromTime: int,
to: int,
) -> str:
s3_backends[self.account_id]["global"].get_bucket(destination)
if log_group_name not in self.groups:
if logGroupName not in self.groups:
raise ResourceNotFoundException()
return str(mock_random.uuid4())
task_id = str(mock_random.uuid4())
self.export_tasks[task_id] = ExportTask(
task_id,
taskName,
logGroupName,
destination,
destinationPrefix,
fromTime,
to,
)
if fromTime <= datetime.now().timestamp() <= to:
logs = self.filter_log_events(
logGroupName, [], fromTime, to, None, None, "", False
)
s3_backends[self.account_id]["global"].put_object(
destination, destinationPrefix, json.dumps(logs)
)
self.export_tasks[task_id].status["code"] = "completed"
self.export_tasks[task_id].status["message"] = "Task is completed"
return task_id
def describe_export_tasks(self, taskId: str = "") -> Tuple[List[ExportTask], str]:
if taskId:
if taskId not in self.export_tasks:
raise ResourceNotFoundException()
return [self.export_tasks[taskId]], ""
else:
return list(self.export_tasks.values()), ""
def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
return self.tagger.get_tag_dict_for_resource(resource_arn)

View File

@ -433,13 +433,24 @@ class LogsResponse(BaseResponse):
return json.dumps(query.to_result_json())
def create_export_task(self) -> str:
log_group_name = self._get_param("logGroupName")
destination = self._get_param("destination")
task_id = self.logs_backend.create_export_task(
log_group_name=log_group_name, destination=destination
logGroupName=self._get_param("logGroupName"),
fromTime=self._get_int_param("from"),
to=self._get_int_param("to"),
destination=self._get_param("destination"),
destinationPrefix=self._get_param("destinationPrefix"),
taskName=self._get_param("taskName"),
)
return json.dumps(dict(taskId=str(task_id)))
def describe_export_tasks(self) -> str:
task_id = self._get_param("taskId")
tasks, next_token = self.logs_backend.describe_export_tasks(taskId=task_id)
return json.dumps(
{"exportTasks": [task.to_json() for task in tasks], "nextToken": next_token}
)
def list_tags_for_resource(self) -> str:
resource_arn = self._get_param("resourceArn")
tags = self.logs_backend.list_tags_for_resource(resource_arn)

View File

@ -1,5 +1,5 @@
import json
from datetime import timedelta
from datetime import timedelta, datetime
from uuid import UUID
import boto3
@ -1384,7 +1384,10 @@ def test_create_export_task_happy_path():
logs.create_log_group(logGroupName=log_group_name)
s3.create_bucket(Bucket=destination)
resp = logs.create_export_task(
logGroupName=log_group_name, fromTime=fromTime, to=to, destination=destination
logGroupName=log_group_name,
fromTime=fromTime,
to=to,
destination=destination,
)
# taskId resembles a valid UUID (i.e. a string of 32 hexadecimal digits)
assert UUID(resp["taskId"])
@ -1425,3 +1428,94 @@ def test_create_export_raises_ResourceNotFoundException_log_group_not_found():
to=to,
destination=destination,
)
@mock_s3
@mock_logs
def test_create_export_executes_export_task():
log_group_name = "/aws/codebuild/blah1"
destination = "mybucket"
fromTime = int(datetime.now().replace(hour=0, minute=0, second=0).timestamp())
to = int(datetime.now().replace(hour=23, minute=59, second=59).timestamp())
logs = boto3.client("logs", region_name="ap-southeast-1")
s3 = boto3.client("s3", "us-east-1")
logs.create_log_group(logGroupName=log_group_name)
s3.create_bucket(Bucket=destination)
resp = logs.create_export_task(
logGroupName=log_group_name,
fromTime=fromTime,
to=to,
destination=destination,
)
taskId = resp["taskId"]
resp = logs.describe_export_tasks(taskId=taskId)
assert len(resp["exportTasks"]) == 1
assert resp["exportTasks"][0]["logGroupName"] == log_group_name
assert resp["exportTasks"][0]["destination"] == destination
assert resp["exportTasks"][0]["from"] == fromTime
assert resp["exportTasks"][0]["to"] == to
assert resp["exportTasks"][0]["status"]["code"] == "completed"
assert resp["exportTasks"][0]["status"]["message"] == "Task is completed"
@mock_s3
@mock_logs
def test_describe_export_tasks_happy_path():
log_group_name = "/aws/codebuild/blah1"
destination = "mybucket"
fromTime = 1611316574
to = 1642852574
logs = boto3.client("logs", region_name="ap-southeast-1")
s3 = boto3.client("s3", "us-east-1")
logs.create_log_group(logGroupName=log_group_name)
s3.create_bucket(Bucket=destination)
logs.create_export_task(
logGroupName=log_group_name,
fromTime=fromTime,
to=to,
destination=destination,
)
resp = logs.describe_export_tasks()
assert len(resp["exportTasks"]) == 1
assert resp["exportTasks"][0]["logGroupName"] == log_group_name
assert resp["exportTasks"][0]["destination"] == destination
assert resp["exportTasks"][0]["from"] == fromTime
assert resp["exportTasks"][0]["to"] == to
assert resp["exportTasks"][0]["status"]["code"] == "active"
assert resp["exportTasks"][0]["status"]["message"] == "Task is active"
@mock_s3
@mock_logs
def test_describe_export_tasks_task_id():
log_group_name = "/aws/codebuild/blah1"
destination = "mybucket"
fromTime = 1611316574
to = 1642852574
logs = boto3.client("logs", region_name="ap-southeast-1")
s3 = boto3.client("s3", "us-east-1")
logs.create_log_group(logGroupName=log_group_name)
s3.create_bucket(Bucket=destination)
resp = logs.create_export_task(
logGroupName=log_group_name,
fromTime=fromTime,
to=to,
destination=destination,
)
taskId = resp["taskId"]
resp = logs.describe_export_tasks(taskId=taskId)
assert len(resp["exportTasks"]) == 1
assert resp["exportTasks"][0]["logGroupName"] == log_group_name
assert resp["exportTasks"][0]["destination"] == destination
assert resp["exportTasks"][0]["from"] == fromTime
assert resp["exportTasks"][0]["to"] == to
assert resp["exportTasks"][0]["status"]["code"] == "active"
assert resp["exportTasks"][0]["status"]["message"] == "Task is active"
@mock_s3
@mock_logs
def test_describe_export_tasks_raises_ResourceNotFoundException_task_id_not_found():
logs = boto3.client("logs", region_name="ap-southeast-1")
with pytest.raises(logs.exceptions.ResourceNotFoundException):
logs.describe_export_tasks(taskId="368a7022dea3dd621")