Friday, June 6, 2014

Cassandra CQL3 - Range Queries

Range Queries using CQL3 :

Once of the major advantage of using Cassandra via thrift was using the hashmap like structure . But with cql3 , it doesn’t appear to be available in a very straight forward way ..

But do not worry , it is very simple . No Maps are not the perfect replacement , because it is not possible to do range queries on a map right now

There is a better way to do it ..


Let us create a column family called testrange

CREATE TABLE testrange (
  key TEXT,
  detail_key int,
  detal_value text,
  PRIMARY KEY (key, detail_key)
);

We are creating the PK as key coupled with detail_key , this will force all the entries with same key to be a single row and each different detail_key will be a column name and we can do a =,<,> query on detail_key


Lets try this now


cqlsh:testkeyspace> describe columnfamily testrange;

CREATE TABLE testrange (
  key text,
  detail_key int,
  detal_value text,
  PRIMARY KEY (key, detail_key)
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='KEYS_ONLY' AND
  comment='' AND
  dclocal_read_repair_chance=0.000000 AND
  gc_grace_seconds=864000 AND
  index_interval=128 AND
  read_repair_chance=0.100000 AND
  replicate_on_write='true' AND
  populate_io_cache_on_flush='false' AND
  default_time_to_live=0 AND
  speculative_retry='99.0PERCENTILE' AND
  memtable_flush_period_in_ms=0 AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'LZ4Compressor'};



Inserting a few records

cqlsh:testkeyspace> insert into testrange (key,detail_key,detal_value) VALUES ('1',1,'number one');
cqlsh:testkeyspace> insert into testrange (key,detail_key,detal_value) VALUES ('1',2,'number two');
cqlsh:testkeyspace> insert into testrange (key,detail_key,detal_value) VALUES ('1',3,'number three');
cqlsh:testkeyspace> insert into testrange (key,detail_key,detal_value) VALUES ('1',4,'number four');
cqlsh:testkeyspace> insert into testrange (key,detail_key,detal_value) VALUES ('1',5,'number five');
cqlsh:testkeyspace> select * from testrange;

 key | detail_key | detal_value
-----+------------+--------------
   1 |          1 |   number one
   1 |          2 |   number two
   1 |          3 | number three
   1 |          4 |  number four
   1 |          5 |  number five

(5 rows)


So far , so good … now lets execute the range queries ..

cqlsh:testkeyspace> select * from testrange where key = '1' and detail_key > 2 and detail_key <=4;

 key | detail_key | detal_value
-----+------------+--------------
   1 |          3 | number three
   1 |          4 |  number four

(2 rows)




Cool , but how do we confirm if we are using the same range query type query that was offered by Cassandra thrift .

Lets jump back to the cli

[default@testkeyspace] list testrange;
Using default limit of 100
Using default cell limit of 100
-------------------
RowKey: 1
=> (name=1:, value=, timestamp=1402037639440000)
=> (name=1:detal_value, value=6e756d626572206f6e65, timestamp=1402037639440000)
=> (name=2:, value=, timestamp=1402037649568000)
=> (name=2:detal_value, value=6e756d6265722074776f, timestamp=1402037649568000)
=> (name=3:, value=, timestamp=1402037659783000)
=> (name=3:detal_value, value=6e756d626572207468726565, timestamp=1402037659783000)
=> (name=4:, value=, timestamp=1402037668866000)
=> (name=4:detal_value, value=6e756d62657220666f7572, timestamp=1402037668866000)
=> (name=5:, value=, timestamp=1402037677374000)
=> (name=5:detal_value, value=6e756d6265722066697665, timestamp=1402037677374000)

1 Row Returned.
Elapsed time: 294 msec(s).
[default@testkeyspace]


So , we have only one row and all out detail_key are columns .. which is almost similar to what we used to do earlier with thrift.



Apache Cassandra - Getting started with CQL 3

Getting started with cql3

   Lets step through some basics of cassandra cql3 . I am going to assume you have already install cassandra and ready to get into cqlsh



