Where’s My Data — A Unique Encounter with Flink Streaming’s Kinesis Connector | by Seth Saperstein

For years now, Lyft has not only been a proponent of but also a contributor to Apache Flink. Lyft’s pipelines have evolved drastically over the years, yet, time and time again, we run into unique cases that stretch Flink to its breaking points — this is one of those times.While Lyft runs many streaming applications, the one specifically in question is a persistence job. Simply put, it streams data from Kinesis, performs some level of serializations and transformations, and writes to S3 every few minutes.Flink pipeline for persisting data from Kinesis to S3.In this case, it persists a hefty majority of events generated at Lyft, occurring at a rate of 80 gigabytes per minute on average and running at a parallelism of 1800, which happens to be one of Lyft’s largest streaming jobs.Let’s start at the end, shall we?Data Engineer: “Alert! My reports aren’t being generated! The upstream data is not available to generate them on!”Platform Engineer: “I’m on it! Looks like our streaming application to persist data is up and running, but I hardly see any data being written either!”Like any good engineer would, we pulled out our runbooks and carefully performed the well-detailed steps:Platform Engineer: “Let me roll back our seemingly innocuous change we just deployed.”Platform Engineer: “No luck.”Platform Engineer: “Ok, let me try turning it off and on again.”Platform Engineer: “No luck.”Platform Engineer: “Ok, let me try performing a hard reset and we’ll backfill later.”Platform Engineer: “It worked!”This pattern unfortunately repeated on a roughly biweekly cadence for over a month. It certainly warranted further investigation.Throughout the rest of this article, we’ll dissect the root cause of the behavior mentioned above and its relation to the inner-workings of the Flink Kinesis Consumer.To start, we asked the following questions:Why did the job suddenly have such low throughput?Why did the issue persist after both attempting to rollback changes and starting from an earlier checkpoint?Our initial investigations yielded the following:Kinesis Per Shard Iterator AgeThe iterator age, or the time between the current processing time and the time at which the event arrived on the Kinesis shard during the outage is pictured below.Per shard iterator age graph indicates an increase from ~3 minutes for shards 1 to 9 followed by a gap and a shard iterator age of ~5 days for shard 10 but no shard iterator age for other shards. Above metrics have been regenerated with far fewer shards as original metrics fell out of retention.While difficult to see, the above is a graph indicating that for all shards but one, the iterator age is regular at ~3 minutes. There is then a gap in metrics followed by the iterator age of a single shard indicating ~5 day iterator age lag.Digging deeper, we see that single shard in question experienced the following behavior days prior:Shard 10 iterator age is steady at ~3 minutes until ~12:46pm on 7/26 followed by a gap in metrics over several days, followed by an increase in the metric to ~5 days iterator age lagLo and behold, it appears our issue did not begin at the time of detection when downstream users appeared to be missing data, but rather days prior.At this point, we shifted our investigation to five days prior, corresponding to the time at which the iterator age lag for the shard above was absent. There appeared to be a handful of irregular metrics at exactly this time, including the following:CPU Usage & CPU ThrottlingTask manager CPU graph corresponding to a series of task managers reaching near or at a spike of 100% CPU usage for ~10 minutes before returning to normal levels of ~5%. Above metrics have been regenerated with fewer task managers as original metrics have fallen out of retentionAs shown above, during this period several of the 1800 task managers reached a CPU usage of 100% with significant CPU throttling. Furthermore, for task managers experiencing prolonged periods of CPU throttling, we saw a gap in logs of up to 10 minutes for the Kinesis consumer:A subtask exhibiting a significant gap in logging in a timer-based thread.Before we begin our next part of the story, we’ll need to develop a deeper understanding of the Flink Kinesis Consumer.For the sake of simplicity, let’s assume there is one Kinesis shard per Flink subtask. A subtask in the context is defined as one parallel slice of a task or a unit of execution able to be scheduled. Now from a high level, most assume consumption from a single shard occurs as follows:Flink Kinesis Consumer processing a single Kinesis shardIndeed, this may have been the case at inception, but further optimizations could and should be made.What if we want to read and write asynchronously?Flink Kinesis Consumer processing a single Kinesis shard asynchronouslyThe multithreaded approach above elegantly solves this issue, however, further issues arise.What if we want to maintain event time alignment in the source?Both of the models above are inherently flawed in regards to a concept known as event time alignment, preventing any single subtask from processing data with an event time far ahead of other subtasks’ corresponding record event times.As discussed in this Flink improvement proposal, unaligned event times from the source operator can lead to uncontrollable growth in state.Let’s define some terminology that will help with the below examples:Local watermark: Corresponds to a single subtask and aligns closely with event time, indicating to downstream operators that no elements with a timestamp older or equal to the watermark timestamp should arrive at the operator.Global watermark: Corresponds to the minimum of all subtasks’ local watermarks.Watermark “lookahead”: The amount of time for which one subtask can process data ahead of the global watermark.ExampleThe following will exemplify the relationship of the different watermark-related timestamps defined above.At first, subtask 1’s event time, and thus watermark, lags behind subtask 2. The red mark indicates how far ahead any subtask’s watermark can be from the global watermark, or the minimum of all local watermarks.After some processing, subtask 2’s watermark becomes the new global watermark.Lastly, subtask 1 gets far enough ahead of the global watermark that it is prevented from processing further data to keep the subtasks’ watermarks relatively aligned.Now, let’s see how this concept is implemented in the context of our consumer.Flink Kinesis Consumer accounting for event time alignmentRecordEmitter: The watermark of each record is compared to the global watermark to decide to emit the record downstream or to wait.PeriodicWatermarkEmitter: This thread periodically emits the watermark corresponding to the last emitted record.WatermarkSyncCallback: This thread periodically updates the job manager with the subtasks local watermark and is returned the global watermark of all subtasks.While elegant, this model has yet another flaw that needs resolving.What if a shard does not have more data?Under conditions of the current model, if a shard has stopped being written to for any reason, the subtask’s local watermark will not progress, and thus holds back the global watermark. If this occurs, other subtasks will eventually be prevented from processing data due to event time alignment.To handle the case of an idle shard, what do we do? Essentially, we ignore the subtask.ExampleSubtask 1’s watermark starts as the global watermark.Subtask 2 has no data to consume from the shard and its watermark becomes the global watermark.Subtask 2 has not consumed any data from the shard after a period of time and is thus ignored.To handle idle shards we perform three actions:Mark the subtask as idle so its watermarks, or lack thereof, are ignored by downstream operators.Prevent the subtask’s local watermark from impacting the global watermark.When the global watermark is calculated for other subtasks, it only considers local watermarks updated in the last 60 seconds.Flink Kinesis Consumer under idle shard conditionsAt this point, we have the proper understanding required to continue to the next chapter. It’s worth noting that much of the complexity of multithreading and event time alignment is the motivation behind the Flink source refactor that implements these concepts as a first-class citizen in Flink.There’s one final piece of the puzzle to uncover before explaining what happened — the logs! The logs periodically show the local watermark, global watermark, and idle status of each subtask.In our case, we see the following events for the subtask and shard in question:The local and global watermarks are emitted normally.During CPU throttling, no logs are reported for the consumer for several minutes.The global watermark has moved backwards in time several minutes.The subtask is marked idle.The subtask does not emit a record due to the event time alignment mechanism.The global subtask does not update as the source is idle.Logs with the same local watermark, global watermark, and idle status are reported indefinitely.It’s often thought that global watermarks can only move forward in time, not backward, and yet here we are. In the next chapter we’ll put the pieces together to explain the series of strange phenomena we’ve seen so far.At this point, we now possess a handful of curious observations.Days prior to restart:High CPU throttling occurs for an extended period of time.After high CPU throttling, the global watermark is moved backward in time.After high CPU throttling, a single subtask is marked idle and it does not emit new records even when the shard contains more data.A shard’s iterator age does not appear in the metrics for ~5 days.At the time of the application restart, 5 days later:The single shard is the only shard whose iterator age appears in the metrics.The application’s throughput is <1% of expected, regardless of starting with an earlier checkpoint or savepoint.This brings us to answer the first of many questions that explain the behavior.Why is the subtask not emitting data when the shard contains data?The subtask does not emit data as it enters the following deadlock state:Subtasks A’s local watermark is too far ahead of the global watermark, corresponding to Subtask B’s watermark, to emit a new record per the event time alignment mechanism.Subtask A is marked idle and thus does not receive the updated global watermark from the job manager.The next obvious question that needs answering is how each of these conditions are met.#1: How does the subtask get too far ahead of the global watermark?While a subtask’s watermark can, under normal circumstances, get too far ahead (or in our case 10 minutes ahead) of the slowest subtask’s watermark due to skew, this is not the case here. As a result of normal processing, the subtask’s local watermark is a few minutes ahead of the global watermark, something to be expected with 1800 subtasks.In this case, the subtask gets too far ahead of the global watermark as a result of the global watermark moving backwards in time several minutes.What causes the global watermark to move backwards in time?The global watermark moves backwards in time as a result of prolonged CPU throttling among a series of subtasks. As both the logs and CPU throttling metrics suggest, many subtasks hit this stop-the-world scenario where operations are no longer occurring. In this next example, we’ll show how this leads to a global watermark that seems to reverse in time.ExampleTo start, subtask 1’s watermark is the global watermark.Subtask 2 is CPU throttled and stops all operations. Its watermark becomes the global watermark.Subtask 2, still being CPU throttled, does not update its watermark with the job manager for more than 60 seconds. Its watermark is thus ignored in the global watermark calculation and subtask 1’s local watermark becomes the global.Subtask 2, resuming shard consumption after CPU throttling, updates its local watermark with the job manager. Thus, the global watermark has moved backwards as result of CPU throttling.Thus, by the global watermark moving backwards, as opposed to the local watermark moving forward naturally, it becomes very possible for a subtask to instantaneously reach a point where it can no longer emit records, meeting condition #1 of the deadlock state.#2: Why is the subtask marked idle?A Kinesis Consumer subtask is marked idle when two conditions occur:The emit queue is empty.No records have been written to the emit queue in the last 60 seconds.Extended CPU throttling is a unique case where these conditions have the possibility to hold true while records as still being written to the shard by a producer. The conditions hold as follows:According to the logs, the queue is empty when waking from CPU throttling and thus when entering CPU throttling, a by chance occurrence that experiences increased probability with lower queue size and higher Kinesis poll times as we have configured.Due to CPU throttling, no records have been written to the emit queue for several minutes.It is through this logic that we understand why a subtask is marked idle; however, an important question still remains.Why does the subtask remain idle?Shortly after CPU throttling stops, we’d expect the emit queue to fill and relieve us of our idle state. This would then allow the global watermark to be updated for the subtask and our subtask would resume emitting data as usual, no longer hindered by the event time alignment mechanism.Note, however, that above it states the conditions for a subtask to be marked idle. It does not state the conditions for a subtask to be marked not idle. To be marked not idle, it is required that the emitted watermark has progressed, which, as mentioned above, is not the case due to the deadlock state.At this point, we have a single subtask in a deadlock state for 5 days. No data is being consumed from its corresponding shard. All other behavior appears normal, including the consumption of other shards from corresponding subtasks.What occurs during a deploy?On deploy, Lyft’s FlinkK8sOperator performs the following:Takes a savepointStops the jobStarts a new clusterSubmits the job to the new cluster with the savepointFrom the Flink Kinesis Consumer’s perspective, the following occurs on submission of the new job:Shards are assigned to subtasksPrevious shard offsets are distributed from the savepointSubtasks begin consuming from shardsSubtasks recalculate the global watermarkData processing continues normallyExamplePrior to deployment, subtask 2 was in a deadlock state and subtask 1 moved ahead for several days with both the local and global watermark.After deployment, the local watermarks were unchanged but the global watermark is recalculated.At this point, subtask 2’s local watermark, corresponding to the global watermark, began processing the past 5 days worth of data while subtask 1 was restricted by the event time alignment mechanism.For this reason, we see metrics for only a single shard’s iterator age as these metrics are only produced when records are retrieved from the corresponding Kinesis shard.This issue was temporarily alleviated by the “hard reset”, or starting the job without a savepoint. As no state is associated upon initialization, each subtask begins reading from the latest record of each shard. Thus, the deadlock is also relieved and the event time alignment mechanism does not prevent any subtask from processing its corresponding shard.To mitigate data loss, we started a separate job where each subtask begins reading its corresponding shard from a specific point in time. In our case, this was set to the point in time prior to the original deployment.We originally assumed we had mitigated all data loss, however, this was not the case. There was but 1 fickle shard we failed to save. Recall a single subtask was in a deadlock state for 5 days. The backfill to mitigate the incident prevented data loss from the time of deployment. As a result, data loss occurred for the 1 of 1800 shards, corresponding to 1/1800 of data, over a 5 day period.As the Kinesis stream had a corresponding 7 day retention period, 2 days after the “hard reset”, our unprocessed data in the shard had unfortunately fallen out of retention, far prior to concluding the aforementioned root cause.To wrap up our investigation, we asked how we could detect such a case sooner as well as avoid this possibility in the future. We took a multi-pronged approach and performed the following:Root Cause Fix [FLINK-29099]: Alleviate the deadlock by retrieving the updated global watermark when the subtask is idle.Additional Monitoring: On a per-shard level, monitor that shards being produced to are also being consumed from the corresponding Flink application.Mitigate CPU Throttling: Given that this application had occasional high levels of CPU usage and thus CPU throttling, we decomposed the application into smaller applications and performed additional analysis into the root cause of high CPU usage.Here at Lyft, we know this unique issue we’ve experienced will not be our last. In fact, new and exciting challenges arise everyday. As such, we plan to continue to share these experiences with the engineering community.Additionally, we are actively looking for innovative engineers to help us take our streaming platforms to the next level! I would love to connect with you to share more on what we do and discuss opportunities that excite you!Relevant PostsGet insights on how we overcame data skewness in streaming applications at Lyft.Learn more about various use cases of streaming at Lyft in this blog post.Check out this blog post of the evolution of streaming pipelines in Lyft’s Marketplace.