Skip to content

Fix WriteToPubSub to pass ordering_key to publish() method#37345

Open
nikitagrover19 wants to merge 5 commits intoapache:masterfrom
nikitagrover19:fix-pubsub-ordering-key
Open

Fix WriteToPubSub to pass ordering_key to publish() method#37345
nikitagrover19 wants to merge 5 commits intoapache:masterfrom
nikitagrover19:fix-pubsub-ordering-key

Conversation

@nikitagrover19
Copy link

Fixes #36201

This PR fixes the WriteToPubSub transform in the Python SDK to properly pass the ordering_key parameter to the Google Cloud Pub/Sub publish() method.

The Problem
While the ordering_key field was correctly serialized into the PubsubMessage protobuf, it was not being extracted and passed to PublisherClient.publish() in the _PubSubWriteDoFn._flush() method. This caused message ordering to not work even when users explicitly specified an ordering_key.

The Solution:
Modified _PubSubWriteDoFn._flush() to:

  • Extract ordering_key from the deserialized PubsubMessage
  • Pass it to publish() in both code paths (with attributes and data-only)
  • Handle empty/missing ordering_key for backward compatibility

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @nikitagrover19, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical bug in the Apache Beam Python SDK's WriteToPubSub transform, which previously failed to correctly utilize the ordering_key for Google Cloud Pub/Sub messages. The fix ensures that messages published through Beam pipelines respect the specified ordering, which is crucial for applications requiring strict message sequence. The changes involve targeted modifications to the publishing logic and the addition of robust test cases to confirm the functionality and prevent regressions.

Highlights

  • Fix for Pub/Sub Ordering Key: The WriteToPubSub transform in the Python SDK now correctly passes the ordering_key to the Google Cloud Pub/Sub publish() method, resolving an issue where message ordering was not working as expected.
  • _PubSubWriteDoFn._flush() Modification: The _flush() method has been updated to extract the ordering_key from the deserialized PubsubMessage and include it in the publish() call, ensuring proper handling for both attribute-rich and data-only messages.
  • Backward Compatibility: The implementation includes logic to gracefully handle cases where ordering_key is empty or missing, maintaining backward compatibility for existing pipelines.
  • Comprehensive Testing: New integration and unit tests have been added to validate the correct propagation and usage of the ordering_key in various scenarios, including with and without attributes, and to ensure backward compatibility.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@nikitagrover19
Copy link
Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly adds support for ordering_key in WriteToPubSub. The changes to the _flush method are functional, but I've suggested a refactoring to improve code clarity and maintainability. I've also added suggestions to strengthen the new unit and integration tests by making the assertions more specific and robust. Overall, this is a good fix.

Comment on lines 662 to +676
if self.with_attributes and pubsub_msg.attributes:
future = self._pub_client.publish(
self._topic, pubsub_msg.data, **pubsub_msg.attributes)
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key
if pubsub_msg.ordering_key else '',
**pubsub_msg.attributes)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
if pubsub_msg.ordering_key:
future = self._pub_client.publish(
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for publishing messages can be simplified to improve readability and reduce code duplication. You can determine the attributes and ordering_key to use and then have a single call to self._pub_client.publish. This makes the code easier to maintain.

      attributes = {}
      if self.with_attributes and pubsub_msg.attributes:
        attributes = pubsub_msg.attributes

      if pubsub_msg.ordering_key:
        future = self._pub_client.publish(
            self._topic,
            pubsub_msg.data,
            ordering_key=pubsub_msg.ordering_key,
            **attributes)
      else:
        future = self._pub_client.publish(
            self._topic, pubsub_msg.data, **attributes)

Comment on lines +345 to +351
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current verification for ordering_key only checks for its presence. To make the test more robust, you should assert that the ordering_key has the expected value for each message.

Additionally, you can improve efficiency by acknowledging all messages in a single batch request after pulling them, rather than one by one inside the loop.

Suggested change
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
received_map = {msg.message.data: msg for msg in response.received_messages}
self.assertEqual(received_map[b'order_data001'].message.ordering_key, 'key1')
self.assertEqual(received_map[b'order_data002'].message.ordering_key, 'key1')
self.assertEqual(received_map[b'order_data003'].message.ordering_key, 'key2')
ack_ids = [msg.ack_id for msg in response.received_messages]
if ack_ids:
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": ack_ids,
})

