diff options
author | CoprDistGit <infra@openeuler.org> | 2023-05-31 03:22:43 +0000 |
---|---|---|
committer | CoprDistGit <infra@openeuler.org> | 2023-05-31 03:22:43 +0000 |
commit | eeeef4573090f7fd028539e5a5d7b65b649a9fda (patch) | |
tree | 35ea60041c697efc087359e1e0c11feebeffff03 | |
parent | b38069106cd36ba0350bd20193b9bc001eefdcdb (diff) |
automatic import of python-aws-cdk-aws-kinesisfirehose-alpha
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | python-aws-cdk-aws-kinesisfirehose-alpha.spec | 1487 | ||||
-rw-r--r-- | sources | 1 |
3 files changed, 1489 insertions, 0 deletions
@@ -0,0 +1 @@ +/aws-cdk.aws-kinesisfirehose-alpha-2.81.0a0.tar.gz diff --git a/python-aws-cdk-aws-kinesisfirehose-alpha.spec b/python-aws-cdk-aws-kinesisfirehose-alpha.spec new file mode 100644 index 0000000..926ed15 --- /dev/null +++ b/python-aws-cdk-aws-kinesisfirehose-alpha.spec @@ -0,0 +1,1487 @@ +%global _empty_manifest_terminate_build 0 +Name: python-aws-cdk.aws-kinesisfirehose-alpha +Version: 2.81.0a0 +Release: 1 +Summary: The CDK Construct Library for AWS::KinesisFirehose +License: Apache-2.0 +URL: https://github.com/aws/aws-cdk +Source0: https://mirrors.nju.edu.cn/pypi/web/packages/63/57/c3bde2ed3905f2ef984f7316b784b724b74f136096a595da927f6cbd5124/aws-cdk.aws-kinesisfirehose-alpha-2.81.0a0.tar.gz +BuildArch: noarch + +Requires: python3-aws-cdk-lib +Requires: python3-constructs +Requires: python3-jsii +Requires: python3-publication +Requires: python3-typeguard + +%description +<!--END STABILITY BANNER--> +[Amazon Kinesis Data Firehose](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html) +is a service for fully-managed delivery of real-time streaming data to storage services +such as Amazon S3, Amazon Redshift, Amazon Elasticsearch, Splunk, or any custom HTTP +endpoint or third-party services such as Datadog, Dynatrace, LogicMonitor, MongoDB, New +Relic, and Sumo Logic. +Kinesis Data Firehose delivery streams are distinguished from Kinesis data streams in +their models of consumption. Whereas consumers read from a data stream by actively pulling +data from the stream, a delivery stream pushes data to its destination on a regular +cadence. This means that data streams are intended to have consumers that do on-demand +processing, like AWS Lambda or Amazon EC2. On the other hand, delivery streams are +intended to have destinations that are sources for offline processing and analytics, such +as Amazon S3 and Amazon Redshift. +This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aws-cdk) +project. It allows you to define Kinesis Data Firehose delivery streams. +## Defining a Delivery Stream +In order to define a Delivery Stream, you must specify a destination. An S3 bucket can be +used as a destination. More supported destinations are covered [below](#destinations). +```python +bucket = s3.Bucket(self, "Bucket") +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destinations.S3Bucket(bucket)] +) +``` +The above example defines the following resources: +* An S3 bucket +* A Kinesis Data Firehose delivery stream with Direct PUT as the source and CloudWatch + error logging turned on. +* An IAM role which gives the delivery stream permission to write to the S3 bucket. +## Sources +There are two main methods of sourcing input data: Kinesis Data Streams and via a "direct +put". +See: [Sending Data to a Delivery Stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html) +in the *Kinesis Data Firehose Developer Guide*. +### Kinesis Data Stream +A delivery stream can read directly from a Kinesis data stream as a consumer of the data +stream. Configure this behaviour by providing a data stream in the `sourceStream` +property when constructing a delivery stream: +```python +# destination: firehose.IDestination +source_stream = kinesis.Stream(self, "Source Stream") +firehose.DeliveryStream(self, "Delivery Stream", + source_stream=source_stream, + destinations=[destination] +) +``` +### Direct Put +Data must be provided via "direct put", ie., by using a `PutRecord` or +`PutRecordBatch` API call. There are a number of ways of doing so, such as: +* Kinesis Agent: a standalone Java application that monitors and delivers files while + handling file rotation, checkpointing, and retries. See: [Writing to Kinesis Data Firehose Using Kinesis Agent](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-agents.html) + in the *Kinesis Data Firehose Developer Guide*. +* AWS SDK: a general purpose solution that allows you to deliver data to a delivery stream + from anywhere using Java, .NET, Node.js, Python, or Ruby. See: [Writing to Kinesis Data Firehose Using the AWS SDK](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-sdk.html) + in the *Kinesis Data Firehose Developer Guide*. +* CloudWatch Logs: subscribe to a log group and receive filtered log events directly into + a delivery stream. See: [logs-destinations](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-logs-destinations-readme.html). +* Eventbridge: add an event rule target to send events to a delivery stream based on the + rule filtering. See: [events-targets](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-events-targets-readme.html). +* SNS: add a subscription to send all notifications from the topic to a delivery + stream. See: [sns-subscriptions](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-sns-subscriptions-readme.html). +* IoT: add an action to an IoT rule to send various IoT information to a delivery stream +## Destinations +The following destinations are supported. See [kinesisfirehose-destinations](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-kinesisfirehose-destinations-readme.html) +for the implementations of these destinations. +### S3 +Defining a delivery stream with an S3 bucket destination: +```python +# bucket: s3.Bucket +s3_destination = destinations.S3Bucket(bucket) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[s3_destination] +) +``` +The S3 destination also supports custom dynamic prefixes. `dataOutputPrefix` +will be used for files successfully delivered to S3. `errorOutputPrefix` will be added to +failed records before writing them to S3. +```python +# bucket: s3.Bucket +s3_destination = destinations.S3Bucket(bucket, + data_output_prefix="myFirehose/DeliveredYear=!{timestamp:yyyy}/anyMonth/rand=!{firehose:random-string}", + error_output_prefix="myFirehoseFailures/!{firehose:error-output-type}/!{timestamp:yyyy}/anyMonth/!{timestamp:dd}" +) +``` +See: [Custom S3 Prefixes](https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html) +in the *Kinesis Data Firehose Developer Guide*. +## Server-side Encryption +Enabling server-side encryption (SSE) requires Kinesis Data Firehose to encrypt all data +sent to delivery stream when it is stored at rest. This means that data is encrypted +before being written to the service's internal storage layer and decrypted after it is +received from the internal storage layer. The service manages keys and cryptographic +operations so that sources and destinations do not need to, as the data is encrypted and +decrypted at the boundaries of the service (i.e., before the data is delivered to a +destination). By default, delivery streams do not have SSE enabled. +The Key Management Service keys (KMS keys) used for SSE can either be AWS-owned or +customer-managed. AWS-owned KMS keys are created, owned and managed by AWS for use in +multiple AWS accounts. As a customer, you cannot view, use, track, or manage these keys, +and you are not charged for their use. On the other hand, customer-managed KMS keys are +created and owned within your account and managed entirely by you. As a customer, you are +responsible for managing access, rotation, aliases, and deletion for these keys, and you +are changed for their use. +See: [AWS KMS keys](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#kms_keys) +in the *KMS Developer Guide*. +```python +# destination: firehose.IDestination +# SSE with an customer-managed key that is explicitly specified +# key: kms.Key +# SSE with an AWS-owned key +firehose.DeliveryStream(self, "Delivery Stream AWS Owned", + encryption=firehose.StreamEncryption.AWS_OWNED, + destinations=[destination] +) +# SSE with an customer-managed key that is created automatically by the CDK +firehose.DeliveryStream(self, "Delivery Stream Implicit Customer Managed", + encryption=firehose.StreamEncryption.CUSTOMER_MANAGED, + destinations=[destination] +) +firehose.DeliveryStream(self, "Delivery Stream Explicit Customer Managed", + encryption_key=key, + destinations=[destination] +) +``` +See: [Data Protection](https://docs.aws.amazon.com/firehose/latest/dev/encryption.html) +in the *Kinesis Data Firehose Developer Guide*. +## Monitoring +Kinesis Data Firehose is integrated with CloudWatch, so you can monitor the performance of +your delivery streams via logs and metrics. +### Logs +Kinesis Data Firehose will send logs to CloudWatch when data transformation or data +delivery fails. The CDK will enable logging by default and create a CloudWatch LogGroup +and LogStream for your Delivery Stream. +When you create a destination, you can specify a log group. In this log group, The CDK +will create log streams where log events will be sent: +```python +import aws_cdk.aws_logs as logs +# bucket: s3.Bucket +log_group = logs.LogGroup(self, "Log Group") +destination = destinations.S3Bucket(bucket, + log_group=log_group +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +Logging can also be disabled: +```python +# bucket: s3.Bucket +destination = destinations.S3Bucket(bucket, + logging=False +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +See: [Monitoring using CloudWatch Logs](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-logs.html) +in the *Kinesis Data Firehose Developer Guide*. +### Metrics +Kinesis Data Firehose sends metrics to CloudWatch so that you can collect and analyze the +performance of the delivery stream, including data delivery, data ingestion, data +transformation, format conversion, API usage, encryption, and resource usage. You can then +use CloudWatch alarms to alert you, for example, when data freshness (the age of the +oldest record in the delivery stream) exceeds the buffering limit (indicating that data is +not being delivered to your destination), or when the rate of incoming records exceeds the +limit of records per second (indicating data is flowing into your delivery stream faster +than it is configured to process). +CDK provides methods for accessing delivery stream metrics with default configuration, +such as `metricIncomingBytes`, and `metricIncomingRecords` (see [`IDeliveryStream`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesisfirehose.IDeliveryStream.html) +for a full list). CDK also provides a generic `metric` method that can be used to produce +metric configurations for any metric provided by Kinesis Data Firehose; the configurations +are pre-populated with the correct dimensions for the delivery stream. +```python +import aws_cdk.aws_cloudwatch as cloudwatch +# delivery_stream: firehose.DeliveryStream +# Alarm that triggers when the per-second average of incoming bytes exceeds 90% of the current service limit +incoming_bytes_percent_of_limit = cloudwatch.MathExpression( + expression="incomingBytes / 300 / bytePerSecLimit", + using_metrics={ + "incoming_bytes": delivery_stream.metric_incoming_bytes(statistic=cloudwatch.Statistic.SUM), + "byte_per_sec_limit": delivery_stream.metric("BytesPerSecondLimit") + } +) +cloudwatch.Alarm(self, "Alarm", + metric=incoming_bytes_percent_of_limit, + threshold=0.9, + evaluation_periods=3 +) +``` +See: [Monitoring Using CloudWatch Metrics](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-metrics.html) +in the *Kinesis Data Firehose Developer Guide*. +## Compression +Your data can automatically be compressed when it is delivered to S3 as either a final or +an intermediary/backup destination. Supported compression formats are: gzip, Snappy, +Hadoop-compatible Snappy, and ZIP, except for Redshift destinations, where Snappy +(regardless of Hadoop-compatibility) and ZIP are not supported. By default, data is +delivered to S3 without compression. +```python +# Compress data delivered to S3 using Snappy +# bucket: s3.Bucket +s3_destination = destinations.S3Bucket(bucket, + compression=destinations.Compression.SNAPPY +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[s3_destination] +) +``` +## Buffering +Incoming data is buffered before it is delivered to the specified destination. The +delivery stream will wait until the amount of incoming data has exceeded some threshold +(the "buffer size") or until the time since the last data delivery occurred exceeds some +threshold (the "buffer interval"), whichever happens first. You can configure these +thresholds based on the capabilities of the destination and your use-case. By default, the +buffer size is 5 MiB and the buffer interval is 5 minutes. +```python +# Increase the buffer interval and size to 10 minutes and 8 MiB, respectively +# bucket: s3.Bucket +destination = destinations.S3Bucket(bucket, + buffering_interval=Duration.minutes(10), + buffering_size=Size.mebibytes(8) +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +See: [Data Delivery Frequency](https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#frequency) +in the *Kinesis Data Firehose Developer Guide*. +## Destination Encryption +Your data can be automatically encrypted when it is delivered to S3 as a final or an +intermediary/backup destination. Kinesis Data Firehose supports Amazon S3 server-side +encryption with AWS Key Management Service (AWS KMS) for encrypting delivered data in +Amazon S3. You can choose to not encrypt the data or to encrypt with a key from the list +of AWS KMS keys that you own. For more information, +see [Protecting Data Using Server-Side Encryption with AWS KMS–Managed Keys (SSE-KMS)](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html). +Data is not encrypted by default. +```python +# bucket: s3.Bucket +# key: kms.Key +destination = destinations.S3Bucket(bucket, + encryption_key=key +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +## Backup +A delivery stream can be configured to back up data to S3 that it attempted to deliver to +the configured destination. Backed up data can be all the data that the delivery stream +attempted to deliver or just data that it failed to deliver (Redshift and S3 destinations +can only back up all data). CDK can create a new S3 bucket where it will back up data, or +you can provide a bucket where data will be backed up. You can also provide a prefix under +which your backed-up data will be placed within the bucket. By default, source data is not +backed up to S3. +```python +# Enable backup of all source records (to an S3 bucket created by CDK). +# bucket: s3.Bucket +# Explicitly provide an S3 bucket to which all source records will be backed up. +# backup_bucket: s3.Bucket +firehose.DeliveryStream(self, "Delivery Stream Backup All", + destinations=[ + destinations.S3Bucket(bucket, + s3_backup=destinations.DestinationS3BackupProps( + mode=destinations.BackupMode.ALL + ) + ) + ] +) +firehose.DeliveryStream(self, "Delivery Stream Backup All Explicit Bucket", + destinations=[ + destinations.S3Bucket(bucket, + s3_backup=destinations.DestinationS3BackupProps( + bucket=backup_bucket + ) + ) + ] +) +# Explicitly provide an S3 prefix under which all source records will be backed up. +firehose.DeliveryStream(self, "Delivery Stream Backup All Explicit Prefix", + destinations=[ + destinations.S3Bucket(bucket, + s3_backup=destinations.DestinationS3BackupProps( + mode=destinations.BackupMode.ALL, + data_output_prefix="mybackup" + ) + ) + ] +) +``` +If any Data Processing or Transformation is configured on your Delivery Stream, the source +records will be backed up in their original format. +## Data Processing/Transformation +Data can be transformed before being delivered to destinations. There are two types of +data processing for delivery streams: record transformation with AWS Lambda, and record +format conversion using a schema stored in an AWS Glue table. If both types of data +processing are configured, then the Lambda transformation is performed first. By default, +no data processing occurs. This construct library currently only supports data +transformation with AWS Lambda. See [#15501](https://github.com/aws/aws-cdk/issues/15501) +to track the status of adding support for record format conversion. +### Data transformation with AWS Lambda +To transform the data, Kinesis Data Firehose will call a Lambda function that you provide +and deliver the data returned in place of the source record. The function must return a +result that contains records in a specific format, including the following fields: +* `recordId` -- the ID of the input record that corresponds the results. +* `result` -- the status of the transformation of the record: "Ok" (success), "Dropped" + (not processed intentionally), or "ProcessingFailed" (not processed due to an error). +* `data` -- the transformed data, Base64-encoded. +The data is buffered up to 1 minute and up to 3 MiB by default before being sent to the +function, but can be configured using `bufferInterval` and `bufferSize` +in the processor configuration (see: [Buffering](#buffering)). If the function invocation +fails due to a network timeout or because of hitting an invocation limit, the invocation +is retried 3 times by default, but can be configured using `retries` in the processor +configuration. +```python +# bucket: s3.Bucket +# Provide a Lambda function that will transform records before delivery, with custom +# buffering and retry configuration +lambda_function = lambda_.Function(self, "Processor", + runtime=lambda_.Runtime.NODEJS_14_X, + handler="index.handler", + code=lambda_.Code.from_asset(path.join(__dirname, "process-records")) +) +lambda_processor = firehose.LambdaFunctionProcessor(lambda_function, + buffer_interval=Duration.minutes(5), + buffer_size=Size.mebibytes(5), + retries=5 +) +s3_destination = destinations.S3Bucket(bucket, + processor=lambda_processor +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[s3_destination] +) +``` +```python +import path as path +import aws_cdk.aws_kinesisfirehose_alpha as firehose +import aws_cdk.aws_kms as kms +import aws_cdk.aws_lambda_nodejs as lambdanodejs +import aws_cdk.aws_logs as logs +import aws_cdk.aws_s3 as s3 +import aws_cdk as cdk +import aws_cdk.aws_kinesisfirehose_destinations_alpha as destinations +app = cdk.App() +stack = cdk.Stack(app, "aws-cdk-firehose-delivery-stream-s3-all-properties") +bucket = s3.Bucket(stack, "Bucket", + removal_policy=cdk.RemovalPolicy.DESTROY, + auto_delete_objects=True +) +backup_bucket = s3.Bucket(stack, "BackupBucket", + removal_policy=cdk.RemovalPolicy.DESTROY, + auto_delete_objects=True +) +log_group = logs.LogGroup(stack, "LogGroup", + removal_policy=cdk.RemovalPolicy.DESTROY +) +data_processor_function = lambdanodejs.NodejsFunction(stack, "DataProcessorFunction", + entry=path.join(__dirname, "lambda-data-processor.js"), + timeout=cdk.Duration.minutes(1) +) +processor = firehose.LambdaFunctionProcessor(data_processor_function, + buffer_interval=cdk.Duration.seconds(60), + buffer_size=cdk.Size.mebibytes(1), + retries=1 +) +key = kms.Key(stack, "Key", + removal_policy=cdk.RemovalPolicy.DESTROY +) +backup_key = kms.Key(stack, "BackupKey", + removal_policy=cdk.RemovalPolicy.DESTROY +) +firehose.DeliveryStream(stack, "Delivery Stream", + destinations=[destinations.S3Bucket(bucket, + logging=True, + log_group=log_group, + processor=processor, + compression=destinations.Compression.GZIP, + data_output_prefix="regularPrefix", + error_output_prefix="errorPrefix", + buffering_interval=cdk.Duration.seconds(60), + buffering_size=cdk.Size.mebibytes(1), + encryption_key=key, + s3_backup=destinations.DestinationS3BackupProps( + mode=destinations.BackupMode.ALL, + bucket=backup_bucket, + compression=destinations.Compression.ZIP, + data_output_prefix="backupPrefix", + error_output_prefix="backupErrorPrefix", + buffering_interval=cdk.Duration.seconds(60), + buffering_size=cdk.Size.mebibytes(1), + encryption_key=backup_key + ) + )] +) +app.synth() +``` +See: [Data Transformation](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) +in the *Kinesis Data Firehose Developer Guide*. +## Specifying an IAM role +The DeliveryStream class automatically creates IAM service roles with all the minimum +necessary permissions for Kinesis Data Firehose to access the resources referenced by your +delivery stream. One service role is created for the delivery stream that allows Kinesis +Data Firehose to read from a Kinesis data stream (if one is configured as the delivery +stream source) and for server-side encryption. Another service role is created for each +destination, which gives Kinesis Data Firehose write access to the destination resource, +as well as the ability to invoke data transformers and read schemas for record format +conversion. If you wish, you may specify your own IAM role for either the delivery stream +or the destination service role, or both. It must have the correct trust policy (it must +allow Kinesis Data Firehose to assume it) or delivery stream creation or data delivery +will fail. Other required permissions to destination resources, encryption keys, etc., +will be provided automatically. +```python +# Specify the roles created above when defining the destination and delivery stream. +# bucket: s3.Bucket +# Create service roles for the delivery stream and destination. +# These can be used for other purposes and granted access to different resources. +# They must include the Kinesis Data Firehose service principal in their trust policies. +# Two separate roles are shown below, but the same role can be used for both purposes. +delivery_stream_role = iam.Role(self, "Delivery Stream Role", + assumed_by=iam.ServicePrincipal("firehose.amazonaws.com") +) +destination_role = iam.Role(self, "Destination Role", + assumed_by=iam.ServicePrincipal("firehose.amazonaws.com") +) +destination = destinations.S3Bucket(bucket, role=destination_role) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination], + role=delivery_stream_role +) +``` +See [Controlling Access](https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html) +in the *Kinesis Data Firehose Developer Guide*. +## Granting application access to a delivery stream +IAM roles, users or groups which need to be able to work with delivery streams should be +granted IAM permissions. +Any object that implements the `IGrantable` interface (i.e., has an associated principal) +can be granted permissions to a delivery stream by calling: +* `grantPutRecords(principal)` - grants the principal the ability to put records onto the + delivery stream +* `grant(principal, ...actions)` - grants the principal permission to a custom set of + actions +```python +# Give the role permissions to write data to the delivery stream +# delivery_stream: firehose.DeliveryStream +lambda_role = iam.Role(self, "Role", + assumed_by=iam.ServicePrincipal("lambda.amazonaws.com") +) +delivery_stream.grant_put_records(lambda_role) +``` +The following write permissions are provided to a service principal by the +`grantPutRecords()` method: +* `firehose:PutRecord` +* `firehose:PutRecordBatch` +## Granting a delivery stream access to a resource +Conversely to the above, Kinesis Data Firehose requires permissions in order for delivery +streams to interact with resources that you own. For example, if an S3 bucket is specified +as a destination of a delivery stream, the delivery stream must be granted permissions to +put and get objects from the bucket. When using the built-in AWS service destinations +found in the `@aws-cdk/aws-kinesisfirehose-destinations` module, the CDK grants the +permissions automatically. However, custom or third-party destinations may require custom +permissions. In this case, use the delivery stream as an `IGrantable`, as follows: +```python +# delivery_stream: firehose.DeliveryStream +fn = lambda_.Function(self, "Function", + code=lambda_.Code.from_inline("exports.handler = (event) => {}"), + runtime=lambda_.Runtime.NODEJS_14_X, + handler="index.handler" +) +fn.grant_invoke(delivery_stream) +``` +## Multiple destinations +Though the delivery stream allows specifying an array of destinations, only one +destination per delivery stream is currently allowed. This limitation is enforced at CDK +synthesis time and will throw an error. + +%package -n python3-aws-cdk.aws-kinesisfirehose-alpha +Summary: The CDK Construct Library for AWS::KinesisFirehose +Provides: python-aws-cdk.aws-kinesisfirehose-alpha +BuildRequires: python3-devel +BuildRequires: python3-setuptools +BuildRequires: python3-pip +%description -n python3-aws-cdk.aws-kinesisfirehose-alpha +<!--END STABILITY BANNER--> +[Amazon Kinesis Data Firehose](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html) +is a service for fully-managed delivery of real-time streaming data to storage services +such as Amazon S3, Amazon Redshift, Amazon Elasticsearch, Splunk, or any custom HTTP +endpoint or third-party services such as Datadog, Dynatrace, LogicMonitor, MongoDB, New +Relic, and Sumo Logic. +Kinesis Data Firehose delivery streams are distinguished from Kinesis data streams in +their models of consumption. Whereas consumers read from a data stream by actively pulling +data from the stream, a delivery stream pushes data to its destination on a regular +cadence. This means that data streams are intended to have consumers that do on-demand +processing, like AWS Lambda or Amazon EC2. On the other hand, delivery streams are +intended to have destinations that are sources for offline processing and analytics, such +as Amazon S3 and Amazon Redshift. +This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aws-cdk) +project. It allows you to define Kinesis Data Firehose delivery streams. +## Defining a Delivery Stream +In order to define a Delivery Stream, you must specify a destination. An S3 bucket can be +used as a destination. More supported destinations are covered [below](#destinations). +```python +bucket = s3.Bucket(self, "Bucket") +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destinations.S3Bucket(bucket)] +) +``` +The above example defines the following resources: +* An S3 bucket +* A Kinesis Data Firehose delivery stream with Direct PUT as the source and CloudWatch + error logging turned on. +* An IAM role which gives the delivery stream permission to write to the S3 bucket. +## Sources +There are two main methods of sourcing input data: Kinesis Data Streams and via a "direct +put". +See: [Sending Data to a Delivery Stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html) +in the *Kinesis Data Firehose Developer Guide*. +### Kinesis Data Stream +A delivery stream can read directly from a Kinesis data stream as a consumer of the data +stream. Configure this behaviour by providing a data stream in the `sourceStream` +property when constructing a delivery stream: +```python +# destination: firehose.IDestination +source_stream = kinesis.Stream(self, "Source Stream") +firehose.DeliveryStream(self, "Delivery Stream", + source_stream=source_stream, + destinations=[destination] +) +``` +### Direct Put +Data must be provided via "direct put", ie., by using a `PutRecord` or +`PutRecordBatch` API call. There are a number of ways of doing so, such as: +* Kinesis Agent: a standalone Java application that monitors and delivers files while + handling file rotation, checkpointing, and retries. See: [Writing to Kinesis Data Firehose Using Kinesis Agent](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-agents.html) + in the *Kinesis Data Firehose Developer Guide*. +* AWS SDK: a general purpose solution that allows you to deliver data to a delivery stream + from anywhere using Java, .NET, Node.js, Python, or Ruby. See: [Writing to Kinesis Data Firehose Using the AWS SDK](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-sdk.html) + in the *Kinesis Data Firehose Developer Guide*. +* CloudWatch Logs: subscribe to a log group and receive filtered log events directly into + a delivery stream. See: [logs-destinations](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-logs-destinations-readme.html). +* Eventbridge: add an event rule target to send events to a delivery stream based on the + rule filtering. See: [events-targets](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-events-targets-readme.html). +* SNS: add a subscription to send all notifications from the topic to a delivery + stream. See: [sns-subscriptions](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-sns-subscriptions-readme.html). +* IoT: add an action to an IoT rule to send various IoT information to a delivery stream +## Destinations +The following destinations are supported. See [kinesisfirehose-destinations](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-kinesisfirehose-destinations-readme.html) +for the implementations of these destinations. +### S3 +Defining a delivery stream with an S3 bucket destination: +```python +# bucket: s3.Bucket +s3_destination = destinations.S3Bucket(bucket) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[s3_destination] +) +``` +The S3 destination also supports custom dynamic prefixes. `dataOutputPrefix` +will be used for files successfully delivered to S3. `errorOutputPrefix` will be added to +failed records before writing them to S3. +```python +# bucket: s3.Bucket +s3_destination = destinations.S3Bucket(bucket, + data_output_prefix="myFirehose/DeliveredYear=!{timestamp:yyyy}/anyMonth/rand=!{firehose:random-string}", + error_output_prefix="myFirehoseFailures/!{firehose:error-output-type}/!{timestamp:yyyy}/anyMonth/!{timestamp:dd}" +) +``` +See: [Custom S3 Prefixes](https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html) +in the *Kinesis Data Firehose Developer Guide*. +## Server-side Encryption +Enabling server-side encryption (SSE) requires Kinesis Data Firehose to encrypt all data +sent to delivery stream when it is stored at rest. This means that data is encrypted +before being written to the service's internal storage layer and decrypted after it is +received from the internal storage layer. The service manages keys and cryptographic +operations so that sources and destinations do not need to, as the data is encrypted and +decrypted at the boundaries of the service (i.e., before the data is delivered to a +destination). By default, delivery streams do not have SSE enabled. +The Key Management Service keys (KMS keys) used for SSE can either be AWS-owned or +customer-managed. AWS-owned KMS keys are created, owned and managed by AWS for use in +multiple AWS accounts. As a customer, you cannot view, use, track, or manage these keys, +and you are not charged for their use. On the other hand, customer-managed KMS keys are +created and owned within your account and managed entirely by you. As a customer, you are +responsible for managing access, rotation, aliases, and deletion for these keys, and you +are changed for their use. +See: [AWS KMS keys](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#kms_keys) +in the *KMS Developer Guide*. +```python +# destination: firehose.IDestination +# SSE with an customer-managed key that is explicitly specified +# key: kms.Key +# SSE with an AWS-owned key +firehose.DeliveryStream(self, "Delivery Stream AWS Owned", + encryption=firehose.StreamEncryption.AWS_OWNED, + destinations=[destination] +) +# SSE with an customer-managed key that is created automatically by the CDK +firehose.DeliveryStream(self, "Delivery Stream Implicit Customer Managed", + encryption=firehose.StreamEncryption.CUSTOMER_MANAGED, + destinations=[destination] +) +firehose.DeliveryStream(self, "Delivery Stream Explicit Customer Managed", + encryption_key=key, + destinations=[destination] +) +``` +See: [Data Protection](https://docs.aws.amazon.com/firehose/latest/dev/encryption.html) +in the *Kinesis Data Firehose Developer Guide*. +## Monitoring +Kinesis Data Firehose is integrated with CloudWatch, so you can monitor the performance of +your delivery streams via logs and metrics. +### Logs +Kinesis Data Firehose will send logs to CloudWatch when data transformation or data +delivery fails. The CDK will enable logging by default and create a CloudWatch LogGroup +and LogStream for your Delivery Stream. +When you create a destination, you can specify a log group. In this log group, The CDK +will create log streams where log events will be sent: +```python +import aws_cdk.aws_logs as logs +# bucket: s3.Bucket +log_group = logs.LogGroup(self, "Log Group") +destination = destinations.S3Bucket(bucket, + log_group=log_group +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +Logging can also be disabled: +```python +# bucket: s3.Bucket +destination = destinations.S3Bucket(bucket, + logging=False +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +See: [Monitoring using CloudWatch Logs](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-logs.html) +in the *Kinesis Data Firehose Developer Guide*. +### Metrics +Kinesis Data Firehose sends metrics to CloudWatch so that you can collect and analyze the +performance of the delivery stream, including data delivery, data ingestion, data +transformation, format conversion, API usage, encryption, and resource usage. You can then +use CloudWatch alarms to alert you, for example, when data freshness (the age of the +oldest record in the delivery stream) exceeds the buffering limit (indicating that data is +not being delivered to your destination), or when the rate of incoming records exceeds the +limit of records per second (indicating data is flowing into your delivery stream faster +than it is configured to process). +CDK provides methods for accessing delivery stream metrics with default configuration, +such as `metricIncomingBytes`, and `metricIncomingRecords` (see [`IDeliveryStream`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesisfirehose.IDeliveryStream.html) +for a full list). CDK also provides a generic `metric` method that can be used to produce +metric configurations for any metric provided by Kinesis Data Firehose; the configurations +are pre-populated with the correct dimensions for the delivery stream. +```python +import aws_cdk.aws_cloudwatch as cloudwatch +# delivery_stream: firehose.DeliveryStream +# Alarm that triggers when the per-second average of incoming bytes exceeds 90% of the current service limit +incoming_bytes_percent_of_limit = cloudwatch.MathExpression( + expression="incomingBytes / 300 / bytePerSecLimit", + using_metrics={ + "incoming_bytes": delivery_stream.metric_incoming_bytes(statistic=cloudwatch.Statistic.SUM), + "byte_per_sec_limit": delivery_stream.metric("BytesPerSecondLimit") + } +) +cloudwatch.Alarm(self, "Alarm", + metric=incoming_bytes_percent_of_limit, + threshold=0.9, + evaluation_periods=3 +) +``` +See: [Monitoring Using CloudWatch Metrics](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-metrics.html) +in the *Kinesis Data Firehose Developer Guide*. +## Compression +Your data can automatically be compressed when it is delivered to S3 as either a final or +an intermediary/backup destination. Supported compression formats are: gzip, Snappy, +Hadoop-compatible Snappy, and ZIP, except for Redshift destinations, where Snappy +(regardless of Hadoop-compatibility) and ZIP are not supported. By default, data is +delivered to S3 without compression. +```python +# Compress data delivered to S3 using Snappy +# bucket: s3.Bucket +s3_destination = destinations.S3Bucket(bucket, + compression=destinations.Compression.SNAPPY +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[s3_destination] +) +``` +## Buffering +Incoming data is buffered before it is delivered to the specified destination. The +delivery stream will wait until the amount of incoming data has exceeded some threshold +(the "buffer size") or until the time since the last data delivery occurred exceeds some +threshold (the "buffer interval"), whichever happens first. You can configure these +thresholds based on the capabilities of the destination and your use-case. By default, the +buffer size is 5 MiB and the buffer interval is 5 minutes. +```python +# Increase the buffer interval and size to 10 minutes and 8 MiB, respectively +# bucket: s3.Bucket +destination = destinations.S3Bucket(bucket, + buffering_interval=Duration.minutes(10), + buffering_size=Size.mebibytes(8) +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +See: [Data Delivery Frequency](https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#frequency) +in the *Kinesis Data Firehose Developer Guide*. +## Destination Encryption +Your data can be automatically encrypted when it is delivered to S3 as a final or an +intermediary/backup destination. Kinesis Data Firehose supports Amazon S3 server-side +encryption with AWS Key Management Service (AWS KMS) for encrypting delivered data in +Amazon S3. You can choose to not encrypt the data or to encrypt with a key from the list +of AWS KMS keys that you own. For more information, +see [Protecting Data Using Server-Side Encryption with AWS KMS–Managed Keys (SSE-KMS)](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html). +Data is not encrypted by default. +```python +# bucket: s3.Bucket +# key: kms.Key +destination = destinations.S3Bucket(bucket, + encryption_key=key +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +## Backup +A delivery stream can be configured to back up data to S3 that it attempted to deliver to +the configured destination. Backed up data can be all the data that the delivery stream +attempted to deliver or just data that it failed to deliver (Redshift and S3 destinations +can only back up all data). CDK can create a new S3 bucket where it will back up data, or +you can provide a bucket where data will be backed up. You can also provide a prefix under +which your backed-up data will be placed within the bucket. By default, source data is not +backed up to S3. +```python +# Enable backup of all source records (to an S3 bucket created by CDK). +# bucket: s3.Bucket +# Explicitly provide an S3 bucket to which all source records will be backed up. +# backup_bucket: s3.Bucket +firehose.DeliveryStream(self, "Delivery Stream Backup All", + destinations=[ + destinations.S3Bucket(bucket, + s3_backup=destinations.DestinationS3BackupProps( + mode=destinations.BackupMode.ALL + ) + ) + ] +) +firehose.DeliveryStream(self, "Delivery Stream Backup All Explicit Bucket", + destinations=[ + destinations.S3Bucket(bucket, + s3_backup=destinations.DestinationS3BackupProps( + bucket=backup_bucket + ) + ) + ] +) +# Explicitly provide an S3 prefix under which all source records will be backed up. +firehose.DeliveryStream(self, "Delivery Stream Backup All Explicit Prefix", + destinations=[ + destinations.S3Bucket(bucket, + s3_backup=destinations.DestinationS3BackupProps( + mode=destinations.BackupMode.ALL, + data_output_prefix="mybackup" + ) + ) + ] +) +``` +If any Data Processing or Transformation is configured on your Delivery Stream, the source +records will be backed up in their original format. +## Data Processing/Transformation +Data can be transformed before being delivered to destinations. There are two types of +data processing for delivery streams: record transformation with AWS Lambda, and record +format conversion using a schema stored in an AWS Glue table. If both types of data +processing are configured, then the Lambda transformation is performed first. By default, +no data processing occurs. This construct library currently only supports data +transformation with AWS Lambda. See [#15501](https://github.com/aws/aws-cdk/issues/15501) +to track the status of adding support for record format conversion. +### Data transformation with AWS Lambda +To transform the data, Kinesis Data Firehose will call a Lambda function that you provide +and deliver the data returned in place of the source record. The function must return a +result that contains records in a specific format, including the following fields: +* `recordId` -- the ID of the input record that corresponds the results. +* `result` -- the status of the transformation of the record: "Ok" (success), "Dropped" + (not processed intentionally), or "ProcessingFailed" (not processed due to an error). +* `data` -- the transformed data, Base64-encoded. +The data is buffered up to 1 minute and up to 3 MiB by default before being sent to the +function, but can be configured using `bufferInterval` and `bufferSize` +in the processor configuration (see: [Buffering](#buffering)). If the function invocation +fails due to a network timeout or because of hitting an invocation limit, the invocation +is retried 3 times by default, but can be configured using `retries` in the processor +configuration. +```python +# bucket: s3.Bucket +# Provide a Lambda function that will transform records before delivery, with custom +# buffering and retry configuration +lambda_function = lambda_.Function(self, "Processor", + runtime=lambda_.Runtime.NODEJS_14_X, + handler="index.handler", + code=lambda_.Code.from_asset(path.join(__dirname, "process-records")) +) +lambda_processor = firehose.LambdaFunctionProcessor(lambda_function, + buffer_interval=Duration.minutes(5), + buffer_size=Size.mebibytes(5), + retries=5 +) +s3_destination = destinations.S3Bucket(bucket, + processor=lambda_processor +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[s3_destination] +) +``` +```python +import path as path +import aws_cdk.aws_kinesisfirehose_alpha as firehose +import aws_cdk.aws_kms as kms +import aws_cdk.aws_lambda_nodejs as lambdanodejs +import aws_cdk.aws_logs as logs +import aws_cdk.aws_s3 as s3 +import aws_cdk as cdk +import aws_cdk.aws_kinesisfirehose_destinations_alpha as destinations +app = cdk.App() +stack = cdk.Stack(app, "aws-cdk-firehose-delivery-stream-s3-all-properties") +bucket = s3.Bucket(stack, "Bucket", + removal_policy=cdk.RemovalPolicy.DESTROY, + auto_delete_objects=True +) +backup_bucket = s3.Bucket(stack, "BackupBucket", + removal_policy=cdk.RemovalPolicy.DESTROY, + auto_delete_objects=True +) +log_group = logs.LogGroup(stack, "LogGroup", + removal_policy=cdk.RemovalPolicy.DESTROY +) +data_processor_function = lambdanodejs.NodejsFunction(stack, "DataProcessorFunction", + entry=path.join(__dirname, "lambda-data-processor.js"), + timeout=cdk.Duration.minutes(1) +) +processor = firehose.LambdaFunctionProcessor(data_processor_function, + buffer_interval=cdk.Duration.seconds(60), + buffer_size=cdk.Size.mebibytes(1), + retries=1 +) +key = kms.Key(stack, "Key", + removal_policy=cdk.RemovalPolicy.DESTROY +) +backup_key = kms.Key(stack, "BackupKey", + removal_policy=cdk.RemovalPolicy.DESTROY +) +firehose.DeliveryStream(stack, "Delivery Stream", + destinations=[destinations.S3Bucket(bucket, + logging=True, + log_group=log_group, + processor=processor, + compression=destinations.Compression.GZIP, + data_output_prefix="regularPrefix", + error_output_prefix="errorPrefix", + buffering_interval=cdk.Duration.seconds(60), + buffering_size=cdk.Size.mebibytes(1), + encryption_key=key, + s3_backup=destinations.DestinationS3BackupProps( + mode=destinations.BackupMode.ALL, + bucket=backup_bucket, + compression=destinations.Compression.ZIP, + data_output_prefix="backupPrefix", + error_output_prefix="backupErrorPrefix", + buffering_interval=cdk.Duration.seconds(60), + buffering_size=cdk.Size.mebibytes(1), + encryption_key=backup_key + ) + )] +) +app.synth() +``` +See: [Data Transformation](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) +in the *Kinesis Data Firehose Developer Guide*. +## Specifying an IAM role +The DeliveryStream class automatically creates IAM service roles with all the minimum +necessary permissions for Kinesis Data Firehose to access the resources referenced by your +delivery stream. One service role is created for the delivery stream that allows Kinesis +Data Firehose to read from a Kinesis data stream (if one is configured as the delivery +stream source) and for server-side encryption. Another service role is created for each +destination, which gives Kinesis Data Firehose write access to the destination resource, +as well as the ability to invoke data transformers and read schemas for record format +conversion. If you wish, you may specify your own IAM role for either the delivery stream +or the destination service role, or both. It must have the correct trust policy (it must +allow Kinesis Data Firehose to assume it) or delivery stream creation or data delivery +will fail. Other required permissions to destination resources, encryption keys, etc., +will be provided automatically. +```python +# Specify the roles created above when defining the destination and delivery stream. +# bucket: s3.Bucket +# Create service roles for the delivery stream and destination. +# These can be used for other purposes and granted access to different resources. +# They must include the Kinesis Data Firehose service principal in their trust policies. +# Two separate roles are shown below, but the same role can be used for both purposes. +delivery_stream_role = iam.Role(self, "Delivery Stream Role", + assumed_by=iam.ServicePrincipal("firehose.amazonaws.com") +) +destination_role = iam.Role(self, "Destination Role", + assumed_by=iam.ServicePrincipal("firehose.amazonaws.com") +) +destination = destinations.S3Bucket(bucket, role=destination_role) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination], + role=delivery_stream_role +) +``` +See [Controlling Access](https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html) +in the *Kinesis Data Firehose Developer Guide*. +## Granting application access to a delivery stream +IAM roles, users or groups which need to be able to work with delivery streams should be +granted IAM permissions. +Any object that implements the `IGrantable` interface (i.e., has an associated principal) +can be granted permissions to a delivery stream by calling: +* `grantPutRecords(principal)` - grants the principal the ability to put records onto the + delivery stream +* `grant(principal, ...actions)` - grants the principal permission to a custom set of + actions +```python +# Give the role permissions to write data to the delivery stream +# delivery_stream: firehose.DeliveryStream +lambda_role = iam.Role(self, "Role", + assumed_by=iam.ServicePrincipal("lambda.amazonaws.com") +) +delivery_stream.grant_put_records(lambda_role) +``` +The following write permissions are provided to a service principal by the +`grantPutRecords()` method: +* `firehose:PutRecord` +* `firehose:PutRecordBatch` +## Granting a delivery stream access to a resource +Conversely to the above, Kinesis Data Firehose requires permissions in order for delivery +streams to interact with resources that you own. For example, if an S3 bucket is specified +as a destination of a delivery stream, the delivery stream must be granted permissions to +put and get objects from the bucket. When using the built-in AWS service destinations +found in the `@aws-cdk/aws-kinesisfirehose-destinations` module, the CDK grants the +permissions automatically. However, custom or third-party destinations may require custom +permissions. In this case, use the delivery stream as an `IGrantable`, as follows: +```python +# delivery_stream: firehose.DeliveryStream +fn = lambda_.Function(self, "Function", + code=lambda_.Code.from_inline("exports.handler = (event) => {}"), + runtime=lambda_.Runtime.NODEJS_14_X, + handler="index.handler" +) +fn.grant_invoke(delivery_stream) +``` +## Multiple destinations +Though the delivery stream allows specifying an array of destinations, only one +destination per delivery stream is currently allowed. This limitation is enforced at CDK +synthesis time and will throw an error. + +%package help +Summary: Development documents and examples for aws-cdk.aws-kinesisfirehose-alpha +Provides: python3-aws-cdk.aws-kinesisfirehose-alpha-doc +%description help +<!--END STABILITY BANNER--> +[Amazon Kinesis Data Firehose](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html) +is a service for fully-managed delivery of real-time streaming data to storage services +such as Amazon S3, Amazon Redshift, Amazon Elasticsearch, Splunk, or any custom HTTP +endpoint or third-party services such as Datadog, Dynatrace, LogicMonitor, MongoDB, New +Relic, and Sumo Logic. +Kinesis Data Firehose delivery streams are distinguished from Kinesis data streams in +their models of consumption. Whereas consumers read from a data stream by actively pulling +data from the stream, a delivery stream pushes data to its destination on a regular +cadence. This means that data streams are intended to have consumers that do on-demand +processing, like AWS Lambda or Amazon EC2. On the other hand, delivery streams are +intended to have destinations that are sources for offline processing and analytics, such +as Amazon S3 and Amazon Redshift. +This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aws-cdk) +project. It allows you to define Kinesis Data Firehose delivery streams. +## Defining a Delivery Stream +In order to define a Delivery Stream, you must specify a destination. An S3 bucket can be +used as a destination. More supported destinations are covered [below](#destinations). +```python +bucket = s3.Bucket(self, "Bucket") +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destinations.S3Bucket(bucket)] +) +``` +The above example defines the following resources: +* An S3 bucket +* A Kinesis Data Firehose delivery stream with Direct PUT as the source and CloudWatch + error logging turned on. +* An IAM role which gives the delivery stream permission to write to the S3 bucket. +## Sources +There are two main methods of sourcing input data: Kinesis Data Streams and via a "direct +put". +See: [Sending Data to a Delivery Stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html) +in the *Kinesis Data Firehose Developer Guide*. +### Kinesis Data Stream +A delivery stream can read directly from a Kinesis data stream as a consumer of the data +stream. Configure this behaviour by providing a data stream in the `sourceStream` +property when constructing a delivery stream: +```python +# destination: firehose.IDestination +source_stream = kinesis.Stream(self, "Source Stream") +firehose.DeliveryStream(self, "Delivery Stream", + source_stream=source_stream, + destinations=[destination] +) +``` +### Direct Put +Data must be provided via "direct put", ie., by using a `PutRecord` or +`PutRecordBatch` API call. There are a number of ways of doing so, such as: +* Kinesis Agent: a standalone Java application that monitors and delivers files while + handling file rotation, checkpointing, and retries. See: [Writing to Kinesis Data Firehose Using Kinesis Agent](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-agents.html) + in the *Kinesis Data Firehose Developer Guide*. +* AWS SDK: a general purpose solution that allows you to deliver data to a delivery stream + from anywhere using Java, .NET, Node.js, Python, or Ruby. See: [Writing to Kinesis Data Firehose Using the AWS SDK](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-sdk.html) + in the *Kinesis Data Firehose Developer Guide*. +* CloudWatch Logs: subscribe to a log group and receive filtered log events directly into + a delivery stream. See: [logs-destinations](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-logs-destinations-readme.html). +* Eventbridge: add an event rule target to send events to a delivery stream based on the + rule filtering. See: [events-targets](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-events-targets-readme.html). +* SNS: add a subscription to send all notifications from the topic to a delivery + stream. See: [sns-subscriptions](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-sns-subscriptions-readme.html). +* IoT: add an action to an IoT rule to send various IoT information to a delivery stream +## Destinations +The following destinations are supported. See [kinesisfirehose-destinations](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-kinesisfirehose-destinations-readme.html) +for the implementations of these destinations. +### S3 +Defining a delivery stream with an S3 bucket destination: +```python +# bucket: s3.Bucket +s3_destination = destinations.S3Bucket(bucket) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[s3_destination] +) +``` +The S3 destination also supports custom dynamic prefixes. `dataOutputPrefix` +will be used for files successfully delivered to S3. `errorOutputPrefix` will be added to +failed records before writing them to S3. +```python +# bucket: s3.Bucket +s3_destination = destinations.S3Bucket(bucket, + data_output_prefix="myFirehose/DeliveredYear=!{timestamp:yyyy}/anyMonth/rand=!{firehose:random-string}", + error_output_prefix="myFirehoseFailures/!{firehose:error-output-type}/!{timestamp:yyyy}/anyMonth/!{timestamp:dd}" +) +``` +See: [Custom S3 Prefixes](https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html) +in the *Kinesis Data Firehose Developer Guide*. +## Server-side Encryption +Enabling server-side encryption (SSE) requires Kinesis Data Firehose to encrypt all data +sent to delivery stream when it is stored at rest. This means that data is encrypted +before being written to the service's internal storage layer and decrypted after it is +received from the internal storage layer. The service manages keys and cryptographic +operations so that sources and destinations do not need to, as the data is encrypted and +decrypted at the boundaries of the service (i.e., before the data is delivered to a +destination). By default, delivery streams do not have SSE enabled. +The Key Management Service keys (KMS keys) used for SSE can either be AWS-owned or +customer-managed. AWS-owned KMS keys are created, owned and managed by AWS for use in +multiple AWS accounts. As a customer, you cannot view, use, track, or manage these keys, +and you are not charged for their use. On the other hand, customer-managed KMS keys are +created and owned within your account and managed entirely by you. As a customer, you are +responsible for managing access, rotation, aliases, and deletion for these keys, and you +are changed for their use. +See: [AWS KMS keys](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#kms_keys) +in the *KMS Developer Guide*. +```python +# destination: firehose.IDestination +# SSE with an customer-managed key that is explicitly specified +# key: kms.Key +# SSE with an AWS-owned key +firehose.DeliveryStream(self, "Delivery Stream AWS Owned", + encryption=firehose.StreamEncryption.AWS_OWNED, + destinations=[destination] +) +# SSE with an customer-managed key that is created automatically by the CDK +firehose.DeliveryStream(self, "Delivery Stream Implicit Customer Managed", + encryption=firehose.StreamEncryption.CUSTOMER_MANAGED, + destinations=[destination] +) +firehose.DeliveryStream(self, "Delivery Stream Explicit Customer Managed", + encryption_key=key, + destinations=[destination] +) +``` +See: [Data Protection](https://docs.aws.amazon.com/firehose/latest/dev/encryption.html) +in the *Kinesis Data Firehose Developer Guide*. +## Monitoring +Kinesis Data Firehose is integrated with CloudWatch, so you can monitor the performance of +your delivery streams via logs and metrics. +### Logs +Kinesis Data Firehose will send logs to CloudWatch when data transformation or data +delivery fails. The CDK will enable logging by default and create a CloudWatch LogGroup +and LogStream for your Delivery Stream. +When you create a destination, you can specify a log group. In this log group, The CDK +will create log streams where log events will be sent: +```python +import aws_cdk.aws_logs as logs +# bucket: s3.Bucket +log_group = logs.LogGroup(self, "Log Group") +destination = destinations.S3Bucket(bucket, + log_group=log_group +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +Logging can also be disabled: +```python +# bucket: s3.Bucket +destination = destinations.S3Bucket(bucket, + logging=False +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +See: [Monitoring using CloudWatch Logs](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-logs.html) +in the *Kinesis Data Firehose Developer Guide*. +### Metrics +Kinesis Data Firehose sends metrics to CloudWatch so that you can collect and analyze the +performance of the delivery stream, including data delivery, data ingestion, data +transformation, format conversion, API usage, encryption, and resource usage. You can then +use CloudWatch alarms to alert you, for example, when data freshness (the age of the +oldest record in the delivery stream) exceeds the buffering limit (indicating that data is +not being delivered to your destination), or when the rate of incoming records exceeds the +limit of records per second (indicating data is flowing into your delivery stream faster +than it is configured to process). +CDK provides methods for accessing delivery stream metrics with default configuration, +such as `metricIncomingBytes`, and `metricIncomingRecords` (see [`IDeliveryStream`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesisfirehose.IDeliveryStream.html) +for a full list). CDK also provides a generic `metric` method that can be used to produce +metric configurations for any metric provided by Kinesis Data Firehose; the configurations +are pre-populated with the correct dimensions for the delivery stream. +```python +import aws_cdk.aws_cloudwatch as cloudwatch +# delivery_stream: firehose.DeliveryStream +# Alarm that triggers when the per-second average of incoming bytes exceeds 90% of the current service limit +incoming_bytes_percent_of_limit = cloudwatch.MathExpression( + expression="incomingBytes / 300 / bytePerSecLimit", + using_metrics={ + "incoming_bytes": delivery_stream.metric_incoming_bytes(statistic=cloudwatch.Statistic.SUM), + "byte_per_sec_limit": delivery_stream.metric("BytesPerSecondLimit") + } +) +cloudwatch.Alarm(self, "Alarm", + metric=incoming_bytes_percent_of_limit, + threshold=0.9, + evaluation_periods=3 +) +``` +See: [Monitoring Using CloudWatch Metrics](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-metrics.html) +in the *Kinesis Data Firehose Developer Guide*. +## Compression +Your data can automatically be compressed when it is delivered to S3 as either a final or +an intermediary/backup destination. Supported compression formats are: gzip, Snappy, +Hadoop-compatible Snappy, and ZIP, except for Redshift destinations, where Snappy +(regardless of Hadoop-compatibility) and ZIP are not supported. By default, data is +delivered to S3 without compression. +```python +# Compress data delivered to S3 using Snappy +# bucket: s3.Bucket +s3_destination = destinations.S3Bucket(bucket, + compression=destinations.Compression.SNAPPY +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[s3_destination] +) +``` +## Buffering +Incoming data is buffered before it is delivered to the specified destination. The +delivery stream will wait until the amount of incoming data has exceeded some threshold +(the "buffer size") or until the time since the last data delivery occurred exceeds some +threshold (the "buffer interval"), whichever happens first. You can configure these +thresholds based on the capabilities of the destination and your use-case. By default, the +buffer size is 5 MiB and the buffer interval is 5 minutes. +```python +# Increase the buffer interval and size to 10 minutes and 8 MiB, respectively +# bucket: s3.Bucket +destination = destinations.S3Bucket(bucket, + buffering_interval=Duration.minutes(10), + buffering_size=Size.mebibytes(8) +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +See: [Data Delivery Frequency](https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#frequency) +in the *Kinesis Data Firehose Developer Guide*. +## Destination Encryption +Your data can be automatically encrypted when it is delivered to S3 as a final or an +intermediary/backup destination. Kinesis Data Firehose supports Amazon S3 server-side +encryption with AWS Key Management Service (AWS KMS) for encrypting delivered data in +Amazon S3. You can choose to not encrypt the data or to encrypt with a key from the list +of AWS KMS keys that you own. For more information, +see [Protecting Data Using Server-Side Encryption with AWS KMS–Managed Keys (SSE-KMS)](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html). +Data is not encrypted by default. +```python +# bucket: s3.Bucket +# key: kms.Key +destination = destinations.S3Bucket(bucket, + encryption_key=key +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination] +) +``` +## Backup +A delivery stream can be configured to back up data to S3 that it attempted to deliver to +the configured destination. Backed up data can be all the data that the delivery stream +attempted to deliver or just data that it failed to deliver (Redshift and S3 destinations +can only back up all data). CDK can create a new S3 bucket where it will back up data, or +you can provide a bucket where data will be backed up. You can also provide a prefix under +which your backed-up data will be placed within the bucket. By default, source data is not +backed up to S3. +```python +# Enable backup of all source records (to an S3 bucket created by CDK). +# bucket: s3.Bucket +# Explicitly provide an S3 bucket to which all source records will be backed up. +# backup_bucket: s3.Bucket +firehose.DeliveryStream(self, "Delivery Stream Backup All", + destinations=[ + destinations.S3Bucket(bucket, + s3_backup=destinations.DestinationS3BackupProps( + mode=destinations.BackupMode.ALL + ) + ) + ] +) +firehose.DeliveryStream(self, "Delivery Stream Backup All Explicit Bucket", + destinations=[ + destinations.S3Bucket(bucket, + s3_backup=destinations.DestinationS3BackupProps( + bucket=backup_bucket + ) + ) + ] +) +# Explicitly provide an S3 prefix under which all source records will be backed up. +firehose.DeliveryStream(self, "Delivery Stream Backup All Explicit Prefix", + destinations=[ + destinations.S3Bucket(bucket, + s3_backup=destinations.DestinationS3BackupProps( + mode=destinations.BackupMode.ALL, + data_output_prefix="mybackup" + ) + ) + ] +) +``` +If any Data Processing or Transformation is configured on your Delivery Stream, the source +records will be backed up in their original format. +## Data Processing/Transformation +Data can be transformed before being delivered to destinations. There are two types of +data processing for delivery streams: record transformation with AWS Lambda, and record +format conversion using a schema stored in an AWS Glue table. If both types of data +processing are configured, then the Lambda transformation is performed first. By default, +no data processing occurs. This construct library currently only supports data +transformation with AWS Lambda. See [#15501](https://github.com/aws/aws-cdk/issues/15501) +to track the status of adding support for record format conversion. +### Data transformation with AWS Lambda +To transform the data, Kinesis Data Firehose will call a Lambda function that you provide +and deliver the data returned in place of the source record. The function must return a +result that contains records in a specific format, including the following fields: +* `recordId` -- the ID of the input record that corresponds the results. +* `result` -- the status of the transformation of the record: "Ok" (success), "Dropped" + (not processed intentionally), or "ProcessingFailed" (not processed due to an error). +* `data` -- the transformed data, Base64-encoded. +The data is buffered up to 1 minute and up to 3 MiB by default before being sent to the +function, but can be configured using `bufferInterval` and `bufferSize` +in the processor configuration (see: [Buffering](#buffering)). If the function invocation +fails due to a network timeout or because of hitting an invocation limit, the invocation +is retried 3 times by default, but can be configured using `retries` in the processor +configuration. +```python +# bucket: s3.Bucket +# Provide a Lambda function that will transform records before delivery, with custom +# buffering and retry configuration +lambda_function = lambda_.Function(self, "Processor", + runtime=lambda_.Runtime.NODEJS_14_X, + handler="index.handler", + code=lambda_.Code.from_asset(path.join(__dirname, "process-records")) +) +lambda_processor = firehose.LambdaFunctionProcessor(lambda_function, + buffer_interval=Duration.minutes(5), + buffer_size=Size.mebibytes(5), + retries=5 +) +s3_destination = destinations.S3Bucket(bucket, + processor=lambda_processor +) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[s3_destination] +) +``` +```python +import path as path +import aws_cdk.aws_kinesisfirehose_alpha as firehose +import aws_cdk.aws_kms as kms +import aws_cdk.aws_lambda_nodejs as lambdanodejs +import aws_cdk.aws_logs as logs +import aws_cdk.aws_s3 as s3 +import aws_cdk as cdk +import aws_cdk.aws_kinesisfirehose_destinations_alpha as destinations +app = cdk.App() +stack = cdk.Stack(app, "aws-cdk-firehose-delivery-stream-s3-all-properties") +bucket = s3.Bucket(stack, "Bucket", + removal_policy=cdk.RemovalPolicy.DESTROY, + auto_delete_objects=True +) +backup_bucket = s3.Bucket(stack, "BackupBucket", + removal_policy=cdk.RemovalPolicy.DESTROY, + auto_delete_objects=True +) +log_group = logs.LogGroup(stack, "LogGroup", + removal_policy=cdk.RemovalPolicy.DESTROY +) +data_processor_function = lambdanodejs.NodejsFunction(stack, "DataProcessorFunction", + entry=path.join(__dirname, "lambda-data-processor.js"), + timeout=cdk.Duration.minutes(1) +) +processor = firehose.LambdaFunctionProcessor(data_processor_function, + buffer_interval=cdk.Duration.seconds(60), + buffer_size=cdk.Size.mebibytes(1), + retries=1 +) +key = kms.Key(stack, "Key", + removal_policy=cdk.RemovalPolicy.DESTROY +) +backup_key = kms.Key(stack, "BackupKey", + removal_policy=cdk.RemovalPolicy.DESTROY +) +firehose.DeliveryStream(stack, "Delivery Stream", + destinations=[destinations.S3Bucket(bucket, + logging=True, + log_group=log_group, + processor=processor, + compression=destinations.Compression.GZIP, + data_output_prefix="regularPrefix", + error_output_prefix="errorPrefix", + buffering_interval=cdk.Duration.seconds(60), + buffering_size=cdk.Size.mebibytes(1), + encryption_key=key, + s3_backup=destinations.DestinationS3BackupProps( + mode=destinations.BackupMode.ALL, + bucket=backup_bucket, + compression=destinations.Compression.ZIP, + data_output_prefix="backupPrefix", + error_output_prefix="backupErrorPrefix", + buffering_interval=cdk.Duration.seconds(60), + buffering_size=cdk.Size.mebibytes(1), + encryption_key=backup_key + ) + )] +) +app.synth() +``` +See: [Data Transformation](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html) +in the *Kinesis Data Firehose Developer Guide*. +## Specifying an IAM role +The DeliveryStream class automatically creates IAM service roles with all the minimum +necessary permissions for Kinesis Data Firehose to access the resources referenced by your +delivery stream. One service role is created for the delivery stream that allows Kinesis +Data Firehose to read from a Kinesis data stream (if one is configured as the delivery +stream source) and for server-side encryption. Another service role is created for each +destination, which gives Kinesis Data Firehose write access to the destination resource, +as well as the ability to invoke data transformers and read schemas for record format +conversion. If you wish, you may specify your own IAM role for either the delivery stream +or the destination service role, or both. It must have the correct trust policy (it must +allow Kinesis Data Firehose to assume it) or delivery stream creation or data delivery +will fail. Other required permissions to destination resources, encryption keys, etc., +will be provided automatically. +```python +# Specify the roles created above when defining the destination and delivery stream. +# bucket: s3.Bucket +# Create service roles for the delivery stream and destination. +# These can be used for other purposes and granted access to different resources. +# They must include the Kinesis Data Firehose service principal in their trust policies. +# Two separate roles are shown below, but the same role can be used for both purposes. +delivery_stream_role = iam.Role(self, "Delivery Stream Role", + assumed_by=iam.ServicePrincipal("firehose.amazonaws.com") +) +destination_role = iam.Role(self, "Destination Role", + assumed_by=iam.ServicePrincipal("firehose.amazonaws.com") +) +destination = destinations.S3Bucket(bucket, role=destination_role) +firehose.DeliveryStream(self, "Delivery Stream", + destinations=[destination], + role=delivery_stream_role +) +``` +See [Controlling Access](https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html) +in the *Kinesis Data Firehose Developer Guide*. +## Granting application access to a delivery stream +IAM roles, users or groups which need to be able to work with delivery streams should be +granted IAM permissions. +Any object that implements the `IGrantable` interface (i.e., has an associated principal) +can be granted permissions to a delivery stream by calling: +* `grantPutRecords(principal)` - grants the principal the ability to put records onto the + delivery stream +* `grant(principal, ...actions)` - grants the principal permission to a custom set of + actions +```python +# Give the role permissions to write data to the delivery stream +# delivery_stream: firehose.DeliveryStream +lambda_role = iam.Role(self, "Role", + assumed_by=iam.ServicePrincipal("lambda.amazonaws.com") +) +delivery_stream.grant_put_records(lambda_role) +``` +The following write permissions are provided to a service principal by the +`grantPutRecords()` method: +* `firehose:PutRecord` +* `firehose:PutRecordBatch` +## Granting a delivery stream access to a resource +Conversely to the above, Kinesis Data Firehose requires permissions in order for delivery +streams to interact with resources that you own. For example, if an S3 bucket is specified +as a destination of a delivery stream, the delivery stream must be granted permissions to +put and get objects from the bucket. When using the built-in AWS service destinations +found in the `@aws-cdk/aws-kinesisfirehose-destinations` module, the CDK grants the +permissions automatically. However, custom or third-party destinations may require custom +permissions. In this case, use the delivery stream as an `IGrantable`, as follows: +```python +# delivery_stream: firehose.DeliveryStream +fn = lambda_.Function(self, "Function", + code=lambda_.Code.from_inline("exports.handler = (event) => {}"), + runtime=lambda_.Runtime.NODEJS_14_X, + handler="index.handler" +) +fn.grant_invoke(delivery_stream) +``` +## Multiple destinations +Though the delivery stream allows specifying an array of destinations, only one +destination per delivery stream is currently allowed. This limitation is enforced at CDK +synthesis time and will throw an error. + +%prep +%autosetup -n aws-cdk.aws-kinesisfirehose-alpha-2.81.0a0 + +%build +%py3_build + +%install +%py3_install +install -d -m755 %{buildroot}/%{_pkgdocdir} +if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi +if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi +if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi +if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi +pushd %{buildroot} +if [ -d usr/lib ]; then + find usr/lib -type f -printf "/%h/%f\n" >> filelist.lst +fi +if [ -d usr/lib64 ]; then + find usr/lib64 -type f -printf "/%h/%f\n" >> filelist.lst +fi +if [ -d usr/bin ]; then + find usr/bin -type f -printf "/%h/%f\n" >> filelist.lst +fi +if [ -d usr/sbin ]; then + find usr/sbin -type f -printf "/%h/%f\n" >> filelist.lst +fi +touch doclist.lst +if [ -d usr/share/man ]; then + find usr/share/man -type f -printf "/%h/%f.gz\n" >> doclist.lst +fi +popd +mv %{buildroot}/filelist.lst . +mv %{buildroot}/doclist.lst . + +%files -n python3-aws-cdk.aws-kinesisfirehose-alpha -f filelist.lst +%dir %{python3_sitelib}/* + +%files help -f doclist.lst +%{_docdir}/* + +%changelog +* Wed May 31 2023 Python_Bot <Python_Bot@openeuler.org> - 2.81.0a0-1 +- Package Spec generated @@ -0,0 +1 @@ +7c32bfae4e7605a7aec6ee144b47d1c0 aws-cdk.aws-kinesisfirehose-alpha-2.81.0a0.tar.gz |