Waiting for engine...
Skip to main content

Implementing the Listen operation

You can create Listen operations for a connector by implementing the com.boomi.connector.api.listen.ListenOperation class.

To implement a Listen operation for a connector, you must have, at a minimum, version 2.0 of the Connector SDK and the com.boomi.connector.api.ListenConnector interface.

note

The Connector SDK artifacts from the public repository contain a connector-sdk-samples-sources.jar file containing source code for some connector samples you can reference when building a connector. You will find sample JSON Operation classes, sample Listener classes, and more. Refer to the Connector SDK artifacts topic for more information.

Code sample

Here is a simple implementation of the Listen operation with comments included for context:

Example: Implementing Listen operation
/**
* Simple {@link ListenOperation} implementation. Instances are managed by a {@link SimpleListenManager}.
*/
public class SimpleListenOperation extends BaseListenOperation<SimpleListenManager> {

private SimpleListenManager _manager;
private SingleMessageHandler _messageHandler;

/**
* Creates a new instance using the provided operation context
*/
protected SimpleListenOperation(OperationContext context) {
super(context);
}

/**
* Starts the listen operation by registering with the manager. The manager has already been started so the consumer
* is connected.
*/
@Override
public void start(Listener listener, SimpleListenManager manager) {
// stash the manager and handler so they can be used to stop the operation
_manager = manager;
_messageHandler = new SimpleMessageHandler(listener);
// register the message handler
manager.getConsumer().register(_messageHandler);
}

/**
* Stops the listen operation by deregistering with the manager
*/
@Override
public void stop() {
_manager.getConsumer().deregister(_messageHandler);
}

}

Implementing Listener batching

Using batches, listen operations can submit multiple payloads for a single process execution. To achieve this, allow the listener instance to create batches and allow multiple active batches. Submitting the batch launches the execution process with the payloads as the initial documents. The listener then attempts to close the batch.

note

The Listener PayloadBatch implementation handles any exception that occurs when it closes after submitting a batch. If an exception occurs, the listener logs a FINE Warning.

Consider the following when you implement Listener batching:

  • The default number of "active" batches an individual listener instance can create is 5, i.e. 1 default batch and 4 indexed batches or 5 indexed batches. If you are at the limit, you must submit batches before creating new ones. You can do this by submitting normally (which executes the process) or submitting as failed, which will appear in process reporting but does not execute the process.

  • Atom owners can increase or decrease the maximum number of batches an individual listener can create using the Maximum Number of Active Batches per Listener container property. This feature is located in the Advanced tab of the Atom Properties panel.

    To learn more, refer to the Properties panel, Advanced tab page.

  • The memory limit for an individual batch is 1MB, at which point the content is moved to the disk. However, this is not currently configurable for local Runtimes or runtime clusters. You can only increase the limit for low latency processes in the cloud using Atom Output Overflow Size in Cloud Management.

  • The document properties, Listener Batch Index, and Listener Batch Payload Sequence allow SDK connector users to retrieve generic metadata for payloads submitted using listener batch functionality.

    note

    To learn more about meta information and document properties, refer to Meta information document properties.

    The Listener Batch Index is an alphabetic, numeric, or alphanumeric value dependent upon the value provided in listener.getBatch(<BatchId>), while Listener Batch Payload Sequence is always a numerical value and represents the order in which a payload is added to a listener batch.

Code samples

Here is a simple implementation of Listener batching using the default PayloadBatch class. Refer to the comments included for context:

Example: Implementing batching using PayloadBatch class
/**
* Sample {@link ListenOperation} that uses the default {@link PayloadBatch} to group a user defined number of messages
* into a single process execution.
*/
public class BatchByCountListenOperation extends BaseConsumerListenOperation {

private final long _limit;

/**
* Creates a new instance using the provided context
*/
protected BatchByCountListenOperation(OperationContext context) {
super(context);
_limit = context.getOperationProperties().getLongProperty("limit");
}

/**
* Creates new {@link SingleMessageHandler} that will add each received message to a batch and submit all of the
* messages when a limit has been reached. The message stream will be closed.
*/
@Override
protected MessageHandler getMessageHandler(final Listener listener) {
return new SingleMessageHandler() {
@Override
public void onMessage(InputStream message) {
try {
PayloadBatch batch = listener.getBatch();
batch.add(PayloadUtil.toPayload(message));
if (batch.getCount() > _limit) {
batch.submit();
}
} finally {
IOUtil.closeQuietly(message);
}

}
};
}

}

