Proper resource management examples
You should always look for ways to properly and efficiently handle data streams. Review the following example operations for ideas and best practices. Whether or not you utilize a specific example varies based on your connector use case.
Directly sending input data and converting responses
This example demonstrates a simple connector use case where input data can be sent directly to the target system, and the response can be directly converted to a payload. This assumes that the target system is capable of handling “bad” input data (e.g., data is too large, data is in the wrong format, etc). This isn’t always possible, but when it is possible the connector implementation is relatively simple.
Example: Directly sending input data and converting responses
/**
* Sample update operation that demonstrates a simple connector use case where the input can be sent directly to the
* target system and the response can be used as output for the connector operation.
*/
public class DirectInputStreamSampleOperation extends BaseUpdateOperation {
protected DirectInputStreamSampleOperation(OperationContext context) {
super(context);
}
@Override
protected void executeUpdate(UpdateRequest request, OperationResponse response) {
SampleClient<InputStream, InputStream> client = new SampleClient<>();
for (ObjectData input : request) {
// open the input data stream
InputStream inputData = input.getData();
// declare outside the try so it can be closed in the finally
InputStream outputData = null;
try {
// use the client to send the input data directly to the target system
outputData = client.send(inputData);
// other logic
// convert the output stream directly to a payload
response.addResult(input, OperationStatus.SUCCESS, "0", "ok", PayloadUtil.toPayload(outputData));
} catch (Exception e) {
// log the failure using the document logger for this specific input
input.getLogger().log(Level.SEVERE, "unable to process input", e);
// add an application error for this input since we were unable to add the successful result
// you can optionally attempt to include the output data as payload for but care needs to taken to
// handle failures adding the payload
response.addResult(input, OperationStatus.APPLICATION_ERROR, "-1", e.getMessage(), null);
} finally {
// close the streams in a finally block in case there's an exception. addResult() will close the payload
// and the associated stream but if there's "other logic" that might fail between when the stream is
// opened and when the result is added, it should be closed in a finally as well
IOUtil.closeQuietly(inputData, outputData);
}
}
}
}
What does this code do well?
This solution addresses the previous operation’s problems in a few ways:
- The input and output data streams are never loaded into memory and they are properly released in a
finallyblock. - The operation includes some simple error handling to add an application error for any exception encountered while processing an individual input. The error applies only to that input and the connector is free to continue processing any remaining input documents.
- The exception is logged using the document logger for this input as opposed to the response logger (which applies to all documents) or the container logger (which should generally be avoided).
Using a client output stream to write the request
In this example, instead of accepting an input stream containing the request, the client provides an output stream to which the connector can write the request.
Example: Using a client output stream to write the request
/**
* Sample update operation that demonstrates how to send data to a client that provides an active output stream to write
* the request. This is a common alternative to accepting an input stream directly. Ideally, as demonstrated in this
* example, the input data is written directly to the target system without any intermediate copies.
*/
public class OutputStreamSampleOperation extends BaseUpdateOperation {
protected OutputStreamSampleOperation(OperationContext context) {
super(context);
}
@Override
protected void executeUpdate(UpdateRequest request, OperationResponse response) {
for (ObjectData input : request) {
// open the input data stream
InputStream inputData = input.getData();
// obtain an output stream from the client using a try with resources to ensure the output stream is
// properly closed
try (OutputStream output = getOutputStream()) {
// copy the entire input stream to the target output stream, this does not close either stream
StreamUtil.copy(inputData, output);
// add an empty result since the client provided no response and there's no metadata to track
response.addEmptyResult(input, OperationStatus.SUCCESS, "OK", "Success!");
} catch (Exception e) {
// add a result with a FAILURE status for this input. depending on the use case, an APPLICATION_ERROR
// may be more appropriate here
ResponseUtil.addExceptionFailure(response, input, e);
} finally {
// ensure the input stream is released
IOUtil.closeQuietly(inputData);
}
}
}
/**
* Simulates a client that provides an {@link OutputStream} where the request should be written. This is only an
* example so it just returns null.
*
* @return output stream for the target system
*/
private static OutputStream getOutputStream() {
return null;
}
}
What does this code do well?
- Similar to the previous example, the input data stream is never loaded into memory and is closed in a
finallyblock. You’ll notice the output stream is not closed in thefinallyblock. Instead, a try-with-resources statement is used to manage the output stream. This is an equally effective approach to ensuring that resources are properly released. - To “send” the request, we’re using the
StreamUtilto efficiently copy the input data stream to the output data stream using an appropriate buffer. Since the client in this example does not return a response, we’re using an empty to success to acknowledge that the connector has successfully processed the input document. - An individual input does not prevent any remaining inputs from processing thanks to proper error handling. Errors are handled by using
ResponseUtilto add a FAILURE for the specific input that failed.
The ResponseUtil.addExceptionFailure method handles logging the error to the document logger. As a result, it doesn’t need to be explicitly done in the catch as it was in the previous example.
Not every use case is straightforward. Let’s examine some more complicated examples that work with JSON data.
Implementing a size-limited update operation
This example demonstrates how to implement a size-limited update operation using the SizeLimitedUpdateOperation class. This base class is available as of the Connector SDK version 2.0.2.
Example: Implementing a size-limited update operation
/**
* Sample {@link SizeLimitedUpdateOperation} that demonstrates binding the document input streams to {@link JsonNode}
* instances. These instances are held entirely in memory but since this operation extends
* {@link SizeLimitedUpdateOperation}, any "large" inputs are filtered out before reaching this operation. Only a SINGLE
* input should be held in memory at a time. If a batch of all documents is required,
* {@link SizeLimitedBatchJsonOperation} demonstrates how to combine documents using a streaming json writer.
*/
public class SizeLimitedJsonOperation extends SizeLimitedUpdateOperation {
/**
* This mapper enables auto close source so the input stream does not need to be closed in a finally block here.
* Note, if other classes have access to the mapper, this feature can be disabled so caution should be used.
*/
private static final ObjectMapper MAPPER = new ObjectMapper().enable(Feature.AUTO_CLOSE_SOURCE);
protected SizeLimitedJsonOperation(OperationContext context) {
super(context);
}
@Override
protected void executeSizeLimitedUpdate(UpdateRequest request, OperationResponse response) {
SampleClient<JsonNode, Void> client = new SampleClient<>();
for (ObjectData input : request) {
try {
// open the input data stream and parse to an in memory json node. the object mapper is configured to
// close the input stream when the parsing is complete
JsonNode json = MAPPER.readTree(input.getData());
// this example expects an object so if the input is not an object, add an APPLICATION_ERROR
if (!json.isObject()) {
response.addResult(input, OperationStatus.APPLICATION_ERROR, "-1", "expected object",
JsonPayloadUtil.toPayload(json));
continue;
}
// safely cast to an object and add an additional field to the input
((ObjectNode) json).put("additionalField", "123");
// send the modified json document to the target system
client.send(json);
// add an empty result since the client provided no response
// TODO size limited response
response.addEmptyResult(input, OperationStatus.SUCCESS, "OK", "Success!");
} catch (IOException e) {
// this exception is specific to this input so use the input specific logger
input.getLogger().log(Level.SEVERE, "unable to proces input", e);
// add an application error for this input. note that status code is different than the application
// error added above. this allows process developers to distinguish between different errors.
response.addResult(input, OperationStatus.APPLICATION_ERROR, "-2", "invalid input", null);
}
}
}
}
What does this code do well?
- Extending
SizeLimitedUpdateOperationclass allows the connector operation to safely load individual input data streams into memory so that it can add additional fields to the input data before sending data to the target system. This is an alternative to extendingcom.boomi.connector.util.BaseUpdateOperationor directly implementingcom.boomi.connector.api.Operation. - This extension is allowed because the parent class filters out any “large” output documents. A “large” document is anything larger than 1 MB, and this is not currently configurable. The expectation is that the connector only processes a single document at a time and does not attempt to load all of the input documents into memory.
- You may notice that the input data stream is not closed in a
finallyblock. In this example, the object mapper used to parse the input stream has been explicitly configured to close the stream in afinallyblock when parsing is complete. This is an acceptable solution, but you should use caution. Any code with access to the object mapper can disable the feature. In addition, different versions of the Jackson library may not provide the same guarantees. - Closing the input stream manually in a
finallyblock is always acceptable when you’re unsure if parser guarantees the stream is released.
Building a batch of input documents and using temporary output streams
This example expands upon the previous example to demonstrate how to build a batch of input documents without loading every input data stream into memory and consuming the heap. It also demonstrates the use of a temporary output stream.
Example: Building a batch of input documents and using temporary output streams
/**
* Sample update operation that builds a JSON array containing the input documents in a memory efficient manner.
* Individual documents the exceed the allowed size are filtered out by
* {@link SizeLimitedUpdateOperation#execute(com.boomi.connector.api.OperationRequest, OperationResponse)}. A streaming
* writer is used to build the array to avoid loading multiple input documents into memory at the same time.
*/
public class SizeLimitedBatchJsonOperation extends SizeLimitedUpdateOperation {
/**
* auto close is disabled because this example uses a temporary output stream that needs to remain open until
* converted to an input stream
*/
private static final ObjectMapper MAPPER = new ObjectMapper().disable(Feature.AUTO_CLOSE_TARGET);
protected SizeLimitedBatchJsonOperation(OperationContext context) {
super(context);
}
@Override
protected void executeSizeLimitedUpdate(UpdateRequest request, OperationResponse response) {
SampleClient<InputStream, Void> client = new SampleClient<>();
// declare these resources outside the try so they can be closed in the finally
OutputStream tempOutputStream = null;
InputStream requestPayload = null;
JsonGenerator generator = null;
try {
List<ObjectData> processed = new ArrayList<>();
// ideally, this writes to the output stream of the target system but if that's not possible a temporary
// stream can be used. this stream is memory efficient and will not be held in memory
tempOutputStream = getContext().createTempOutputStream();
// create a generator that will write to the temp stream
generator = MAPPER.getFactory().createGenerator(tempOutputStream);
// start the array
generator.writeStartArray();
for (ObjectData input : request) {
// open the input data stream
InputStream inputData = input.getData();
try {
// ok to hold a single input in memory because large inputs have been filtered out
JsonNode json = MAPPER.readTree(inputData);
// start the object for this input
generator.writeStartObject();
// write the original object
generator.writeObjectField("object", json);
// write some additional fields not present in the input data
generator.writeStringField("additionalField1", "abc");
generator.writeStringField("additionalField2", "123");
// end the object for this input
generator.writeEndObject();
// keep track of processed input so a combined result can be added later
processed.add(input);
} finally {
IOUtil.closeQuietly(inputData);
}
}
// end the array
generator.writeEndArray();
// flush and close the generator, the close is significant and should not be done quietly
generator.flush();
generator.close();
// convert the temporary output stream to an input stream
requestPayload = getContext().tempOutputStreamToInputStream(tempOutputStream);
// send the payload to the target system
client.send(requestPayload);
// add a single result for all of the inputs
response.addCombinedResult(processed, OperationStatus.SUCCESS, "0", "ok", null);
} catch (IOException e) {
// this will fail all of the input documents which is appropriate since there's nowhere to write them
throw new ConnectorException(e);
} finally {
// ensure the temp output stream, json generator, and request payload are closed
IOUtil.closeQuietly(tempOutputStream, generator, requestPayload);
}
}
}
What does this code do well?
- Ideally, the connector writes directly to the target output stream as seen in
OutputStreamSampleOperationbut that’s not always possible. In this case, the client only accepts an input stream containing the entire batch. So, a temporary stream is obtained from the operation context. This stream efficiently handles large and small documents and can be converted to an input stream. A JSON generator is used to efficiently write each document to the output stream. - This code uses a similar pattern to the previous examples. We declare resources outside of the
tryblock so they can be initialized in the try and closed in thefinallyblock. You may notice that the generator appears to be closed twice (it’s closed in thetryand again in thefinallyblock). This is because the close, when writing data, is often a significant part of the write operation (discussed later). There’s also a subtle nuance with the object mapper configuration. The auto close target feature is explicitly disabled because that closes the output stream when the generator is closed. That’s generally good, although the same cautions about allowing the mapper to manage the stream apply here as well. Be aware that the temporary output stream can only be converted to an input stream while it’s still open. If we allow the mapper to close the output stream, we’d get an error when creating the request payload stream.
- We’re using a combined result to create a many-to-one relationship for the input and output. This allows us to create a single combined result for all of the inputs. This is often useful in scenarios like this where the inputs are sent to the target system in a single batch and only a single response is provided.
- We’re throwing a
ConnectorExceptionwhen anIOExceptionis encountered for any input. This is a shortcut to tell the Atom to fail any remaining documents. This may make sense in certain use cases, but you should take caution because as previously discussed, the failure of a single input document does not always imply a failure for all input documents.
Splitting responses into documents
This example is effectively the reverse of the previous example. Instead of combining multiple inputs into a single batch and returning a single response, we’re sending each input individually and then “splitting” the response into many documents.
Example: Splitting responses into documents
/**
* Sample update operation that demonstrates the use of a {@link JsonSplitter} to create multiple payloads from a single
* stream in a memory efficient manner. In this example, the root json document is assumed to be a json array. If the
* root document is an object, {@link JsonArraySplitter} can be used to split an array that is a top level property of
* the object.
*/
public class SplitJsonResponseOperation extends BaseUpdateOperation {
private static final String ERROR_MESSAGE = "unable to process input";
protected SplitJsonResponseOperation(OperationContext context) {
super(context);
}
@Override
protected void executeUpdate(UpdateRequest request, OperationResponse response) {
SampleClient<InputStream, InputStream> client = new SampleClient<>();
for (ObjectData input : request) {
// open the input data stream
InputStream inputData = input.getData();
// declare splitter outside the try so it can be closed in the finally
JsonSplitter splitter = null;
try {
// send the input data to the target system and get the output data
InputStream outputData = client.send(inputData);
// assign a new splitter to the previously declared variable
splitter = new JsonRootArraySplitter(outputData);
// add a partial result for each item in the json array
for (Payload payload : splitter) {
response.addPartialResult(input, OperationStatus.SUCCESS, "OK", "Succuss!", payload);
}
} catch (Exception e) {
// this exception is specific to this input so use the input specific logger
input.getLogger().log(Level.SEVERE, ERROR_MESSAGE, e);
// add a partial result to indicate that an exception occurred while preserving any previous payloads
response.addPartialResult(input, OperationStatus.APPLICATION_ERROR, "-1", ERROR_MESSAGE, null);
} finally {
// ensure the input data and splitter are closed, the output data is closed when the splitter is closed
IOUtil.closeQuietly(inputData, splitter);
}
// finish the partial result to complete processing of this input
response.finishPartialResult(input);
}
}
}
What does this code do well?
- The Connector SDK utilities provide a few different splitters. In this example, the
JsonRootArraySplitterclass is used to split a JSON document where the array to split is at the root of the document. There’s also aJsonArraySplitterclass that can be used to split an array that is a field in the root object and anXmlSplitterclass that can be used to split XML documents. - The input data stream and splitter are declared outside the
tryand closed in thefinallyblock, but the output data stream is not. This is because closing the splitter closes the input stream that it’s splitting. - The error handling in this example is of particular interest. You’ll notice that we’re adding partial results for each payload produced by the splitter but there’s also a partial result added if an exception occurs. We could have just as easily added a “full” result or thrown an exception, but we’re adding the exception as a partial application error so that process designers can still access any successful documents that were added before the exception occurred. This is especially important for operations that add a lot of partial results but are likely to fail at some point. This typically happens when the target API requires a request for each “page” but the page requests are prone failures (timeouts, service unavailable, etc).
The error result uses a different status code so that process designers can differentiate between successful and error documents.
Implementing a custom Payload class
We’ve discussed modifying request data streams and splitting response data streams. In this example, we’ll take a look at how to efficiently modify an input stream containing the response from the target system.
To accomplish this, we’ll implement a custom Payload class. Payloads represent the output data when adding a result to the OperationResponse class. In the previous examples, we used the Boomi payload utilities to create simple payloads. However, custom payload implementations allow for more robust response handling. The output stream passed into the payload’s writeTo method, allowing the data to be written directly to the Atom write store without any additional data being copied.
Example: Implementing a custom Payload class
/**
* Sample update operation that demonstrates how to convert the response from a target system to a new format using a
* memory efficient custom {@link Payload}.
*/
public class CustomJsonPayloadOperation extends BaseUpdateOperation {
protected CustomJsonPayloadOperation(OperationContext context) {
super(context);
}
@Override
protected void executeUpdate(UpdateRequest request, OperationResponse response) {
SampleClient<InputStream, InputStream> client = new SampleClient<>();
for (ObjectData input : request) {
// open the input data stream
InputStream inputData = input.getData();
// declare payload outside the try so it can be closed in the finally
Payload payload = null;
try {
// send the input data to the target system and get the output data
InputStream outputData = client.send(inputData);
// convert the output data to a custom payload
payload = new CustomPayload(outputData);
// add a successful result with the custom payload
response.addResult(input, OperationStatus.SUCCESS, "OK", "Success!", payload);
} finally {
// ensure the input data and payload are closed, the output data is closed when the payload is closed
IOUtil.closeQuietly(inputData, payload);
}
}
}
/**
* Custom payload that modifies a stream containing json data in a memory efficient manner. The parsing logic is
* intentionally naive because realistic parsing logic would detract from the example and ultimately the logic will
* be use case specific. In this example, the use case is to convert a json document to flat structure with every
* field from the original json. The values for the fields will be either the original value x10 for numeric fields
* or an attempted string conversion of the original value.
*
* For example, something like
*
* {"x":1,"a":"b","g":false,"sub":{"y":2,"b":"c"}}
*
* becomes something like
*
* {"x":10,"a":"b","g":"false","sub":null,"y":20,"b":"c"}
*
*/
private static class CustomPayload extends BasePayload {
private static final JsonFactory JSON_FACTORY = new JsonFactory();
private final InputStream _content;
/**
* Creates a new instance. Closing the payload will close the content stream
*/
private CustomPayload(InputStream content) {
_content = content;
}
@Override
public void writeTo(OutputStream out) throws IOException {
// open the parser and generator as managed resources that are guaranteed to be closed
try (JsonParser parser = JSON_FACTORY.createParser(_content);
JsonGenerator generator = JSON_FACTORY.createGenerator(out)) {
// start the root object for the payload
generator.writeStartObject();
// convert the original content to the modified content in a memory efficient manner
while (parser.nextToken() != null) {
if (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
// write the current field name
generator.writeFieldName(parser.getCurrentName());
// grab the value token
JsonToken valueToken = parser.nextToken();
// this is intentionally naive parsing that demonstrates writing the value
if (valueToken.isNumeric()) {
generator.writeNumber((parser.getValueAsLong() * 10));
} else {
generator.writeString(parser.getValueAsString());
}
}
}
// end the root object
generator.writeEndObject();
// flush the generator, close is handled by the try
generator.flush();
}
}
@Override
public void close() throws IOException {
// close the original content stream when the payload is closed
_content.close();
}
}
}
What does this code do well?
Most of this code is similar to the previous example and it is effective code for the following reasons:
- Resources are declared outside the
tryand closed in thefinallyblock. - Closing the payload closes the output data stream so it does not need to be explicitly closed.
- The custom payload uses both a streaming parser and generator so that the original content is read efficiently and the new content is written efficiently.
- The
try-with-resources statement is used again to ensure that both the parser and generator are properly released.
Ultimately, the logic in the custom payload is completely use case dependent, but this example is a good starting point for any complex conversions.