dbms-m5-module-5-notes-of-dbms-vtu
dbms-m5-module-5-notes-of-dbms-vtu
1 DBMS (BCS403)
MODULE 5
There are number of concurrency control techniques that are used to ensure the
noninterference or isolation property of concurrently executing transactions: Two-phase
locking techniques for Concurrency control, Concurrency control based on
Timestamp ordering, Multiversion Concurrency control techniques, Validation
Concurrency control techniques, Granularity of Data items and Multiple
Granularity Locking.
2 DBMS (BCS403)
Binary Locks:
A binary lock can have two states or values: locked and unlocked (or 1 and 0,
for simplicity). A distinct lock is associated with each database item X.
If the value of the lock on X is 1, item X cannot be accessed by a database operation
that requests the item.
If the value of the lock on X is 0, the item can be accessed when requested, and the lock
value is changed to 1.
Two operations, lock_item and unlock_item, are used with binary locking.
A transaction requests access to an item X by first issuing a lock_item(X)
operation. If LOCK(X) = 1, the transaction is forced to wait.
If LOCK(X) = 0, it is set to 1 (the transaction locks the item) and the transaction is allowed
to access item X.
When the transaction is through using the item, it issues an unlock_item(X)
operation, which sets LOCK(X) back to 0 (unlocks the item) so that X may be
accessed by other transactions. Hence, a binary lock enforces mutual exclusion on the
data item. lock_item(X):
B: if LOCK(X) = 0 (*item is unlocked*)
then LOCK(X) ←1 (*lock the item*)
else
begin
wait (until LOCK(X) = 0
and the lock manager wakes up
the transaction);
go to B
end;
unlock_item(X):
LOCK(X) ← 0; (* unlock the item *)
if any transactions are waiting
3 DBMS (BCS403)
then wakeup one of the waiting
transactions;
The DBMS has a lock manager subsystem to keep track of and control access to locks.
If the simple binary locking scheme described here is used, every transaction must obey the
following rules:
1. A transaction T must issue the operation lock_item(X) before any read_item(X) or
write_item(X) operations are performed in T.
2. A transaction T must issue the operation unlock_item(X) after all read_item(X) and
write_item(X) operations are completed in T.
3. A transaction T will not issue a lock_item(X) operation if it already holds the lock on
item X.1
4. A transaction T will not issue an unlock_item(X) operation unless it already holds the
lock on item X.
The preceding binary locking scheme is too restrictive for database items because at most
one transaction can hold a lock on a given item. We should allow several transactions to
access the same item X if they all access X for reading purposes only. This is because read
operations on the same item by different transactions are not conflicting.
A different type of lock, called a multiple-mode lock, is used. In this scheme—called
shared/exclusive or read/write locks—there are three locking operations: read_lock(X),
write_lock(X), and unlock(X).
A lock associated with an item X, LOCK(X), now has three possible states: read-locked,
write- locked, or unlocked.
read-locked item is also called share-locked because other transactions are allowed to
read the item, whereas a write-locked item is called exclusive-locked because a single
transaction exclusively holds the lock on the item.
4 DBMS (BCS403)
When we use the shared/exclusive locking scheme, the system must enforce the
following rules:
1. A transaction T must issue the operation read_lock(X) or write_lock(X) before any
read_item(X) operation is performed in T.
5 DBMS (BCS403)
2. A transaction T must issue the operation write_lock(X) before any write_item(X)
operation is performed in T.
3. A transaction T must issue the operation unlock(X) after all read_item(X) and
write_item(X) operations are completed in T.
4. A transaction T will not issue a read_lock(X) operation if it already holds a read
(shared) lock or a write (exclusive) lock on item X. This rule may be relaxed for
downgrading of locks.
5. A transaction T will not issue a write_lock(X) operation if it already holds a read
(shared) lock or write (exclusive) lock on item X. This rule may also be relaxed for
upgrading of locks.
6. A transaction T will not issue an unlock(X) operation unless it already holds a read
(shared) lock or a write (exclusive) lock on item X.
6 DBMS (BCS403)
1. expanding or growing (first) phase, during which new locks on items can be acquired
but none can be released; and
2. shrinking (second) phase, during which existing locks can be released but no new
locks can be acquired.
If lock conversion is allowed, then upgrading of locks (from read-locked to write-locked)
must be done during the expanding phase, and downgrading of locks (from write-locked to
read-locked) must be done in the shrinking phase.
Initial values: X=20, Y=30 Result serial schedule T1 followed by T2: X=50, Y=80 Result of serial
schedule T2 followed by T1: X=70, Y=50.
Transactions T1 and T2 in Figure do not follow the two-phase locking protocol because the
write_lock(X) operation follows the unlock(Y) operation in T1, and similarly the write_lock(Y)
operation follows the unlock(X) operation in T2.
Transactions T1′ and T2′, which are the same as T1 and T2 in Figure but follow the two-
7 DBMS (BCS403)
phase locking protocol. Note that they can produce a deadlock.
A variation known as conservative 2PL (or static 2PL) requires a transaction to lock
all the items it accesses before the transaction begins execution, by predeclaring its
read-set and write-set. The read-set of a transaction is the set of all items that the
transaction reads, and the write-set is the set of all items that it writes.
The most popular variation of 2PL is strict 2PL, which guarantees strict schedules.
In this variation, a transaction T does not release any of its exclusive (write) locks
until after it commits or aborts. Hence, no other transaction can read or write an
item that is written by T unless T has committed, leading to a strict schedule for
recoverability. Strict 2PL is not deadlock-free.
A more restrictive variation of strict 2PL is rigorous 2PL, which also guarantees
strict schedules. In this variation, a transaction T does not release any of its locks
(exclusive or shared) until after it commits or aborts, and so it is easier to
implement than strict 2PL.
Usually the concurrency control subsystem itself is responsible for generating the
read_lock and write_lock requests. For example, suppose the system is to enforce
the strict 2PL protocol. Then, whenever transaction T issues a read_item(X), the
system calls the read_lock(X) operation on behalf of T. If the state of LOCK(X) is
write_locked by some other transaction T′, the system places T in the waiting queue
for item X; otherwise, it grants the read_lock(X) request and permits the
read_item(X) operation of T to execute. On the other hand, if transaction T issues a
write_item(X), the system calls the write_lock(X) operation on behalf of T.
8 DBMS (BCS403)
Deadlock occurs when each transaction T in a set of two or more transactions is waiting for
some item that is locked by some other transaction T′ in the set. Hence, each transaction in
the set is in a waiting queue, waiting for one of the other transactions in the set to release the
lock on an item. But because the other transaction is also waiting, it will never release the
lock. a) b)
Fig 2.1 a) A partial schedule of T1′ and T2′ that is in a state of deadlock. (b) A wait-for
graph for the partial schedule in (a).
9 DBMS (BCS403)
and then restarted after a certain time delay without checking whether a deadlock will
actually occur or not.
The cautious waiting algorithm was proposed to try to reduce the number of needless
aborts/restarts. Suppose that transaction Ti tries to lock an item X but is not able to do so
because X is locked by some other transaction Tj with a conflicting lock. The cautious
waiting rule is as follows:
■ Cautious waiting: If Tj is not blocked (not waiting for some other locked item), then Ti is
blocked and allowed to wait; otherwise abort Ti.
Deadlock Detection:
An alternative approach to dealing with deadlock is deadlock detection, where the system
checks if a state of deadlock actually exists.
1. wait-for graph: simple way to detect a state of deadlock is for the system to construct
and maintain a wait-for graph. One node is created in the wait-for graph for each
transaction that is currently executing. Whenever a transaction Ti is waiting to lock an
item X that is currently locked by a transaction Tj, a directed edge (Ti → Tj) is created in
in the wait-for graph. When Tj releases the lock(s) on the items that Ti was waiting for,
the directed edge is dropped from the wait-for graph as shown in fig 2.1 b).
2. victim selection: If the system is in a state of deadlock, some of the transactions
causing the deadlock must be aborted. Choosing which transactions to abort is known
as victim selection. The algorithm for victim selection should generally avoid selecting
transactions that have been running for a long time and that have performed many
updates, and it should try instead to select transactions that have not made many
changes (younger transactions).
3. Timeouts: Another simple scheme to deal with deadlock is the use of timeouts. This
method is practical because of its low overhead and simplicity. In this method, if a
transaction waits for a period longer than a system-defined timeout period, the system
assumes that the transaction may be deadlocked and aborts it regardless of whether a
deadlock actually exists.
10 DBMS (BCS403)
Starvation.
Another problem that may occur when we use locking is starvation, which occurs when a
transaction cannot proceed for an indefinite period of time while other transactions in the
system continue normally.
This may occur if the waiting scheme for locked items is unfair in that it gives priority to
some transactions over others.
One solution for starvation is to have a fair waiting scheme, such as using a first-come-first-
served queue; transactions are enabled to lock an item in the order in which they originally
requested the lock.
Another scheme allows some transactions to have priority over others but increases the
priority of a transaction the longer it waits, until it eventually gets the highest priority and
proceeds.
Starvation can also occur because of victim selection if the algorithm selects the same
transaction as victim repeatedly, thus causing it to abort and never finish execution.
The use of locking, combined with the 2PL protocol, guarantees serializability of schedules.
The serializable schedules produced by 2PL have their equivalent serial schedules based on
the order in which executing transactions lock the items they acquire.
If a transaction needs an item that is already locked, it may be forced to wait until the item is
released.
Some transactions may be aborted and restarted because of the deadlock problem.
A different approach to concurrency control involves using transaction timestamps to order
transaction execution for an equivalent serial schedule.
1.2.1 Timestamps
A timestamp is a unique identifier created by the DBMS to identify a transaction.
Typically, timestamp values are assigned in the order in which the transactions are
submitted to the system, so a timestamp can be thought of as the transaction start time. We
will refer to the timestamp of transaction T as TS(T).
11 DBMS (BCS403)
Concurrency control techniques based on timestamp ordering do not use locks; hence,
deadlocks cannot occur.
Timestamps can be generated in several ways. One possibility is to use a counter that is
incremented each time its value is assigned to a transaction. The transaction timestamps are
numbered 1, 2, 3, … in this scheme. A computer counter has a finite maximum value, so the
system must periodically reset the counter to zero when no transactions are executing for
some short period of time.
Another way to implement timestamps is to use the current date/time value of the system
clock and ensure that no two timestamp values are generated during the same tick of the
clock.
12 DBMS (BCS403)
that the timestamp order of transaction execution is not violated.
The concurrency control algorithm must check whether conflicting operations violate the
timestamp ordering in the following two cases:
1. Whenever a transaction T issues a write_item(X) operation, the following check is
performed:
a. If read_TS(X) > TS(T) or if write_TS(X) > TS(T), then abort and roll back T and
reject the operation. This should be done because some younger transaction
with a timestamp greater than TS(T)—and hence after T in the timestamp
ordering—has already read or written the value of item X before T had a
chance to write X, thus violating the timestamp ordering.
b. If the condition in part (a) does not occur, then execute the write_item(X)
operation of T and set write_TS(X) to TS(T).
2. Whenever a transaction T issues a read_item(X) operation, the following check is
performed:
a. If write_TS(X) > TS(T), then abort and roll back T and reject the operation. This
should be done because some younger transaction with timestamp greater
than TS(T)—and hence after T in the timestamp ordering—has already
written the value of item X before T had a chance to read X.
b. If write_TS(X) ≤ TS(T), then execute the read_item(X) operation of T and set
read_TS(X) to the larger of TS(T) and the current read_TS(X).
13 DBMS (BCS403)
1. If read_TS(X) > TS(T), then abort and roll back T and reject the operation.
2. If write_TS(X) > TS(T), then do not execute the write operation but continue
processing. This is because some transaction with timestamp greater than TS(T)—
and hence after T in the timestamp ordering—has already written the value of X.
Thus, we must ignore the write_item(X) operation of T because it is already outdated
and obsolete. Notice that any conflict arising from this situation would be detected by
case (1).
3. If neither the condition in part (1) nor the condition in part (2) occurs, then execute
the write_item(X) operation of T and set write_TS(X) to TS(T).
14 DBMS (BCS403)
wrote the value of version Xi.
Whenever a transaction T is allowed to execute a write_item(X) operation, a new version
Xk+1 of item X is created, with both the write_TS(Xk+1) and the read_TS(Xk+1) set to TS(T).
Correspondingly, when a transaction T is allowed to read the value of version Xi, the value of
read_TS(Xi) is set to the larger of the current read_TS(Xi) and TS(T).
To ensure serializability, the following rules are used:
1. If transaction T issues a write_item(X) operation, and version i of X has the highest
write_TS(Xi) of all versions of X that is also less than or equal to TS(T), and read_TS(Xi)
> TS(T), then abort and roll back transaction T; otherwise, create a new version Xj of
X with read_TS(Xj) = write_TS(Xj) = TS(T).
2. If transaction T issues a read_item(X) operation, find the version i of X that has the
highest write_TS(Xi) of all versions of X that is also less than or equal to TS(T); then
return the value of Xi to transaction T, and set the value of read_TS(Xi) to the larger of
TS(T) and the current read_TS(Xi).
We can describe the relationship between read and write locks in the standard scheme by
means of the lock compatibility table.
The idea behind multiversion 2PL is to allow other transactions T′ to read an item X while a
single transaction T holds a write lock on X. This is accomplished by allowing two versions
for each item X; one version, the committed version, must always have been written by
some committed transaction.
The second local version X′ can be created when a transaction T acquires a write lock on X.
Other transactions can continue to read the committed version of X while T holds the write
15 DBMS (BCS403)
lock.
Transaction T can write the value of X′ as needed, without affecting the value of the
committed version X. However, once T is ready to commit, it must obtain a certify lock on all
items that it currently holds write locks on before it can commit; this is another form of lock
upgrading.
16 DBMS (BCS403)
is restarted.
The validation phase for Ti checks that, for each such transaction Tj that is either recently
committed or is in its validation phase, one of the following conditions holds:
1. Transaction Tj completes its write phase before Ti starts its read phase.
2. Ti starts its write phase after Tj completes its write phase, and the read_set of Ti has
no items in common with the write_set of Tj.
3. Both the read_set and write_set of Ti have no items in common with the write_set of
Tj, and Tj completes its read phase before Ti completes its read phase.
The basic definition of snapshot isolation is that a transaction sees the data items that it
reads based on the committed values of the items in the database snapshot (or database
state) when the transaction starts.
Snapshot isolation will ensure that the phantom record problem does not occur, since the
database transaction, or, in some cases, the database statement, will only see the records that
were committed in the database at the time the transaction started.
In this scheme, read operations do not require read locks to be applied to the items, thus
reducing the overhead associated with two-phase locking. However, write operations do
require write locks.
Thus, for transactions that have many reads, the performance is much better than 2PL.
When writes do occur, the system will have to keep track of older versions of the updated
items in a temporary version store (sometimes known as tempstore), with the timestamps
of when the version was created.
17 DBMS (BCS403)
• The whole database
Figure shows a simple granularity hierarchy with a database containing two files, each file containing
several disk pages, and each page containing several records.
This can be used to illustrate a multiple granularity level 2PL protocol, with
shared/exclusive locking modes, where a lock can be requested at any level. However,
additional types of locks will be needed to support such a protocol efficiently.
To make multiple granularity level locking practical, additional types of locks, called
18 DBMS (BCS403)
intention locks, are needed. The idea behind intention locks is for a transaction to indicate,
along the path from the root to the desired node, what type of lock (shared or exclusive) it
will require from one of the node’s descendants. There are three types of intention locks:
1. Intention-shared (IS) indicates that one or more shared locks will be requested
on some descendant node(s).
2. Intention-exclusive (IX) indicates that one or more exclusive locks will be
requested on some descendant node(s).
3. Shared-intention-exclusive (SIX) indicates that the current node is locked in
shared mode but that one or more exclusive locks will be requested on some
descendant node(s).
The multiple granularity locking (MGL) protocol consists of the following rules:
1. The lock compatibility (based on Figure 21.8) must be adhered to.
2. The root of the tree must be locked first, in any mode.
3. A node N can be locked by a transaction T in S or IS mode only if the parent node N is
already locked by transaction T in either IS or IX mode.
4. A node N can be locked by a transaction T in X, IX, or SIX mode only if the parent of
node N is already locked by transaction T in either IX or SIX mode.
5. A transaction T can lock a node only if it has not unlocked any node (to enforce the
2PL protocol).
6. A transaction T can unlock a node, N, only if none of the children of node N are
currently locked by T.
To illustrate the MGL protocol with the database hierarchy in Figure 21.7, consider the
following three transactions:
1. T1 wants to update record r111 and record r211.
2. T2 wants to update all records on page p12.
3. T3 wants to read record r11j and the entire f2 file.
19 DBMS (BCS403)
Figure 21.9 shows a possible serializable schedule for these three transactions. Only the lock
and unlock operations are shown. The notation <lock_type>(<item>) is used to display the
locking operations in the schedule.
The multiple granularity level protocol is especially suited when processing a mix of
transactions that include,
(1) short transactions that access only a few items (records or fields)
(2) long transactions that access entire files. In this environment, less transaction blocking
and less locking overhead are incurred by such a protocol when compared to a single-level
granularity locking approach.
20 DBMS (BCS403)
There is a need for a storage system that can manage all these e-mails; a structured
relational SQL system may not be appropriate because
(1) SQL systems offer too many services (powerful query language, concurrency control,
etc.), which this application may not need; and
(2) a structured data model such the traditional relational model may be too restrictive.
Consider an application such as Facebook, with millions of users who submit posts, many
with images and videos; then these posts must be displayed on pages of other users using
the social media relationships among the users. User profiles, user relationships, and posts
21 DBMS (BCS403)
must all be stored in a huge collection of data stores, and the appropriate posts must be made
available to the sets of users that have signed up to see these posts.
Google developed a proprietary NOSQL system known as BigTable, which is used in many
of Google’s applications that require vast amounts of data storage, such as Gmail, Google
Maps, and Web site indexing.
Amazon developed a NOSQL system called DynamoDB that is available through Amazon’s
cloud services.
Other software companies started developing their own solutions and making them
available to users who need these capabilities—for example, MongoDB and CouchDB,
which are classified as document-based NOSQL systems or document stores.
Another category of NOSQL systems is the graph-based NOSQL systems, or graph
databases; these include Neo4J and GraphBase, among others.
22 DBMS (BCS403)
cumbersome because an update must be applied to every copy of the replicated data
items; this can slow down write performance if serializable consistency is required.
3. Replication Models: Two major replication models are used in NOSQL systems:
master-slave and master-master replication.
a. Master-slave replication requires one copy to be the master copy; all write
operations must be applied to the master copy and then propagated to the
slave copies, usually using eventual consistency (the slave copies will
eventually be the same as the master copy).
b. The master-master replication allows reads and writes at any of the replicas
but may not guarantee that reads at nodes that store different copies see the
same values. Different users may write the same data item concurrently at
different nodes of the system, so the values of the item will be temporarily
inconsistent.
4. Sharding of Files: In many NOSQL applications, files (or collections of data objects)
can have many millions of records (or documents or objects), and these records can
be accessed concurrently by thousands of users. So it is not practical to store the
whole file in one node. Sharding (also known as horizontal partitioning) of the file
records is often employed in NOSQL systems. This serves to distribute the load of
accessing the file records to multiple nodes.
5. High-Performance Data Access: In many NOSQL applications, it is necessary to find
individual records or objects (data items) from among the millions of data records or
objects in a file. To achieve this, most systems use one of two techniques: hashing or
range partitioning on object keys. The majority of accesses to an object will be by
providing the key value rather than by using complex query conditions.
In hashing, a hash function h(K) is applied to the key K, and the location of the object
with key K is determined by the value of h(K). In range partitioning, the location is
determined via a range of key values; for example, location i would hold the objects
whose key values K are in the range Kimin ≤ K ≤ Kimax.
23 DBMS (BCS403)
24 DBMS (BCS403)
1. Document-based NOSQL systems: These systems store data in the form of
documents using well-known formats, such as JSON (JavaScript Object Notation).
Documents are accessible via their document id, but can also be accessed rapidly
using other indexes.
2. NOSQL key-value stores: These systems have a simple data model based on fast
access by the key to the value associated with the key; the value can be a record or an
object or a document or even have a more complex data structure.
3. Column-based or wide column NOSQL systems: These systems partition a table by
column into column families where each column family is stored in its own files. They
also allow versioning of data values.
4. Graph-based NOSQL systems: Data is represented as graphs, and related nodes can
be found by traversing the edges using path expressions.
Additional categories can be added as follows to include some systems that are not easily
categorized into the above four categories, as well as some other types of systems that have
been available even before the term NOSQL became widely used.
5. Hybrid NOSQL systems: These systems have characteristics from two or more of the
above four categories.
6. Object databases: Database systems that were based on the object data model were
known originally as object-oriented databases (OODBs) but are now referred to as
object databases (ODBs).
7. XML databases: A new language namely, XML (Extensible Markup Language) has
emerged as the standard for structuring and exchanging data over the Web in text
files. Another language that can be used for the same purpose is JSON (JavaScript
Object Notation).
Even keyword-based search engines store large amounts of data with fast search access, so
the stored data can be considered as large NOSQL big data stores.
25 DBMS (BCS403)
replicated data:
1.Consistency (among replicated copies),
2. Availability (of the system for read and write operations) and
3. Partition tolerance (in the face of the nodes in the system being partitioned by a
network fault).
Availability means that each read or write request for a data item will either be processed
successfully or will receive a message that the operation cannot be completed.
Partition tolerance means that the system can continue operating if the network connecting
the nodes has a fault that results in two or more partitions, where the nodes in each partition
can only communicate among each other.
Consistency means that the nodes will have the same copies of a replicated data item visible
for various transactions.
It is important to note here that the use of the word consistency in CAP and its use in ACID do
not refer to the same identical concept.
In CAP, the term consistency refers to the consistency of the values in different copies of the
same data item in a replicated distributed system. In ACID, it refers to the fact that a
transaction will not violate the integrity constraints specified on the database schema.
However, if we consider that the consistency of replicated copies is a specified constraint,
then the two uses of the term consistency would be related.
The CAP theorem states that it is not possible to guarantee all three of the desirable
properties—consistency, availability, and partition tolerance—at the same time in a
distributed system with data replication. If this is the case, then the distributed system
designer would have to choose two properties out of the three to guarantee.
It is generally assumed that in many traditional (SQL) applications, guaranteeing consistency
through the ACID properties is important. On the other hand, in a NOSQL distributed data
store, a weaker consistency level is often acceptable, and guaranteeing the other two
properties (availability, partition tolerance) is important. Hence, weaker consistency levels
are often used in NOSQL system instead of guaranteeing serializability. In particular, a form
of consistency known as eventual consistency is often adopted in NOSQL systems.
26 DBMS (BCS403)
collections of similar documents. These types of systems are also sometimes known as
document stores. The individual documents somewhat resemble complex or XML
documents, but a major difference between document-based systems versus object and
object-relational systems and XML is that there is no requirement to specify a schema—
rather, the documents are specified as self-describing data. Although the documents in a
collection should be similar, they can have different data elements (attributes), and new
documents can have new data elements that do not exist in any of the current documents in
the collection.
Create another document collection called worker to hold information about the
EMPLOYEEs who work on each project; for example:
db.createCollection(“worker”, { capped : true, size : 5242880, max : 2000 } ) )
Each document in a collection has a unique ObjectId field, called _id, which is automatically
indexed in the collection unless the user explicitly requests no index for the _id field. The
value of ObjectId can be specified by the user, or it can be system-generated if the user does
not specify an _id field for a particular document.
System-generated ObjectIds have a specific format, which combines the timestamp when the
27 DBMS (BCS403)
object is created (4 bytes, in an internal MongoDB format), the node id (3 bytes), the process
id (2 bytes), and a counter (3 bytes) into a 16-byte Id value.
User-generated ObjectsIds can have any value specified by the user as long as it uniquely
identifies the document and so these Ids are similar to primary keys in relational systems.
28 DBMS (BCS403)
Normalized documents.
29 DBMS (BCS403)
some of the fields in the collection documents. There is also an update operation, which has
a condition to select certain documents, and a $set clause to specify the update. It is also
possible to use the update operation to replace an existing document with another one but
keep the same ObjectId.
For read queries, the main command is called find, and the format is:
db.<collection_name>.find(<condition>)
General Boolean conditions can be specified as <condition>, and the documents in the
collection that return true are selected for the query result.
Replication in MongoDB.
The concept of replica set is used in MongoDB to create multiple copies of the same data set
on different nodes in the distributed system, and it uses a variation of the master-slave
approach for replication. For example, suppose that we want to replicate a particular
document collection C.
A replica set will have one primary copy of the collection C stored in one node N1, and at
least one secondary copy (replica) of C stored at another node N2. Additional copies can be
stored in nodes N3, N4, etc., as needed, but the cost of storage and update (write) increases
with the number of replicas.
The total number of participants in a replica set must be at least three, so if only one
secondary copy is needed, a participant in the replica set known as an arbiter must run on
the third node N3.
The arbiter does not hold a replica of the collection but participates in elections to choose a
new primary if the node storing the current primary copy fails. If the total number of
members in a replica set is n (one primary plus i secondaries, for a total of n = i + 1), then n
must be an odd number; if it is not, an arbiter is added to ensure the election process works
correctly if the primary fails.
30 DBMS (BCS403)
In MongoDB replication, all write operations must be applied to the primary copy and then
propagated to the secondaries. For read operations, the user can choose the particular read
preference for their application.
The default read preference processes all reads at the primary copy, so all read and write
operations are performed at the primary node. In this case, secondary copies are mainly to
make sure that the system continues operation if the primary fails, and MongoDB can ensure
that every read request gets the latest document value.
To increase read performance, it is possible to set the read preference so that read requests
can be processed at any replica (primary or secondary); however, a read at a secondary is not
guaranteed to get the latest version of a document because there can be a delay in
propagating writes from the primary to the secondaries.
Sharding in MongoDB.
When a collection holds a very large number of documents or requires a large storage space,
storing all the documents in one node can lead to performance problems, particularly if there
are many user operations accessing the documents concurrently using various CRUD
operations.
Sharding of the documents in the collection—also known as horizontal partitioning—
divides the documents into disjoint partitions known as shards. This allows the system to
add more nodes as needed by a process known as horizontal scaling of the distributed
system, and to store the shards of the collection on different nodes to achieve load balancing.
Each node will process only those operations pertaining to the documents in the shard
stored at that node. Also, each shard will contain fewer documents than if the entire
collection were stored at one node, thus further improving performance.
There are two ways to partition a collection into shards in MongoDB—range partitioning
and hash partitioning. Both require that the user specify a particular document field to be
used as the basis for partitioning the documents into shards.
The partitioning field—known as the shard key in MongoDB—must have two
characteristics: it must exist in every document in the collection, and it must have an index.
The ObjectId can be used, but any other field possessing these two characteristics can also
be used as the basis for sharding. The values of the shard key are divided into chunks either
through range partitioning or hash partitioning, and the documents are partitioned based on
31 DBMS (BCS403)
the chunks of shard key values.
Range partitioning creates the chunks by specifying a range of key values; for example, if
the shard key values ranged from one to ten million, it is possible to create ten ranges—1 to
1,000,000; 1,000,001 to 2,000,000; … ; 9,000,001 to 10,000,000—and each chunk would
contain the key values in one range. Hash partitioning applies a hash function h(K) to each
shard key K, and the partitioning of keys into chunks is based on the hash values.
In general, if range queries are commonly applied to a collection (for example, retrieving all
documents whose shard key value is between 200 and 400), then range partitioning is
preferred because each range query will typically be submitted to a single node that contains
all the required documents in one shard. If most searches retrieve one document at a time,
hash partitioning may be preferable because it randomizes the distribution of shard key
values into chunks.
When sharding is used, MongoDB queries are submitted to a module called the query
router, which keeps track of which nodes contain which shards based on the particular
partitioning method used on the shard keys. The query (CRUD operation) will be routed to
the nodes that contain the shards that hold the documents that the query is requesting.
If the system cannot determine which shards hold the required documents, the query will be
submitted to all the nodes that hold shards of the collection.
Sharding and replication are used together; sharding focuses on improving performance via
load balancing and horizontal scalability, whereas replication focuses on ensuring system
availability when certain nodes fail in the distributed system.
32 DBMS (BCS403)
the key-value store has to interpret the structure of the data value. In other cases, some
standard formatted data is allowed; for example, structured data rows (tuples) similar to
relational data, or semistructured data using JSON or some other self-describing data format.
Different key-value stores can thus store unstructured, semistructured, or structured data
items.
The main characteristic of key-value stores is the fact that every value (data item) must be
associated with a unique key, and that retrieving the value by supplying the key must be very
fast.
There are many systems that fall under the key-value store label, so rather than provide a lot
of details on one particular system, we will give a brief introductory overview for some of
these systems and their characteristics.
33 DBMS (BCS403)
■ A single attribute. The DynamoDB system will use this attribute to build a hash index on
the items in the table. This is called a hash type primary key. The items are not ordered in
storage on the value of the hash attribute.
■ A pair of attributes. This is called a hash and range type primary key. The primary key will
be a pair of attributes (A, B): attribute A will be used for hashing, and because there will be
multiple items with the same value of A, the B values will be used for ordering the records
with the same A value. A table with this type of key can have additional secondary indexes
defined on its attributes. For example, if we want to store multiple versions of some type of
items in a table, we could use ItemID as hash and Date or Timestamp (when the version was
created) as range in a hash and range type primary key.
34 DBMS (BCS403)
s.put(k, v) inserts an item as a key-value pair with key k and value v. The operation
s.delete(k) deletes the item whose key is k from the store, and the operation v = s.get(k)
retrieves the value v associated with key k. The application can use these basic operations to
build its own requirements. At the basic storage level, both keys and values are arrays of
bytes (strings).
■ High-level formatted data values. The values v in the (k, v) items can be specified in JSON
(JavaScript Object Notation), and the system will convert between JSON and the internal
storage format. Other data object formats can also be specified if the application provides
the conversion (also known as serialization) between the user format and the storage
format as a Serializer class.
The Serializer class must be provided by the user and will include operations to convert the
user format into a string of bytes for storage as a value, and to convert back a string (array
of bytes) retrieved via s.get(k) into the user format. Voldemort has some built-in serializers
for formats other than JSON.
■ Consistent hashing for distributing (key, value) pairs. A variation of the data
distribution algorithm known as consistent hashing is used in Voldemort for data
distribution among the nodes in the distributed cluster of nodes.
A hash function h(k) is applied to the key k of each (k, v) pair, and h(k) determines where the
item will be stored. The method assumes that h(k) is an integer value, usually in the range 0
to Hmax = 2n−1, where n is chosen based on the desired range for the hash values. This
method is best visualized by considering the range of all possible integer hash values 0 to
Hmax to be evenly distributed on a circle (or ring).
The nodes in the distributed system are then also located on the same ring; usually each node
will have several locations on the ring. The positioning of the points on the ring that
represent the nodes is done in a psuedorandom manner.
An item (k, v) will be stored on the node whose position in the ring follows the position of
h(k) on the ring in a clockwise direction. In Figure below, we assume there are three nodes in
the distributed cluster labeled A, B, and C, where node C has a bigger capacity than nodes A
and B. In a typical system, there will be many more nodes. On the circle, two instances each
of A and B are placed, and three instances of C (because of its higher capacity), in a
35 DBMS (BCS403)
pseudorandom manner to cover the circle. Figure below, indicates which (k, v) items are
placed in which nodes based on the h(k) values.
Ring having three nodes A,B, and C, with C having greater capacity. The h(K) values
that map to the circle points in range 1 have their (k, v) items stored in node A, range
2 in node B, range 3 in node C.
The h(k) values that fall in the parts of the circle marked as range 1 in Figure above, will have
their (k, v) items stored in node A because that is the node whose label follows h(k) on the
ring in a clockwise direction; those in range 2 are stored in node B; and those in range 3 are
stored in node C.
This scheme allows horizontal scalability because when a new node is added to the
distributed system, it can be added in one or more locations on the ring depending on the
node capacity. Only a limited percentage of the (k, v) items will be reassigned to the new
node from the existing nodes based on the consistent hashing placement algorithm.
Also, those items assigned to the new node may not all come from only one of the existing
nodes because the new node can have multiple locations on the ring.
For example, if a node D is added and it has two placements on the ring as shown in Figure
below, then some of the items from nodes B and C would be moved to node D. The items
whose keys hash to range 4 on the circle (see Figure below) would be migrated to node D.
36 DBMS (BCS403)
Adding a node D to the ring. Items in range 4 are moved to the node D from node B
(range 2 is reduced) and node C (range 3 is reduced).
This scheme also allows replication by placing the number of specified replicas of an item on
successive nodes on the ring in a clockwise direction.
The sharding is built into the method, and different items in the store (file) are located on
different nodes in the distributed cluster, which means the items are horizontally partitioned
(sharded) among the nodes in the distributed system.
■ Consistency and versioning. Voldemort uses a method similar to the one developed for
DynamoDB for consistency in the presence of replicas. Basically, concurrent write
operations are allowed by different processes so there could exist two or more different
values associated with the same key at different nodes when items are replicated.
Consistency is achieved when the item is read by using a technique known as versioning and
read repair.
Concurrent writes are allowed, but each write is associated with a vector clock value. When
a read occurs, it is possible that different versions of the same value (associated with the
same key) are read from different nodes.
If the system can reconcile to a single final value, it will pass that value to the read; otherwise,
more than one version can be passed back to the application, which will reconcile the various
37 DBMS (BCS403)
versions into one version based on the application semantics and give this reconciled value
back to the nodes.
38 DBMS (BCS403)
describing form by associating columns with data values, where data values are strings.
Hbase also stores multiple versions of a data item, with a timestamp associated with each
version, so versions and timestamps are also part of the Hbase data model.
As with other NOSQL systems, unique keys are associated with stored data items for fast
access, but the keys identify cells in the storage system. Because the focus is on high
performance when storing huge amounts of data, the data model includes some storage-
related concepts.
It is important to note that the use of the words table, row, and column is not identical to their
use in relational databases, but the uses are related.
■ Tables and Rows. Data in Hbase is stored in tables, and each table has a table name. Data
in a table is stored as self-describing rows. Each row has a unique row key, and row keys
are strings that must have the property that they can be lexicographically ordered, so
characters that do not have a lexicographic order in the character set cannot be used as part
of a row key.
■ Column Families, Column Qualifiers, and Columns. A table is associated with one or
more column families. Each column family will have a name, and the column families
associated with a table must be specified when the table is created and cannot be changed
later. The below create statement shows how a table may be created; the table name is
followed by the names of the column families associated with the table.
When the data is loaded into a table, each column family can be associated with many
column qualifiers, but the column qualifiers are not specified as part of creating a table. So
the column qualifiers make the model a self-describing data model because the qualifiers can
be dynamically specified as new rows are created and inserted into the table.
A column is specified by a combination of ColumnFamily:ColumnQualifier. Basically, column
families are a way of grouping together related columns (attributes in relational
terminology) for storage purposes, except that the column qualifier names are not specified
during table creation.
Rather, they are specified when the data is created and stored in rows, so the data is
selfdescribing since any column qualifier name can be used in a new row of data that added
using put statement.
39 DBMS (BCS403)
However, it is important that the application programmers know which column qualifiers
belong to each column family, even though they have the flexibility to create new column
qualifiers on the fly when new data rows are created.
The concept of column family is somewhat similar to vertical partitioning, because columns
(attributes) that are accessed together because they belong to the same column family are
stored in the same files. Each column family of a table is stored in its own files using the HDFS
file system.
■ Versions and Timestamps. Hbase can keep several versions of a data item, along with
the timestamp associated with each version. The timestamp is a long integer number that
represents the system time when the version was created, so newer versions have larger
timestamp values.
Hbase uses midnight ‘January 1, 1970 UTC’ as timestamp value zero, and uses a long integer
that measures the number of milliseconds since that time as the system timestamp value
(this is similar to the value returned by the Java utility java.util.Date.getTime() and is also
used in MongoDB).
It is also possible for the user to define the timestamp value explicitly in a Date format rather
than using the system-generated timestamp.
■ Cells. A cell holds a basic data item in Hbase. The key (address) of a cell is specified by a
combination of (table, rowid, columnfamily, columnqualifier, timestamp).
If timestamp is left out, the latest version of the item is retrieved unless a default number of
versions is specified, say the latest three versions.
The default number of versions to be retrieved, as well as the default number of versions that
the system needs to keep, are parameters that can be specified during table creation.
■ Namespaces. A namespace is a collection of tables. A namespace basically specifies a
collection of one or more tables that are typically used together by user applications, and it
corresponds to a database that contains a collection of tables in relational terminology.
Examples in Hbase.
(a) Creating a table called EMPLOYEE with three column families: Name, Address, and
Details.
create ‘EMPLOYEE’, ‘Name’, ‘Address’, ‘Details’
40 DBMS (BCS403)
(b) Inserting some in the EMPLOYEE table; different rows can have different self-describing
column qualifiers (Fname, Lname, Nickname, Mname, Minit, Suffix, … for column family
Name; Job, Review, Supervisor, Salary for column family Details).
41 DBMS (BCS403)
Hbase only provides low-level CRUD operations. It is the responsibility of the application
programs to implement more complex operations, such as joins between rows in different
tables. The create operation creates a new table and specifies one or more column families
associated with that table, but it does not specify the column qualifiers, as we discussed
earlier. The put operation is used for inserting new data or new versions of existing data
items. The get operation is for retrieving the data associated with a single row in a table, and
the scan operation retrieves all the rows.
42 DBMS (BCS403)
We will focus our discussion on one particular system, Neo4j, which is used in many
applications. Neo4j is an open source system, and it is implemented in Java.
43 DBMS (BCS403)
We will just show the high-level syntax for creating nodes and relationships; to do so, we will
use the Neo4j CREATE command, which is part of the high-level declarative query language
Cypher.
Neo4j has many options and variations for creating nodes and relationships using various
scripting interfaces, but a full discussion is outside the scope of our presentation.
■ Labels and properties. When a node is created, the node label can be specified. It is also
possible to create nodes without any labels.
In create command, the node labels are EMPLOYEE, DEPARTMENT, PROJECT, and
LOCATION, and the created nodes correspond to some of the data from the COMPANY
database with a few modifications; for example, we use EmpId instead of SSN, and we only
include a small subset of the data for illustration purposes.
Properties are enclosed in curly brackets { … }. It is possible that some nodes have multiple
labels; for example the same node can be labeled as PERSON and EMPLOYEE and MANAGER
by listing all the labels separated by the colon symbol as follows:
PERSON:EMPLOYEE:MANAGER.
Having multiple labels is similar to an entity belonging to an entity type (PERSON) plus some
subclasses of PERSON (namely EMPLOYEE and MANAGER) in the EER model (see Chapter
4) but can also be used for other purposes.
■ Relationships and relationship types. The create statement in (b) shows a few example
relationships in Neo4j based on the COMPANY database. The → specifies the direction of the
relationship, but the relationship can be traversed in either direction. The relationship types
(labels) in (b) are WorksFor, Manager, LocatedIn, and WorksOn; only relationships with
the relationship type WorksOn have properties (Hours) in (b).
■ Paths. A path specifies a traversal of part of the graph. It is typically used as part of a query
to specify a pattern, where the query will retrieve from the graph data that matches the
pattern. A path is typically specified by a start node, followed by one or more relationships,
leading to one or more end nodes that satisfy the pattern.
■ Optional Schema. A schema is optional in Neo4j. Graphs can be created and used without
a schema, but in Neo4j version 2.0, a few schema-related functions were added. The main
features related to schema creation involve creating indexes and constraints based on the
labels and properties. For example, it is possible to create the equivalent of a key constraint
44 DBMS (BCS403)
on a property of a label, so all nodes in the collection of nodes associated with the label must
have unique values for that property.
■ Indexing and node identifiers. When a node is created, the Neo4j system creates an
internal unique system-defined identifier for each node. To retrieve individual nodes using
other properties of the nodes efficiently, the user can create indexes for the collection of
nodes that have a particular label. Typically, one or more of the properties of the nodes in
that collection can be indexed. For example, Empid can be used to index nodes with the
EMPLOYEE label, Dno to index the nodes with the DEPARTMENT label, and Pno to index the
nodes with the PROJECT label.
Examples in Neo4j using the Cypher language. (a) Creating some nodes. (b) Creating some
relationships. (c) Basic syntax of Cypher queries. (d) Examples of Cypher queries.
(a) creating some nodes for the COMPANY data (from Figure 5.6):
CREATE (e1: EMPLOYEE, {Empid: ‘1’, Lname: ‘Smith’, Fname: ‘John’, Minit: ‘B’})
CREATE (e2: EMPLOYEE, {Empid: ‘2’, Lname: ‘Wong’, Fname: ‘Franklin’})
CREATE (e3: EMPLOYEE, {Empid: ‘3’, Lname: ‘Zelaya’, Fname: ‘Alicia’})
CREATE (e4: EMPLOYEE, {Empid: ‘4’, Lname: ‘Wallace’, Fname: ‘Jennifer’, Minit: ‘S’})
…
CREATE (d1: DEPARTMENT, {Dno: ‘5’, Dname: ‘Research’})
CREATE (d2: DEPARTMENT, {Dno: ‘4’, Dname: ‘Administration’})
…
45 DBMS (BCS403)
46 DBMS (BCS403)
47 DBMS (BCS403)
Query 1 in Figure d) shows how to use the MATCH and RETURN clauses in a query,
and the query retrieves the locations for department number 5. Match specifies the
pattern and the query variables (d and loc) and RETURN specifies the query result to
be retrieved by refering to the query variables.
Query 2 has three variables (e, w, and p), and returns the projects and hours per week
that the employee with Empid = 2 works on.
Query 3, on the other hand, returns the employees and hours per week who work on
the project with Pno = 2.
Query 4 illustrates the ORDER BY clause and returns all employees and the projects
they work on, sorted by Ename. It is also possible to limit the number of returned
results by using the LIMIT clause as in query 5, which only returns the first 10 answers.
Query 6 illustrates the use of WITH and aggregation, although the WITH clause can be
used to separate clauses in a query even if there is no aggregation.
Query 6 also illustrates the WHERE clause to specify additional conditions, and the
query returns the employees who work on more than two projects, as well as the
number of projects each employee works on. It is also common to return the nodes
and relationships themselves in the query result, rather than the property values of
the nodes as in the previous queries.
Query 7 is similar to query 5 but returns the nodes and relationships only, and so the
query result can be displayed as a graph using Neo4j’s visualization tool. It is also
possible to add or remove labels and properties from nodes.
Query 8 shows how to add more properties to a node by adding a Job property to an
employee node.
48 DBMS (BCS403)
Transfer) API. In addition, both editions support ACID properties. The enterprise edition
supports additional features for enhancing performance, such as caching and clustering of
data and locking.
■ Graph visualization interface. Neo4j has a graph visualization interface, so that a subset
of the nodes and edges in a database graph can be displayed as a graph. This tool can be used
to visualize query results in a graph representation.
■ Master-slave replication. Neo4j can be configured on a cluster of distributed system
nodes (computers), where one node is designated the master node. The data and indexes are
fully replicated on each node in the cluster.
Various ways of synchronizing the data between master and slave nodes can be configured
in the distributed cluster.
■ Caching. A main memory cache can be configured to store the graph data for improved
performance.
■ Logical logs. Logs can be maintained to recover from failures.