The following code sample demonstrates how to interact with complicated API and handler interfaces. The MultiMessageListenOperation class extends the base operation and creates a multi-message handler that uses indexed batches. Refer to the included comments for context.

Example: Implementing batching using MultiMessageListenOperation
/**
* Sample {@link ListenOperation} that demonstrates how to interact with a consumer API with a more complicated handler
* interface. This example uses indexed batches to group messages by consumer provided ID.
*/
public class MultiMessageListenOperation extends BaseConsumerListenOperation {

/**
* Creates a new instance using the provided context
*/
protected MultiMessageListenOperation(OperationContext context) {
super(context);
}

/**
* Creates a new {@link MultiMessageHandler} that uses indexed batches.
*/
@Override
protected MessageHandler getMessageHandler(Listener listener) {
return new SampleMultiMessageHandler(listener);
}

/**
* Sample {@link MultiMessageHandler} that demonstrates how to use {@link IndexedPayloadBatch} instances to group
* messages by an ID. The ID can be of any type but a String is used in this example.
*/
private static class SampleMultiMessageHandler implements MultiMessageHandler {

private final Listener _listener;

/**
* Creates a new instance using the provided {@link Listener}.
*/
private SampleMultiMessageHandler(Listener listener) {
_listener = listener;
}

/**
* Obtains a batch for the provided message group id and adds a payload for each message to the batch. If this
* is the first group of messages received for this id, a new batch will be created. Otherwise, the existing
* batch will be used. All message streams are closed either by the batch after being successfully added or in
* the finally block if an exception occurs. This method does not submit any executions.
*/
@Override
public void onMessage(String messageGroupId, Iterable<InputStream> messages) {
try {
IndexedPayloadBatch<String> batch = _listener.getBatch(messageGroupId);
for (InputStream message : messages) {
batch.add(PayloadUtil.toPayload(message));
}
} finally {
IOUtil.closeQuietly(messages);
}

}

/**
* Submits all of the previously received messages for the provided group id. If no messages have been received,
* nothing will be submitted. In a more realistic scenario, attempting to submit an empty batch would likely
* correspond with some sort of notification or exception.
*/
@Override
public void onCommit(String messageGroupId) {
IndexedPayloadBatch<String> batch = _listener.getBatch(messageGroupId);
if (batch.getCount() > 0) {
batch.submit();
}
}

/**
* Fails all of the previously submitted messages for the provided group id. This effectively discards the batch
* and it's documents but a record of the failure will be available in process reporting.
*/
@Override
public void onFailure(String messageGroupId, Exception failure) {
_listener.getBatch(messageGroupId).submit(failure);
}

}

}

Creating a Listen manager

While a Listen operation is for one process, Listen Managers let you share the state between Listen operations as long as it is the same connection. Each connection gets its own manager. The Runtime checks if a listen manager is running, then it starts the manager (if it is not already running), and passes it to the operation's start method.

Code sample

Here is a simple implementation of the Listen Manager with comments included for context:

Example: Implementing Listen Manager
/**
* Simple {@link ListenManager} implementation, which manages consumer connections for all managed
* {@link ListenOperation} instances.
*/
public class SimpleListenManager extends BaseListenManager {

/**
* The consumer here is just created when the manager is instantiated. You could also defer the creation until the
* manager is started (i.e., create the consumer in {@link #start()}).
*/
private final SampleConsumer _consumer = new SampleConsumer();

/**
* Creates a new instance with the provided connector context
*/
protected SimpleListenManager(ConnectorContext context) {
super(context);
}

/**
* Opens the consumer connection. Managed {@link ListenOperation} instances will have access to a fully connected
* consumer when {@link ListenOperation#start(com.boomi.connector.api.listen.Listener, ListenManager)} is invoked.
*/
@Override
public void start() {
_consumer.connect();
}

/**
* Closes the consumer connection. {@link ListenOperation#stop()} will have already been invoked for all managed
* operation instances.
*/
@Override
public void stop() {
_consumer.disconnect();
}

/**
* Returns the consumer to allow listen operations to register message handlers. Optionally, the manager could
* expose registration methods if encapsulation of the consumer is desired.
*
* @return the consumer
*/
public SampleConsumer getConsumer() {
return _consumer;
}

}
On this Page