Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: KinesisIO source on FlinkRunner initializes the same splits twice #31313

Open
2 of 16 tasks
hlteoh37 opened this issue May 16, 2024 · 10 comments
Open
2 of 16 tasks

Comments

@hlteoh37
Copy link
Contributor

hlteoh37 commented May 16, 2024

What happened?

Bug description

Setup details:

  • FlinkRunner (Flink 1.15.4) (also replicated with Flink 1.18.2)
  • KinesisIO (from beam-sdks-java-io-amazon-web-services2)
  • Beam version 2.56.0
  • Pipeline attached mode (false)

Bug details:

  • When restoring from snapshot on Flink, the org.apache.beam.sdk.io.aws2.kinesis.KinesisReader is assigned the same splits twice, once with snapshot state, and once without. This leads to duplicate data being processed.

Replication steps:

  1. Start a Flink job with KinesisIO source.
  2. Stop the Flink job with a savepoint.
  3. Start the same Flink job from savepoint.

Logs:

  • From Flink Taskmanager (worker node) log dump below, we can see that splits for shardId-000000000000 to shardId-000000000003 are first initialized with checkpoint state AFTER_SEQUENCE_NUMBER (correct).
  • Following that, we see that they are initialized without checkpoint state with AT_TIMESTAMP (not correct).
2024-05-16 12:29:43,263 INFO  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=org.apache.beam.sdk.io.aws2.kinesis.KinesisSource@269f1286, splitState.isNull=false, checkpointMark=null}]
2024-05-16 12:29:43,264 WARN  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSourceReader [] - AutoWatermarkInterval is not set, watermarks will be emitted at a default interval of 200 ms
2024-05-16 12:29:43,264 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: KDS Source/Read(KinesisSource) -> Flat Map -> ParDo(Anonymous)/ParMultiDo(Anonymous) -> KDS Sink/ParDo(Anonymous)/ParMultiDo(Anonymous) (1/1)#0 (9e813ffa491b6a4d44e7860742f1576b) switched from INITIALIZING to RUNNING.
2024-05-16 12:29:43,265 WARN  org.apache.beam.sdk.coders.SerializableCoder                 [] - Can't verify serialized elements of type KinesisReaderCheckpoint have well defined equals method. This may produce incorrect results on some PipelineRunner implementations2024-05-16 12:29:43,265 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisSource            [] - Got checkpoint mark [Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream,shard shardId-000000000000: 49651326501327272958725198009666143092350196858539212802 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000001: 49651326500635949857570748692274909048439253764329701394 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000002: 49651326503155934065004709107267236287428903985330782242 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000003: 49651326499966927501614829993237864477127027156192329778 0]2024-05-16 12:29:43,266 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisSource            [] - Creating new reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000000: 49651326501327272958725198009666143092350196858539212802 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000001: 49651326500635949857570748692274909048439253764329701394 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000002: 49651326503155934065004709107267236287428903985330782242 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000003: 49651326499966927501614829993237864477127027156192329778 0]
2024-05-16 12:29:43,268 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisReader            [] - Starting reader using [Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000000: 49651326501327272958725198009666143092350196858539212802 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000001: 49651326500635949857570748692274909048439253764329701394 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000002: 49651326503155934065004709107267236287428903985330782242 0, Checkpoint AFTER_SEQUENCE_NUMBER for stream ExampleInputStream, shard shardId-000000000003: 496513264999669275016148299932378644771270271561923297780]
2024-05-16 12:29:43,272 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000000, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326501327272958725198009666143092350196858539212802)
2024-05-16 12:29:43,795 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000001, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326500635949857570748692274909048439253764329701394)
2024-05-16 12:29:43,891 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000002, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326503155934065004709107267236287428903985330782242)
2024-05-16 12:29:43,983 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000003, ShardIteratorType=AT_SEQUENCE_NUMBER, StartingSequenceNumber=49651326499966927501614829993237864477127027156192329778)
2024-05-16 12:29:44,080 INFO  org.apache.beam.sdk.io.aws2.kinesis.ShardReadersPool         [] - Starting to read ExampleInputStream stream from [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003] shards
2024-05-16 12:29:44,266 INFO  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Adding splits [FlinkSourceSplit{splitIndex=0, beamSource=org.apache.beam.sdk.io.aws2.kinesis.KinesisSource@2a249887, splitState.isNull=true, checkpointMark=null}]
2024-05-16 12:29:44,266 INFO  org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase [] - Received NoMoreSplits signal from enumerator.
2024-05-16 12:29:44,898 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisSource            [] - No checkpointMark specified, fall back to initial [Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000000: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000001: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000002: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000003: null null]
2024-05-16 12:29:44,898 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisSource            [] - Creating new reader using [Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000000: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000001: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000002: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000003: null null]
2024-05-16 12:29:44,899 INFO  org.apache.beam.sdk.io.aws2.kinesis.KinesisReader            [] - Starting reader using [Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000000: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000001: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream,shard shardId-000000000002: null null, Checkpoint AT_TIMESTAMP for stream ExampleInputStream, shard shardId-000000000003: null null]
2024-05-16 12:29:44,899 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000000, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,311 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000001, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,412 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000002, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,514 INFO  org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient  [] - Starting getIterator request GetShardIteratorRequest(StreamName=ExampleInputStream, ShardId=shardId-000000000003, ShardIteratorType=AT_TIMESTAMP, Timestamp=2024-05-16T10:59:38.912Z)
2024-05-16 12:29:45,620 INFO  org.apache.beam.sdk.io.aws2.kinesis.ShardReadersPool         [] - Starting to read ExampleInputStream stream from [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003] shards

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@hlteoh37
Copy link
Contributor Author

