谈谈 Cassandra 的客户端 API

小编辑 发布于 2010/03/24 15:31
阅读 6K+
收藏 1

最近试用了一段时间Cassandra,将Oracle中的数据导入进来,遇到了问题然后解决问题,收获挺大。在这个过程中,除了设计一个合理的数 据模型,再就是使用Cassandra API进行交互了。

Cassandra在设计的时候,就是支持Thrift的,这意味着我们可以使用多种语言开发。

对于Cassandra的开发本身而言,这是使用Thrift的好处:支持多语言。坏处也是显而易见的:Thrift API功能过于简单,不具备在生产环境使用的条件。

在Cassandra Wiki页面上,也有基于Thrift API开发的更加高级的API,各个语言都有,具体信息可以参考:http://wiki.apache.org/cassandra/ClientExamples

这次只谈谈下面两类Java的客户端:

1 Thrift Java API

2 hector

Thrift Java API

这个是Cassandra自带的最简单的一类API,这个文件在apache-cassandra-0.5.1.jar中包含了。可以直接使用。我 们也可以自己安装一个Thrift,然后通过cassandra.thrift文件自动生成。

如果你要使用Cassandra,那么我们必须要了解Thrift API,毕竟所有的其他更加高级的API都是基于这个来包装的。

提供的功能

插入数据

插入数据需要指定keyspace,ColumnFamily, Column,Key,Value,timestamp和数据同步级别。(如何需要了Cassandra的解数据模型,可以参考《大话Cassandra数据模型 》)

/**
< CRLF>
 * Insert a Column consisting of (column_path.column, value, timestamp) at the given column_path.column_family and optional
< CRLF>
 * column_path.super_column. Note that column_path.column is here required, since a SuperColumn cannot directly contain binary
< CRLF>
 * values -- it can only contain sub-Columns.
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 * @param key
< CRLF>
 * @param column_path
< CRLF>
 * @param value
< CRLF>
 * @param timestamp
< CRLF>
 * @param consistency_level
< CRLF>
 */
< CRLF>
public
void
insert(String keyspace, String key, ColumnPath column_path, byte
[] value, long
timestamp, int
consistency_level) throws
InvalidRequestException, UnavailableException, TimedOutException, TException;
< CRLF>
 
< CRLF>
/**
< CRLF>
 * Insert Columns or SuperColumns across different Column Families for the same row key. batch_mutation is a
< CRLF>
 * map<string, list<ColumnOrSuperColumn>> -- a map which pairs column family names with the relevant ColumnOrSuperColumn
< CRLF>
 * objects to insert.
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 * @param key
< CRLF>
 * @param cfmap
< CRLF>
 * @param consistency_level
< CRLF>
 */
< CRLF>
public
void
batch_insert(String keyspace, String key, Map<String,List<ColumnOrSuperColumn>> cfmap, int
consistency_level) throws
InvalidRequestException, UnavailableException, TimedOutException, TException;
< CRLF>

读取数据

获取一个查询条件精确的值。

/**
< CRLF>
 * Get the Column or SuperColumn at the given column_path. If no value is present, NotFoundException is thrown. (This is
< CRLF>
 * the only method that can throw an exception under non-failure conditions.)
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 * @param key
< CRLF>
 * @param column_path
< CRLF>
 * @param consistency_level
< CRLF>
 */
< CRLF>
public
ColumnOrSuperColumn get(String keyspace, String key, ColumnPath column_path, int
consistency_level) throws
InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException;
< CRLF>
 
< CRLF>
/**
< CRLF>
 * Perform a get for column_path in parallel on the given list<string> keys. The return value maps keys to the
< CRLF>
 * ColumnOrSuperColumn found. If no value corresponding to a key is present, the key will still be in the map, but both
< CRLF>
 * the column and super_column references of the ColumnOrSuperColumn object it maps to will be null.
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 * @param keys
< CRLF>
 * @param column_path
< CRLF>
 * @param consistency_level
< CRLF>
 */
