Tuesday, March 9, 2010

Java implementation of: WTF is a SuperColumn? An Intro to the Cassandra Data Model

After reading WTF is a SuperColumn? An Intro to the Cassandra Data Model I decided to implement, in Java, a client to insert and read data to the data model presented.

<Keyspace Name="BloggyAppy">
 <KeysCachedFraction>0.01</KeysCachedFraction>
 <ColumnFamily CompareWith="BytesType" Name="Authors"/>
 <ColumnFamily CompareWith="BytesType" Name="BlogEntries"/>
 <ColumnFamily CompareWith="TimeUUIDType" Name="TaggedPosts"/>
 <ColumnFamily CompareWith="TimeUUIDType" Name="Comments"
  CompareSubcolumnsWith="BytesType" ColumnType="Super"/>
</Keyspace>
Data model used

For simplicity I removed some columns of the column families and created the following Java domain classes.


package domain;

public class Author {

  public Author(String username, String email) {
    this.username = username;
    this.email = email;
  }

  public String getUsername() {
    return username;
  }

  public void setUsername(String username) {
    this.username = username;
  }

  public String getEmail() {
    return email;
  }

  public void setEmail(String email) {
    this.email = email;
  }

  public int getTotalPosts() {
    return totalPosts;
  }

  public void setTotalPosts(int totalPosts) {
    this.totalPosts = totalPosts;
  }

  private String username;
  private String email;
  private int totalPosts;

}





package domain;

import java.util.ArrayList;

public class BlogEntry {

  public BlogEntry() {
    this.tag.add(new Tag("__notag__"));
  }

  public String getSlug() {
    return slug;
  }

  public void setSlug(String slug) {
    this.slug = slug;
  }

  public Author getAuthor() {
    return author;
  }

  public void setAuthor(Author author) {
    this.author = author;
  }

  public String getBody() {
    return body;
  }

  public void setBody(String body) {
    this.body = body;
  }

  public ArrayList<Tag> getTag() {
    return (ArrayList<Tag>tag.clone();
  }

  public String getTitle() {
    return title;
  }

  public void setTitle(String title) {
    this.title = title;
  }

  public void addTag(Tag tag) {
    if (!this.tag.contains(tag)) {
      this.tag.add(tag);
    }
  }

  private String slug;
  private Author author;
  private String body;
  private ArrayList<Tag> tag = new ArrayList<Tag>();;
  private String title;

}




package domain;

public class Comment {

  public Comment(String commenter, String comment) {
    this.commenter = commenter;
    this.comment = comment;
  }

  public String getCommenter() {
    return commenter;
  }

  public void setCommenter(String commenter) {
    this.commenter = commenter;
  }

  public String getComment() {
    return comment;
  }

  public void setComment(String comment) {
    this.comment = comment;
  }

  private String commenter;
  private String comment;

}




package domain;

public class Tag {

  public Tag(String name) {
    this.name = name;
  }

  public boolean equals(Object obj) {
    return this.name.equals(((Tagobj).getName());
  }

  public int hashCode() {
    return this.name.hashCode();
  }

  public String getName() {
    return this.name;
  }

  private String name;

}



package client;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.service.Cassandra;
import org.apache.cassandra.service.Column;
import org.apache.cassandra.service.ColumnOrSuperColumn;
import org.apache.cassandra.service.ColumnParent;
import org.apache.cassandra.service.ColumnPath;
import org.apache.cassandra.service.ConsistencyLevel;
import org.apache.cassandra.service.SlicePredicate;
import org.apache.cassandra.service.SliceRange;
import org.apache.cassandra.service.SuperColumn;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.safehaus.uuid.UUID;
import org.safehaus.uuid.UUIDGenerator;

import domain.Author;
import domain.BlogEntry;
import domain.Comment;
import domain.Tag;

public class BlogClient {

