Druid Deprecation and ClickHouse Adoption at Lyft | by Ritesh Varyani

What is ClickHouse?ClickHouse is an open-source, column-oriented database for online analytical processing. One of ClickHouse’s standout factors is its high performance—due to a combination of factors such as column-based data storage & processing, data compression, and indexing.Initial Use CaseIn 2020, while the data platform team was managing Druid, the marketplace team considered a new set of requirements:Data produced is immediately available for querying in near real-timeLatencies are sub-second for business dashboardingIngestion for quick slice and dice of datasets. (For example: How many rides in the last 2 hours in the SF region?)Nested data supportSupport for both real-time and batch ingestionNative data deduplication at destinationWhile the latest version of Druid would provide us with some of these features, such as nested joins (v0.18), other requirements such as deduplication at destination would not be well satisfied. Using our existing stack, we considered performing deduplication at the streaming layer instead of at the destination.However, two main reasons prevented us from pursuing this idea:We would want to perform this at the Destination Storage layer to deduplicate data between the stream and batch loads.Streaming solutions require setting up a mutability window per entity (ex. 24 hours per ride). This was a hard requirement from the business end due to possible scenarios of updating a past transactional entity already written to storage. This was coupled with the need of the entity to be queryable as soon as possible (at the end of a ride, for example, if not earlier).The conclusion was that we should be able to overwrite records as needed using the dedupe facility. With this information at top of mind in 2020, ClickHouse emerged as an option that satisfied the above requirements out of the box and was adopted by the marketplace team.ClickHouse gained momentum with our marketplace use cases, leading us to a series of questions — should we expand to other use cases? Can ClickHouse support our Druid use cases? Should we continue to run both systems at Lyft or consolidate into one?After a careful and deep analysis of cost, infrastructure management, and overall feature benefits, we decided to expand on ClickHouse and sunset Druid, migrating existing Druid use cases to ClickHouse. The following points expand on some benefits we saw with ClickHouse over Druid:Simplified infrastructure management—with Lyft’s pivot towards leaner teams and architecture, there was a preference for converging on a system with less management and maintenance requirements. Druid, due to its modular design, turned out to be a more complex system to maintain.Reduced learning curve—our users are well versed in Python and SQL semantics compared to Java, etc. With more familiar language and tooling, onboarding their use cases would take less time. For example, defining sorting key and engine type in their object definitions with TTL would be a shorter learning curve compared to defining these as a Druid specification.Data deduplication—covered in the “Initial Use Case” section above.Cost—for Lyft, as a Kubernetes Compute company, running ClickHouse over Druid would reduce our compute footprint by a large margin. Running much leaner with better TTL definitions at 1/8th of the cost was a big plus.Specialized engines—Replicated*, Replacing* and Kafka Engine types in ClickHouse allow Lyft to natively manage Kafka pull-based ingestion and also maintain high availability (HA) due to replication across nodes.We created a benchmarking test suite, and while still serving queries off of Druid, provisioned our real-time & batch ingestion in ClickHouse, and ran tests in a controlled environment comparing query performance between ClickHouse and Druid.We involved our stakeholder users in these tests and took queries running in our Druid production system, dynamically transpiled them to ClickHouse syntax, and fired both queries against Druid and ClickHouse respectively. We compared the query latencies and identified bottlenecks in ClickHouse.For a couple of our experimentation use cases in ClickHouse, we observed unreliable (spiky, higher) latency performance due to the sorting key of the table and the query resulting in full scans of the table. Since the shape of the query was consistent and the data queried could be pre-aggregated, adding projections in ClickHouse helped.Projections are essentially precomputed views of your data, which compute only on the columns needed and rather than running a full table scan, ClickHouse just scans the projection column. In addition to improving the query performance for the couple experimentation use cases, projections helped reduce our I/O as well.We measured correctness (by row counts returned and the diff of exact table results) and latency, and used a tiered migration serving 1%, 5%, 10%, 20%, 50% and then 100% from ClickHouse. We eventually also realized latency gains which is discussed in later sections.Overall, our migration experience went smoothly. Use cases for campaigns, experimentation, forecasting, and spend tracking underwent this migration process. We communicated with customer teams and ensured correct read query translations from Druid to ClickHouse, as well as running the above benchmarking tests and performance analysis. Throughout the process, we worked on some query optimizations (see the“ClickHouse Query Optimizations” section below) for certain scenarios where the latency was higher than desired.Current ClickHouse Architecture at LyftWe use Altinity’s Kubernetes Operator to deploy the ClickHouse clusters. Currently, we run the 21.7 version of ClickHouse and have plans to upgrade to the latest version. The storage which we use are co-located EBS volumes in our stateful Kubernetes cluster. We run our clusters in HA mode with general-purpose AWS M5-type compute instances with our database objects being replicated across nodes. We do not use sharding on our clusters currently but there are plans to optimize the cluster performance as we scale more.ClickHouse data ingestion is discussed in detail below. ClickHouse read querying is served through our internal Proxy with ACLs and visualization tools such as Mode.For our ClickHouse infrastructure, we handle ingestion through three separate pipelines.Kafka → ClickHouse: this is primarily used by our services which rely on a pub-sub model. ClickHouse is one of the subscribers to this data. There is native support in ClickHouse for the KafkaTableEngine, which uses a pull-based mechanism to read from Kafka cluster topics. We ingest up to 2 billion records per day into ClickHouse from Kafka-based ingestion.Kinesis → Flink → ClickHouse: this ingestion scheme populates our events data in ClickHouse. Events teams, who need their data in ClickHouse for analytics, onboard through this ingestion mechanism. Lyft generates about 600 million rows per day from Flink ingestion alone, in ClickHouse.Trino → Cron → ClickHouse: we also support batch ingestion from our offline systems through Trino. This is primarily used to export our marketplace health derived datasets for quick slice and dice in determining marketplace health.Moving forward, we will explore merging our real-time pipelines to ingest everything through Kafka for simplified architecture and costs.Sorting KeyThe sorting key helps determine how data is physically organized on disk and speeds up query execution times. When data is sorted on particular column(s) frequently used in queries, the database can skip large portions of irrelevant data during query execution. For our time-series based datasets, many of the tables are sorted on event_time (or occurred_at time). This also helps with time-based range queries in the system. Along with reduced I/O due to sequential access of such data, sorting keys strongly help with query performance.Choosing the right sorting key can easily be answered based on the type of queries that will run on those datasets.Skip IndicesWhen querying data with filters where the sorting key is not defined, we risk a full scan of each column in order to apply the WHERE clause. To evaluate these non-indexed queries, we make use of Skip indices where ClickHouse uses the index file to understand which blocks of data can be skipped. At Lyft, we primarily have used minmax indices to increase our query performance.ALTER TABLE <database>.<table> ON cluster <cluster> ADD INDEX logged_at_idx (logged_at) TYPE minmax GRANULARITY 8192We then materialize this example index on already existing data. This allows ClickHouse to skip unnecessary data blocks and minimize I/O operations when running the queries, lowering our end-to-end latencies.ProjectionsOne of our customer requirements for the migration was to maintain, or lower, the latency. While most of the transpiled ClickHouse queries ran with a faster execution time, our health queries that regularly polled the latest experiment timestamps were much slower.We utilized the power of ClickHouse projections, specifically creating and materializing the projection SELECT max(occurred_at) to pre-compute and store the latest timestamp. With this projection, only a couple thousand rows are scanned rather than the entire table, speeding up our health checks from ~20 seconds to sub-second.Today, we serve the following use cases on ClickHouse:Market healthPolicy reporting for bikes & scootersSpend trackingForecasting and market signalingExperimentationCampaignsAt Lyft, we ingest tens of millions of rows and execute millions of read queries in ClickHouse daily with volume continuing to increase. On a monthly basis, this means reading and writing more than 25TB of data.While our migration process went smoothly, some pain points arose with our updated system:Query caching performance—we have sometimes seen largely variable latencies, making it hard to promise SLAs for certain workloads to customers. Using query cache with appropriate cache size and TTL helps. Initially, when the cache is getting hydrated, the query performance can vary but the latency spikes are very short-lived.Kafka issues with MSK integration—we use Kafka Table Engine extensively, which is a pull-based, native supported ingestion mechanism in ClickHouse. In Kafka Table Engine, the authentication scheme used for SASL is SCRAM-SHA-256 to ingest from in-house Kafka. librdkafka is a C library used by ClickHouse for data ingestion from Kafka. While trying to ingest from Amazon Managed Kafka (AWS MSK) for a new use case, the SASL mechanism AWS_MSK_IAM (AWS’s SASL mechanism) is not supported in librdkafka (Confluent). The solution over here is to try Kafka Connect / MSK Connect, which we will tackle once we upgrade ClickHouse.Ingestion pipeline resiliency—our Flink ingestion into ClickHouse is a push-based model and when ZooKeeper is doing conflict resolutions, it can mark the table as readonly, causing failures in the push model. We will explore a better push-based approach with Kafka Connect into ClickHouse and use Kafka between Flink and ClickHouse to stream the writes and store in Kafka while the Kafka Connect can batch write into ClickHouse.There are three main areas for expansion when it comes to ClickHouse at Lyft:Stabilize batch ingestion architecture with streaming Kinesis ingestion through Kafka—we are working to stabilize our batch ingestion architecture to a more resilient orchestration platform, Apache Airflow, largely used at Lyft.Move Flink SQL to ClickHouse—certain Flink transformations can be directly done in destination in ClickHouse. We plan to leverage for multiple new use cases in ClickHouse.Retire ZooKeeper—we are currently using Apache ZooKeeper for state management in ClickHouse. We will soon be upgrading ClickHouse and exploring ClickHouse Keeper to reduce external component dependencies.