Comment on lines +1162 to +1163
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make this test more robust, you should also verify that ordering_key is not passed to the publish method when no ordering key is provided in the PubsubMessage. You can do this by checking the call_args of the mock.

Suggested change
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
# Verify that publish was called
mock_pubsub.return_value.publish.assert_called()
call_args = mock_pubsub.return_value.publish.call_args
self.assertNotIn('ordering_key', call_args.kwargs)

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@nikitagrover19 nikitagrover19 marked this pull request as draft January 19, 2026 16:50
@nikitagrover19 nikitagrover19 marked this pull request as ready for review January 20, 2026 11:32
@nikitagrover19
Copy link
Author

FYI: CI failures look flaky (PortableRunner gRPC DEADLINE_EXCEEDED / Socket closed).
I ran these locally and they pass:

  • apache_beam/yaml/yaml_transform_unit_test.py::ExpandPipelineTest::test_expand_pipeline_with_pipeline_key_only
  • apache_beam/dataframe/convert_test.py
    • ConvertTest::test_convert
    • ConvertTest::test_convert_memoization_clears_cache
    • (full file run also passes, including with --runner=PortableRunner)

@damccorm
Copy link
Contributor

assign set of reviewers

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @tvalentyn for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Contributor

github-actions bot commented Feb 3, 2026

Reminder, please take a look at this pr: @tvalentyn

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly adds support for ordering_key when writing to Pub/Sub, which is a valuable fix. The changes are logical and include new unit and integration tests to cover the new functionality. I have two suggestions for improvement: one to refactor the publishing logic in pubsub.py for better readability and maintainability, and another to strengthen the assertions in the new integration test to make it more robust.

Comment on lines 662 to +676
if self.with_attributes and pubsub_msg.attributes:
future = self._pub_client.publish(
self._topic, pubsub_msg.data, **pubsub_msg.attributes)
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key
if pubsub_msg.ordering_key else '',
**pubsub_msg.attributes)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
if pubsub_msg.ordering_key:
future = self._pub_client.publish(
self._topic,
pubsub_msg.data,
ordering_key=pubsub_msg.ordering_key)
else:
future = self._pub_client.publish(self._topic, pubsub_msg.data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for publishing messages can be simplified to reduce code duplication and improve readability. Instead of separate code paths for messages with and without attributes, you can build a dictionary of keyword arguments for the publish call. This makes the code cleaner and easier to maintain.

      publish_kwargs = {}
      if self.with_attributes and pubsub_msg.attributes:
        publish_kwargs.update(pubsub_msg.attributes)

      if pubsub_msg.ordering_key:
        publish_kwargs['ordering_key'] = pubsub_msg.ordering_key

      future = self._pub_client.publish(
          self._topic, pubsub_msg.data, **publish_kwargs)

Comment on lines +345 to +351
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertion in this test is quite weak. It only checks for the presence of the ordering_key attribute on the message object using dir(), but doesn't verify its value. A more robust test would be to check that the received messages have the correct ordering keys and attributes corresponding to the messages that were sent. Also, acknowledging messages one by one in a loop is inefficient; it's better to collect all ack_ids and acknowledge them in a single call.

Suggested change
for received_message in response.received_messages:
self.assertIn('ordering_key', dir(received_message.message))
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
# Verify ordering keys and attributes were preserved
received_msgs_map = {
msg.message.data: msg.message
for msg in response.received_messages
}
for sent_msg in test_messages:
self.assertIn(sent_msg.data, received_msgs_map)
received_msg = received_msgs_map[sent_msg.data]
self.assertEqual(received_msg.ordering_key, sent_msg.ordering_key)
self.assertEqual(dict(received_msg.attributes), sent_msg.attributes)
# Acknowledge all messages at once for efficiency
ack_ids = [msg.ack_id for msg in response.received_messages]
if ack_ids:
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": ack_ids,
})

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @tvalentyn

@tvalentyn
Copy link
Contributor

responded on the issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Beam Python PubSubIO Does Not Support Ordered Key Publishing

3 participants

Comments