<cassandra_home># bin/cqlsh
Connected to Test Cluster at localhost:9160.
[cqlsh 4.1.1 | Cassandra 2.0.8 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.
cqlsh>


Now we are ready to run some queries .


Lets create a keyspace to start with . Keyspace is the same as database if you are coming from sql land

cqlsh> CREATE KEYSPACE testkeyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

now the keyspace has been created..

lets use the keyspace

cqlsh> USE testkeyspace;
cqlsh:testkeyspace>

now we have a keyspace with replication factor 1 .

if you want to change your keyspace rf , you can alter at any point of time

ALTER KEYSPACE testkeyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2};

this will require you to have a two node cluster setup


Dropping keyspace


DROP KEYSPACE testkeyspace;

yeah , simple so far


Now lets create a simple table

CREATE TABLE testtable(id text PRIMARY KEY, name text, avg varint, age int) WITH comment='Test table' AND read_repair_chance = 1.0;


Running a basic select query is very simple

cqlsh:testkeyspace> select * from testtable;

(0 rows)



Lets describe the table quickly

cqlsh:testkeyspace> describe columnfamily testtable;

CREATE TABLE testtable (
  id text,
  age int,
  avg varint,
  name text,
  PRIMARY KEY (id)
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='KEYS_ONLY' AND
  comment='Test table' AND
  dclocal_read_repair_chance=0.000000 AND
  gc_grace_seconds=864000 AND
  index_interval=128 AND
  read_repair_chance=1.000000 AND
  replicate_on_write='true' AND
  populate_io_cache_on_flush='false' AND
  default_time_to_live=0 AND
  speculative_retry='99.0PERCENTILE' AND
  memtable_flush_period_in_ms=0 AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'LZ4Compressor'};


column family is similar to a table in sql


Now , let us insert some data into the column family

cqlsh:testkeyspace> insert into testtable (id,age,avg,name) values('1',23,10,'Arunn');

and select

cqlsh:testkeyspace> select * from testtable;

 id | age | avg | name
----+-----+-----+-------
  1 |  23 |  10 | Arunn

(1 rows)




Altering a table and adding fields is simple

ALTER TABLE testtable ADD gender varchar;


cqlsh:testkeyspace> ALTER TABLE testtable ADD gender varchar;
cqlsh:testkeyspace> select * from testtable;

 id | age | avg | gender | name
----+-----+-----+--------+-------
  1 |  23 |  10 |   null | Arunn

(1 rows)


Updates are very simple too

cqlsh:testkeyspace> update testtable set gender = 'MALE' where id = '1';
cqlsh:testkeyspace> select * from testtable;

 id | age | avg | gender | name
----+-----+-----+--------+-------
  1 |  23 |  10 |   MALE | Arunn

(1 rows)


Now lets try a simple delete and we are done with basic operations


cqlsh:testkeyspace> delete from testtable where id = '1';
cqlsh:testkeyspace> select * from testtable;

(0 rows)



Now we know how to create
·         Keyspace
·         Column family / CF
·         Insert data
·         Alter CF
·         Update row
·         Delete row



Immediate interesting thing is to try batch operations , this saves network round-trips

BEGIN BATCH
insert into testtable (id,age,avg,gender,name) values('2',30,19, 'MALE','Abc');
insert into testtable (id,age,avg,gender,name) values('3',40,16, 'MALE','Def');
update testtable set name = 'Ghi' where id = '3';
APPLY BATCH;



Counters are a very important column type in Cassandra

Counters
The counter type is used to define counter columns. A counter column is a column whose value is a 64-bit signed integer and on which 2 operations are supported: incrementation and decrementation Note the value of a counter cannot be set. A counter doesn’t exist until first incremented/decremented, and the first incrementation/decrementation is made as if the previous value was 0. Deletion of counter columns is supported but have some limitations (see the Cassandra Wiki for more information).
The use of the counter type is limited in the following way:
  • It cannot be used for column that is part of the PRIMARY KEY of a table.
  • A table that contains a counter can only contain counters. In other words, either all the columns of a table outside the PRIMARY KEY have the counter type, or none of them have it.


