Glen Knight

NYC Based IT Professional

New – Amazon Keyspaces (for Apache Cassandra) is Now Generally Available

We introduced Amazon Managed Apache Cassandra Service (MCS) in preview at re:Invent last year. In the few months that passed, the service introduced many new features, and it is generally available today with a new name: Amazon Keyspaces (for Apache Cassandra).

Amazon Keyspaces is built on Apache Cassandra, and you can use it as a fully managed, serverless database. Your applications can read and write data from Amazon Keyspaces using your existing Cassandra Query Language (CQL) code, with little or no changes. For each table, you can select the best configuration depending on your use case:

  • With on-demand, you pay based on the actual reads and writes you perform. This is the best option for unpredictable workloads.
  • With provisioned capacity, you can reduce your costs for predictable workloads by configuring capacity settings up front. You can also further optimize costs by enable auto scaling, which updates your provisioned capacity settings automatically as your traffic changes throughout the day.

Using Amazon Keyspaces
One of the first “serious” applications I built as a kid, was an archive for my books. I’d like to rebuild it now as a serverless API, using:

With Amazon Keyspaces, your data is stored in keyspaces and tables. A keyspace gives you a way to group related tables together. In the blog post for the preview, I used the console to configure my data model. Now, I can also use AWS CloudFormation to manage my keyspaces and tables as code. For example I can create a bookstore keyspace and a books table with this CloudFormation template:

AWSTemplateFormatVersion: '2010-09-09'
Description: Amazon Keyspaces for Apache Cassandra example

Resources:

  BookstoreKeyspace:
    Type: AWS::Cassandra::Keyspace
    Properties: 
      KeyspaceName: bookstore

  BooksTable:
    Type: AWS::Cassandra::Table
    Properties: 
      TableName: books
      KeyspaceName: !Ref BookstoreKeyspace
      PartitionKeyColumns: 
        - ColumnName: isbn
          ColumnType: text
      RegularColumns: 
        - ColumnName: title
          ColumnType: text
        - ColumnName: author
          ColumnType: text
        - ColumnName: pages
          ColumnType: int
        - ColumnName: year_of_publication
          ColumnType: int

Outputs:
  BookstoreKeyspaceName:
    Description: "Keyspace name"
    Value: !Ref BookstoreKeyspace # Or !Select [0, !Split ["|", !Ref BooksTable]]
  BooksTableName:
    Description: "Table name"
    Value: !Select [1, !Split ["|", !Ref BooksTable]]

If you don’t specify a name for a keyspace or a table in the template, CloudFormation generates a unique name for you. Note that in this way keyspaces and tables may contain uppercase characters that are outside of the usual Cassandra conventions, and you need to put those names between double quotes when using Cassandra Query Language (CQL).

When the creation of the stack is complete, I see the new bookstore keyspace in the console:

Selecting the books table, I have an overview of its configuration, including the partition key, the clustering columns, and all the columns, and the option to change the capacity mode for the table from on-demand to provisioned:

For authentication and authorization, Amazon Keyspaces supports AWS Identity and Access Management (IAM) identity-based policies, that you can use with IAM users, groups, and roles. Here’s a list of actions, resources, and conditions that you can use in IAM policies with Amazon Keyspaces. You can now also manage access to resources based on tags.

You can use IAM roles using AWS Signature Version 4 Process (SigV4) with this open source authentication plugin for the DataStax Java driver. In this way you can run your applications inside an Amazon Elastic Compute Cloud (EC2) instance, a container managed by Amazon ECS or Amazon Elastic Kubernetes Service, or a Lambda function, and leverage IAM roles for authentication and authorization to Amazon Keyspaces, without the need to manage credentials. Here’s a sample application that you can test on an EC2 instance with an associated IAM role giving access to Amazon Keyspaces.

Going back to my books API, I create all the resources I need, including a keyspace and a table, with the following AWS Serverless Application Model (SAM) template.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Sample Books API using Cassandra as database

Globals:
  Function:
    Timeout: 30

Resources:

  BookstoreKeyspace:
    Type: AWS::Cassandra::Keyspace

  BooksTable:
    Type: AWS::Cassandra::Table
    Properties: 
      KeyspaceName: !Ref BookstoreKeyspace
      PartitionKeyColumns: 
        - ColumnName: isbn
          ColumnType: text
      RegularColumns: 
        - ColumnName: title
          ColumnType: text
        - ColumnName: author
          ColumnType: text
        - ColumnName: pages
          ColumnType: int
        - ColumnName: year_of_publication
          ColumnType: int

  BooksFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: BooksFunction
      Handler: books.App::handleRequest
      Runtime: java11
      MemorySize: 2048
      Policies:
        - Statement:
          - Effect: Allow
            Action:
            - cassandra:Select
            Resource:
              - !Sub "arn:aws:cassandra:${AWS::Region}:${AWS::AccountId}:/keyspace/system*"
              - !Join
                - ""
                - - !Sub "arn:aws:cassandra:${AWS::Region}:${AWS::AccountId}:/keyspace/${BookstoreKeyspace}/table/"
                  - !Select [1, !Split ["|", !Ref BooksTable]] # !Ref BooksTable returns "Keyspace|Table"
          - Effect: Allow
            Action:
            - cassandra:Modify
            Resource:
              - !Join
                - ""
                - - !Sub "arn:aws:cassandra:${AWS::Region}:${AWS::AccountId}:/keyspace/${BookstoreKeyspace}/table/"
                  - !Select [1, !Split ["|", !Ref BooksTable]] # !Ref BooksTable returns "Keyspace|Table"
      Environment:
        Variables:
          KEYSPACE_TABLE: !Ref BooksTable # !Ref BooksTable returns "Keyspace|Table"
      Events:
        GetAllBooks:
          Type: HttpApi
          Properties:
            Method: GET
            Path: /books
        GetBookByIsbn:
          Type: HttpApi
          Properties:
            Method: GET
            Path: /books/{isbn}
        PostNewBook:
          Type: HttpApi
          Properties:
            Method: POST
            Path: /books

Outputs:
  BookstoreKeyspaceName:
    Description: "Keyspace name"
    Value: !Ref BookstoreKeyspace # Or !Select [0, !Split ["|", !Ref BooksTable]]
  BooksTableName:
    Description: "Table name"
    Value: !Select [1, !Split ["|", !Ref BooksTable]]
  BooksApi:
    Description: "API Gateway HTTP API endpoint URL"
    Value: !Sub "https://${ServerlessHttpApi}.execute-api.${AWS::Region}.amazonaws.com/"
  BooksFunction:
    Description: "Books Lambda Function ARN"
    Value: !GetAtt BooksFunction.Arn
  BooksFunctionIamRole:
    Description: "Implicit IAM Role created for Books function"
    Value: !GetAtt BooksFunctionRole.Arn

In this template I don’t specify the keyspace and table names, and CloudFormation is generating unique names automatically. The function IAM policy gives access to read (cassandra:Select) and write (cassandra:Write) only to the books table. I am using CloudFormation Fn::Select and Fn::Split intrinsic functions to get the table name. The driver also needs read access to the system* keyspaces.

To use the authentication plugin for the DataStax Java driver that supports IAM roles, I write the Lambda function in Java, using the APIGatewayV2ProxyRequestEvent and APIGatewayV2ProxyResponseEvent classes to communicate with the HTTP API created by the API Gateway.

package books;

import java.net.InetSocketAddress;
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.StringJoiner;
import javax.net.ssl.SSLContext;

import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;

import software.aws.mcs.auth.SigV4AuthProvider;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.APIGatewayV2ProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayV2ProxyResponseEvent;

public class App implements RequestHandler<APIGatewayV2ProxyRequestEvent, APIGatewayV2ProxyResponseEvent> {
    
    JSONParser parser = new JSONParser();
    String[] keyspace_table = System.getenv("KEYSPACE_TABLE").split("|");
    String keyspace = keyspace_table[0];
    String table = keyspace_table[1];
    CqlSession session = getSession();
    PreparedStatement selectBookByIsbn = session.prepare("select * from "" + table + "" where isbn = ?");
    PreparedStatement selectAllBooks = session.prepare("select * from "" + table + """);
    PreparedStatement insertBook = session.prepare("insert into "" + table + "" "
    + "(isbn, title, author, pages, year_of_publication)" + "values (?, ?, ?, ?, ?)");
    
