Last updated:
0 purchases
awscdk.awskinesisanalyticsflinkalpha 2.159.0a0
Kinesis Analytics Flink
---
The APIs of higher level constructs in this module are experimental and under active development.
They are subject to non-backward compatible changes or removal in any future version. These are
not subject to the Semantic Versioning model and breaking changes will be
announced in the release notes. This means that while you may use them, you may need to update
your source code when upgrading to a newer version of this package.
This package provides constructs for creating Kinesis Analytics Flink
applications. To learn more about using using managed Flink applications, see
the AWS developer
guide.
Creating Flink Applications
To create a new Flink application, use the Application construct:
import path as path
import aws_cdk.aws_cloudwatch as cloudwatch
import aws_cdk as core
import aws_cdk.aws_kinesisanalytics_flink_alpha as flink
app = core.App()
stack = core.Stack(app, "FlinkAppTest")
flink_app = flink.Application(stack, "App",
code=flink.ApplicationCode.from_asset(path.join(__dirname, "code-asset")),
runtime=flink.Runtime.FLINK_1_19
)
cloudwatch.Alarm(stack, "Alarm",
metric=flink_app.metric_full_restarts(),
evaluation_periods=1,
threshold=3
)
app.synth()
The code property can use fromAsset as shown above to reference a local jar
file in s3 or fromBucket to reference a file in s3.
import path as path
import aws_cdk.aws_s3_assets as assets
import aws_cdk as core
import aws_cdk.aws_kinesisanalytics_flink_alpha as flink
app = core.App()
stack = core.Stack(app, "FlinkAppCodeFromBucketTest")
asset = assets.Asset(stack, "CodeAsset",
path=path.join(__dirname, "code-asset")
)
bucket = asset.bucket
file_key = asset.s3_object_key
flink.Application(stack, "App",
code=flink.ApplicationCode.from_bucket(bucket, file_key),
runtime=flink.Runtime.FLINK_1_19
)
app.synth()
The propertyGroups property provides a way of passing arbitrary runtime
properties to your Flink application. You can use the
aws-kinesisanalytics-runtime library to retrieve these
properties.
# bucket: s3.Bucket
flink_app = flink.Application(self, "Application",
property_groups={
"FlinkApplicationProperties": {
"input_stream_name": "my-input-kinesis-stream",
"output_stream_name": "my-output-kinesis-stream"
}
},
# ...
runtime=flink.Runtime.FLINK_1_19,
code=flink.ApplicationCode.from_bucket(bucket, "my-app.jar")
)
Flink applications also have specific configuration for passing parameters
when the Flink job starts. These include parameters for checkpointing,
snapshotting, monitoring, and parallelism.
# bucket: s3.Bucket
flink_app = flink.Application(self, "Application",
code=flink.ApplicationCode.from_bucket(bucket, "my-app.jar"),
runtime=flink.Runtime.FLINK_1_19,
checkpointing_enabled=True, # default is true
checkpoint_interval=Duration.seconds(30), # default is 1 minute
min_pause_between_checkpoints=Duration.seconds(10), # default is 5 seconds
log_level=flink.LogLevel.ERROR, # default is INFO
metrics_level=flink.MetricsLevel.PARALLELISM, # default is APPLICATION
auto_scaling_enabled=False, # default is true
parallelism=32, # default is 1
parallelism_per_kpu=2, # default is 1
snapshots_enabled=False, # default is true
log_group=logs.LogGroup(self, "LogGroup")
)
Flink applications can optionally be deployed in a VPC:
# bucket: s3.Bucket
# vpc: ec2.Vpc
flink_app = flink.Application(self, "Application",
code=flink.ApplicationCode.from_bucket(bucket, "my-app.jar"),
runtime=flink.Runtime.FLINK_1_19,
vpc=vpc
)
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.