Skip to content

[Feature][Connector-V2] Add MQTT source connector#10935

Open
JAEKWANG97 wants to merge 1 commit into
apache:devfrom
JAEKWANG97:feature/add-mqtt-source
Open

[Feature][Connector-V2] Add MQTT source connector#10935
JAEKWANG97 wants to merge 1 commit into
apache:devfrom
JAEKWANG97:feature/add-mqtt-source

Conversation

@JAEKWANG97
Copy link
Copy Markdown
Contributor

Purpose of this pull request

Add MQTT Source connector support for streaming jobs.

This PR adds:

  • MQTT source factory, options, config, and reader
  • JSON/text message deserialization
  • source plugin mapping
  • English and Chinese connector documents
  • unit tests for source config, factory, and reader behavior

This is part of #10753.

Does this PR introduce any user-facing change?

Yes.

Before this PR, the MQTT connector only supported the sink side in the current dev branch, so users could publish SeaTunnel rows to an MQTT broker but could not configure MQTT as a source connector.

After this PR, users can configure MQTT as a source connector in streaming jobs to subscribe to an MQTT topic and read messages from an MQTT broker.

Example:

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  MQTT {
    url = "tcp://broker.example.com:1883"
    topic = "iot/sensors/readings"
    qos = 1
    format = "json"
    schema = {
      fields {
        id = bigint
        temperature = double
      }
    }
    plugin_output = "mqtt_source"
  }
}

sink {
  Console {
    plugin_input = "mqtt_source"
  }
}

This is a user-facing change in the unreleased dev branch. It extends the existing MQTT connector by adding source-side support and documentation.

How was this patch tested?

Added unit tests for:

  • MQTT source config default values and validation
  • MQTT source factory option rules
  • MQTT source reader message deserialization and queue handling

Fork CI passed:

https://github.com/JAEKWANG97/seatunnel/actions/runs/26322480447

Check list

@DanielLeens
Copy link
Copy Markdown
Contributor

Thanks for the contribution. I reviewed the latest head f5231ced78760bc8e7f33a235e673c83df01e0c2 locally against upstream/dev, and I traced the actual source runtime path from MqttSource into MqttSourceReader. I did not run Maven locally in this pass; this is a source-level review only.

What problem this PR solves

  • User pain point
    connector-mqtt already has a sink, but there is no MQTT source connector for pulling messages from a broker into SeaTunnel.
  • Fix approach
    This PR adds a new source-side implementation with config, factory, single-reader source, callback-based reader, docs, and basic unit tests.
  • One sentence
    The overall feature shape is reasonable, but the latest head still has a risky reconnect behavior: on a long-lasting broker outage, the source can silently stop ingesting without actually failing the task.

Runtime chain I checked

SeaTunnel streaming job
  -> MqttSource.getBoundedness() => UNBOUNDED
  -> MqttSource.createReader() -> new MqttSourceReader(...)
      -> open()
          -> Paho MqttClient.connect()
          -> subscribe(topic, qos)

Message arrival
  -> MqttCallbackExtended.messageArrived() [MqttSourceReader.java:165-180]
      -> copy payload into LinkedBlockingQueue
  -> pollNext() [MqttSourceReader.java:93-116]
      -> poll queue
      -> deserialize payload
      -> collect SeaTunnelRow

Connection loss
  -> connectionLost() [MqttSourceReader.java:137-140]
      -> only LOG.warn(...)
      -> no receiveException, no task failure
  -> if reconnect never succeeds
      -> no more callbacks
      -> pollNext() keeps returning null forever
      -> task looks alive but ingestion is stalled

Core review

Key findings:

  1. The main callback -> queue -> pollNext() design is clear and the normal path is reachable.
  2. Docs and option wiring are mostly aligned.
  3. The failure state is incomplete: pollNext() only fails when receiveException is set, but connectionLost() never sets it.
  4. That means a long-lasting outage can degrade into a silent no-data state instead of an explicit source failure.

Findings

Issue 1: a long-lasting reconnect failure can silently stall the source instead of surfacing a task failure

  • Location: seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceReader.java:138
  • Problem description:
    pollNext() only throws when receiveException is set. But connectionLost() currently just logs a warning and relies on Paho auto-reconnect. If the broker stays unavailable or reconnect never succeeds for a long time, the reader receives no more callbacks, receiveException stays null, and pollNext() simply keeps polling an empty queue forever.
  • Potential risk:
    The task appears healthy from the engine side while ingestion has actually stopped. For a streaming source, that kind of silent stall is more dangerous than a direct fail-fast because it delays detection and recovery.
  • Best improvement suggestion:
    Option A: record the disconnected state and fail pollNext() after a configurable reconnect timeout if recovery never succeeds.
    Option B: if you want to keep relying on auto-reconnect, then you still need explicit health/error propagation for repeated reconnect failure instead of only logging a warning.
  • Severity: High
  • Already raised by others: No

Issue 2: the new tests do not cover connection-loss / reconnect-failure behavior

  • Location: seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceTest.java:91
  • Problem description:
    The current tests cover JSON happy path and queue-full failure, but they do not cover connectionLost(), reconnect callbacks, or how reconnect failures are surfaced back through pollNext().
  • Potential risk:
    The most failure-prone part of a streaming MQTT source is the recovery path, and that path currently has no regression protection.
  • Best improvement suggestion:
    Please add reader-level tests for connection loss and reconnect/resubscribe failure propagation.
  • Severity: Medium
  • Already raised by others: No

Compatibility

  • API/config/defaults: compatible addition of new source capability
  • Protocol: MQTT source support is new; no existing source behavior is broken
  • Historical behavior: connector-mqtt moves from sink-only to source+sink

Tests / coverage

  • The newly added tests are structurally stable: they are in-memory only and I did not see obvious flaky patterns such as Thread.sleep, external ports, or unordered assertions.
  • The main gap is coverage, not test stability.

CI status

  • GitHub currently reports:
    • mergeable=MERGEABLE
    • mergeStateStatus=BLOCKED
    • Notify test workflow=FAILURE
  • I checked that failure. The contributor fork already has a successful Build workflow run for this same head (JAEKWANG97/seatunnel run 26322480447), but the Apache-side helper workflow failed with 401 Bad credentials while querying the fork check-runs. So the visible CI failure looks workflow/integration-side rather than caused by the source code in this PR.

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: the source must not silently stall forever when reconnect never succeeds.
  1. Suggested but non-blocking follow-up
  • Issue 2: add regression coverage for connection-loss / reconnect-failure behavior.

Overall, the feature direction makes sense and most of the source skeleton is in place. But for the current head, I would not merge yet because the reconnect/error state is still incomplete for a production streaming source.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants