Tuesday, March 9, 2010

Java implementation of: WTF is a SuperColumn? An Intro to the Cassandra Data Model

After reading WTF is a SuperColumn? An Intro to the Cassandra Data Model I decided to implement, in Java, a client to insert and read data to the data model presented.

<Keyspace Name="BloggyAppy">
 <KeysCachedFraction>0.01</KeysCachedFraction>
 <ColumnFamily CompareWith="BytesType" Name="Authors"/>
 <ColumnFamily CompareWith="BytesType" Name="BlogEntries"/>
 <ColumnFamily CompareWith="TimeUUIDType" Name="TaggedPosts"/>
 <ColumnFamily CompareWith="TimeUUIDType" Name="Comments"
  CompareSubcolumnsWith="BytesType" ColumnType="Super"/>
</Keyspace>
Data model used

For simplicity I removed some columns of the column families and created the following Java domain classes.


package domain;

public class Author {

  public Author(String username, String email) {
    this.username = username;
    this.email = email;
  }

  public String getUsername() {
    return username;
  }

  public void setUsername(String username) {
    this.username = username;
  }

  public String getEmail() {
    return email;
  }

  public void setEmail(String email) {
    this.email = email;
  }

  public int getTotalPosts() {
    return totalPosts;
  }

  public void setTotalPosts(int totalPosts) {
    this.totalPosts = totalPosts;
  }

  private String username;
  private String email;
  private int totalPosts;

}





package domain;

import java.util.ArrayList;

public class BlogEntry {

  public BlogEntry() {
    this.tag.add(new Tag("__notag__"));
  }

  public String getSlug() {
    return slug;
  }

  public void setSlug(String slug) {
    this.slug = slug;
  }

  public Author getAuthor() {
    return author;
  }

  public void setAuthor(Author author) {
    this.author = author;
  }

  public String getBody() {
    return body;
  }

  public void setBody(String body) {
    this.body = body;
  }

  public ArrayList<Tag> getTag() {
    return (ArrayList<Tag>tag.clone();
  }

  public String getTitle() {
    return title;
  }

  public void setTitle(String title) {
    this.title = title;
  }

  public void addTag(Tag tag) {
    if (!this.tag.contains(tag)) {
      this.tag.add(tag);
    }
  }

  private String slug;
  private Author author;
  private String body;
  private ArrayList<Tag> tag = new ArrayList<Tag>();;
  private String title;

}




package domain;

public class Comment {

  public Comment(String commenter, String comment) {
    this.commenter = commenter;
    this.comment = comment;
  }

  public String getCommenter() {
    return commenter;
  }

  public void setCommenter(String commenter) {
    this.commenter = commenter;
  }

  public String getComment() {
    return comment;
  }

  public void setComment(String comment) {
    this.comment = comment;
  }

  private String commenter;
  private String comment;

}




package domain;

public class Tag {

  public Tag(String name) {
    this.name = name;
  }

  public boolean equals(Object obj) {
    return this.name.equals(((Tagobj).getName());
  }

  public int hashCode() {
    return this.name.hashCode();
  }

  public String getName() {
    return this.name;
  }

  private String name;

}



package client;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.service.Cassandra;
import org.apache.cassandra.service.Column;
import org.apache.cassandra.service.ColumnOrSuperColumn;
import org.apache.cassandra.service.ColumnParent;
import org.apache.cassandra.service.ColumnPath;
import org.apache.cassandra.service.ConsistencyLevel;
import org.apache.cassandra.service.SlicePredicate;
import org.apache.cassandra.service.SliceRange;
import org.apache.cassandra.service.SuperColumn;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.safehaus.uuid.UUID;
import org.safehaus.uuid.UUIDGenerator;

import domain.Author;
import domain.BlogEntry;
import domain.Comment;
import domain.Tag;

public class BlogClient {