< CRLF>
public
Map<String,ColumnOrSuperColumn> multiget(String keyspace, List<String> keys, ColumnPath column_path, int
consistency_level) throws
InvalidRequestException, UnavailableException, TimedOutException, TException;
< CRLF>

获取某一个keyspace,Key,ColumnFamily,SuperColumn(如果有的话需要指定)下面的相关数据:只查询 Column的name符合条件的相关数据(SlicePredicate)。

/**
< CRLF>
 * Get the group of columns contained by column_parent (either a ColumnFamily name or a ColumnFamily/SuperColumn name
< CRLF>
 * pair) specified by the given SlicePredicate. If no matching values are found, an empty list is returned.
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 * @param key
< CRLF>
 * @param column_parent
< CRLF>
 * @param predicate
< CRLF>
 * @param consistency_level
< CRLF>
 */
< CRLF>
public
List<ColumnOrSuperColumn> get_slice(String keyspace, String key, ColumnParent column_parent, SlicePredicate predicate, int
consistency_level) throws
InvalidRequestException, UnavailableException, TimedOutException, TException;
< CRLF>
 
< CRLF>
/**
< CRLF>
 * Performs a get_slice for column_parent and predicate for the given keys in parallel.
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 * @param keys
< CRLF>
 * @param column_parent
< CRLF>
 * @param predicate
< CRLF>
 * @param consistency_level
< CRLF>
 */
< CRLF>
public
Map<String,List<ColumnOrSuperColumn>> multiget_slice(String keyspace, List<String> keys, ColumnParent column_parent, SlicePredicate predicate, int
consistency_level) throws
InvalidRequestException, UnavailableException, TimedOutException, TException;
< CRLF>

查询Key的取值范围(使用这个功能需要使用order-preserving partitioner)。

/**
< CRLF>
 * @deprecated; use get_range_slice instead
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 * @param column_family
< CRLF>
 * @param start
< CRLF>
 * @param finish
< CRLF>
 * @param count
< CRLF>
 * @param consistency_level
< CRLF>
 */
< CRLF>
public
List<String> get_key_range(String keyspace, String column_family, String start, String finish, int
count, int
consistency_level) throws
InvalidRequestException, UnavailableException, TimedOutException, TException;
< CRLF>
 
< CRLF>
/**
< CRLF>
 * returns a subset of columns for a range of keys.
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 * @param column_parent
< CRLF>
 * @param predicate
< CRLF>
 * @param start_key
< CRLF>
 * @param finish_key
< CRLF>
 * @param row_count
< CRLF>
 * @param consistency_level
< CRLF>
 */
< CRLF>
public
List<KeySlice> get_range_slice(String keyspace, ColumnParent column_parent, SlicePredicate predicate, String start_key, String finish_key, int
row_count, int
consistency_level) throws
InvalidRequestException, UnavailableException, TimedOutException, TException;
< CRLF>

查询系统的信息。

/**
< CRLF>
 * get property whose value is of type string.
< CRLF>
 * 
< CRLF>
 * @param property
< CRLF>
 */
< CRLF>
public
String get_string_property(String property) throws
TException;
< CRLF>
 
< CRLF>
/**
< CRLF>
 * get property whose value is list of strings.
< CRLF>
 * 
< CRLF>
 * @param property
< CRLF>
 */
< CRLF>
public
List<String> get_string_list_property(String property) throws
TException;
< CRLF>
 
< CRLF>
/**
< CRLF>
 * describe specified keyspace
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 */
< CRLF>
public
Map<String,Map<String,String>> describe_keyspace(String keyspace) throws
NotFoundException, TException;
< CRLF>

通过这些操作,我们可以了解到系统的信息。

其中一个比较有意思的查询信息是:token map,通过这个我们可以知道哪些Cassandra Service是可以提供服务的。

删除数据