Map column types are interesting to store a collection of data . Cassandra also supports set and list types

cqlsh:testkeyspace> create table testmaptable (id text PRIMARY KEY , clus_columns map<int,text>);
cqlsh:testkeyspace> select * from testmaptable;

(0 rows)

cqlsh:testkeyspace> insert into testmaptable (id,clus_columns) values ('1',{1:'number one'});
cqlsh:testkeyspace> select * from testmaptable;

 id | clus_columns
----+-------------------
  1 | {1: 'number one'}

(1 rows)

cqlsh:testkeyspace> update testmaptable set clus_columns = {2:'number two'} where id = '1';
cqlsh:testkeyspace> select * from testmaptable;

 id | clus_columns
----+-------------------
  1 | {2: 'number two'}

(1 rows)


cqlsh:testkeyspace> update testmaptable set clus_columns = clus_columns + {1:'number one'} where id = '1';
cqlsh:testkeyspace> select * from testmaptable;

 id | clus_columns
----+------------------------------------
  1 | {1: 'number one', 2: 'number two'}

(1 rows)

Lets take a look at how it looks in the cassandra cli
<Cassandra_home> bin/Cassandra-cli –host localhost
Use testkeyspace;
[default@testkeyspace] list testmaptable;
Using default limit of 100
Using default cell limit of 100
-------------------
RowKey: 1
=> (name=, value=, timestamp=1402035929908000)
=> (name=clus_columns:00000001, value=6e756d626572206f6e65, timestamp=1402036016296000)
=> (name=clus_columns:00000002, value=6e756d6265722074776f, timestamp=1402035971589000)

1 Row Returned.
Elapsed time: 429 msec(s).


Monday, October 1, 2012

Installing Cassandra


Introduction


This document aims to provide a few easy to follow steps to take the first-time user from installation, to running single node Cassandra, and overview to configure multinode cluster. Cassandra is meant to run on a cluster of nodes, but will run equally well on a single machine. This is a handy way of getting familiar with the software while avoiding the complexities of a larger system.

Step 0: Prerequisites and connection to the community


Cassandra requires the most stable version of Java 1.6 you can deploy. For Sun's jvm, this means at least u19; u21 is better. Cassandra also runs on the IBM jvm, and should run on jrockit as well.
The best way to ensure you always have up to date information on the project, releases, stability, bugs, and features is to subscribe to the users mailing list (subscription required) and participate in the #cassandra channel on IRC.

Step 1: Download Cassandra Kit


  • Download links for the latest stable release can always be found on the website.
  • Users of Debian or Debian-based derivatives can install the latest stable release in package form, see DebianPackaging for details.
  • Users of RPM-based distributions can get packages from Datastax.
  • If you are interested in building Cassandra from source, please refer to How to Build page.
For more details about misc builds, please refer to Cassandra versions and builds page.
  • If you plan to run "snapshot" command on Cassandra, it will be better to install jna.jar also. Please refer to Backup Data section.

Step 2: Edit configuration files


Cassandra configuration files can be found in conf directory under the top directory of binary and source distributions. If you have installed cassandra from RPM packages, configuration files will be placed into /etc/cassandra/conf.

Step 2.1: Edit cassandra.yaml


The distribution's sample configuration conf/cassandra.yaml contains reasonable defaults for single node operation, but you will need to make sure that the paths exist fordata_file_directoriescommitlog_directory, and saved_caches_directory.
Verify storage_port and rpc_port are not conflict with other service on your computer. By default, Cassandra uses 7000 for storage_port, and 9160 for rpc_port. The storage_port must be identical between Cassandra nodes in a cluster. Cassandra client applications will use rpc_port to connect to Cassandra.
It will be a good idea to change cluster_name to avoid unnecessary conflict with existing clusters.
initial_token. You can leave it blank, but I recommend you to set it to 0 if you are configuring your first node.