  private static void readEntireRowAuthor(Cassandra.Client client,
      Author authorthrows Exception {

    SlicePredicate predicate = new SlicePredicate(null, new SliceRange(
        new byte[0]new byte[0], false, 10));

    ColumnParent parent = new ColumnParent("Authors"null);

    List<ColumnOrSuperColumn> results = client.get_slice("BloggyAppy",
        author.getUsername(), parent, predicate, ConsistencyLevel.ONE);

    for (ColumnOrSuperColumn result : results) {
      Column column = result.column;
      System.out.println(new String(column.name, "UTF-8"" -> "
          new String(column.value, "UTF-8"));
    }
  }

  private static void readEntireRowBlogEntry(Cassandra.Client client,
      String slugthrows Exception {

    SlicePredicate predicate = new SlicePredicate(null, new SliceRange(
        new byte[0]new byte[0], false, 10));

    ColumnParent parent = new ColumnParent("BlogEntries"null);

    List<ColumnOrSuperColumn> results = client.get_slice("BloggyAppy",
        slug, parent, predicate, ConsistencyLevel.ONE);

    for (ColumnOrSuperColumn result : results) {
      Column column = result.column;
      System.out.println(new String(column.name, "UTF-8"" -> "
          new String(column.value, "UTF-8"));
    }
  }

  private static void readEntireRowTaggedPost(Cassandra.Client client,
      String tagthrows Exception {

    SlicePredicate predicate = new SlicePredicate(null, new SliceRange(
        new byte[0]new byte[0], false, 10));

    ColumnParent parent = new ColumnParent("TaggedPosts"null);

    List<ColumnOrSuperColumn> results = client.get_slice("BloggyAppy", tag,
        parent, predicate, ConsistencyLevel.ONE);

    for (ColumnOrSuperColumn result : results) {
      Column column = result.column;
      UUID uuid = new UUID(column.name);
      System.out.println(uuid.toString() " -> "
          new String(column.value, "UTF-8"));
    }
  }

  private static void readEntireRowComment(Cassandra.Client client,
      BlogEntry blogEntrythrows Exception {

    SlicePredicate predicate = new SlicePredicate(null, new SliceRange(
        new byte[0]new byte[0], false, 10));

    ColumnParent parent = new ColumnParent("Comments"null);

    List<ColumnOrSuperColumn> results = client.get_slice("BloggyAppy",
        blogEntry.getSlug(), parent, predicate, ConsistencyLevel.ONE);

    for (ColumnOrSuperColumn result : results) {
      SuperColumn superColumn = result.super_column;
      UUID uuid = new UUID(superColumn.name);
      System.out.println(uuid.toString() " -> ");
      for (Column column : superColumn.columns) {
        System.out.println("\t" new String(column.name, "UTF-8")
            " -> " new String(column.value, "UTF-8"));
      }
    }
  }

  private static void readSingleColumnAuthor(Cassandra.Client client,
      Author authorthrows Exception {
    ColumnPath path = new ColumnPath("Authors", null, "email"
        .getBytes("UTF-8"));
    ColumnOrSuperColumn colOrSupCol = client.get("BloggyAppy", author
        .getUsername(), path, ConsistencyLevel.ONE);
    Column column = colOrSupCol.getColumn();

    System.out.println(colOrSupCol);
    System.out.println("name:" new String(column.getName()));
    System.out.println("value:" new String(column.getValue()));
  }

  private static void insertAuthor(Cassandra.Client client, Author author)
      throws Exception {
    long timestamp = System.currentTimeMillis();

    client.insert("BloggyAppy", author.getUsername()new ColumnPath(
        "Authors", null, "email".getBytes("UTF-8")), author.getEmail()
        .getBytes("UTF-8"), timestamp, ConsistencyLevel.ONE);

    client.insert("BloggyAppy", author.getUsername()new ColumnPath(
        "Authors", null, "totalPosts".getBytes("UTF-8")), Integer
        .toString(author.getTotalPosts()).getBytes("UTF-8"), timestamp,
        ConsistencyLevel.ONE);
  }

  private static void insertBlogEntry(Cassandra.Client client,
      BlogEntry blogEntrythrows Exception {
    long timestamp = System.currentTimeMillis();

    client.insert("BloggyAppy", blogEntry.getSlug()new ColumnPath(
        "BlogEntries", null, "author".getBytes("UTF-8")), blogEntry
        .getAuthor().getUsername().getBytes("UTF-8"), timestamp,
        ConsistencyLevel.ONE);
    client.insert("BloggyAppy", blogEntry.getSlug()new ColumnPath(
        "BlogEntries", null, "body".getBytes("UTF-8")), blogEntry
        .getBody().getBytes("UTF-8"), timestamp, ConsistencyLevel.ONE);
    client.insert("BloggyAppy", blogEntry.getSlug()new ColumnPath(
        "BlogEntries", null, "title".getBytes("UTF-8")), blogEntry
        .getTitle().getBytes("UTF-8"), timestamp, ConsistencyLevel.ONE);

    StringBuilder sb = new StringBuilder();
    for (Tag tag : blogEntry.getTag()) {
      sb.append(tag.getName());
      sb.append(" ");
    }
    client.insert("BloggyAppy", blogEntry.getSlug()new ColumnPath(
        "BlogEntries", null, "tags".getBytes("UTF-8")), sb.toString()
        .trim().getBytes("UTF-8"), timestamp, ConsistencyLevel.ONE);

  }

  private static void insertTaggedPost(Cassandra.Client client, String tag,
      BlogEntry blogEntrythrows Exception {
    long timestamp = System.currentTimeMillis();

    UUID uu = UUIDGenerator.getInstance().generateTimeBasedUUID();

    client.insert("BloggyAppy", tag, new ColumnPath("TaggedPosts", null, uu
        .toByteArray()), blogEntry.getSlug().getBytes("UTF-8"),
        timestamp, ConsistencyLevel.ONE);
  }

  private static void insertComment(Cassandra.Client client,
      BlogEntry blogEntry, Comment commentthrows Exception {
    long timestamp = System.currentTimeMillis();

    UUID uuid = UUIDGenerator.getInstance().generateTimeBasedUUID();

    Column colCommenter = new Column("commenter".getBytes("UTF-8"), comment
        .getCommenter().getBytes("UTF-8"), timestamp);
    Column colComment = new Column("comment".getBytes("UTF-8"), comment
        .getComment().getBytes("UTF-8"), timestamp);

    SuperColumn s = new SuperColumn();
    s.name = uuid.toByteArray();
    s.addToColumns(colCommenter);
    s.addToColumns(colComment);

    ColumnOrSuperColumn csc = new ColumnOrSuperColumn();
    csc.super_column = s;

    List<ColumnOrSuperColumn> columns = new ArrayList<ColumnOrSuperColumn>();
    columns.add(csc);

    Map<String, List<ColumnOrSuperColumn>> superCol = 
      new HashMap<String, List<ColumnOrSuperColumn>>();
    superCol.put("Comments", columns);

    client.batch_insert("BloggyAppy", blogEntry.getSlug(), superCol,
        ConsistencyLevel.ONE);
  }

  public static void main(String[] argsthrows Exception {

    TTransport trA = new TSocket("serverA"9160);
    TProtocol protoA = new TBinaryProtocol(trA);
    Cassandra.Client clientA = new Cassandra.Client(protoA);
    trA.open();

    TTransport trB = new TSocket("serverB"9160);
    TProtocol protoB = new TBinaryProtocol(trB);
    Cassandra.Client clientB = new Cassandra.Client(protoB);
    trB.open();

    // insert data
    Author author = new Author("pedro""pedro@yahoo.com");
    author.setTotalPosts(0);

    // insertAuthor(clientA, author);
    // readSingleColumnAuthor(client, author);
    readEntireRowAuthor(clientB, author);

    BlogEntry blogEntry = new BlogEntry();
    blogEntry.setAuthor(author);
    blogEntry.setBody("Este es el cuerpo del post de testeo.");
    blogEntry.setSlug("post-de-testeo");
    blogEntry.addTag(new Tag("test"));
    blogEntry.addTag(new Tag("post"));
    blogEntry.setTitle("Post de testeo");

    // insertBlogEntry(client, blogEntry);
    // readEntireRowBlogEntry(client, blogEntry.getSlug());

    // insertTaggedPost(client, "test", blogEntry);
    // insertTaggedPost(client, "__notag__", blogEntry);
    // readEntireRowTaggedPost(client, "test");
    // readEntireRowTaggedPost(client, "__notag__");

    // Comment comment = new Comment("Pedro Marini", "El post lo veo bien");
    // Comment comment = new Comment("Lolo Grep",
    // "Para cuando lo esta listo?");
    // insertComment(client, blogEntry, comment);
    // readEntireRowComment(clientA, blogEntry);

    trA.close();
    trB.close();
  }

}



11 comments:

  1. Thanks for the example!

    ReplyDelete
  2. Greate post! Thanks! But when I run it using Eclipse, I get this error:

    Exception in thread "main" org.apache.thrift.transport.TTransportException: java.net.UnknownHostException: serverA
    at org.apache.thrift.transport.TSocket.open(TSocket.java:185)
    at client.BlogClient.main(BlogClient.java:206)
    Caused by: java.net.UnknownHostException: serverA
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:177)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
    at java.net.Socket.connect(Socket.java:525)
    at java.net.Socket.connect(Socket.java:475)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:180)
    ... 1 more

    What am I doing wrong?
    I think I didn't install thrift for java. How can I do it?

    Thanks in advance!

    ReplyDelete
  3. Hi Jesus. You have to change 'serverA' to the name or IP of your Cassandra server.

    ReplyDelete
  4. Thank you very much rodrigoap!
    Now it's workin perfect!
    Greate articule!

    ReplyDelete
  5. I tried to use it against cassandra version 0.6.0-beta3. However, the package seems to have changed and the client code could not compile.

    Any idea on how to change the client to work with version 0.6.0 ?

    Thanks

    ReplyDelete
  6. I will write another entry for Cassandra 0.6.0

    ReplyDelete
  7. Could u give me the complete details of installing "thrift" in "RedHat Linux"

    ReplyDelete
  8. thanks for the post

    ReplyDelete
  9. At last! I found a good post like this.. Thanks for this informative post! By the way, can you write a post about facebook seo importance? Thanks again!

    ReplyDelete
  10. i am not getting batch_insert API in client ,
    i am using 1.0.7 version of cassandra

    ReplyDelete
  11. @Rodrigo, I have updated it to cassandra 1.1, inserted 1 author, 1 blogentry, some Tags, and 1 comment

    with

    Author author = new Author("pedro", "pedro@yahoo.com");
    author.setTotalPosts(0);

    clientA.set_keyspace("BloggyAppy");

    System.out.println("read author");
    //insertAuthor(clientA, author);
    //readSingleColumnAuthor(clientA, author);
    readEntireRowAuthor(clientA, author);

    BlogEntry blogEntry = new BlogEntry();
    blogEntry.setAuthor(author);
    blogEntry.setBody("Este es el cuerpo del post de testeo.");
    blogEntry.setSlug("post-de-testeo");
    blogEntry.addTag(new Tag("test"));
    blogEntry.addTag(new Tag("post"));
    blogEntry.setTitle("Post de testeo");

    System.out.println("read blogentry");
    // insertBlogEntry(clientA, blogEntry);
    readEntireRowBlogEntry(clientA, blogEntry.getSlug());

    System.out.println("read taggedposts");
    // insertTaggedPost(clientA, "test", blogEntry);
    //insertTaggedPost(clientA, "t", blogEntry);
    //insertTaggedPost(clientA, "__notag__", blogEntry);
    readEntireRowTaggedPost(clientA, "test");
    readEntireRowTaggedPost(clientA, "__notag__");


    System.out.println("read comments");
    Comment comment = new Comment("Pedro Marini", "El post lo veo bien");
    //insertComment(clientA, blogEntry, comment);
    readEntireRowComment(clientA, blogEntry);

    but the output is:

    read author
    email -> pedro@yahoo.com
    totalPosts ->
    read blogentry
    author -> pedro
    body -> Este es el cuerpo del post de testeo.
    tags -> __notag__ test post
    title -> Post de testeo
    read taggedposts
    read comments

    Tags and comments can't be read, although they have been inserted

    [default@BloggyAppy] List TaggedPosts;
    Using default limit of 100
    -------------------
    RowKey: 706f73742d64652d74657374656f
    => (column=c29f7a98-f311-e19d-54e1-7d0347ed0500, value=706f73742d64652d74657374656f, timestamp=133647073
    3544)
    => (column=c362cb98-f311-e19d-54e1-7d0347ed0500, value=706f73742d64652d74657374656f, timestamp=133647073
    3548)
    => (column=c3b0ec98-f311-e19d-54e1-7d0347ed0500, value=706f73742d64652d74657374656f, timestamp=133647073
    3550)

    1 Row Returned.
    Elapsed time: 8 msec(s).

    [default@BloggyAppy] List Comments;
    Using default limit of 100
    -------------------
    RowKey: 436f6d6d656e7473
    => (super_column=c4261d98-f311-e19d-54e1-7d0347ed0500,
    (column=comment, value=456c20706f7374206c6f2076656f206269656e, timestamp=1336470733553)
    (column=commenter, value=506564726f204d6172696e69, timestamp=1336470733553))

    1 Row Returned.
    Elapsed time: 13 msec(s).

    ReplyDelete