By Ohad Shacham, Yonatan Gottesman, Edward Bortnikov Scalable Systems Research, Verizon/Oath
Omid, an open source transaction processing platform for Big Data, was born as a research project at Yahoo (now part of Verizon), and became an Apache Incubator project in 2015. Omid complements Apache HBase, a distributed key-value store in Apache Hadoop suite, with a capability to clip multiple operations into logically indivisible (atomic) units named transactions. This programming model has been extremely popular since the dawn of SQL databases, and has more recently become indispensable in the NoSQL world. For example, it is the centerpiece for dynamic content indexing of search and media products at Verizon, powering a web-scale content management platform since 2015.
Today, we are excited to share a new chapter in Omid’s history. Thanks to its scalability, reliability, and speed, Omid has been selected as transaction management provider for Apache Phoenix, a real-time converged OLTP and analytics platform for Hadoop. Phoenix provides a standard SQL interface to HBase key-value storage, which is much simpler and in many cases more performant than the native HBase API. With Phoenix, big data and machine learning developers get the best of all worlds: increased productivity coupled with high scalability. Phoenix is designed to scale to 10,000 query processing nodes in one instance and is expected to process hundreds of thousands or even millions of transactions per second (tps). It is widely used in the industry, including by Alibaba, Bloomberg, PubMatic, Salesforce, Sogou and many others.
We have just released a new and significantly improved version of Omid (1.0.0), the first major release since its original launch. We have extended the system with multiple functional and performance features to power a modern SQL database technology, ready for deployment on both private and public cloud platforms.
A few of the significant innovations include:
Protocol re-design for low latency
The early version of Omid was designed for use in web-scale data pipeline systems, which are throughput-oriented by nature. We re-engineered Omid’s internals to now support new ultra-low-latency OLTP (online transaction processing) applications, like messaging and algo-trading. The new protocol, Omid Low Latency (Omid LL), dissipates Omid’s major architectural bottleneck. It reduces the latency of short transactions by 5 times under light load, and by 10 to 100 times under heavy load. It also scales the overall system throughput to 550,000 tps while remaining within real-time latency SLAs. The figure below illustrates Omid LL scaling versus the previous version of Omid, for short and long transactions.
Throughput vs latency, transaction size=1 op
Throughput vs latency, transaction size=10 ops
Figure 1. Omid LL scaling versus legacy Omid. The throughput scales beyond 550,000 tps while the latency remains flat (low milliseconds).
ANSI SQL support
Phoenix provides secondary indexes for SQL tables — a centerpiece tool for efficient access to data by multiple keys. The CREATE INDEX command is on-demand; it is not allowed to block already deployed applications. We added Omid support for accomplishing this without impeding concurrent database operations or sacrificing consistency. We further introduced a mechanism to avoid recursive read-your-own-writes scenarios in complex queries, like “INSERT INTO T … SELECT FROM T …” statements. This was achieved by extending Omid’s traditional Snapshot Isolation consistency model, which provides single-read-point-single-write-point semantics, with multiple read and write points.
Performance improvements
Phoenix extensively employs stored procedures implemented as HBase filters in order to eliminate the overhead of multiple round-trips to the data store. We integrated Omid’s code within such HBase-resident procedures, allowing for a smooth integration with Phoenix and also reduced the overhead of transactional reads (for example, filtering out redundant data versions).
We collaborated closely with the Phoenix developer community while working on this project, and contributed code to Phoenix that made Omid’s integration possible. We look forward to seeing Omid’s adoption through a wide range of Phoenix applications. We always welcome new developers to join the community and help push Omid forward!
Huge milesone for the Scalable Systems team! Congrats!
By Dmitry Basin, Edward Bortnikov, Anastasia Braginsky, Eshcar Hillel, Idit Keidar, Hagar Meir, Gali Sheffi
Real-time analytics applications are on the rise. Modern decision support and machine intelligence engines strive to continuously ingest large volumes of data while providing up-to-date insights with minimum delay. For example, in Flurry Analytics, an Oath service which provides mobile developers with rich tools to explore user behavior in real time, it only takes seconds to reflect the events that happened on mobile devices in its numerous dashboards. The scalability demand is immense – as of late 2017, the Flurry SDK was installed on 2.6B devices and monitored 1M+ mobile apps. Mobile data hits the Flurry backend at a huge rate, updates statistics across hundreds of dimensions, and becomes queryable immediately. Flurry harnesses the open-source distributed interactive analytics engine named Druid to ingest data and serve queries at this massive rate.
In order to minimize delays before data becomes available for analysis, technologies like Druid should avoid maintaining separate systems for data ingestion and query serving, and instead strive to do both within the same system. Doing so is nontrivial since one cannot compromise on overall correctness when multiple conflicting operations execute in parallel on modern multi-core CPUs. A promising approach is using concurrent data structure (CDS) algorithms which adapt traditional data structures to multiprocessor hardware. CDS implementations are thread-safe – that is, developers can use them exactly as sequential code while maintaining strong theoretical correctness guarantees. In recent years, CDS algorithms enabled dramatic application performance scaling and became popular programming tools. For example, Java programmers can use the ConcurrentNavigableMap JDK implementations for the concurrent ordered key-value map abstraction that is instrumental in systems like Druid.
Today, we are excited to share Oak, a new open source project from Oath, available under the Apache License 2.0. The project was created by the Scalable Systems team at Yahoo Research. It extends upon our earlier research work, named KiWi.
Oak is a Java package that implements OakMap – a concurrent ordered key-value map. OakMap’s API is similar to Java’s ConcurrentNavigableMap. Java developers will find it easy to switch most of their applications to it. OakMap provides the safety guarantees specified by ConcurrentNavigableMap’s programming model. However, it scales with the RAM and CPU resources well beyond the best-in-class ConcurrentNavigableMap implementations. For example, it compares favorably to Doug Lea’s seminal ConcurrentSkipListMap, which is used by multiple big data platforms, including Apache HBase, Druid, EVCache, etc. Our benchmarks show that OakMap harnesses 3x more memory, and runs 3x-5x faster on analytics workloads.
OakMap’s implementation is very different from traditional implementations such as ConcurrentSkipListMap. While the latter maintains all keys and values as individual Java objects, OakMap stores them in very large memory buffers allocated beyond the JVM-managed memory heap (hence the name Oak - abbr. Off-heap Allocated Keys). The access to the key-value pairs is provided by a lightweight two-level on-heap index. At its lower level, the references to keys are stored in contiguous chunks, each responsible for a distinct key range. The chunks themselves, which dominate the index footprint, are accessed through a lightweight top-level ConcurrentSkipListMap. The figure below illustrates OakMap’s data organization.
OakMap structure.
The maintenance of OakMap’s chunked index in a concurrent setting is the crux of its complexity as well as the key for its efficiency. Experiments have shown that our algorithm is advantageous in multiple ways:
1. Memory scaling. OakMap’s custom off-heap memory allocation alleviates the garbage collection (GC) overhead that plagues Java applications. Despite the permanent progress, modern Java GC algorithms do not practically scale beyond a few tens of GBs of memory, whereas OakMap scales beyond 128GB of off-heap RAM.
2. Query speed. The chunk-based layout increases data locality, which speeds up both single-key lookups and range scans. All queries enjoy efficient, cache-friendly access, in contrast with permanent dereferencing in object-based maps. On top of these basic merits, OakMap provides safe direct access to its chunks, which avoids an extra copy for rebuilding the original key and value objects. Our benchmarks demonstrate OakMap’s performance benefits versus ConcurrentSkipListMap:
A) Up to 2x throughput for ascending scans.
B) Up to 5x throughput for descending scans.
C) Up to 3x throughput for lookups.
3. Update speed. Beyond avoiding the GC overhead typical for write-intensive workloads, OakMap optimizes the incremental maintenance of big complex values – for example, aggregate data sketches, which are indispensable in systems like Druid. It adopts in situ computation on objects embedded in its internal chunks to avoid unnecessary data copy, yet again. In our benchmarks, OakMap achieves up to 1.8x data ingestion rate versus ConcurrentSkipListMap.
With key-value maps being an extremely generic abstraction, it is easy to envision a variety of use cases for OakMap in large-scale analytics and machine learning applications – such as unstructured key-value storage, structured databases, in-memory caches, parameter servers, etc. For example, we are already working with the Druid community on rebuilding Druid’s core Incremental Index component around OakMap, in order to boost its scalability and performance.
We look forward to growing the Oak community! We invite you to explore the project, use OakMap in your applications, raise issues, suggest improvements, and contribute code. If you have any questions, please feel free to send us a note on the Oak developers list: [email protected]. It would be great to hear from you!
Congrats to Eshcar Hillel and Anastasia Braginsky with the presentation of their HBase work accepted to the prestigious Committers Track at Hadoop Summit Europe! Don’t miss Eshcar’s talk at the conference - Dublin, 13 Apr.
Title: HBase on Steroids with In-Memory Flush and Compaction
Abstract:
Real-time HBase application performance depends critically on the amount of I/O in the datapath. We describe an optimization of HBase for high-churn applications that frequently insert/update/delete the same keys - for example, high-speed queuing systems, e-commerce, etc. Our work improves upon the traditional log-sequenced-merge (LSM) design of HBase storage, which absorbs updates in memory and merges them with on-disk data in the background. We introduce Accordion - a novel in-memory compaction algorithm (JIRA HBase-13408), which exploits the churn to eliminate data redundancies in-place, before flushing the data to disk. This minimizes the number of disk writes and maximizes the number of reads served from RAM. With this optimization, HBase performance comes close to in-memory databases, featuring high speed and high predictability without giving up on data durability guarantees. Extensive evaluation shows up to 3.5x improvement in tail random-access latency and up to 3x improvement in scan throughput under realistic scenarios.
Ohad Shacham has just presented Omid to the developer community at the Haifa Big Data meetup, with the emphasis on recent innovation in scalability and reliability. Great talk!
Congrats to Sasha Spiegelman (our ex-intern), Guy Gueta and Idit Keidar with their paper “Transactional Data Structure Libraries” accepted to PLDI (#1 conference in design and implementation of programming languages). The acceptance rate was 16.1% this year. To be presented at PLDI 2016 in Santa Barbara, CA, June 2016.
Title Transactional Data Structure Libraries
Abstract
We introduce transactions into libraries of concurrent data
structures; such transactions can be used to ensure atomicity
of sequences of data structure operations. The resulting
transactional data structures strike a balance between the
ease-of-programming of transactions and the efficiency of
custom-tailored data structures. We exemplify this concept
by designing and implementing a library supporting transactions
on any number of maps, sets (implemented as skiplists)
and queues. Our library offers efficient and scalable transactions,
which are an order of magnitude faster than state-of-the-art
transactional memory toolkits. Moreover, our approach
treats stand-alone data structure operations (like put
and enqueue) as first class citizens, and allows them to execute
with virtually no overhead, at the speed of the original
data structure library.
We are thrilled to partner with the world’s
largest esports company, ESL to deliver their core content to Yahoo Esports starting with the Intel® Extreme Masters World Championship in
Katowice, March 4-6, 2016.
This weekend, we will bring fans inside the
action of one of the industry’s largest championship events, which had over 75
million sessions last year. Fans will enjoy live tournament action, in-depth
coverage and summaries from our experts, real-time scoring and all of this
delivered in HD quality on our global streaming video platform.
ESL’s events have consistently broken viewership
records, and the company continues to deliver the most legendary esports
moments across multiple games and platforms. For the live stream of the Intel
Extreme Masters, we will provide ESL’s audience a world class viewing
experience, delivering HD video quality with its video technology that live
streamed the NFL to over 15 million people around the world last October.
“We’re very excited to see this partnership kick
off at our most important event of the year, the Intel Extreme Masters World
Championship in Katowice, Poland.” said Nik Adams, Senior VP Global Sales &
Business Development at ESL. “Bringing our content out on Yahoo Esports channels
will give many more viewers around the world more opportunities and greater
flexibility in watching the world’s best esports competitions.”
Read more about our launch here. We have hired
some of the top talent in the industry including Andrea Rene, Travis Gafford,
Taylor Cocke, and Dylan Walker to bring unique perspectives on the
personalities, key matchups, upcoming releases and many of the untold stories
surrounding one of the fastest growing online sports.
We are going to shatter all esports streaming records this weekend.
By Andy Feng(@afeng76), Jun Shi and Mridul Jain (@mridul_jain), Yahoo Big ML Team
Introduction
Deep learning (DL) is a critical capability required by Yahoo product teams (ex. Flickr, Image Search) to gain intelligence from massive amounts of online data. Many existing DL frameworks require a separated cluster for deep learning, and multiple programs have to be created for a typical machine learning pipeline (see Figure 1). The separated clusters require large datasets to be transferred among them, and introduce unwanted system complexity and latency for end-to-end learning.
Figure 1: ML Pipeline with multiple programs on separated clusters
As discussed in our earlier Tumblr post, we believe that deep learning should be conducted in the same cluster along with existing data processing pipelines to support feature engineering and traditional (non-deep) machine learning. We created CaffeOnSpark to allow deep learning training and testing to be embedded into Spark applications (see Figure 2).
Figure 2: ML Pipeline with single program on one cluster
CaffeOnSpark: API & Configuration and CLI
CaffeOnSpark is designed to be a Spark deep learning package. Spark MLlib supported a variety of non-deep learning algorithms for classification, regression, clustering, recommendation, and so on. Deep learning is a key capacity that Spark MLlib lacks currently, and CaffeOnSpark is designed to fill that gap. CaffeOnSpark API supports dataframes so that you can easily interface with a training dataset that was prepared using a Spark application, and extract the predictions from the model or features from intermediate layers for results and data analysis using MLLib or SQL.
Figure 3: CaffeOnSpark as a Spark Deep Learning package
Scala program in Figure 4 illustrates how CaffeOnSpark and MLlib work together:
L1-L4 … You initialize a Spark context, and use it to create CaffeOnSpark and configuration object.
L5-L6 … You use CaffeOnSpark to conduct DNN training with a training dataset on HDFS.
L7-L8 …. The learned DL model is applied to extract features from a feature dataset on HDFS.
L9-L12 … MLlib uses the extracted features to perform non-deep learning (more specifically logistic regression for classification).
L13 … You could save the classification model onto HDFS.
As illustrated in Figure 4, CaffeOnSpark enables deep learning steps to be seamlessly embedded in Spark applications. It eliminates unwanted data movement in traditional solutions (as illustrated in Figure 1), and enables deep learning to be conducted on big-data clusters directly. Direct access to big-data and massive computation power are critical for DL to find meaningful insights in a timely manner.
CaffeOnSpark uses the configuration files for solvers and neural network as in standard Caffe uses. As illustrated in our example, the neural network will have a MemoryData layer with 2 extra parameters:
CaffeOnSpark applications will be launched by standard Spark commands, such as spark-submit. Here are 2 examples of spark-submit commands. The first command uses CaffeOnSpark to train a DNN model saved onto HDFS. The second command is a custom Spark application that embedded CaffeOnSpark along with MLlib.
Figure 5 describes the system architecture of CaffeOnSpark. We launch Caffe engines on GPU devices or CPU devices within the Spark executor, via invoking a JNI layer with fine-grain memory management. Unlike traditional Spark applications, CaffeOnSpark executors communicate to each other via MPI allreduce style interface via TCP/Ethernet or RDMA/Infiniband. This Spark+MPI architecture enables CaffeOnSpark to achieve similar performance as dedicated deep learning clusters.
Many deep learning jobs are long running, and it is important to handle potential system failures. CaffeOnSpark enables training state being snapshotted periodically, and thus we could resume from previous state after a failure of a CaffeOnSpark job.
Open Source
In the last several quarters, Yahoo has applied CaffeOnSpark on several projects, and we have received much positive feedback from our internal users. Flickr teams, for example, made significant improvements on image recognition accuracy with CaffeOnSpark by training with millions of photos from the Yahoo Webscope Flickr Creative Commons 100M dataset on Hadoop clusters.
CaffeOnSpark is beneficial to deep learning community and the Spark community. In order to advance the fields of deep learning and artificial intelligence, Yahoo is happy to release CaffeOnSpark at github.com/yahoo/CaffeOnSpark under Apache 2.0 license.
CaffeOnSpark can be tested on an AWS EC2 cloud or on your own Spark clusters. Please find the detailed instructions at Yahoo github repository, and share your feedback at [email protected]. Our goal is to make CaffeOnSpark widely available to deep learning scientists and researchers, and we welcome contributions from the community to make that happen. .
It is hard to believe that 10 years have already passed since Hadoop was started at Yahoo. We initially applied it to web search, but since then, Hadoop has become central to everything we do at the company. Today, Hadoop is the de facto platform for processing and storing big data for thousands of companies around the world, including most of the Fortune 500. It has also given birth to a thriving industry around it, comprised of a number of companies who have built their businesses on the platform and continue to invest and innovate to expand its capabilities.
At Yahoo, Hadoop remains a cornerstone technology on which virtually every part of our business relies on to power our world-class products, and deliver user experiences that delight more than a billion users worldwide. Whether it is content personalization for increasing engagement, ad targeting and optimization for serving the right ad to the right consumer, new revenue streams from native ads and mobile search monetization, data processing pipelines, mail anti-spam or search assist and analytics – Hadoop touches them all.
When it comes to scale, Yahoo still boasts one of the largest Hadoop deployments in the world. From a footprint standpoint, we maintain over 35,000 Hadoop servers as a central hosted platform running across 16 clusters with a combined 600 petabytes in storage capacity (HDFS), allowing us to execute 34 million monthly compute jobs on the platform.
But we aren’t stopping there, and actively collaborate with the Hadoop community to further push the scalability boundaries and advance technological innovation. We have used MapReduce historically to power batch-oriented processing, but continue to invest in and adopt low latency data processing stacks on top of Hadoop, such as Storm for stream processing, and Tez and Spark for faster batch processing.
What’s more, the applications of these innovations have spanned the gamut – from cool and fun features, like Flickr’s Magic View to one of our most exciting recent projects that involves combining Apache Spark and Caffe. The project allows us to leverage GPUs to power deep learning on Hadoop clusters. This custom deployment bridges the gap between HPC (High Performance Computing) and big data, and is helping position Yahoo as a frontrunner in the next generation of computing and machine learning.
We’re delighted by the impact the platform has made to the big data movement, and can’t wait to see what the next 10 years has in store.
Cheers!
Happy birthday, congrats! Great work by many current and former Yahoo’s.
By Edward Bortnikov, Idit Keidar, Ohad Shacham (Search Systems, Yahoo Labs), and Francisco Perez-Sorrosal (Yahoo Search)
Omid, discussed in detail in our previous posts, offers transactional access to data persistently stored in HBase. Here, we explain how Omid is made highly available (HA). Omid’s availability is obviously critical for smooth operation of its applications, and should thus not be inferior to the availability guarantees of the underlying HBase store. High availability is a brand new feature in Omid.
In very-high-end Omid-powered applications, the conjunction of Omid and HBase is expected to work round the clock, and exhibit a mean-time-to-recover (MTTR) of just a few seconds. Moreover, any measures taken for high availability should not hamper performance in the normal, fault-free, case.
Omid supports both HA and non-HA modes. The latter serves settings in which the system administrator prefers manual recovery, and a longer MTTR can be tolerated; for example, these can be non-critical infrastructures where the additional resources for running a backup TSO cannot be spared.
High Availability via Primary-Backup Architecture
Omid is designed around a centralized transaction processing service (transaction status oracle, or TSO), which is responsible for serializing transaction commit points and resolving inter-transaction conflicts. This design renders the TSO critical for the entire system’s availability. Our focus is thus on the high-availability architecture behind the TSO. As most HA solutions, it is expected to satisfy two requirements: (1) low MTTR, and (2) negligible impact on the system’s mainstream (failure-free) operation.
Omid’s HA solution is based on the primary-backup paradigm: the TSO is implemented as a process pair consisting of a primary process and a backup process. The former serves all client requests, whereas the latter is in hot-standby mode, ready to take over if the primary fails. The process of transferring the client-serving responsibilities from the primary to the backup is called failover. Failure detection is timeout-based – namely, if the primary TSO does not re-assert its existence within a configured period, it is deemed failed, and the backup starts acting as a new primary.
Note that the primary and backup run independently on different machines, and the time it takes the primary to inform the backup that it is alive can be unpredictable due to processing delays (e.g., garbage-collection stalls, long I/O operations) and unexpected network failures. On the other hand, in order to provide a low MTTR, we cannot set the timeout conservatively so as to ensure that a live primary is never detected as faulty. We therefore have to account for the case that the backup performs a failover and takes over the service while the primary is operational. To this end, we use a Zookeeper object to track the current primary. The primary regularly re-asserts itself, unless it sees that it has been supplanted; the backup constantly tracks this object, and if the current primary becomes stale, updates the object to reflect the fact that it is now the primary.
The primary TSO advertises its identity to clients, also via Zookeeper. This way, the Omid library learns about the new primary upon failover and facilitates reconnection. Client applications must learn about the output of the pending commit requests to the old primary before retrying the transaction in order to avoid data corruption.
The Failover Challenge
A reliable system must honor all the operations successfully completed in the past, regardless of failovers. Namely, if a transaction receives a success response to its commit request, then future transactions must observe its updates. On the other hand, if a transaction aborts for whatever reason, then no future transaction should see its updates.
In Omid, the TSO allocates monotonically increasing commit timestamps to committing transactions. In addition, when a transaction begins, it obtains a read timestamp, which reflects the commit timestamp of the last transaction to commit before it began. The transaction then reads the latest version of each object that does not exceed its read timestamp. As explained in our first post, this yields a correctness semantics called snapshot isolation. The critical part of system state that affects correctness is the persistent commit table (CT), which reliably stores the mapping from transaction ids (txid) to commit timestamps (commit ts). The state recorded in the CT captures the system’s guarantee to its clients. As described in the previous post, a transaction is committed if and only if a (txid, commit ts) pair for it exists in the CT. Today, we will scrutinize this approach in a failure-prone world.
The key challenge faced by HA systems is known as split brain in the theory of distributed systems - the risk for conflicting updates occurring independently at distinct places. In primary-backup systems, split-brain manifests when the backup detects the primary as faulty whereas the latter is either still operating or the operations undertaken by it are in the process of taking effect. If treated naively, such lack of synchronization may lead to race conditions that ultimately affect the system’s correctness.
Let us take a closer look at this challenge now. There are scenarios in which the primary TSO can be falsely detected as failed, for example, due to a Java garbage collection stalls. The system therefore can end up with two concurrent TSO’s. The primary TSO therefore actively checks if a backup has replaced it, and if so, “commits suicide”, i.e., halts. However, it is still possible to have a (short) window between the failover and the primary’s discovery of the emergence of a new primary.
When a TSO fails, there may be some pending transactions that began with it (i.e., performed their begin transaction using this TSO) and did not commit (they might have either not attempted to commit yet, or may have attempted to commit with the old TSO, but the TSO did not complete logging their commit in the CT). Such pending transactions are deemed aborted.
To prevent new transactions from seeing partial updates of transactions handled by the old TSO, the new TSO needs to employ timestamps that exceed all those committed (or that might still be committed) by the old TSO. However, this separation is challenged by the potential concurrency of two TSOs. For example, if a TSO fails immediately after issuing a write to the CT that takes nonzero time, an old transaction may end up committing after the new TSO has begun handling new transactions. Unless handled carefully, this can cause a new transaction to see partial updates of an old one, as illustrated in the diagram below. To avoid this scenario, we must ensure that once a new transaction obtains a read timestamp, the commit/abort status of all transactions with smaller commit timestamps does not change.
One way to address the above challenge is via mutual exclusion, that is, making sure that at most one TSO commits operations at a time. However, this solution would entail synchronization upon each commit, not only at failover times, which would adversely affect the system’s performance. We therefore forgo this option, and implement a different HA algorithm in Omid. This algorithm does not incur any penalty in failure-free scenarios.
HA Algorithm
The failover algorithm in Omid tolerates temporary overlap between the primary and backup TSO’s activity periods. To ensure correctness despite of such periods, we first have to ensure that the transactions committed by the old TSO and the new TSO are safely separated in time. Namely, (1) all the timestamps assigned by the new TSO exceed all those assigned by the old one, and (2) after a transaction with read timestamp tsr begins, no transaction that will end up with a commit timestamp tsc < tsr can update any additional data items (though it may still commit after this time). Beyond that, we have to allow the new TSO to safely figure out the status of pending transaction served by the old TSO. Recall from our previous post that in Omid, transactions write their updates tentatively before committing, and upon commit, update the written entries with their commit timestamp. Therefore, our failover mechanism has to ensure (3) when a transaction reads a tentative update, it can determine whether this update will be committed with a timestamp smaller than its read timestamp or not.
One way to meet (1) and (2) is to have the TSO publish the read timestamp it allots as part of initiating a transaction (e.g., via Zookeeper). Before committing, a TSO would check this location. If a timestamp greater than its last committed is detected, it would deduce that failover has happened, abort the transaction attempting to commit, and halt. This approach is plausible but would cast synchronization overhead on every begin and commit operation. Instead, the HA algorithm implemented in Omid uses locally-checkable leases. Leases are essentially locks that live for a limited time. With them, we can both detect TSO failures and allocate timestamp ranges in big chunks, thereby eliminating the synchronization overhead most of the time.
The challenge of meeting (3) is that transactions cannot consult the old TSO process, as it might have failed. In order to prevent in-flight writes of the old TSO to the CT from “changing the history” retroactively, we allow transactions served by the new TSO to proactively abort ones coming from the previous TSO. Specifically, when a read encounters a tentative update by a transaction that is not present in the CT, it forces that transaction to abort. We call this invalidation and illustrate it in the following figure. Invalidation is used judiciously only when failover might be taking place, as discussed in the next section of this post.
Technically, the client performs the invalidation using an atomic read-modify-write (RMW) operation (put-if-absent flavor) to the CT, which adds an attribute to the CT record marking that the incomplete transaction has an “invalid” status. Any subsequent attempt to commit it (by adding it to the CT) will see this record, and thus fail. In addition, every read of a tentative update must check its invalid field in the CT, and ignore the update if the transaction has already been invalidated.
Implementation Details
Let us now dive into some implementation details, and see how they guarantee the system’s safety. The TSO process pair, namely the primary and the backup, coordinate their actions via two shared Zookeeper znodes. One serves for allocating timestamp ranges called epochs. A TSO claims ownership of an epoch before allocating timestamps of this range to transactions. Upon failover, the new primary picks the next epoch in a way that ensures property (1) above.
The second znode implements the lease. The lease is active for 𝞓 time, for some parameter 𝞓 whose production default is 10 seconds. The lease is claimed via an atomic znode RMW request. The backup TSO polls the lease every 𝞓 time and tries to claim it if it has expired. The primary TSO tries to reclaim it before it expires, and halts if it cannot renew the lease prior to a guard period 𝞓’ < 𝞓 before its expiration; by default, 𝞓’ = ⅓ 𝞓. Before initiating a write to the CT, the TSO checks the lease, and does not attempt to commit the transaction within the guard period. Since the TSO still holds the lease when initiating the commit, and since the commit is initiated after all writes that are part of the transaction complete, property (2) above is also ensured.
If the write to CT is slow and overruns the guard period, it is still possible for the commit of an old transaction to overlap the execution of a newer one by a new TSO, as illustrated in the figure above. So extra measures are needed in order to ensure property (3). To this end, we use the aforedescribed invalidation mechanism. However, we limit its scope as follows: a read needs to invalidate a transaction only if it pertains to an earlier epoch, and a commit needs to check whether the transaction has been invalidated only if its lease has expired. These scenarios can only happen upon failover, and therefore the synchronization overhead it induces does not affect mainstream commits.
In order to determine whether it should perform invalidation, the TSO checks the lease after every write to the CT. Note the lease check is a local operation, comparing the local clock to the obtained lease time. If the lease has expired, the TSO halts immediately, without returning any response to its client. From the client’s perspective, this is equivalent to a TSO crash in the course of a commit request, i.e., an abort. Adding this check after writing to the CT enables the TSO to write to the CT using regular write operations, avoiding expensive read-modify-writes, without having to worry about the invalidation path. Note that since failure detection is also lease-based, it is guaranteed that the old TSO can overtake at most one “rogue” in flight write to the CT in parallel with the new primary taking over.
Summary
We presented Omid’s high-availability (HA) algorithm for 24x7 applications, as well as the configuration necessary to operate in the HA mode. Our algorithm provides HA at no cost to mainstream data access. In the next post, we will discuss Omid’s performance.
Omid-HA is out, at last! Thanks to all the colleagues for driving.