Step 2.2: Edit log4j-server.properties


conf/log4j.properties contains a path for the log file. Edit the line if you need.
# Edit the next line to point to your logs directory
log4j.appender.R.File=/var/log/cassandra/system.log

Step 2.3: Edit cassandra-env.sh


Cassandra has JMX (Java Management Extensions) interface, and the JMX_PORT is defined in conf/cassandra-env.shEdit following line if you need.
# Specifies the default port over which Cassandra will be available for
# JMX connections.
JMX_PORT="7199"

By default, Cassandra will allocate memory based on physical memory your system has. For example it will allocate 1GB heap on 2GB system, and 2GB heap on 8GB system. If you want to specify Cassandra heap size, remove leading pound sign(#) on the following lines and specify memory size for them.
#MAX_HEAP_SIZE="4G"
#HEAP_NEWSIZE="800M"

If you are not familiar with Java GC, 1/4 of MAX_HEAP_SIZE may be a good start point for HEAP_NEWSIZE.
Cassandra will need more than few GB heap for production use, but you can run it with smaller footprint for test drive. If you want to assign 128MB as max, edit the lines as following.
MAX_HEAP_SIZE="128M"
HEAP_NEWSIZE="32M"

If you face OutOfMemory exceptions or massive GCs with this configuration, increase these values. Don't start your production service with such tiny heap configuration!
  • Note for Mac Uses:
    Some people running OS X have trouble getting Java 6 to work. If you've kept up with Apple's updates, Java 6 should already be installed (it comes in Mac OS X 10.5 Update 1). Unfortunately, Apple does not default to using it. What you have to do is change your JAVA_HOME environment setting to/System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home and add /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin to the beginning of yourPATH.

Step 3: Start up Cassandra


And now for the moment of truth, start up Cassandra by invoking bin/cassandra -f from the command line1. The service should start in the foreground and log gratuitously to standard-out. Assuming you don't see messages with scary words like "error", or "fatal", or anything that looks like a Java stack trace, then chances are you've succeeded.
Press "Control-C" to stop Cassandra.
If you start up Cassandra without "-f" option, it will run in background, so you need to kill the process to stop.

Step 4: Using cassandra-cli


bin/cassandra-cli is a interactive command line interface for Cassandra. You can define schema, store and fetch data with the tool. Run following command to connect to your Cassandra instance.
bin/cassandra-cli -h host -p rpc_port

example:
% bin/cassandra-cli -h 127.0.0.1 -p 9160

Then you will see following cassandra-cli prompt.
Connected to: "Test Cluster" on 127.0.0.1/9160
Welcome to Cassandra CLI version 1.0.7

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown] 

Tuesday, September 18, 2012

Apache Cassandra - Load Balancing the cluster


Load balancing


When adding new nodes to the cluster, the data does not automatically get shared across new nodes equally and share load proportionately. This will make the cluster completely unbalanced.
 In order to make the data get shared equally we need to shift the token range some by using the nodetool move command.
Token ranges must be calculated in a way that will make sharing of data almost equal in each of the node.
Here's a python program which can be used to calculate new tokens for the nodes. 
  • def tokens(nodes):
    • for x in xrange(nodes):
      • print 2 ** 127 / nodes * x
Run this py program for each node and try the nodetool ring command . This will tell you the load on each of the node connected to the cluster. Run the program on each node again if needed until you see the nodetool ring display the load to be shared equally between each node.

In versions of Cassandra 0.7.* and lower, there's also nodetool loadbalance: essentially a convenience over decommission + bootstrap, only instead of telling the target node where to move on the ring it will choose its location based on the same heuristic as Token selection on bootstrap. You should not use this as it doesn't rebalance the entire ring.
The status of move and balancing operations can be monitored using nodetool with the netstat argument. (Cassandra 0.6.* and lower use the streams argument).