Adding dump of replication Flink code here:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.flink.example</groupId>
    <artifactId>beam</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>Apache Flink Beam Application</name>
    <packaging>jar</packaging>

    <properties>
        <flink.version>1.15.2</flink.version>
        <logback.version>1.4.14</logback.version>
        <main-class>org.apache.flink.example.BeamApplication</main-class>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-math3</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-flink-1.15</artifactId>
            <version>2.56.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
            <version>2.56.0</version>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:flink-core</exclude>
                                    <exclude>org.apache.flink:flink-annotations</exclude>
                                    <exclude>org.apache.flink:flink-metrics-core</exclude>
                                    <exclude>org.apache.flink:flink-shaded-*</exclude>
                                    <exclude>org.apache.flink:flink-table-*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>${main-class}</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Java class

package org.apache.flink.example;


import io.opentelemetry.instrumentation.annotations.WithSpan;
import lombok.Data;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.joda.time.Duration;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;

import java.nio.charset.StandardCharsets;
import java.time.Instant;

public class BeamApplication {

    @Data
    @JsonPropertyOrder({"timestamp", "location", "quantity"})
    public static final class Event {
        private Instant timestamp;
        private String location;
        private long quantity;
    }

    @WithSpan
    public static void main(final String... args) throws Exception {
        FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);
        options.setRunner(FlinkRunner.class);
        options.setAttachedMode(false);

        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply("KDS Source", KinesisIO.read()
                .withStreamName("ExampleInputStream")
                        .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
                        .withInitialTimestampInStream(org.joda.time.Instant.now().minus(Duration.standardMinutes(30)))
                        .withClientConfiguration(ClientConfiguration.builder()
                                .region(Region.US_EAST_1)
                                .build()))
                .apply(ParDo.of(new DoFn<KinesisRecord, String>() {
                    @ProcessElement
                    public void processElement(@Element KinesisRecord record, OutputReceiver<String> out) {
                        System.out.println(record.toString());
                        out.output(record.toString());
                    }
                }))
                .apply("KDS Sink", KinesisIO.<String>write()
                        .withStreamName("ExampleOutputStream")
                        .withClientConfiguration(ClientConfiguration.builder()
                                .region(Region.US_EAST_1)
                                .build())
                        .withSerializer((SerializableFunction<String, byte[]>) input -> input.getBytes(StandardCharsets.UTF_8))
                        .withPartitioner(KinesisPartitioner.explicitRandomPartitioner(1))
                );


        pipeline.run();

    }
}

@akashk99
Copy link

@je-ik is this the same issue as this #30903

I noticed you fixed it and the problem statement seems to be similar, but please let me know if this is something different as I am getting duplicated data on 2.56 when restoring from a flink save point

@je-ik
Copy link
Contributor

je-ik commented May 24, 2024

I suppose this is (similar, but) different issue, probably caused by the same underlying bug. #30903 fixed Impulse only. Does using --experiments=beam_fn_api fix the issue?

