import boto3 import os import sure # noqa import six from botocore.exceptions import ClientError from moto import mock_logs, settings from nose.tools import assert_raises from nose import SkipTest _logs_region = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2" @mock_logs def test_log_group_create(): conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" response = conn.create_log_group(logGroupName=log_group_name) response = conn.describe_log_groups(logGroupNamePrefix=log_group_name) assert len(response["logGroups"]) == 1 # AWS defaults to Never Expire for log group retention assert response["logGroups"][0].get("retentionInDays") == None response = conn.delete_log_group(logGroupName=log_group_name) @mock_logs def test_exceptions(): conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" log_stream_name = "dummp-stream" conn.create_log_group(logGroupName=log_group_name) with assert_raises(ClientError): conn.create_log_group(logGroupName=log_group_name) # descrine_log_groups is not implemented yet conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) with assert_raises(ClientError): conn.create_log_stream( logGroupName=log_group_name, logStreamName=log_stream_name ) conn.put_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=[{"timestamp": 0, "message": "line"}], ) with assert_raises(ClientError): conn.put_log_events( logGroupName=log_group_name, logStreamName="invalid-stream", logEvents=[{"timestamp": 0, "message": "line"}], ) @mock_logs def test_put_logs(): conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" log_stream_name = "stream" conn.create_log_group(logGroupName=log_group_name) conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) messages = [ {"timestamp": 0, "message": "hello"}, {"timestamp": 0, "message": "world"}, ] putRes = conn.put_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages ) res = conn.get_log_events( logGroupName=log_group_name, logStreamName=log_stream_name ) events = res["events"] nextSequenceToken = putRes["nextSequenceToken"] assert isinstance(nextSequenceToken, six.string_types) == True assert len(nextSequenceToken) == 56 events.should.have.length_of(2) @mock_logs def test_filter_logs_interleaved(): conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" log_stream_name = "stream" conn.create_log_group(logGroupName=log_group_name) conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) messages = [ {"timestamp": 0, "message": "hello"}, {"timestamp": 0, "message": "world"}, ] conn.put_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages ) res = conn.filter_log_events( logGroupName=log_group_name, logStreamNames=[log_stream_name], interleaved=True ) events = res["events"] for original_message, resulting_event in zip(messages, events): resulting_event["eventId"].should.equal(str(resulting_event["eventId"])) resulting_event["timestamp"].should.equal(original_message["timestamp"]) resulting_event["message"].should.equal(original_message["message"]) @mock_logs def test_filter_logs_raises_if_filter_pattern(): if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true": raise SkipTest("Does not work in server mode due to error in Workzeug") conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" log_stream_name = "stream" conn.create_log_group(logGroupName=log_group_name) conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) messages = [ {"timestamp": 0, "message": "hello"}, {"timestamp": 0, "message": "world"}, ] conn.put_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages ) with assert_raises(NotImplementedError): conn.filter_log_events( logGroupName=log_group_name, logStreamNames=[log_stream_name], filterPattern='{$.message = "hello"}', ) @mock_logs def test_put_retention_policy(): conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" response = conn.create_log_group(logGroupName=log_group_name) response = conn.put_retention_policy(logGroupName=log_group_name, retentionInDays=7) response = conn.describe_log_groups(logGroupNamePrefix=log_group_name) assert len(response["logGroups"]) == 1 assert response["logGroups"][0].get("retentionInDays") == 7 response = conn.delete_log_group(logGroupName=log_group_name) @mock_logs def test_delete_retention_policy(): conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" response = conn.create_log_group(logGroupName=log_group_name) response = conn.put_retention_policy(logGroupName=log_group_name, retentionInDays=7) response = conn.describe_log_groups(logGroupNamePrefix=log_group_name) assert len(response["logGroups"]) == 1 assert response["logGroups"][0].get("retentionInDays") == 7 response = conn.delete_retention_policy(logGroupName=log_group_name) response = conn.describe_log_groups(logGroupNamePrefix=log_group_name) assert len(response["logGroups"]) == 1 assert response["logGroups"][0].get("retentionInDays") == None response = conn.delete_log_group(logGroupName=log_group_name) @mock_logs def test_get_log_events(): conn = boto3.client("logs", "us-west-2") log_group_name = "test" log_stream_name = "stream" conn.create_log_group(logGroupName=log_group_name) conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) events = [{"timestamp": x, "message": str(x)} for x in range(20)] conn.put_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=events ) resp = conn.get_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, limit=10 ) resp["events"].should.have.length_of(10) resp.should.have.key("nextForwardToken") resp.should.have.key("nextBackwardToken") resp["nextForwardToken"].should.equal( "f/00000000000000000000000000000000000000000000000000000010" ) resp["nextBackwardToken"].should.equal( "b/00000000000000000000000000000000000000000000000000000000" ) for i in range(10): resp["events"][i]["timestamp"].should.equal(i) resp["events"][i]["message"].should.equal(str(i)) next_token = resp["nextForwardToken"] resp = conn.get_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, nextToken=next_token, limit=10, ) resp["events"].should.have.length_of(10) resp.should.have.key("nextForwardToken") resp.should.have.key("nextBackwardToken") resp["nextForwardToken"].should.equal( "f/00000000000000000000000000000000000000000000000000000020" ) resp["nextBackwardToken"].should.equal( "b/00000000000000000000000000000000000000000000000000000000" ) for i in range(10): resp["events"][i]["timestamp"].should.equal(i + 10) resp["events"][i]["message"].should.equal(str(i + 10)) resp = conn.get_log_events( logGroupName=log_group_name, logStreamName=log_stream_name, nextToken=resp["nextBackwardToken"], limit=10, ) resp["events"].should.have.length_of(10) resp.should.have.key("nextForwardToken") resp.should.have.key("nextBackwardToken") for i in range(10): resp["events"][i]["timestamp"].should.equal(i) resp["events"][i]["message"].should.equal(str(i)) @mock_logs def test_list_tags_log_group(): conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" tags = {"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"} response = conn.create_log_group(logGroupName=log_group_name) response = conn.list_tags_log_group(logGroupName=log_group_name) assert response["tags"] == {} response = conn.delete_log_group(logGroupName=log_group_name) response = conn.create_log_group(logGroupName=log_group_name, tags=tags) response = conn.list_tags_log_group(logGroupName=log_group_name) assert response["tags"] == tags response = conn.delete_log_group(logGroupName=log_group_name) @mock_logs def test_tag_log_group(): conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" tags = {"tag_key_1": "tag_value_1"} response = conn.create_log_group(logGroupName=log_group_name) response = conn.tag_log_group(logGroupName=log_group_name, tags=tags) response = conn.list_tags_log_group(logGroupName=log_group_name) assert response["tags"] == tags tags_with_added_value = {"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"} response = conn.tag_log_group( logGroupName=log_group_name, tags={"tag_key_2": "tag_value_2"} ) response = conn.list_tags_log_group(logGroupName=log_group_name) assert response["tags"] == tags_with_added_value tags_with_updated_value = {"tag_key_1": "tag_value_XX", "tag_key_2": "tag_value_2"} response = conn.tag_log_group( logGroupName=log_group_name, tags={"tag_key_1": "tag_value_XX"} ) response = conn.list_tags_log_group(logGroupName=log_group_name) assert response["tags"] == tags_with_updated_value response = conn.delete_log_group(logGroupName=log_group_name) @mock_logs def test_untag_log_group(): conn = boto3.client("logs", "us-west-2") log_group_name = "dummy" response = conn.create_log_group(logGroupName=log_group_name) tags = {"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"} response = conn.tag_log_group(logGroupName=log_group_name, tags=tags) response = conn.list_tags_log_group(logGroupName=log_group_name) assert response["tags"] == tags tags_to_remove = ["tag_key_1"] remaining_tags = {"tag_key_2": "tag_value_2"} response = conn.untag_log_group(logGroupName=log_group_name, tags=tags_to_remove) response = conn.list_tags_log_group(logGroupName=log_group_name) assert response["tags"] == remaining_tags response = conn.delete_log_group(logGroupName=log_group_name)