Apache Cassandra - Moving a node to a different token range


Moving nodes


nodetool move: move the target node to a given Token. Moving is both a convenience over and more efficient than decommission + bootstrap. After moving a node, nodetool cleanup should be run to remove any unnecessary data.
As with bootstrap, see Streaming for how to monitor progress.


Streaming :

Transfer


The following steps occur for Stream Transfers.
  1. Source has a list of ranges it must transfer to another node.
  2. Source copies the data in those ranges to sstable files in preparation for streaming. This is called anti-compaction (because compaction merges multiple sstable files into one, and this does the opposite).
  3. Source builds a list of PendingFile's which contains information on each sstable to be transfered.
  4. Source starts streaming the first file from the list, followed by the log "Waiting for transfer to $some_node to complete". The header for the stream contains information on the streamed file for the Destination to interpret what to do with the incoming stream.
  5. Destination receives the file writes it to disk and sends a FileStatus.
  6. On successful transfer the Source streams the next file until its done, on error it re-streams the same file.

Request


  1. Destination compiles a list of ranges it needs from another node.
  2. Destination sends a StreamRequestMessage to the Source node with the list of ranges.
  3. Source prepares the SSTables for those ranges and creates the PendingFile's.
  4. Source starts streaming the first file in the list. The header for the first stream contains info of the current stream and a list of the remaining PendingFile's that fall in the requested ranges.
  5. Destination receives the stream and writes it to disk, followed by the log message "Streaming added org.apache.cassandra.io.sstable.SSTableReader(path='/var/lib/cassandra/data/Keyspace1/Standard1-e-1-Data.db')".
  6. Destination then takes the lead and requests the remaining files one at a time. If an error occurs it re-requests the same file, if not continues with the next file until done.
  7. Source streams each of the requested files. The files are already anti-compacted, so it just streams them to the Destination.

Apache Cassandra - How to remove nodes from a live cluster

If you have decided to remove a node from a live cluster you can follow one of the two options based on which one suits you better.

Nodetool is available in the cassandra home directory

{CASSANDRA_HOME}/bin/nodetool decommission
{CASSANDRA_HOME}/bin/nodetool removetoken

Removing nodes entirely


* Decommission :

You can take a node out of the cluster with nodetool decommission to a live node. This will assign the ranges the old node was responsible for to other nodes, and replicate the appropriate data there. If decommission is used, the data will stream from the decommissioned node. 
No data is removed automatically from the node being decommissioned, so if you want to put the node back into service at a different token on the ring, it should be removed manually.

* Remove Token :

You can execute nodetool removetoken on any available machine to remove a dead one. This will assign the token range the old node was applicable to other nodes. If removetoken is used, the data will stream from the remaining replicas.

Tuesday, August 7, 2012

Cassandra Hector - Fetch all rows

How to do a "select * from my_table" in cassandra using hector library ?

Did some googling around , and this seems to be working ...





public class Dumper {
    private final Cluster cluster;
    private final Keyspace keyspace;

    public Dumper() {
        this.cluster = HFactory.getOrCreateCluster("Name", "hostname");
        this.keyspace = HFactory.createKeyspace("Keyspace", cluster, new QuorumAllConsistencyLevelPolicy());
    }

    public void run() {
        int row_count = 100;

        RangeSlicesQuery<UUID, String, Long> rangeSlicesQuery = HFactory
            .createRangeSlicesQuery(keyspace, UUIDSerializer.get(), StringSerializer.get(), LongSerializer.get())
            .setColumnFamily("Column Family")
            .setRange(null, null, false, 10)
            .setRowCount(row_count);

        UUID last_key = null;

        while (true) {
            rangeSlicesQuery.setKeys(last_key, null);
            System.out.println(" > " + last_key);

            QueryResult<OrderedRows<UUID, String, Long>> result = rangeSlicesQuery.execute();
            OrderedRows<UUID, String, Long> rows = result.get();
            Iterator<Row<UUID, String, Long>> rowsIterator = rows.iterator();

            // we'll skip this first one, since it is the same as the last one from previous time we executed
            if (last_key != null && rowsIterator != null) rowsIterator.next();   

            while (rowsIterator.hasNext()) {
              Row<UUID, String, Long> row = rowsIterator.next();
              last_key = row.getKey();

              if (row.getColumnSlice().getColumns().isEmpty()) {
                continue;
              }


              System.out.println(row);
            }

            if (rows.getCount() < row_count)
                break;
        }
    }

