Error encountered while trying pubsub io > splittable dofn
RuntimeError: Transform node 
AppliedPTransform(ParDo(TestDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey, 
_GroupByKeyOnly) was not replaced as expected.
Can someone help me with reviewing the code for anything I might be doing incorrectly in there
Code:
"""
python examples/test_restriction_unbounded.py --project mk2 --topic projects/mk2/topics/testing
"""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import csv
import logging
import sys
import time
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker, OffsetRange
from apache_beam.transforms.core import RestrictionProvider
class TestProvider(RestrictionProvider):
  def initial_restriction(self, element):
    return OffsetRange(0, 1)
  def create_tracker(self, restriction):
    return OffsetRestrictionTracker(restriction)
  def restriction_size(self, element, restriction):
    return restriction.size()
class TestDoFn(beam.DoFn):
    def process(
        self,
        element,
        restriction_tracker=beam.DoFn.RestrictionParam(
            TestProvider())):
        import pdb; pdb.set_trace()
        cur = restriction_tracker.current_restriction().start
        while restriction_tracker.try_claim(cur):
          return element
def run(argv=None, save_main_session=True):
  parser = argparse.ArgumentParser()
  parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
  args, pipeline_args = parser.parse_known_args(argv)
  options = PipelineOptions(pipeline_args)
  options.view_as(StandardOptions).streaming = True
  with beam.Pipeline(options=options) as p:
    # data = ['abc', 'defghijklmno', 'pqrstuv', 'wxyz']
    # actual = (p | beam.Create(data) | beam.ParDo(ExpandingStringsDoFn()))
    scores = p | beam.io.ReadFromPubSub(topic=args.topic) | beam.ParDo(TestDoFn())
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()