Introducing SimpleListener: Listener tests without a runtime
If you have ever written a listener connector, you can unit-test your parsing, config, and models. But the moment your code calls listener.submit(...), you are talking to the runtime, and the runtime is not in your test. That path stayed untested, or you deployed to a Boomi Runtime and watched the logs.
Connector SDK 2.30.1 adds a small package, com.boomi.connector.testutil.listen, that puts a working stand-in for the Boomi Runtime inside your test. The centerpiece is SimpleListener: a real, in-memory Listener you hand to your operation. It records what you submit, closes payloads the way a Runtime does, tracks status updates, runs batches through their real single-use lifecycle, and lets you script what the process "returns."
The pattern
Every test has the same three beats: stage the runtime, run your code, and inspect what it did. SimpleListener controls the cause (what the process returns); a mock of your connector's own client holds the effect (what your connector did to the outside world).

For listeners that wait on a result, the result provider is a Function<SimpleSubmission, ListenerExecutionResult>. When your operation calls future.get(), the provider runs with the submission your operation just made and returns the outcome you want to simulate: a healthy process, a process that ran but did not succeed, a process that hands back documents and dynamic properties, or an execution that throws an exception.

Setting up a test
The classes ship in the com.boomi.connsdk:connector-sdk-test-util artifact, the same module as ConnectorTester and SimpleOperationContext. If you already write connector tests, they are on the classpath; otherwise, add the artifact at version 2.30.1 or later with test scope. The examples use JUnit 4 and Mockito, along with your own helpers like read(InputStream) and bytes(String) - these are not part of the SDK.
Most test classes share the same scaffolding: create the listener, set a default for result provider for listeners that wait on a result, and close it in teardown.
Test class scaffolding
public class MyListenOperationTest {
private SimpleListener listener;
@Before
public void setUp() {
listener = new SimpleListener();
// Only needed if your operation waits on a result. Fire-and-forget listeners can skip this.
// Alternatively may be set/overridden per unit test if desired.
listener.setResultProvider(submission ->
SimpleListenerExecutionResult.builder()
.withSuccess(true)
.build());
}
@After
public void tearDown() {
IOUtil.closeQuietly(listener);
// SimpleListener cleans up its own streams. It cannot reset your operation's
// static fields or spawned threads, so stop those here too.
}
}
Listeners come in a few shapes
The way you test a listener follows its shape. There are four common shapes.
| Shape | How it submits | How to test it |
|---|---|---|
| Fire-and-forget | submit(Payload), never waits | Assert on recorded submissions and tracked properties. |
| Wait, then acknowledge the source | submit(payload, options).get(), then acks or nacks | Stage the result, assert your ack/nack/status reaction. |
| Request-reply (return to caller) | submit(...).get(), then returns the result | Stage the result, assert the response your callback returns. |
| Batch and deferred settlement | getBatch(), submit, settle later | Stage a batch result, assert the batch and the settlement. |
Shape 1: fire-and-forget event listeners
A push callback fires (a broker message arrives, a file appears), your connector builds a payload and calls submit(Payload). It never waits for a result, so there is no provider to stage. Run the callback and assert on what was submitted.
Assert payload and topic on message arrival
@Test
public void messageArrivedSubmitsPayloadWithTopic() {
MqttSubscriber subscriber = newSubscriberBoundTo(listener);
subscriber.messageArrived("sensors/temp", bytes("21.5"));
SimpleSubmission submission = listener.getSubmissions().get(0);
assertEquals("21.5", read(submission.getRecord().getData()));
assertEquals("sensors/temp", submission.getRecord().getTrackedProperties().get("topic"));
// fire-and-forget should have set appropriate WaitMode option
assertEquals(WaitMode.PROCESS_SUBMISSION, submission.getOptions().getWaitMode());
}
Tracked groups, event filtering, and error submissions
The following tests cover three additional scenarios your fire-and-forget listener may need to handle.
Three additional fire-and-forget scenarios
// Tracked groups, not just flat properties. Many connectors attach a group of message attributes.
@Test
public void submitsMessageAttributesAsATrackedGroup() {
newSubscriberBoundTo(listener).messageArrived("orders", bytesWithHeaders());
SimplePayloadRecord record = listener.getSubmissions().get(0).getRecord();
assertEquals("application/json",
record.getTrackedGroups().get("messageAttributes").get("contentType"));
}
// Event filtering: a delete event with deletes disabled should produce no submission at all.
@Test
public void ignoresDeleteEventsWhenDeletesAreDisabled() {
FileListenerAdaptor adaptor = newAdaptor(listener, /*create*/ true, /*delete*/ false);
adaptor.fileRemoved(event("orders.csv"));
assertTrue("disabled event must not submit", listener.getSubmissions().isEmpty());
}
// Error submissions: a connection drop should be reported via submit(Throwable), not a payload.
@Test
public void connectionLostSubmitsTheCauseAsAnError() {
MqttSubscriber subscriber = newSubscriberBoundTo(listener);
Exception cause = new MqttException("connection lost");
subscriber.connectionLost(cause);
SimpleSubmission submission = listener.getSubmissions().get(0);
assertNull("error submissions carry no record", submission.getRecord());
assertSame(cause, submission.getError());
}
Shape 2: wait for the result, then acknowledge the source
Your connector submits, waits on the process, then tells the source what happened: ack (acknowledge) on success, nack (negative acknowledge) on failure, so the message is redelivered. SimpleListener stages the result; a mock of your own client captures the ack or nack.
Connector logic: submit, then ack or nack
// Connector logic under test
public class QueueConsumer {
private final QueueClient queue; // your client for the external server
public QueueConsumer(QueueClient queue) {
this.queue = queue;
}
public void handle(Listener listener, Message message) {
SubmitOptions options = new SubmitOptions().withWaitMode(WaitMode.PROCESS_COMPLETION);
try {
ListenerExecutionResult result =
listener.submit(PayloadUtil.toPayload(message.body()), options).get();
if (result.isSuccess()) {
queue.ack(message.id()); // processed cleanly, drop it
} else {
queue.nack(message.id()); // process reported failure, redeliver it
}
} catch (ExecutionException e) {
queue.nack(message.id()); // execution itself failed, redeliver it
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Stage the process to return a failure, then verify the connector called nack and never ack.
nack when the process reports failure
@Test
public void nacksTheMessageWhenTheProcessFails() {
QueueClient queue = mock(QueueClient.class);
// Cause: the process runs but reports failure
listener.setResultProvider(submission ->
SimpleListenerExecutionResult.builder().withSuccess(false).build());
new QueueConsumer(queue).handle(listener, new Message("m-1", "payload"));
// Effect: the source was told to redeliver, not to drop the message
verify(queue).nack("m-1");
verify(queue, never()).ack("m-1");
}
Throw from the provider to simulate the execution failing outright, then verify the connector still calls nack.
nack when execution throws an exception
@Test
public void nacksWhenTheExecutionThrows() {
QueueClient queue = mock(QueueClient.class);
listener.setResultProvider(submission -> { throw new RuntimeException("epic failure"); });
new QueueConsumer(queue).handle(listener, new Message("m-1", "payload"));
verify(queue).nack("m-1");
}
Where your commit determines your delivery guarantee: committing after a successful result gives you at-least-once delivery; committing before the result is known gives you at-most-once delivery. Use a Mockito InOrder against your client to verify which guarantee your connector implements.
A connector can also opt into document retrieval, in which the process returns dynamic properties that drive behaviour: a routing key, a destination, or an acknowledgement mode. Use document retrieval only if your connector reads returned properties; if it does, stage the documents and assert the effect.
Use returned document to choose ack or nack
// This connector lets the process choose how to acknowledge,
// via a dynamic property on the returned document.
@Test
public void honorsTheAcknowledgmentModeFromTheReturnedDocument() {
QueueClient queue = mock(QueueClient.class);
listener.setResultProvider(submission ->
SimpleListenerExecutionResult.builder()
.withSuccess(true)
.withDocuments(SimpleListenerExecutionDocuments.of(
SimpleListenerExecutionData.builder()
.withDynamicProperty("acknowledgment_mode", "negative")
.build()))
.build());
new AckModeConsumer(queue).handle(listener, new Message("m-1", "payload"));
verify(queue).nack("m-1"); // the process selected a negative ack
verify(queue, never()).ack("m-1");
}
Because the provider receives the submission, you can inspect a tracked property or the submit options and return a different result per call.
Shape 3: batches and deferred settlement
Listeners that buffer messages submit them as a PayloadBatch. Batches are single-use, as they are on the runtime: after submit(), the batch is closed, and the records move into a SimpleBatchSubmission you read from the listener. For batches that wait on a result, use setBatchResultProvider(...).
Drain messages into a batch and reject on failure
@Test
public void drainGroupsAllMessagesIntoOneBatch() {
new QueueConsumer(queue).drain(listener, Arrays.asList("a", "b", "c"));
assertEquals(1, listener.getBatchSubmissions().size());
SimpleBatchSubmission batch = listener.getBatchSubmissions().get(0);
assertEquals(3, batch.getCount());
assertEquals("a", read(batch.getRecords().get(0).getData()));
assertEquals("b", read(batch.getRecords().get(1).getData()));
assertEquals("c", read(batch.getRecords().get(2).getData())); // order preserved, nothing dropped
}
@Test
public void rejectsTheWholeBatchWhenTheProcessFails() {
QueueClient queue = mock(QueueClient.class);
listener.setBatchResultProvider(batch ->
SimpleListenerExecutionResult.builder().withSuccess(false).build());
new QueueConsumer(queue).drainAndSettle(listener, Arrays.asList("a", "b"));
verify(queue).nackBatch(anyList()); // your client redelivers the whole batch
}
Many streaming connectors settle later: submit now, then on a follow-up pass read each tracked future's result and ack or settle(FAILED) (the batch equivalent of nack). SimpleListener supports this as long as your follow-up pass calls future.get(). If your connector instead decides by polling future.isDone() without ever calling get(), the stand-in cannot drive it; test that settlement decision by passing your connector's decision method an already-completed future, for example, CompletableFuture.completedFuture(result).
You can also group by a business key with an indexed batch, useful when one poll distributes messages into per-tenant or per-topic batches. The index is available in the submission.
Route messages into per-tenant batches
@Test
public void routesEachOrderToItsTenantBatch() {
// the router calls listener.getBatch(tenantId) internally and submits per tenant
new TenantRouter().route(listener, Arrays.asList(order("tenant-a", "order-1")));
SimpleBatchSubmission submission = listener.getBatchSubmissions().get(0);
assertEquals("tenant-a", submission.getIndex()); // getIndex() returns Object
assertEquals(1, submission.getCount());
}
Shape 4: request-reply listeners that return the result to a caller
A web-service or RPC-style listener (an inbound HTTP request, for example) submits the request, waits for the process to complete, and returns the output back to the caller as the response. The effect you assert is not a mock's ack, it is the value your callback returns. Stage the returned documents, invoke the callback, and assert the response it builds.
Return process output to the caller
@Test
public void returnsTheProcessOutputToTheCaller() throws Exception {
listener.setResultProvider(submission ->
SimpleListenerExecutionResult.builder()
.withSuccess(true)
.withDocuments(SimpleListenerExecutionDocuments.of(
SimpleListenerExecutionData.builder()
.withData("{\"ok\":true}")
.withDynamicProperty("http_status", "200")
.build()))
.build());
Response response = new RequestHandler(listener).handle(inboundRequest("ping"));
assertEquals(200, response.status());
assertEquals("{\"ok\":true}", response.body());
}
One caveat for this shape: if your handler limits the wait with get(timeout, unit) and treats a TimeoutException specially, the stand-in cannot reproduce that branch. Test the timeout handling by calling that path directly.
Reporting listener status (ONLINE and OFFLINE)
Status reflects whether your connector is connected to the external system. Wire updateStatus(...) into your connect and disconnect paths: a successful connect reports ONLINE; a dropped connection, failed reconnect, or unresponsive server reports OFFLINE. Use updateStatus(OFFLINE, "reason") for a plain message, or updateStatus(Throwable) when you have an exception — this forces OFFLINE and records the cause. SimpleListener records every call, so you assert with getLastStatusUpdate(), or getStatusUpdates() for the full history.
A successful connect reports ONLINE:
Report ONLINE on successful connect
@Test
public void connectReportsOnline() {
Subscriber subscriber = newSubscriberBoundTo(listener);
subscriber.onConnected("tcp://broker:1883"); // your client's connect callback calls updateStatus(ONLINE)
assertEquals(ListenerStatus.ONLINE, listener.getLastStatusUpdate().getStatus());
}
A lost connection reports OFFLINE and carries the cause (the updateStatus(Throwable) form):
Report OFFLINE with cause on lost connection
@Test
public void connectionLostReportsOfflineWithTheCause() {
Subscriber subscriber = newSubscriberBoundTo(listener);
Exception cause = new IOException("connection lost");
subscriber.onConnectionLost(cause); // your client's disconnect callback calls updateStatus(cause)
SimpleStatusUpdate update = listener.getLastStatusUpdate();
assertEquals(ListenerStatus.OFFLINE, update.getStatus()); // updateStatus(Throwable) forces OFFLINE
assertSame(cause, update.getError());
}
A poller that cannot reach the server, or reaches it but gets nothing back, reports OFFLINE with a reason (the updateStatus(OFFLINE, message) form):
Report OFFLINE when the server is unreachable
@Test
public void pollReportsOfflineWhenTheServerIsUnreachable() {
ServerClient client = mock(ServerClient.class);
when(client.fetch()).thenThrow(new IOException("connection refused"));
new PollingSubscriber(client).pollOnce(listener);
SimpleStatusUpdate update = listener.getLastStatusUpdate();
assertEquals(ListenerStatus.OFFLINE, update.getStatus());
assertEquals("Server unreachable: connection refused", update.getMessage());
}
Status and error submission are independent. A connection drop can surface as an error submission via submit(Throwable) (the Shape 1 connectionLostSubmitsTheCauseAsAnError test) and as a status update. If your connector does both, assert each where it belongs: status on getLastStatusUpdate(), the error on getSubmissions().
Verifying submit options
Your operation chooses SubmitOptions, and getting them wrong changes runtime behavior. For example, your listener sets DistributionMode.PREFER_REMOTE only when the listener is a singleton, or flips WaitMode based on a config flag (PROCESS_COMPLETION when transacted, PROCESS_SUBMISSION otherwise). Options are recorded on the submission, so assert after you act, not inside the provider. An assertion that fails inside the provider surfaces as an ExecutionException, not a clean test failure. Assert only the options your operation sets; the rest keep their defaults.
For example, isReturnDocuments() is false unless you opt in.
Verify remote distribution for a singleton listener
@Test
public void singletonListenerPrefersRemoteDistribution() {
newSingletonOperation().start(listener);
SubmitOptions options = listener.getSubmissions().get(0).getOptions();
assertEquals(WaitMode.PROCESS_COMPLETION, options.getWaitMode());
assertEquals(DistributionMode.PREFER_REMOTE, options.getDistributionMode());
}
Verifying cursor positions and offset resumption
Listeners that track an offset or cursor use the PersistedConnectorCache. The cursor advances only after the process succeeds, so a failure replays the same records instead of skipping them. Test both branches: a failure that holds the cursor, and a success that advances it.
Hold the cursor on failure, advance on success
// Poller under test: advances the cursor ONLY after a successful execution
public void poll(Listener listener) throws Exception {
PersistedConnectorCache cache = listener.getPersistedConnectorCache();
String cursor = cache.getPersistedProperty("cursor", "0");
for (Record r : source.fetchSince(cursor)) { // your backend call
SubmitOptions options = new SubmitOptions().withWaitMode(WaitMode.PROCESS_COMPLETION);
ListenerExecutionResult result = listener.submit(PayloadUtil.toPayload(r.body()), options).get();
if (!result.isSuccess()) {
return; // hold the cursor; this record is retried next poll
}
cache.setPersistedProperty("cursor", r.id()); // advance only after success
}
}
@Test
public void cursorHoldsWhenTheExecutionFailsSoTheRecordIsRetried() throws Exception {
listener.getPersistedConnectorCache().setPersistedProperty("cursor", "100");
// source is stubbed: cursor "100" returns records 101, 102
listener.setResultProvider(submission ->
SimpleListenerExecutionResult.builder()
.withSuccess(!"101".equals(read(submission.getRecord().getData()))) // 101 fails
.build());
new MyPoller(source).poll(listener);
assertEquals("100", listener.getPersistedConnectorCache().getPersistedProperty("cursor"));
assertEquals(1, listener.getSubmissions().size()); // stopped after the failure
}
@Test
public void cursorAdvancesToTheLastRecordWhenAllSucceed() throws Exception {
listener.getPersistedConnectorCache().setPersistedProperty("cursor", "100");
listener.setResultProvider(s -> SimpleListenerExecutionResult.builder().withSuccess(true).build());
new MyPoller(source).poll(listener);
assertEquals("102", listener.getPersistedConnectorCache().getPersistedProperty("cursor"));
assertEquals(2, listener.getSubmissions().size());
}
Verifying a clean shutdown behavior
When the container is draining, a submit made with withFailIfStopping(true) is rejected with a ContainerStoppingException. What matters is how your operation reacts: the in-flight message should be returned to the source for redelivery, not acknowledged or dropped. Stage the listener as stopping with setStopping(true), run your handler, and assert the reaction on your client.
Nack the in-flight message on shutdown
@Test
public void nacksTheInFlightMessageWhenTheContainerIsStopping() {
QueueClient queue = mock(QueueClient.class);
listener.setStopping(true);
// handle() submits with withFailIfStopping(true) and nacks if the submit is rejected
new QueueConsumer(queue).handle(listener, new Message("m-1", "payload"));
verify(queue).nack("m-1"); // redelivered on the next run, not lost
verify(queue, never()).ack("m-1");
}
(A connector that instead polls isStopping() to break its own loop is testing the same thing: how your operation reacts to a stopping container.)
Class reference
Everything lives in com.boomi.connector.testutil.listen.
| Class | What it is for |
|---|---|
SimpleListener | The stand-in Listener. Hand it to your operation. Records submissions, batches, and status updates, and stages results. |
SimpleSubmission | A recorded single submission. Carries the payload record or the error, along with the submit options (null for submit(Payload)). |
SimplePayloadRecord | A snapshot of a submitted payload: data, tracked properties, user-defined properties, and tracked groups. |
SimpleBatchSubmission | A recorded batch submission with its transferred records, count, size, and index. |
SimplePayloadBatch | The stand-in PayloadBatch. Single-use, closes on submit. |
SimpleIndexedPayloadBatch | A batch with an index, for grouping by a business key. |
SimpleStatusUpdate | A recorded updateStatus call: status, message, optional error, timestamp. |
SimpleListenerExecutionResult | The result you stage with a builder: success flag, execution id, and returned documents. |
SimpleListenerExecutionDocuments | A collection of returned documents. Built with of(...). |
SimpleListenerExecutionData | A single returned document: content along with dynamic properties. |
SimplePersistedConnectorCache | An in-memory persisted cache for offsets and cursors. |
Tips for developing and testing listeners
The hardest part of testing a listener is often just getting your operation to run in a test at all, which comes down to how the operation is structured. If yours is already easy to drive, skip this.
Know which start your base class exposes. If you extend BaseListenOperation, the only start is the public start(Listener, ListenManager) from the ListenOperation interface, which you implement directly, so a test just calls it (pass a real or mocked manager). If you extend UnmanagedListenOperation, the public entry point is a final start(Listener, ListenManager), but the method you override is the protected start(Listener); put the test in the operation's package and call operation.start(listener) directly.
Often, the real work lives in a callback, not at the start. Many listeners (broker callbacks, file adaptors, web handlers) do their submitting in a separate object that the start() method wires up. Skip start: construct that callback object with a SimpleListener and invoke its onMessage / fileAdded / handle method directly. That is the cleanest seam.
Do not test the scheduler. If start() schedules a poll on a timer or spawns a worker, pull the per-cycle logic into its own method (or inject the scheduler) and call that directly. Make collaborators injectable through a package-private constructor. That is the difference between a deterministic test and a sleep-and-hope test.
Make collaborators injectable for testing
// Collaborators are injectable, so a test can drive one poll by hand.
public class MyListenOperation extends UnmanagedListenOperation {
private final PollScheduler scheduler;
private final PollHandler handler;
protected MyListenOperation(OperationContext context) {
this(context, PollScheduler.getInstance(), new PollHandler());
}
// package-private: for tests
MyListenOperation(OperationContext context, PollScheduler scheduler, PollHandler handler) {
super(context);
this.scheduler = scheduler;
this.handler = handler;
}
@Override
protected void start(Listener listener) {
scheduler.register(listener, () -> handler.poll(listener)); // fired each interval
}
}
A fake scheduler lets the test trigger exactly one poll, with no timing involved.
Trigger one poll with a fake scheduler
@Test
public void pollSubmitsADocument() {
TestPollScheduler scheduler = new TestPollScheduler(); // records registrations, triggers on demand
MyListenOperation operation = new MyListenOperation(context(), scheduler, new PollHandler());
operation.start(listener);
scheduler.triggerPoll(listener); // one deterministic poll, no scheduler thread
assertEquals("1", listener.getSubmissions().get(0).getRecord().getTrackedProperties().get("documentNumber"));
}
Next steps
You have everything you need to start testing your listener end-to-end. Here is how to use SimpleListener in your project.
- Upgrade your project to Connector SDK 2.30.1 or later.
- Assert on
getSubmissions()andgetBatchSubmissions()to check what your operation submitted, whether a payload or an error. If it waits on the result, stage that result withsetResultProvider(...)and assert how it handles each outcome: success, success with returned documents, failure, or an exception. - Then go after the paths you could never reach before: a failed process, a returned document that changes behavior, a full batch, a persisted cursor, and a container shutting down mid-poll.
The code that talks to Boomi should be as testable as the rest of your connector. Give SimpleListener the awkward paths the runtime used to surprise you with, and let your build catch them before a customer does.