    public static void main(String[] args) {
        new Dumper().run();
    }
}


This will page through the column family in pages of 100 rows. It will only fetch 10 columns for each row (you will want to page very long rows too).
This is for a column family with uuids for row keys, strings for column names and longs for values. Hopefully it should be obvious how to change this.

Cassandra Replication Factor - What is it ?


Replication


A Cassandra cluster always divides up the key space into ranges delimited by Tokens as described above, but additional replica placement is customizable via IReplicaPlacementStrategy in the configuration file. The standard strategies are
  • RackUnawareStrategy: replicas are always placed on the next (in increasing Token order) N-1 nodes along the ring
  • RackAwareStrategy: replica 2 is placed in the first node along the ring the belongs in another data center than the first; the remaining N-2 replicas, if any, are placed on the first nodes along the ring in the same rack as the first
Note that with RackAwareStrategy, succeeding nodes along the ring should alternate data centers to avoid hot spots. For instance, if you have nodes A, B, C, and D in increasing Token order, and instead of alternating you place A and B in DC1, and C and D in DC2, then nodes C and A will have disproportionately more data on them because they will be the replica destination for every Token range in the other data center.
  • The corollary to this is, if you want to start with a single DC and add another later, when you add the second DC you should add as many nodes as you have in the first rather than adding a node or two at a time gradually.
Replication factor is not really intended to be changed in a live cluster either, but increasing it is conceptually simple: update the replication_factor from the CLI (see below), then run repair against each node in your cluster so that all the new replicas that are supposed to have the data, actually do.
Until repair is finished, you have 3 options:
  • read at ConsistencyLevel.QUORUM or ALL (depending on your existing replication factor) to make sure that a replica that actually has the data is consulted
  • continue reading at lower CL, accepting that some requests will fail (usually only the first for a given query, if ReadRepair is enabled)
  • take downtime while repair runs
The same options apply to changing replication strategy.
Reducing replication factor is easily done and only requires running cleanup afterwards to remove extra replicas.
To update the replication factor on a live cluster, forget about cassandra.yaml. Rather you want to use cassandra-cli:
  • update keyspace Keyspace1 with strategy_options = {replication_factor:3};

Cassandra replication


About Replication in Cassandra

Replication is the process of storing copies of data on multiple nodes to ensure reliability and fault tolerance.
Cassandra stores copies, called replicas, of each row based on the row key. You set the number of replicas when you create a keyspace using the replica placement strategy. In addition to setting the number of replicas, this strategy sets the distribution of the replicas across the nodes in the cluster depending on the cluster’s topology.
The total number of replicas across the cluster is referred to as the replication factor. A replication factor of 1 means that there is only one copy of each row on one node. A replication factor of 2 means two copies of each row, where each copy is on a different node. All replicas are equally important; there is no primary ormaster replica. As a general rule, the replication factor should not exceed the number of nodes in the cluster. However, you can increase the replication factor and then add the desired number of nodes afterwards. When replication factor exceeds the number of nodes, writes are rejected, but reads are served as long as the desired consistency level can be met.
To determine the physical location of nodes and their proximity to each other, the replication strategy also relies on the cluster-configured snitch, which is described below.

Replication Strategy

The available strategies are:

SimpleStrategy