    public APIGatewayV2ProxyResponseEvent handleRequest(APIGatewayV2ProxyRequestEvent request, Context context) {
        
        LambdaLogger logger = context.getLogger();
        
        String responseBody;
        int statusCode = 200;
        
        String routeKey = request.getRequestContext().getRouteKey();
        logger.log("routeKey = '" + routeKey + "'");
        
        if (routeKey.equals("GET /books")) {
            ResultSet rs = execute(selectAllBooks.bind());
            StringJoiner jsonString = new StringJoiner(", ", "[ ", " ]");
            for (Row row : rs) {
                String json = row2json(row);
                jsonString.add(json);
            }
            responseBody = jsonString.toString();
        } else if (routeKey.equals("GET /books/{isbn}")) {
            String isbn = request.getPathParameters().get("isbn");
            logger.log("isbn: '" + isbn + "'");
            ResultSet rs = execute(selectBookByIsbn.bind(isbn));
            if (rs.getAvailableWithoutFetching() == 1) {
                responseBody = row2json(rs.one());
            } else {
                statusCode = 404;
                responseBody = "{"message": "not found"}";
            }
        } else if (routeKey.equals("POST /books")) {
            String body = request.getBody();
            logger.log("Body: '" + body + "'");
            JSONObject requestJsonObject = null;
            if (body != null) {
                try {
                    requestJsonObject = (JSONObject) parser.parse(body);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                if (requestJsonObject != null) {
                    int i = 0;
                    BoundStatement boundStatement = insertBook.bind()
                    .setString(i++, (String) requestJsonObject.get("isbn"))
                    .setString(i++, (String) requestJsonObject.get("title"))
                    .setString(i++, (String) requestJsonObject.get("author"))
                    .setInt(i++, ((Long) requestJsonObject.get("pages")).intValue())
                    .setInt(i++, ((Long) requestJsonObject.get("year_of_publication")).intValue())
                    .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
                    ResultSet rs = execute(boundStatement);
                    statusCode = 201;
                    responseBody = body;
                } else {
                    statusCode = 400;
                    responseBody = "{"message": "JSON parse error"}";
                }
            } else {
                statusCode = 400;
                responseBody = "{"message": "body missing"}";
            }
        } else {
            statusCode = 405;
            responseBody = "{"message": "not implemented"}";
        }
        
        Map<String, String> headers = new HashMap<>();
        headers.put("Content-Type", "application/json");
        
        APIGatewayV2ProxyResponseEvent response = new APIGatewayV2ProxyResponseEvent();
        response.setStatusCode(statusCode);
        response.setHeaders(headers);
        response.setBody(responseBody);
        
        return response;
    }
    
    private String getStringColumn(Row row, String columnName) {
        return """ + columnName + "": "" + row.getString(columnName) + """;
    }
    
    private String getIntColumn(Row row, String columnName) {
        return """ + columnName + "": " + row.getInt(columnName);
    }
    
    private String row2json(Row row) {
        StringJoiner jsonString = new StringJoiner(", ", "{ ", " }");
        jsonString.add(getStringColumn(row, "isbn"));
        jsonString.add(getStringColumn(row, "title"));
        jsonString.add(getStringColumn(row, "author"));
        jsonString.add(getIntColumn(row, "pages"));
        jsonString.add(getIntColumn(row, "year_of_publication"));
        return jsonString.toString();
    }
    
    private ResultSet execute(BoundStatement bs) {
        final int MAX_RETRIES = 3;
        ResultSet rs = null;
        int retries = 0;

        do {
            try {
                rs = session.execute(bs);
            } catch (Exception e) {
                e.printStackTrace();
                session = getSession(); // New session
            }
        } while (rs == null && retries++ < MAX_RETRIES);
        return rs;
    }
    
    private CqlSession getSession() {
        
        System.setProperty("javax.net.ssl.trustStore", "./cassandra_truststore.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "amazon");
        
        String region = System.getenv("AWS_REGION");
        String endpoint = "cassandra." + region + ".amazonaws.com";
        
        System.out.println("region: " + region);
        System.out.println("endpoint: " + endpoint);
        System.out.println("keyspace: " + keyspace);
        System.out.println("table: " + table);
        
        SigV4AuthProvider provider = new SigV4AuthProvider(region);
        List<InetSocketAddress> contactPoints = Collections.singletonList(new InetSocketAddress(endpoint, 9142));
        
        CqlSession session;
                
        try {
            session = CqlSession.builder().addContactPoints(contactPoints).withSslContext(SSLContext.getDefault())
            .withLocalDatacenter(region).withAuthProvider(provider).withKeyspace(""" + keyspace + """)
            .build();
        } catch (NoSuchAlgorithmException e) {
            session = null;
            e.printStackTrace();
        }
        
        return session;
    }
}

To connect to Amazon Keyspaces with TLS/SSL using the Java driver, I need to include a trustStore in the JVM arguments. When using the Cassandra Java Client Driver in a Lambda function, I can’t pass parameters to the JVM, so I pass the same options as system properties, and specify the SSL context when creating the CQL session with the  withSslContext(SSLContext.getDefault()) parameter. Note that I also have to configure the pom.xml file, used by Apache Maven, to include the trustStore file as a dependency.

System.setProperty("javax.net.ssl.trustStore", "./cassandra_truststore.jks");
System.setProperty("javax.net.ssl.trustStorePassword", "amazon");

Now, I can use a tool like curl or Postman to test my books API. First, I take the endpoint of the API from the output of the CloudFormation stack. At the beginning there are no books stored in the books table, and if I do an HTTP GET on the resource, I get an empty JSON list. For readability, I am removing all HTTP headers from the output.

$ curl -i https://a1b2c3d4e5.execute-api.eu-west-1.amazonaws.com/books

HTTP/1.1 200 OK
[]

In the code, I am using a PreparedStatement to run a CQL statement to select all rows from the books table. The names of the keystore and of the table are passed to the Lambda function in an environment variable, as described in the SAM template above.

Let’s use the API to add a book, by doing an HTTP POST on the resource.

$ curl -i -d '{ "isbn": "978-0201896831", "title": "The Art of Computer Programming, Vol. 1: Fundamental Algorithms (3rd Edition)", "author": "Donald E. Knuth", "pages": 672, "year_of_publication": 1997 }' -H "Content-Type: application/json" -X POST https://a1b2c3d4e5.execute-api.eu-west-1.amazonaws.com/books

HTTP/1.1 201 Created
{ "isbn": "978-0201896831", "title": "The Art of Computer Programming, Vol. 1: Fundamental Algorithms (3rd Edition)", "author": "Donald E. Knuth", "pages": 672, "year_of_publication": 1997 }

I can check that the data has been inserted in the table using the CQL Editor in the console, where I select all the rows in the table.

I repeat the previous HTTP GET to get the list of the books, and I see the one I just created.

$ curl -i https://a1b2c3d4e5-api.eu-west-1.amazonaws.com/books

HTTP/1.1 200 OK
[ { "isbn": "978-0201896831", "title": "The Art of Computer Programming, Vol. 1: Fundamental Algorithms (3rd Edition)", "author": "Donald E. Knuth", "pages": 672, "year_of_publication": 1997 } ]

I can get a single book by ISBN, because the isbn column is the primary key of the table and I can use it in the where condition of a select statement.

$ curl -i https://a1b2c3d4e5.execute-api.eu-west-1.amazonaws.com/books/978-0201896831

HTTP/1.1 200 OK
{ "isbn": "978-0201896831", "title": "The Art of Computer Programming, Vol. 1: Fundamental Algorithms (3rd Edition)", "author": "Donald E. Knuth", "pages": 672, "year_of_publication": 1997 }

If there is no book with that ISBN, I return a “not found” message:

$ curl -i https://a1b2c3d4e5.execute-api.eu-west-1.amazonaws.com/books/1234567890

HTTP/1.1 404 Not Found
{"message": "not found"}

It works! We just built a fully serverless API using CQL to read and write data using temporary security credentials, managing the whole infrastructure, including the database table, as code.

Available Now
Amazon Keyspaces (for Apache Cassandra) is ready for your applications, please see this table for regional availability. You can find more information on how to use Keyspaces in the documentation. In this post, I built a new application, but you can get lots of benefits by migrating your current tables to a fully managed environment. For migrating data, you can now use cqlsh as described in this post.

Let me know what are you going to use it for!

Danilo


Source: AWS News

Leave a Reply

Your email address will not be published. Required fields as marked *.

This site uses Akismet to reduce spam. Learn how your comment data is processed.