Cassandra Architecture PDF
Cassandra Architecture PDF
Cassandra
Overview
Relational Databases
• Transactions with ACID properties - Atomicity, Consistency, Isolation &
Durability
• Adherence to Strong Schema of data being written/read
• Real time query management ( in case of data size < 10 Tera bytes )
• Execution of complex queries involving join & group by clauses
• Support ad-hoc queries
• Not designed to work in modern distributed architecture
• Data are organized in the 3NF form - storage and memory efficient
Third normal form does not scale
• Queries are unpredictable
• Users are impatient
• Data must be denormalized
• If data > memory, performance degrade
• Disk seeks are the worst
Problems with sharding
• Data is all over the place
• No more joins
• No more aggregations
• Denormalized all thing
• Querying secondary index requires hitting every shard
• Adding shards requires manual movement of data
• Manage Schema Changes
Challenges with high availability
• Master failover is messy
• Multi DC failover is harder
• Downtime is frequent
• Change in database settings
• Drive, power supply failures
• OS updates
NoSQL Databases
http://techblog.netflix.com/2011/11/benchmarking-
cassandra-scalability-on.html
Cassandra transactions are atomic, isolated and
durable
• Atomicity
• In Cassandra, a write operation is atomic at the partition level, meaning the
insertions or updates of two or more rows in the same partition are treated as one
write operation. A delete operation is also atomic at the partition level.
• Isolation
• Cassandra write and delete operations are performed with full row-level isolation.
This means that a write to a row within a single partition on a single node is only
visible to the client performing the operation – the operation is restricted to this
scope until it is complete.
• Durability
• All writes to a replica node are recorded both in memory and in a commit log on
disk before they are acknowledged as a success.
DB Ranking
https://db-engines.com/en/ranking
Cassandra Use Cases
• Over 1000 companies use Cassandra today
• Accenture uses Cassandra for message
- 2 billion messages / day
• Netflix use Cassandra for their catalog and
playlist
- 10 million transactions / sec
• EBay - personalization, messaging and fraud
detection
- Connects 100 millions customer with 400
million items and stored 250 TB data
Cassandra Performance
University of Toronto Study
Nodes (no of transactions/sec)
Load Type 1 2 4 8 16 32
Load Process 18,686.43 31,144.21 53,067.62 86,924.94 173,001.20 326,427.07
Read Mostly Workload 11,173.75 18,137.80 39,481.33 65,963.30 116,363.93 221,073.15
Balanced Read/Write 18,925.59 35,539.69 64,911.39 117,237.91 210,237.90 384,682.44
Read Modify Read 10,555.73 19,919.52 37,418.16 69,221.07 141,715.80 256,165.66
Mixed Operational and
Analytical Workload 4,455.63 9,343.11 19,737.82 36,177.48 73,600.53 120,755.00
Insert Mostly 24,163.62 47,974.09 85,324.69 159,945.39 NA NA
https://academy.datastax.com/planet-cassandra/nosql-performance-benchmarks
Cassandra Limitations
• Does not support query time join,
• Cassandra recommends “join on write”
• Data duplication can scale, but joins cannot
• Does not support group by queries
• Supports a limited variation of sum, avg functions
• Does not support order by on any ad-hoc columns
• Does not support where clause on ad-hoc columns
• Cassandra recommends duplicate data based on the query requirements
• Secondary Index is supported but anti pattern
• Recommends use materialized views
• 3NF Data modeling is antipattern in Cassandra
• Recommends query first data model
Node
• Node is single server running Cassandra JVM process
• Node can handle typically 3k-5k transactions / secs / core
• Node can manage about 1-3 TB data
• Node generally have 8-32 GB of RAM
• Node is connected to the other nodes through 10 Gbps lines
• Supports both SSD or Rotational disks in JBOD architecture
Data Centers
and Racks
Data center
rack
Hash Ring
• Cassandra Hash ring
• No config servers, zookeeper
• Data is partitioned around ring
• Data is replicated to RF = N servers
• All nodes hold data and can handle
write and read queries
• Location of the data is determined by
Partition Key
Replication
• Data is replicated automatically
• Specified by replication-factor
• If a machine is down, missing data is replayed
Multi DC
• Typical usage: client writes to local DC and
replicates async to other DC
• Replication factor per key space per data
center
• Datacenters can be physical or logical
dc = DC1
rack=RAC1
Dynamic Snitch
• Layered automatically with other snitches
• Maintains a pulse on each node’s performance
• Determines which node to query replicas from depending the node health
• Tuned on by default for all snitches
Replication Strategy
• Cassandra OOB support 2 strategies
• SimpleStrategy and NetworkTopologyStrategy.
• The SimpleStrategy places replicas at consecutive nodes around the ring,
starting with the node indicated by the partitioner.
• The NetworkTopologyStrategy allows you to specify a different replication
factor for each data center. Within a data center, it allocates replicas to
different racks in order to maximize availability.
Consistent Model
• Per query consistency
• ALL, Quorum, ONE
• How many replicas for query to respond OK
Consistency Levels
Partition Index
Partition Index
Sample
SStable
Reading SSTable using Index Cache
int ● Signed
● 32 bits
PRIMARY KEY ((state), id) PRIMARY KEY ((state), city, id) PRIMARY KEY ((state), city, name
id)
Querying Clustering Columns
• Must provide partition key
• Clustering column can follow thereafter
• You can perform either equality or range on clustering columns
• All equality comparisons must come before inequality comparisons
• Since data is sorted on disk, range searches are a binary search followed
by a sequential read
Allow Filtering
• ALLOW FILTERING relaxes the querying on partition key constraint
• SELECT * FROM users
• You can then query on just clustering columns
• Causes Cassandra to scan all partitions in the table
• Do not use it
• Unless you really have to
• Best on small data sets
• Consider use of Spark!
Operations
Topics
• Adding Nodes
• Bootstrapping a Node into a cluster
• Removing a Node
• Replace a Downed Node
• Run a Repair Operation
• Dividing SSTables with sstableslits
• Create snapshot
• Implement Multiple Data Centers
• Best Practices for Cluster Sizing
• Using Cassandra-stress Tool
Adding Node
• Data capacity problem
• You data has outgrown the node’s hardware capacity
• Reached Traffic Capacity
• You application needs more rapid response with less latency
• Need more operational headroom
• For node repair, compaction, and other resource intensive operation
Bootstrapping
• Simple process that brings a node up to speed
• The amount of time it takes to bootstrap depends on the amount of data
• Can be a long running process
• Node announces itself to ring using seed node
The Bootstrapping Process
• Calculate range(s) of new node, notify ring of these pending ranges
• Calculate the node(s) that currently own these ranges and will no longer
own once the bootstrap completes
• Stream the data from these nodes to the bootstrapping node (monitor
with nodetool netstatus)
• Join the new node to the ring to it can serve traffic
Nodetool Cleanup
• Makes sure no data in left behind in SSTables that are outside the
allocated token ranges
• It essentially copies the SSTables to new SSTables keeping only valid data
• Compaction process will clean up
• Not essentials - it is more a deterministic process
$ bin/nodetool [options] cleanup -- <keyspace> (<table>)
Use flags to specify:
- h [host] [IP Address]
- p port
-pw password
-u username
Replace a Downed Node
• First find the ip address of the down node using nodetool status
command
• In the node, open the cassandra-env.sh
• Swap in the IP address of the dead node as replace_address value in the
JVM option. This will eliminate bootstrapping of the new node.
• Use nodetool removenode to remove the dead node
• Use force option if necessary nodetool assassinate
• You can monitor the process using nodetool netstats command
What if the node was also a seed node?
• Need to add to list of seeds in cassandra.yaml
• Cassandra will not allow seed node to auto-bootstrap
• You will have to run repair on new seed node to do so
• Add a new node making the necessary changes to the cassandra.yaml
file
• Specify new seed node in cassandra.yaml
• Start Cassandra on new seed node
• Run nodetool repair on the new seed node to manually
bootstrap
• Remove the old seed node using nodetool removednode with the
Host ID of the downed node
• Run nodetool cleanup on previously existing nodes
Repair Operation
• Repair is a deliberate process to cope with cluster entropy … a
consistency check
• Entropy can arise from the node that were down longer than hint-
window, dropped mutations or other causes
• A repair operates on all of the nodes in the replica set by default
• Ensures that all replicas have identical copies of a given partition
• Consists of two phases
• Build Merkle tree of the data per partition
• Replicas then compare the differences between their trees and stream the
differences to each other as needed
A Merkle Tree Exchange
• Starts with root of the tree ( a list of one hash value)
• The origin sends the list of hashes at the current level
• The destination diffs the list of hashes against its own, then requests
subtrees that are different
• If there are no differences, the request can terminate
• Repeat steps 2 and 3 until leaf nodes are reached
• The origin sends the values of the keys in the resulting set
Why Repair is Necessary?
• A node’s data can get inconsistent over time (Repair is just a
maintenance action in this case)
• If a node goes down for some time, it misses writes and will need to
catch up
• Sometimes it is best to repair a node:
• If the node has been down longer than the length specified in
max_hint_window_in_ms, the node is out of sync
• Depending on the amount of data, might be faster to repair
• Sometimes it is better to bring the node back as a new node and
bootstrap
• If there is a significant amount of data, might be faster just to bring in a new
node and stream data just to that node
What are Incremental Repairs
• To avoid the need for constant tree
construction, incremental repairs have been
introduced
• Idea is to persist already repaired data and
only calculate merkle tree for sstables that
been created since
• This allows the repair process to stay
performant and lightweight
What are incremental repairs?
• Incremental repairs begin the repair leader sending out a prepare
message to the peers
• Each node builds a Merkle tree from the unrepaired sstables
• This can be distinguished by the repairedAt field in the each sstable’s metadata
• Once the leader receives a merkle tree from each node, it compares the
tree and issues streaming requests
• This is just as in the classic repair case
• Finally the leader issues an anti-compaction command
• Anti-compaction is the process of segregating repaired and unrepaired ranges
into separate sstables
• Repaired sstables are written with a new repairedAt field denoting the time of
repair
Best Practices for Repair
• Run repair weekly
• Run repair on a small number of nodes at a time
• Schedule for low usage hours
• Run repair on a partition or subrange of a partition
Repair Type - Primary and Secondary Ranges
• Primary Range (inside ring) - first node that
data is stored on based on partition key
• Secondary Range (outside rings) - additional
replicas of the same data
• What are the implications of repair?
SSTableSplit - Why?
• You did a nodetool compaction
• Maybe you used SizeTieredCompactionStrategy for a major compaction
• This would result in an excessively large SSTable
• Good idea to split the table
• Using the size tiered compaction, we may have gotten some really large
files over time
• May find yourself with a 200GB file you need to split up
• It is an anti-compaction in a way
Usage: sstablesplit
• Stop Cassandra Node
$ sudo service cassandra stop
$ sstablesplit [options] filename [filename]*
Options:
--debug Displays stack traces
--no-snapshot Do not snapshot the SSTables before splitting
--size Maximum size in MB for output SSTables (default: 50)
--verbose Verbose output
Snapshot
Why Backup? Why Snapshots?
• We do not backup like traditional databases • Snapshot represents a state at a given time
where we copy out all the data • Snapshots create hard link to the file systems
• It is a distributed system; every server or node as opposed to copying data
has a portion of the data • Therefore very fast!
• SSTables are immutable, which is great! • Represents the state of the data files at a
Makes them easy to back up. particular point of time
• Can consist of single table, single keyspace or
multiple keyspace
What is a Snapshot
• Represents the state of the data files at a particular point in time
• Snapshot directory is created (this has pointers)
• Then you can either leave it there or copy it offline to an NFS mount or
copy S3
Incremental Backup
• Incremental backups create a hard link to every SSTable upon flush
• User must manually delete them after creating a new snapshot
• Incremental backups are disabled by default
• Configured in cassandra.yaml file setting increatemental_backups to true
• Need a snapshot before taking incremental backups
• Snapshot information is stored in a snapshots directory under each table
directory
• Snapshot need only be stored once offsite
Location of Snapshot and Incremental Backups
• Snapshots and incremental backups are stored in on each Cassandra
node
• Vulnerable to hardware failures
• Commonly, the files are copied to off-node location
• Open source programs like tablesnap is useful for backing up to S3
• Scripts can be used to automate backup files to another machine: cron + bash
script, rsync etc
Auto Snapshot
• Critical safety factor
• If enabled, before a table is truncated or tables/keyspace is dropped, a
snapshot is taken
• A configuration in cassandra.yaml, default is true
• Recommended to keep it true
How to take Snapshot?
$ bin/nodetool [options] snapshot (-cf <table> | -t
<tag> --keyspace)
We can specify to take a snapshot of
• One more keyspaces
• A table specified to backup data
Clean up Snapshots
• $ bin/nodetool clearsnapshot command removed snapshots
• Same options as nodetool command
• Specify the snapshot file and keyspace
• Not specifying a snapshot name removes all snapshots
• Remember to remove old snapshots before taking new ones - previous
snapshots are not automatically deleted
• To clear snapshots on all nodes at once, use parallel ssh utility - pssh,
clusterssh
How to restore?
• Most common method is to delete current data files and copy the
snapshot and incremental files to the appropriate data directories
• If using incremental backups, copy the contents of the backups directory to
each table directory
• Table schema must already be present in order to use this command
• Restart and repair the node after the file copying is done
• Another method is to use sstableloader
• Great if you are loading it into a different size cluster
• Must be careful about its use as it can add significant load to cluster while
loading
Cluster Wide Backup And Restore
• OpsCenter
• SSH Programs - pssh, clusterssh
• Honorable mention - tablesnap and tablestore
• For Cassandra backup to AWS S3
• Recovery
Multi Data Center Implementation
• Node - the virtual or physical host of a single Cassandra instance
• Rack - a logical grouping of physical related nodes
• Data Center - a logical grouping of a set racks
• Enables geographically aware read and write request routing
• Each node belongs to one rack in one data center
• The identity of each node’s rack and data center may be configured in its
conf/cassandra-rackdc.properties
Adding a Secondary Data Center
• Ensures continuous availability of your data and application
• Live backup
• Improved performance
• Lower latency by serving near from geo location
• Analytics
• One DC is dedicated for transactional load and one for analytics
How cluster operate between data centers
• A data center is a grouping of nodes configured together for replication
purposes
• Data replicates across data centers automatically and transparently
• Consistency level can be specified at LOCAL level for read/write
operations
• Consistency level can also be specified as EACH
• EACH meaning - each data center has its own quorum
Implementing Multi Data Center Cluster
• Use NetWorkTopologyStrategy rather than SimpleStrategy
• Use LOCAL_* consistency level for read/write operations to limit latency
• If possible, define one rack for entire cluster
• Specify the snitch
Cluster Sizing
Factors 1. Data Model and Use Cases
• Application data model and use case • What is the read/write bias?
• Volume of data over time • What are the SLAs
• Velocity of data • Do I need to tier out data? Hot / cold
• Replication Factor (RF)
2. Volume of Data
• Estimate the volume based on your application
3. Velocity
• How many writes per second
• Replication Factor of 3
• Multiple Data Center
• 25ms read latency 95th percentile
• At max packet size 50k writes / sec
• At max packet size 40k reads / sec
• 4TB per node to allow for compaction
overhead
Cluster Sizing - Based on Volume
• (100G / day) x (30 days) x (6 months) = 18 TB
• (18 TB data) x (RF 3) = 54 TB
• 54 TB total data / (4TB max per node) = 14 nodes
• (14 nodes) x (2 Data Center) = 28 nodes
Cluster Sizing - Sizing for Velocity
• (150 K writes/ sec load) / (50k writes/sec per node) = 3 nodes
• Volume capacity covers the write capacity
Cluster Sizing - Future Capacity
• Validate your assumptions often
• Monitor for changes over time
• Plan for increasing cluster size before you need it
• Be ready to draw down if needed
Cassandra Stress Tool
• Java based utility built in Cassandra
• Used for basic benchmarking and load testing a Cassandra cluster
• Quickly determine how a schema performs
• Understand how your database scale
• Optimize your data model and settings
• Determine production capacity
Cassandra Stress
YAML file is configuration file for stress tool. It is split into few sections
• DDL - for defining your schema
• Column distributions - for defining the shape and size of each column
globally and within each partition
• Insert Distributions - for defining how the data is written during the
stress test
• DML - for defining how the data is queried during the stress test
Cassandra Stress
DDL
• Define the keyspace and table information
• If the schema is not already define, stress tool will define it when it runs
• If schema is already defined, then stress tool needs know only keyspace
and table names
Cassandra Stress YAML - Example
Stress Test: Column Distribution
• The columnspec section describes the different distribution for use for
each column
• Columnspec section describes the different distributions to use for each
column. These distributions model the size of the data in column, the
number of unique values and the clustering of them within a given
partition
• These distributions are used to auto generate data that looks like what
you would see in reality
Stress Test: Column Distribution
EXP(min..max) — An exponential distribution over the range [min..max]
EXTREME(min..max,shape) — An extreme value (Weibull) distribution over
the range [min..max]
GAUSSIAN(min..max,stdvrng) — A gaussian/normal distribution, where
mean=(min+max)/2, and stdev is (mean-min)/stdvrng
GAUSSIAN(min..max,mean,stdev) — A gaussian/normal distribution, with
explicitly defined mean and stdev
UNIFORM(min..max) — A uniform distribution over the range [min, max]
FIXED(val) — A fixed distribution, always returning the same value
Stress test: other column specs
Size distribution — Defines the distribution of sizes for text, blob, set and list
types (default of UNIFORM(4..8))
Population distribution — Defines the distribution of unique values for the
column values (default of UNIFORM(1..100B))
Cluster distribution — Defines the distribution for the number of clustering
prefixes within a given partition (default of FIXED(1))
Stress Test: column spec - example
Stress Test: insert distribution
• The insert section lets you specify how data is inserted during stress.
• For each insert operation you can specify the following
distributions/ratios:
• Partition distribution — The number of partitions to update per batch (default
FIXED(1))
• select distribution ratio — The ratio of rows each partition should insert as a
proportion of the total possible rows for the partition (as defined by the
clustering distribution columns). default FIXED(1)/1
• Batch type — The type of CQL batch to use. Either LOGGED/UNLOGGED (default
LOGGED)
Stress Tool: DML
• You can specify any CQL query on the table by naming them under the
'queries' section.
• The 'fields' field specifies if the bind variables should be picked from the
same row or across all rows in the partition