Use SimpleStrategy for simple single data center clusters. This strategy is the default replica placement strategy when creating a keyspace using the Cassandra CLI. See Creating a Keyspace. When using the Cassandra Query Language interface, you must explicitly specify a strategy. See CREATE KEYSPACE.
SimpleStrategy places the first replica on a node determined by the partitioner. Additional replicas are placed on the next nodes clockwise in the ring without considering rack or data center location.


NetworkTopologyStrategy

Use NetworkTopologyStrategy when you have (or plan to have) your cluster deployed across multiple data centers. This strategy specify how many replicas you want in each data center.
When deciding how many replicas to configure in each data center, the two primary considerations are (1) being able to satisfy reads locally, without incurring cross-datacenter latency, and (2) failure scenarios. The two most common ways to configure multiple data center clusters are:
  • Two replicas in each data center. This configuration tolerates the failure of a single node per replication group and still allows local reads at a consistency level of ONE.
  • Three replicas in each data center. This configuration tolerates the failure of a one node per replication group at a strong consistency level of LOCAL_QUORUM or tolerates multiple node failures per data center using consistency level ONE.
Asymmetrical replication groupings are also possible. For example, you can have three replicas per data center to serve real-time application requests and use a single replica for running analytics.
The NetworkTopologyStrategy determines replica placement independently within each data center as follows:
  • The first replica is placed according to the partitioner (same as with SimpleStrategy).
  • Additional replicas are placed by walking the ring clockwise until a node in a different rack is found. If no such node exists, additional replicas are placed in different nodes in the same rack.
NetworkTopologyStrategy attempts to place replicas on distinct racks because nodes in the same rack (or similar physical grouping) can fail at the same time due to power, cooling, or network issues.

Monday, July 11, 2011

Whats new in cassandra 0.8

What’s New in Cassandra 0.8, Part 2: Counters


One of the features making its debut in Cassandra 0.8.0 is distributed counters. They allow you to … count things. (Or sum things; the counter increment need not be 1, or even positive). But a lot of stuff, very quickly, which makes them invaluable for real-time analytical tasks.

Why Counters?

Prior to 0.8, Cassandra had no simple and efficient way to count. By
“counting,” we mean here to provide an atomic increment operation in a single column value, as opposed to counting the number of columns in a row, or rows in a column family, both of which were already supported.
If you had to count or sum things, available solutions previously included:
  • inserting a different column for each increment with a batch process to merge those
  • use an external synchronization like Zookeeper (preferably through the
    use of the Cages library for simplicity)
  • use another database such as redis to handle those counts
Those solutions all had one or more of the following problems:
  • unfriendly to develop against
  • poor performance
  • not scalable (in particular, none scales to multiple datacenter usage)
  • requires additional software
The new counters feature solves this lack of simple and efficient counting
facility without any of the above problems.

Using Counters

A counter is a specific kind of column whose user-visible value is a 64-bit signed
integer, though this is more complex internally. When a new value is written
to a given counter column, this new value is added to whatever was the
previous value of the counter.
To create a column family holding counters, you simply indicate to Cassandra
that the default_validation_class on that column family is
CounterColumnType. For instance, using the CLI, you can create such
a column family using:

[default@unknown] create keyspace test;
54900c80-9378-11e0-0000-242d50cf1f9d
Waiting for schema agreement...
... schemas agree across the cluster
[default@unknown] use test;
Authenticated to keyspace: test

[default@test] create column family counters with default_validation_class=CounterColumnType and key_validation_class=UTF8Type and comparator=UTF8Type;
6c7db090-9378-11e0-0000-242d50cf1f9d
Waiting for schema agreement...
... schemas agree across the cluster

Super column families holding counters are also supported the usual way,
by specifying column_type=Super.
Using counters is then straightforward:

[default@test] incr counters[row][c1];
Value incremented.
[default@test] incr counters[row][c2] by 3;
Value incremented.
[default@test] get counters[row];
=> (counter=c1, value=1)
=> (counter=c2, value=3)