@akashk99
Copy link

@je-ik Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation?

@akashk99
Copy link

I am noticing actually a lot of back pressure using this approach despite downstream operators having low CPU usage. Is the fix to the root cause relatively straight forward in which case I can implement it in a forked version of the repo? or is it more involved?

@je-ik
Copy link
Contributor

je-ik commented May 24, 2024

I don't know the root cause, it seems that Flink does not send the snapshot state after restore from savepoint. I observed this on the Impulse (I suspected that it affects only bounded sources running in unbounded mode, but it seems it is not the case). It might be a Beam bug or a Flink bug.

Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation?

The flag turns on different expansion for Read transform - it uses splittable DoFn (SDF), which uses Impulse which was fixed earlier. Performance should be similar to classical Read.

@akashk99
Copy link

Im surprised this is a bug considering restoring from a flink savepoint is a pretty common use case, is it possible there some configuration missing somewhere? I havent been able to find anyone else online experiencing this same issue but I was able to replicate it using both kinesis and kafka. Given how common of a use case it is, Im not 100% sure I believe this is in fact a bug and most likely some user error on my part.

I can make do without savepoints by utilizing kafka offset commits and consumer groups to ensure no data is lost, but cant figure out a way to not lose data that is windowed but not triggered when the flink application is stopped. Maybe you know of a solution to that problem?

it seems like a lot of the subtasks arent being utilized when stripping IDs with beam_fn_api despite the number of shards being 20 and parallelism being 24 (in theory should only be 4 idle subtasks)

Screenshot 2024-05-24 at 12 48 32 PM

@je-ik
Copy link
Contributor

je-ik commented May 25, 2024

Im surprised this is a bug considering restoring from a flink savepoint is a pretty common use case, is it possible there some configuration missing somewhere? I havent been able to find anyone else online experiencing this same issue but I was able to replicate it using both kinesis and kafka. Given how common of a use case it is, Im not 100% sure I believe this is in fact a bug and most likely some user error on my part.

Can you please provide a minimal example and setup to reproduce the behavior?

I can make do without savepoints by utilizing kafka offset commits and consumer groups to ensure no data is lost, but cant figure out a way to not lose data that is windowed but not triggered when the flink application is stopped. Maybe you know of a solution to that problem?

You can drain the Pipeline, see https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#terminating-a-job

it seems like a lot of the subtasks arent being utilized when stripping IDs with beam_fn_api despite the number of shards being 20 and parallelism being 24 (in theory should only be 4 idle subtasks)

This is related to how Flink computes target splits. It is affected by maximal parallelism (which is computed automatically, if not specified). You can try increasing it via --maxParallelism=32768 (32768 is maximal value), this could make the assignment more balanced.

@akashk99
Copy link

Thanks for the suggestions, will give them a try. I believe the first comment of the ticket provides a simple pipeline that exhibits this behavior on the flink runner but if that doesn’t work, happy to provide another. The example also submits the job in detached mode which may be related, although have seen similar behavior without it. Appreciate your help looking into this, if there’s anything I can assist with, please let me know

@akashk99
Copy link

Just to mimic the local setup I used:

I ran flink/start-cluster.sh

used the flink run command with the -d flag

and then stopped the job with a savepoint ./flink/bin/flink stop -p flink/savepoints cf78a44e6b10ab7062d3c02bb7d4e052

and then restarted using run with the savepoint path.

When doing this, I looked inside the task manager logs and searched for Starting getIterator request and saw 6 logs for the same timestamp that my app restarted. 3 at sequence number and 3 at latest. I am not sure why the latest ones are showing up and didnt see anything in the source code that would cause this.

I also switched to kafka and noticed the same behavior so it seems to be related to the runner. I was unable to fix the performance issues with beam_fn_api and notice the backpressure was causing my data to come in waves. Looking at a cpu chart, it was very cyclic with peaks of 99% cpu and troughs of 8% cpu leading me to believe that this pipeline option was causing some sort of build up and then a rush of data causing the cpu to spike.

I can make do with kafka offset commits for now, but if there are any pointers on how to fix this in the beam source code, id be happy to take a look and even submit a PR to be included in version 2.57. Although still hoping the issue is somewhere on my end that can be fixed fairly easily

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants