Apache Cassandra 是一个基于列的,分布式 NoSQL 数据库。 一直到最近为止,通过 Mule 与 Cassandra 数据库交互的唯一方式,是在组件中重用已有的Java客户端, 比如 Hector 或 Astyanax。现在 Mule 的 Cassandra DB Module 提供了消息处理器来插入,修改,查询和删除Cassandra中的数据。
为了突出 Cassandra 模块的一些特性,我准备展示一下如何实现一套简单的账户管理API. 这套 API 将允许客户端对账户执行 CRUD 操作, 其行为就像是一个LDAP 目录。
Cassandra Module 使用 Java 的 map机制来定义如何在Cassandra 的键空间中插入和获取数据。本例中,我们将使用Mule的JSON转换器来处理通过HTTP获取或发送的数据。让我们看一下账户数据是什么样:
{ "Accounts":{ "engineering":{ "joe@acmesoft.com":{ "Name":"Joe Developer", "Password":"286755fad04869ca523320acce0dc6a4", "passwordAge": 731400 }, "jane@acmesoft.com":{ "Name":"Jane Developer", "Password":"10b222970537b97919db36ec757370d2", "passwordAge": 10082400 }, "john@acmesoft.com":{ "Name":"Jane Developer", "Password":"10b222970537b97919db36ec757370d2", "passwordAge": 1080000 } }, "operations":{ "bill@acmesoft.com":{ "Name":"Bill SysAdmin", "Password":"f1f16683f3e0208131b46d37a79c8921", "passwordAge": 4343100 }, "jill@acmesoft.com":{ "Name":"Jill NetworkAdmin", "Password":"32a3571fa12b39266a58d42234836839", "passwordAge": 41923143 } } } }
当我们把这段JSON格式的数据持久化到Cassandra时,列名会叫 “Accounts”,每一个编制单元将是一个行的键 (即, “Engineering” 和“Operations”) ,而账户信息如用户名、密码以及最后一次密码修改距今的时间等将被容纳在一个超级列中。
让我们配置Mule流程来通过HTTP持久化这些数据。
<flow name="AccountCreate" doc:name="AccountsCreate"> <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" path="account/create" mimeType="application/json" /> <json:json-to-object-transformer returnClass="java.util.Map"/> <cassandradb:insert config-ref="CassandraDB" /> <json:object-to-json-transformer /> </flow>
这个流程将把我们刚才看到的通过HTTP获取到的JSON格式的账户信息转换成Map,使用Cassandra connector’s 的“insert” 消息处理器来持久化数据,然后把有效载荷转换回JSON格式并返回给客户端。
Cassandra的一个优势,同时也是挑战,是所有的数据都保存为byte数组。这使得数据的存储非常灵活,不过数据的类型信息也因此丢失了。Cassandra通过使用 Hector 的序列化让你在从列拉取数据的时候指定如何对数据进行转换。
我们通过两个API的查询来看一下这是如何工作的。第一个查询让我们可以通过email地址查询用户,返回的结果是row key的map。
<flow name="AccountGet" doc:name="AccountGet"> <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" path="account/get" doc:name="HTTP"/> <cassandradb:get config-ref="CassandraDB" columnPath= "Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]" rowKey= "#[message.inboundProperties['http.relative.path'].split('/')[0]]"/> <json:object-to-json-transformer doc:name="Object to JSON"/> </flow>
上面的流程接受我们从HTTP获得的JSON数据,转换成Map,然后使用Cassandra的 connector 的“insert” 消息处理器来持久化数据,最后将有效数据转回JSON格式并返回给客户端。
Cassandra的一个优势,同时也是挑战,是所有的数据都保存为byte数组。这使得数据的存储非常灵活,不过数据的类型信息也因此丢失了。Cassandra通过使用 Hector 的序列化让你在从列拉取数据的时候指定如何对数据进行转换。
我们通过两个API的查询来看一下这是如何工作的。第一个查询让我们可以通过email地址查询用户,返回的结果是row key的map。
<flow name="AccountGet" doc:name="AccountGet"> <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" path="account/get" doc:name="HTTP"/> <cassandradb:get config-ref="CassandraDB" columnPath= "Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]" rowKey= "#[message.inboundProperties['http.relative.path'].split('/')[0]]"/> <json:object-to-json-transformer doc:name="Object to JSON"/> </flow>
【注:这一段与上一段有很多重复】
我们用 Mule的表达式语言 来解析URI。这样我们可以得到columnPath和rowKey。在这个例子,columnPath为“operators”,rowKey为“bill@acmesoft.com”。我们可以通过下面的URL查询Bill的account:
http://localhost:8081/account/get/operations/bill@acmesoft.com
{ "bill@acmesoft.com":{ "passwordAge":"\u0000BE<", "Name":"Bill SysAdmin", "Password":"f1f16683f3e0208131b46d37a79c8921" } }
passwordAge是一个字符串,而不是整数。这是因为如果不指定显示的列序列化,Cassandra默认用的是字符串的序列化。我们来解决这个问题:
<flow name="AccountGet" doc:name="AccountGet"> <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" path="account/get" /> <cassandradb:get config-ref="CassandraDB" columnPath="Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]" rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" doc:name="Cassandradb"> <cassandradb:column-serializers> <cassandradb:column-serializer key="passwordAge" type="java.lang.Integer"/> </cassandradb:column-serializers> </cassandradb:get> <json:object-to-json-transformer/> </flow>
现在我们刷新一下URL,结果应该像下面这样:
{ "bill@acmesoft.com":{ "passwordAge":4343100, "Name":"Bill SysAdmin", "Password":"f1f16683f3e0208131b46d37a79c8921" } }
列系列化仅支持那些Hector支持的 数据类型 。
Cassandra模块还支持通过列分片来查询。下面的flow返回给定的 organizational unit 的所有account (row key):
<flow name="AccountList" doc:name="AccountsList"> <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" path="account/list"/> <cassandradb:get-slice config-ref="CassandraDB" rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" columnParent="Accounts" count="100"> <cassandradb:column-serializer key="passwordAge" type="java.lang.Integer"/> </cassandradb:get-slice> <json:object-to-json-transformer/> </flow>
这将最多返回100个结果。例如,http://localhost:8081/account/list/operations 的返回是下面这样的:
[ { "bill@acmesoft.com":{ "passwordAge":4343100, "Name":"Bill SysAdmin", "Password":"f1f16683f3e0208131b46d37a79c8921" } }, { "jill@acmesoft.com":{ "passwordAge":41923143, "Name":"Jill NetworkAdmin", "Password":"32a3571fa12b39266a58d42234836839" } } ]
删除列同样很简单。下面的flow演示了如何从一个row中删除一列:
<flow name="AccountDelete" doc:name="AccountGet"> <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" path="account/delete"/> <cassandradb:remove config-ref="CassandraDB" columnPath="Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]" rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" doc:name="Cassandradb"/> <json:object-to-json-transformer /> </flow>
下面的URL可以删除Bill的account:
http://localhost:8081/account/delete/operations/bill@acmesoft.com
Cassandra在NoSQL领域是一个强大的竞争者。它尤其适合于需要跨数据中心的大数据集。我们这里提到的一些希望能加入到Cassandra模块的特性,在Cassandra backed Mule的 object store 中可以找到支持,同样也支持 CQL 作为可选的查询机制。
评论删除后,数据将无法恢复
评论(0)