Returned 2 results.
[default@test] decr counters[row][c2] by 4;
Value decremented.
[default@test] incr counters[row][c1] by -2;
Value incremented.
[default@test] get counters[row];
=> (counter=c1, value=-1)
=> (counter=c2, value=-1)
Returned 2 results.
[default@test] del counters[row][c1];
column removed.
[default@test] get counters[row];
=> (counter=c2, value=-1)
Returned 1 results.

Note that the CLI provides a decr (decrement) operation, but this
is simply syntactic sugar for incrementing by a negative number. The
usual consistency level trade-offs apply to counter operations.

Using CQL

Let us start by noting that the support for counters in CQL is not part of
0.8.0 (the official release at the time of this writing) but has been added
for the 0.8.1 release.
Considering the counters column family created above:

cqlsh> UPDATE counters SET c1 = c1 + 3, c2 = c2 - 4 WHERE key = row2;
cqlsh> select * from counters where key=row2;
     KEY | c1 | c2 |
    row2 |  3 | -4 |

Operational Considerations

Performance

Counters have been designed to allow for very fast writes. However, increment
does involve a read on one of the replica as part of replication. As a consequence,
counter increments are expected to be slightly slower than regular writes. Note
however that:
  • For each write, only one of the replica has to perform a read, even with many replicas.
  • At ConsistencyLevel.ONE, this read is not part of the latency the client will
    observe, but is still part of the write itself. It follows that the
    latency of increments at CL.ONE is very good, but care should be taken to
    not overload the cluster by writing faster than it can handle.
    (In JMX, you can monitor the pending tasks on the REPLICATE_ON_WRITE stage.)
Counter reads use the same code path than regular reads and thus offer comparable performance.

Dealing with data loss

With regular column families, if an SSTable on disk is lost or corrupted (because
of disk failure, for instance), a standard way to deal with it is to remove
the problematic file and run repair to have the missing informations pulled from
the other replicas.
This is unfortunately not as simple with counters. Currently, the only
safe way to handle the loss of an sstable for a counter column family
is to remove all data for that column family, restart the node with
-Dcassandra.renew_counter_id=true (or remove the NodeIdInfo
system sstables on versions earlier than 0.8.2) and run repair once
the node is up.
(The reason you must remove all the counter sstables, even undamaged
ones, is that each node maintains a sub-count of the counter to which
it adds new increments and for which other nodes trust it to have the
most up-to-date value. Wiping the data on A ensures the replicas have
recognized that A is missing its sub-count and will re-replicate to it
on repair.)

Other considerations

Internally, counters use server side timestamps order to deal with
deletions. This does mean that you will need to keep the Cassandra servers in
sync. Of course, using ntpd on an server deployment is good practice anyway, so this should not be an
important constraint.

Current limitations, known problems and the future

Besides the operational considerations above, Counters have a number of
limitations in their current form that you should be aware of:
  • If a write times out in Cassandra,
    the client cannot know if the write was persisted or not. This is not a
    problem for regular columns, where the recommended way to cope with such
    exception is to replay the write, since writes are idempotent. For counters however, replaying the write
    in those situations may result in an over-count. On the other hand, not
    replaying it may mean the write never gets recorded.
    CASSANDRA-2783 is open to add an optional replay ID to counter writes.
  • Support for counter removal is exposed by the API, but is limited. If
    you perform in a short sequence a counter increment, followed by a delete and then by
    another increment, there is no guarantee that the end value will only be
    the value of the second increment (the deletion could be fully ignored). The only safe use of deletion is for permanent removal,
    where no new increment follows the deletion.
  • There is no support for time to live (TTL) on counter columns as there is
    for regular columns (see CASSANDRA-1952
    for more information on why).
  • There is no support for secondary indexes on counter columns.
  • At the time of this writing, you cannot have a counter column inside a column
    family of regular columns (and vice versa). The only way to use
    counters is to create a column family with
    default_validation_class=CounterColumnType, in which case all
    columns are counters
    (CASSANDRA-2614
    is open to lift this limitation).