Java FaaS sample to perform actions in AWS QLDB.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ctw.consent</groupId>
<artifactId>customer</artifactId>
<version>0.1.0</version>
<packaging>jar</packaging>
<parent>
<artifactId>dcm-qldb</artifactId>
<groupId>com.ctw.mom</groupId>
<version>0.1.0</version>
</parent>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jersey.version>2.28</jersey.version>
<slf4j.version>1.7.21</slf4j.version>
<log4j2.version>2.5</log4j2.version>
</properties>
<dependencies>
<dependency>
<groupId>com.amazonaws.serverless</groupId>
<artifactId>aws-serverless-java-container-jersey</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>${jersey.version}</version>
<!-- excluding redundant javax.inject dependency -->
<exclusions>
<exclusion>
<groupId>org.glassfish.hk2</groupId>
<artifactId>hk2-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish.hk2</groupId>
<artifactId>hk2-api</artifactId>
<version>2.5.0-b42</version>
<!-- excluding redundant javax.inject dependency -->
<exclusions>
<exclusion>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.qldb</groupId>
<artifactId>amazon-qldb-driver-java</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>1.11.620</version>
</dependency>
<dependency>
<groupId>com.amazon.ion</groupId>
<artifactId>ion-java</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-ion</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j2.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn" name="MyApp" packages="">
<Appenders>
<!-- <File name="MyFile" fileName="app.log">-->
<!-- <PatternLayout>-->
<!-- <Pattern>%d %p %c{1.} [%t] %m%n</Pattern>-->
<!-- </PatternLayout>-->
<!-- </File>-->
<!-- <Async name="Async">-->
<!-- <AppenderRef ref="MyFile"/>-->
<!-- </Async>-->
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Logger name="com.ctw" level="info">
<AppenderRef ref="Console"/>
</Logger>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
StreamLambdaHandler.java
package com.ctw.consent;
import com.amazonaws.serverless.proxy.jersey.JerseyLambdaContainerHandler;
import com.amazonaws.serverless.proxy.model.AwsProxyRequest;
import com.amazonaws.serverless.proxy.model.AwsProxyResponse;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class StreamLambdaHandler implements RequestStreamHandler {
private static final String PACK_SCAN= "com.ctw.consent";
private static final ResourceConfig jerseyApplication = new ResourceConfig()
.packages(PACK_SCAN)
.register(JacksonFeature.class);
private static final JerseyLambdaContainerHandler<AwsProxyRequest, AwsProxyResponse> handler
= JerseyLambdaContainerHandler.getAwsProxyHandler(jerseyApplication);
@Override
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
handler.proxyStream(inputStream, outputStream, context);
// just in case it wasn't closed by the mapper
outputStream.close();
}
}
OperationHelper.java
package com.ctw.consent.util;
import com.amazon.ion.IonStruct;
import software.amazon.qldb.Result;
import java.util.ArrayList;
import java.util.List;
public class OperationHelper {
public static List<IonStruct> toIonStructs(Result result) {
final List<IonStruct> documentList = new ArrayList<>();
result.iterator().forEachRemaining(row -> documentList.add((IonStruct) row));
return documentList;
}
}
CustomerService.java
package com.ctw.consent.service;
import com.amazon.ion.IonSystem;
import com.amazon.ion.system.IonSystemBuilder;
import com.ctw.consent.dao.Constants;
import com.ctw.consent.dao.CustomerRepository;
import com.ctw.consent.model.Customer;
import com.ctw.consent.model.DocumentID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class CustomerService {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomerService.class);
private static final IonSystem ion = IonSystemBuilder.standard().build();
private CustomerRepository customerRepository;
public CustomerService() {
this.customerRepository = new CustomerRepository();
}
public List<Customer> findAll() throws Exception {
return customerRepository.findAll();
}
public DocumentID add(Customer customer) throws Exception {
LOGGER.info(String.format("Inserting customer %s", customer));
try {
String documentID = this.customerRepository.insert(Constants.MAPPER.writeValueAsIonValue(customer));
return Constants.MAPPER.readValue(documentID, DocumentID.class);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
}
DocumentID.java
package com.ctw.consent.model;
public class DocumentID {
private String documentId;
public DocumentID() {
}
public String getDocumentId() {
return documentId;
}
public void setDocumentId(String documentId) {
this.documentId = documentId;
}
}
Customer.java
package com.ctw.consent.model;
import com.amazon.ion.IonBool;
import com.amazon.ion.IonString;
import com.amazon.ion.IonStruct;
import com.amazon.ion.IonSystem;
import com.amazon.ion.system.IonSystemBuilder;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"document_id",
"consent",
"vin",
"brand"
})
public class Customer {
private static final IonSystem system = IonSystemBuilder.standard().build();
@JsonProperty("vin")
public String vin;
@JsonProperty("consent")
public Boolean consent;
@JsonProperty("brand")
public String brand;
@JsonProperty("document_id")
private String documentID;
public Customer(String documentID, String vin, Boolean consent, String brand) {
this.documentID = documentID;
this.vin = vin;
this.consent = consent;
this.brand = brand;
}
public Customer() {
}
public static Customer decode(IonStruct struct) {
return new Customer(((IonString) struct.get("id")).stringValue(), ((IonString) struct.get("vin")).stringValue(), ((IonBool) struct.get("consent")).booleanValue(), ((IonString) struct.get("brand")).stringValue());
}
public String getDocumentID() {
return documentID;
}
public void setDocumentID(String documentID) {
this.documentID = documentID;
}
public String getVin() {
return vin;
}
public void setVin(String vin) {
this.vin = vin;
}
public Boolean getConsent() {
return consent;
}
public void setConsent(Boolean consent) {
this.consent = consent;
}
public String getBrand() {
return brand;
}
public void setBrand(String brand) {
this.brand = brand;
}
@Override
public String toString() {
return "Customer{" +
"vin='" + vin + '\'' +
", consent=" + consent +
", brand='" + brand + '\'' +
", documentID='" + documentID + '\'' +
'}';
}
}
CustomerRepository.java
package com.ctw.consent.dao;
import com.amazon.ion.IonString;
import com.amazon.ion.IonStruct;
import com.amazon.ion.IonValue;
import com.ctw.consent.model.Customer;
import com.ctw.consent.util.OperationHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.qldb.PooledQldbDriver;
import software.amazon.qldb.Result;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class CustomerRepository {
private final Logger LOGGER = LogManager.getLogger(CustomerRepository.class);
public String insert(IonValue data) throws Exception {
try (PooledQldbDriver session = ConnectToLedger.createQldbSession()) {
LOGGER.info(String.format("Appending new document to %s", Constants.TABLE_CONSENTS));
final String query = String.format("INSERT INTO %s ?", Constants.TABLE_CONSENTS);
final List<IonValue> parameters = Collections.singletonList(data);
Result result = session.getSession().execute(query, parameters);
if (result.isEmpty()) {
throw new Exception("No documents updated when at least one document expected.");
}
List<IonStruct> cursor = OperationHelper.toIonStructs(result);
Optional<com.amazon.ion.IonStruct> resultDocumentID = cursor.stream().findFirst();
if (resultDocumentID.isPresent()) {
return resultDocumentID.get().toPrettyString();
} else {
throw new Exception("No document id was find to return, check log files.");
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
public String update(Customer Customer, String documentID) throws Exception {
LOGGER.info(String.format("Updating new document to %s by document id %s", Constants.TABLE_CONSENTS, documentID));
try (PooledQldbDriver session = ConnectToLedger.createQldbSession()) {
final StringBuilder SQL = new StringBuilder();
SQL.append("UPDATE " + Constants.TABLE_CONSENTS + " AS t BY pid SET t.vin = ?,");
SQL.append("t.vin = ?,");
SQL.append("t.consent = ?,");
SQL.append("t.brand = ?,");
SQL.append("WHERE pid = ?");
final List<IonValue> parameters = new ArrayList<>();
parameters.add(Constants.MAPPER.writeValueAsIonValue(Customer.getVin()));
parameters.add(Constants.MAPPER.writeValueAsIonValue(Customer.getConsent()));
parameters.add(Constants.MAPPER.writeValueAsIonValue(Customer.getBrand()));
parameters.add(Constants.MAPPER.writeValueAsIonValue(documentID));
Result result = session.getSession().execute(SQL.toString(), parameters);
if (result.isEmpty()) {
throw new Exception("No documents updated when at least one document expected.");
}
List<IonStruct> cursor = OperationHelper.toIonStructs(result);
Optional<IonStruct> resultDocumentID = cursor.stream().findFirst();
if (resultDocumentID.isPresent()) {
return resultDocumentID.get().toPrettyString();
} else {
throw new Exception("No document id was find to return, check log files.");
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
public List<Customer> findAll() throws Exception {
try (PooledQldbDriver session = ConnectToLedger.createQldbSession()) {
final String query = String.format("SELECT id, c.* FROM %s AS c BY id", Constants.TABLE_CONSENTS);
Result result = session.getSession().execute(query);
List<IonStruct> cursor = OperationHelper.toIonStructs(result);
List<Customer> data = new ArrayList<>();
for(IonStruct c : cursor) {
Customer customer = Customer.decode(c);
data.add(new Customer(customer.getDocumentID(), customer.getVin(), customer.getConsent(), customer.getBrand()));
}
return data;
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
public Customer findById(String documentID) throws Exception {
try (PooledQldbDriver session = ConnectToLedger.createQldbSession()) {
final String query = String.format("SELECT * FROM %s AS c BY id WHERE id = ?", Constants.TABLE_CONSENTS);
final List<IonValue> parameters = Collections.singletonList(Constants.MAPPER.writeValueAsIonValue(documentID));
Result result = session.getSession().execute(query, parameters);
if (result.isEmpty()) {
throw new Exception(String.format("No car history found for DOC ID %s.", documentID));
}
List<IonStruct> cursor = OperationHelper.toIonStructs(result);
return Customer.decode(cursor.get(0));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
public Customer findByVIN(String vin) throws Exception {
try (PooledQldbDriver session = ConnectToLedger.createQldbSession()) {
final String query = String.format("SELECT pid, c.* from %s AS c BY pid WHERE c.vin = ?", Constants.TABLE_CONSENTS);
final List<IonValue> parameters = Collections.singletonList(Constants.MAPPER.writeValueAsIonValue(vin));
Result result = session.getSession().execute(query, parameters);
if (result.isEmpty()) {
throw new Exception(String.format("No car history found for VIN %s.", vin));
}
List<IonStruct> cursor = OperationHelper.toIonStructs(result);
return Customer.decode(cursor.get(0));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw e;
}
}
}
Constants.java
package com.ctw.consent.dao;
import com.fasterxml.jackson.dataformat.ion.IonObjectMapper;
public class Constants {
public static final String LEDGER_NAME = "di-ledger";
public static final String REGION = "us-east-1";
public static final int RETRY_LIMIT = 4;
public static final IonObjectMapper MAPPER = new IonObjectMapper();
public static final String TABLE_CONSENTS = "Consents";
public static final String ROLE = "arn:aws:iam::720205669273:role/ccc-administration/OwnFull";
public static final String ACCESS_KEY = "";
public static final String PRIVATE_KEY = "";
}
ConnectToLedger.java
package com.ctw.consent.dao;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.services.qldbsession.AmazonQLDBSessionClientBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import software.amazon.qldb.PooledQldbDriver;
public final class ConnectToLedger {
public static AWSCredentialsProvider credentialsProvider;
public static PooledQldbDriver createQldbSession() {
AmazonQLDBSessionClientBuilder builder = AmazonQLDBSessionClientBuilder.standard().withRegion(Constants.REGION);
return PooledQldbDriver.builder()
.withLedger(Constants.LEDGER_NAME)
.withRetryLimit(Constants.RETRY_LIMIT)
.withSessionClientBuilder(builder)
.build();
}
// CONNECT TO A LEDGER USING ASSUME ROLE API
public PooledQldbDriver createQldbSessionAssumeRole() {
BasicAWSCredentials basic = new BasicAWSCredentials(Constants.ACCESS_KEY, Constants.PRIVATE_KEY);
AWSSecurityTokenServiceClientBuilder stsBuilder = AWSSecurityTokenServiceClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basic))
.withRegion(Constants.REGION);
AWSSecurityTokenService sts = stsBuilder.build();
AssumeRoleRequest assumeRequest = new AssumeRoleRequest().withRoleArn(Constants.ROLE).withRoleSessionName("trs");
AssumeRoleResult assumeResult = sts.assumeRole(assumeRequest);
BasicSessionCredentials cred = new BasicSessionCredentials(assumeResult.getCredentials().getAccessKeyId(),
assumeResult.getCredentials().getSecretAccessKey(),
assumeResult.getCredentials().getSessionToken());
AmazonQLDBSessionClientBuilder builder = AmazonQLDBSessionClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(cred))
.withRegion(Constants.REGION);
return PooledQldbDriver.builder()
.withLedger(Constants.LEDGER_NAME)
.withRetryLimit(Constants.RETRY_LIMIT)
.withSessionClientBuilder(builder)
.build();
}
}
CustomerResource.java
package com.ctw.consent.api;
import com.ctw.consent.model.Customer;
import com.ctw.consent.service.CustomerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/customers")
public class CustomerResource {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomerResource.class);
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response findAll() {
try {
LOGGER.info("Getting all consents");
CustomerService customerService = new CustomerService();
return Response.status(200).entity(customerService.findAll()).build();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return Response.status(500).entity(e.getMessage()).build();
}
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Response insert(Customer form) {
try {
if (validateUserConsent(form)) {
LOGGER.info("Adding user consent {}", form);
CustomerService customerService = new CustomerService();
customerService.add(form);
} else {
return Response.status(400).build();
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return Response.status(500).entity(e.getMessage()).build();
}
return Response.status(200).build();
}
private Boolean validateUserConsent(Customer form) {
return form.getVin() != null && !"".equals(form.getVin());
}
}
sam.yaml This file is used to create a serverless application that you can package and deploy in the AWS Cloud. You can get details on https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/what-is-sam.html
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: AWS Serverless Jersey API - Customer Endpoint
Resources:
CustomerFunction:
Type: AWS::Serverless::Function
Properties:
Handler: com.ctw.consent.StreamLambdaHandler::handleRequest
Runtime: java8
CodeUri: target/customer-0.1.0.jar
MemorySize: 512
Policies:
- AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/AmazonQLDBFullAccess
Timeout: 15
Events:
GetResource:
Type: Api
Properties:
Path: /{proxy+}
Method: any
Outputs:
CustomerApi:
Description: URL for application
Value: !Sub 'https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Stage/customers'
Export:
Name: CarDataCustomerApi
After set up a standard Java project with the source code, you have to get your AWS roles and private keys and then
replace them in Constants.java class.
Deploying
Run the script deploy.sh, once it's finished, you can access you AWS web console, navigate to the gateway api service and test it.
deploy.sh
#!/bin/bash
stack='DI-STACK'
bucket='java-prototype-di'
echo 'Building project'
mvn -T 2C clean install -DskipTests=true
#echo 'Delete previous stack'
#aws cloudformation delete-stack --stack-name $stack
sleep 5
echo 'Uploading artifact to S3 bucket' $bucket
aws cloudformation package --template-file sam.yaml --output-template-file output-sam.yaml --s3-bucket $bucket
sleep 5
echo 'Deploying function as a service'
aws cloudformation deploy --template-file output-sam.yaml --stack-name $stack --capabilities CAPABILITY_IAM
sleep 10
aws cloudformation describe-stacks --stack-name $stack