/**
< CRLF>
 * Remove data from the row specified by key at the granularity specified by column_path, and the given timestamp. Note
< CRLF>
 * that all the values in column_path besides column_path.column_family are truly optional: you can remove the entire
< CRLF>
 * row by just specifying the ColumnFamily, or you can remove a SuperColumn or a single Column by specifying those levels too.
< CRLF>
 * 
< CRLF>
 * @param keyspace
< CRLF>
 * @param key
< CRLF>
 * @param column_path
< CRLF>
 * @param timestamp
< CRLF>
 * @param consistency_level
< CRLF>
 */
< CRLF>
public
void
remove(String keyspace, String key, ColumnPath column_path, long
timestamp, int
consistency_level) throws
InvalidRequestException, UnavailableException, TimedOutException, TException;
< CRLF>

这里需要注意的是,由于一致性的问题。这里的删除操作不会立即删除所有机器上的该数据,但是最终会一致。

程序范例

import
java.util.List;
< CRLF>
import
java.io.UnsupportedEncodingException;
< CRLF>
 
< CRLF>
import
org.apache.thrift.transport.TTransport;
< CRLF>
import
org.apache.thrift.transport.TSocket;
< CRLF>
import
org.apache.thrift.protocol.TProtocol;
< CRLF>
import
org.apache.thrift.protocol.TBinaryProtocol;
< CRLF>
import
org.apache.thrift.TException;
< CRLF>
import
org.apache.cassandra.service.*;
< CRLF>
 
< CRLF>
public
class
CClient
< CRLF>
{
< CRLF>
    public
static
void
main(String[] args)
< CRLF>
    throws
TException, InvalidRequestException, UnavailableException, UnsupportedEncodingException, NotFoundException
< CRLF>
    {
< CRLF>
        TTransport tr = new
TSocket("localhost"
, 9160);
< CRLF>
        TProtocol proto = new
TBinaryProtocol(tr);
< CRLF>
        Cassandra.Client client = new
Cassandra.Client(proto);
< CRLF>
        tr.open();
< CRLF>
 
< CRLF>
        String key_user_id = "逖靖寒的世界"
;
< CRLF>
 
< CRLF>
        // insert data
< CRLF>
        long
timestamp = System.currentTimeMillis();
< CRLF>
        client.insert("Keyspace1"
,
< CRLF>
                      key_user_id,
< CRLF>
                      new
ColumnPath("Standard1"
, null, "网址"
.getBytes("UTF-8"
)),
< CRLF>
                      "http://gpcuster.cnblogs.com"
.getBytes("UTF-8"
),
< CRLF>
                      timestamp,
< CRLF>
                      ConsistencyLevel.ONE);
< CRLF>
        client.insert("Keyspace1"
,
< CRLF>
                      key_user_id,
< CRLF>
                      new
ColumnPath("Standard1"
, null, "作者"
.getBytes("UTF-8"
)),
< CRLF>
                      "逖靖寒"
.getBytes("UTF-8"
),
< CRLF>
                      timestamp,
< CRLF>
                      ConsistencyLevel.ONE);
< CRLF>
 
< CRLF>
        // read single column
< CRLF>
        ColumnPath path = new
ColumnPath("Standard1"
, null, "name"
.getBytes("UTF-8"
));
< CRLF>
        System.out.println(client.get("Keyspace1"
, key_user_id, path, ConsistencyLevel.ONE));
< CRLF>
 
< CRLF>
        // read entire row
< CRLF>
        SlicePredicate predicate = new
SlicePredicate(null, new
SliceRange(new
byte
[0], new
byte
[0], false, 10));
< CRLF>
        ColumnParent parent = new
ColumnParent("Standard1"
, null);
< CRLF>
        List<ColumnOrSuperColumn> results = client.get_slice("Keyspace1"
, key_user_id, parent, predicate, ConsistencyLevel.ONE);
< CRLF>
        for
(ColumnOrSuperColumn result : results)
< CRLF>
        {
< CRLF>
            Column column = result.column;
< CRLF>
            System.out.println(new
String(column.name, "UTF-8"
) + " -> "
+ new
String(column.value, "UTF-8"
));
< CRLF>
        }
< CRLF>
 
< CRLF>
        tr.close();
< CRLF>
    }
< CRLF>
}
< CRLF>