  private static void readEntireRowAuthor(Cassandra.Client client,
      Author authorthrows Exception {

    SlicePredicate predicate = new SlicePredicate(null, new SliceRange(
        new byte[0]new byte[0], false, 10));

    ColumnParent parent = new ColumnParent("Authors"null);

    List<ColumnOrSuperColumn> results = client.get_slice("BloggyAppy",
        author.getUsername(), parent, predicate, ConsistencyLevel.ONE);

    for (ColumnOrSuperColumn result : results) {
      Column column = result.column;
      System.out.println(new String(column.name, "UTF-8"" -> "
          new String(column.value, "UTF-8"));
    }
  }

  private static void readEntireRowBlogEntry(Cassandra.Client client,
      String slugthrows Exception {

    SlicePredicate predicate = new SlicePredicate(null, new SliceRange(
        new byte[0]new byte[0], false, 10));

    ColumnParent parent = new ColumnParent("BlogEntries"null);

    List<ColumnOrSuperColumn> results = client.get_slice("BloggyAppy",
        slug, parent, predicate, ConsistencyLevel.ONE);

    for (ColumnOrSuperColumn result : results) {
      Column column = result.column;
      System.out.println(new String(column.name, "UTF-8"" -> "
          new String(column.value, "UTF-8"));
    }
  }

  private static void readEntireRowTaggedPost(Cassandra.Client client,
      String tagthrows Exception {

    SlicePredicate predicate = new SlicePredicate(null, new SliceRange(
        new byte[0]new byte[0], false, 10));

    ColumnParent parent = new ColumnParent("TaggedPosts"null);

    List<ColumnOrSuperColumn> results = client.get_slice("BloggyAppy", tag,
        parent, predicate, ConsistencyLevel.ONE);

    for (ColumnOrSuperColumn result : results) {
      Column column = result.column;
      UUID uuid = new UUID(column.name);
      System.out.println(uuid.toString() " -> "
          new String(column.value, "UTF-8"));
    }
  }

  private static void readEntireRowComment(Cassandra.Client client,
      BlogEntry blogEntrythrows Exception {

    SlicePredicate predicate = new SlicePredicate(null, new SliceRange(
        new byte[0]new byte[0], false, 10));

    ColumnParent parent = new ColumnParent("Comments"null);

    List<ColumnOrSuperColumn> results = client.get_slice("BloggyAppy",
        blogEntry.getSlug(), parent, predicate, ConsistencyLevel.ONE);

    for (ColumnOrSuperColumn result : results) {
      SuperColumn superColumn = result.super_column;
      UUID uuid = new UUID(superColumn.name);
      System.out.println(uuid.toString() " -> ");
      for (Column column : superColumn.columns) {
        System.out.println("\t" new String(column.name, "UTF-8")
            " -> " new String(column.value, "UTF-8"));
      }
    }
  }

  private static void readSingleColumnAuthor(Cassandra.Client client,
      Author authorthrows Exception {
    ColumnPath path = new ColumnPath("Authors", null, "email"
        .getBytes("UTF-8"));
    ColumnOrSuperColumn colOrSupCol = client.get("BloggyAppy", author
        .getUsername(), path, ConsistencyLevel.ONE);
    Column column = colOrSupCol.getColumn();

    System.out.println(colOrSupCol);
    System.out.println("name:" new String(column.getName()));
    System.out.println("value:" new String(column.getValue()));
  }

  private static void insertAuthor(Cassandra.Client client, Author author)
      throws Exception {
    long timestamp = System.currentTimeMillis();

    client.insert("BloggyAppy", author.getUsername()new ColumnPath(
        "Authors", null, "email".getBytes("UTF-8")), author.getEmail()
        .getBytes("UTF-8"), timestamp, ConsistencyLevel.ONE);

    client.insert("BloggyAppy", author.getUsername()new ColumnPath(
        "Authors", null, "totalPosts".getBytes("UTF-8")), Integer
        .toString(author.getTotalPosts()).getBytes("UTF-8"), timestamp,
        ConsistencyLevel.ONE);
  }

  private static void insertBlogEntry(Cassandra.Client client,
      BlogEntry blogEntrythrows Exception {
    long timestamp = System.currentTimeMillis();

    client.insert("BloggyAppy", blogEntry.getSlug()new ColumnPath(
        "BlogEntries", null, "author".getBytes("UTF-8")), blogEntry
        .getAuthor().getUsername().getBytes("UTF-8"), timestamp,
        ConsistencyLevel.ONE);
    client.insert("BloggyAppy", blogEntry.getSlug()new ColumnPath(
        "BlogEntries", null, "body".getBytes("UTF-8")), blogEntry
        .getBody().getBytes("UTF-8"), timestamp, ConsistencyLevel.ONE);
    client.insert("BloggyAppy", blogEntry.getSlug()new ColumnPath(
        "BlogEntries", null, "title".getBytes("UTF-8")), blogEntry
        .getTitle().getBytes("UTF-8"), timestamp, ConsistencyLevel.ONE);

    StringBuilder sb = new StringBuilder();
    for (Tag tag : blogEntry.getTag()) {
      sb.append(tag.getName());
      sb.append(" ");
    }
    client.insert("BloggyAppy", blogEntry.getSlug()new ColumnPath(
        "BlogEntries", null, "tags".getBytes("UTF-8")), sb.toString()
        .trim().getBytes("UTF-8"), timestamp, ConsistencyLevel.ONE);

  }

  private static void insertTaggedPost(Cassandra.Client client, String tag,
      BlogEntry blogEntrythrows Exception {
    long timestamp = System.currentTimeMillis();

    UUID uu = UUIDGenerator.getInstance().generateTimeBasedUUID();

    client.insert("BloggyAppy", tag, new ColumnPath("TaggedPosts", null, uu
        .toByteArray()), blogEntry.getSlug().getBytes("UTF-8"),
        timestamp, ConsistencyLevel.ONE);
  }

  private static void insertComment(Cassandra.Client client,
      BlogEntry blogEntry, Comment commentthrows Exception {
    long timestamp = System.currentTimeMillis();

    UUID uuid = UUIDGenerator.getInstance().generateTimeBasedUUID();

    Column colCommenter = new Column("commenter".getBytes("UTF-8"), comment
        .getCommenter().getBytes("UTF-8"), timestamp);
    Column colComment = new Column("comment".getBytes("UTF-8"), comment
        .getComment().getBytes("UTF-8"), timestamp);

    SuperColumn s = new SuperColumn();
    s.name = uuid.toByteArray();
    s.addToColumns(colCommenter);
    s.addToColumns(colComment);

    ColumnOrSuperColumn csc = new ColumnOrSuperColumn();
    csc.super_column = s;

    List<ColumnOrSuperColumn> columns = new ArrayList<ColumnOrSuperColumn>();
    columns.add(csc);

    Map<String, List<ColumnOrSuperColumn>> superCol = 
      new HashMap<String, List<ColumnOrSuperColumn>>();
    superCol.put("Comments", columns);

    client.batch_insert("BloggyAppy", blogEntry.getSlug(), superCol,
        ConsistencyLevel.ONE);
  }

  public static void main(String[] argsthrows Exception {

    TTransport trA = new TSocket("serverA"9160);
    TProtocol protoA = new TBinaryProtocol(trA);
    Cassandra.Client clientA = new Cassandra.Client(protoA);
    trA.open();

    TTransport trB = new TSocket("serverB"9160);
    TProtocol protoB = new TBinaryProtocol(trB);
    Cassandra.Client clientB = new Cassandra.Client(protoB);
    trB.open();

    // insert data
    Author author = new Author("pedro""pedro@yahoo.com");
    author.setTotalPosts(0);

    // insertAuthor(clientA, author);
    // readSingleColumnAuthor(client, author);
    readEntireRowAuthor(clientB, author);

    BlogEntry blogEntry = new BlogEntry();
    blogEntry.setAuthor(author);
    blogEntry.setBody("Este es el cuerpo del post de testeo.");
    blogEntry.setSlug("post-de-testeo");
    blogEntry.addTag(new Tag("test"));
    blogEntry.addTag(new Tag("post"));
    blogEntry.setTitle("Post de testeo");

    // insertBlogEntry(client, blogEntry);
    // readEntireRowBlogEntry(client, blogEntry.getSlug());

    // insertTaggedPost(client, "test", blogEntry);
    // insertTaggedPost(client, "__notag__", blogEntry);
    // readEntireRowTaggedPost(client, "test");
    // readEntireRowTaggedPost(client, "__notag__");

    // Comment comment = new Comment("Pedro Marini", "El post lo veo bien");
    // Comment comment = new Comment("Lolo Grep",
    // "Para cuando lo esta listo?");
    // insertComment(client, blogEntry, comment);
    // readEntireRowComment(clientA, blogEntry);

    trA.close();
    trB.close();
  }

}



First post.

This is the first post.