优点与缺点

优点:简单高效

缺点:功能简单,无法提供连接池,错误处理等功能,不适合直接在生产环境使用。

Hector

Hector是基于Thrift Java API包装的一个Java客户端,提供一个更加高级的一个抽象。

程序范例

package
me.prettyprint.cassandra.service;
< CRLF>
 
< CRLF>
import
static
me.prettyprint.cassandra.utils.StringUtils.bytes;
< CRLF>
import
static
me.prettyprint.cassandra.utils.StringUtils.string;
< CRLF>
 
< CRLF>
import
org.apache.cassandra.service.Column;
< CRLF>
import
org.apache.cassandra.service.ColumnPath;
< CRLF>
 
< CRLF>
public
class
ExampleClient {
< CRLF>
 
< CRLF>
  public
static
void
main(String[] args) throws
IllegalStateException, PoolExhaustedException,
< CRLF>
      Exception {
< CRLF>
    CassandraClientPool pool = CassandraClientPoolFactory.INSTANCE.get();
< CRLF>
    CassandraClient client = pool.borrowClient("localhost"
, 9160);
< CRLF>
    // A load balanced version would look like this:
< CRLF>
    // CassandraClient client = pool.borrowClient(new String[] {"cas1:9160", "cas2:9160", "cas3:9160"});
< CRLF>
 
< CRLF>
    try
{
< CRLF>
      Keyspace keyspace = client.getKeyspace("Keyspace1"
);
< CRLF>
      ColumnPath columnPath = new
ColumnPath("Standard1"
, null, bytes("网址"
));
< CRLF>
 
< CRLF>
      // insert
< CRLF>
      keyspace.insert("逖靖寒的世界"
, columnPath, bytes("http://gpcuster.cnblogs.com"
));
< CRLF>
 
< CRLF>
      // read
< CRLF>
      Column col = keyspace.getColumn("逖靖寒的世界"
, columnPath);
< CRLF>
 
< CRLF>
      System.out.println("Read from cassandra: "
+ string(col.getValue()));
< CRLF>
 
< CRLF>
    } finally
{
< CRLF>
      // return client to pool. do it in a finally block to make sure it's executed
< CRLF>
      pool.releaseClient(client);
< CRLF>
    }
< CRLF>
  }
< CRLF>
}
< CRLF>

优点

1 提供连接池。

2 提供错误处理:当操作失败的时候,Hector会根据系统信息(token map)自动连接另一个Cassandra Service。

3 编程接口容易使用。

4 支持JMX。

缺点

1 不支持多线程的环境。

2 keyspace封装过多(数据校验和数据重新封装),如果进行大量的数据操作,这里的消耗需要考虑。

3 错误处理不够人性化:如果所有的Cassandra Service都非常繁忙,那么经过多次操作失败后,最终的结果失败。

总结

Hector已经是一个基本足够使用的Java客户端了,但是还是缺乏一些相关的功能,比如:

1 线程安全。

2 支持自动的多线程查询和插入,提高操作效率。

3 人性化的错误处理机制。

4 避免过多的封装。

加载中
0
c
cnkenly

请问一下,Cassandra能做查询吗,类似sql的?
如: select * from db.table where field like '%key%'

zhoujy
zhoujy
好像支持,cassandra有个selectstatement类,网上查过类似资料,但是一直不知道怎么用,还在研究中。。。
0
黄平俊
黄平俊

Cassandra不是关系数据库,是不能用类似SQL查询的,可以用key-value方式访问!

0
Liren
Liren

能否给个KeyRange查询的例子,我总是失败

0
liuhello
liuhello

thrift 是直接使用socket连接,连接时可以从用的 ,很简单的几行代码就可以支持连接池啦(或者换种说法叫做长连接)。

0
囧南风囧
囧南风囧

最近都流行这个哈,真是技术日日新

0
开源中国隔壁老王
开源中国隔壁老王

nosql  与sql 相比有什么优势?

0
cmzx3444
cmzx3444

怎么做分页呢

返回顶部
顶部