Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12233

Python InteractiveRunner works when evaluated as a global but not in a function

Details

    • Bug
    • Status: Triage Needed
    • P3
    • Resolution: Unresolved
    • None
    • None
    • runner-py-interactive
    • None
    • Python 3.7.10, Beam 2.28.0

    Description

      The following snippet works when I run it in a Jupyter notebook (e.g., Google Colab):

      import apache_beam as beam
      
      transform = beam.Create([1, 2, 3])
      
      p = beam.Pipeline('InteractiveRunner')
      pcoll = p | transform
      result = p.run()
      result.wait_until_finish()
      print(result.get(pcoll))  # [1, 2, 3]

       

      However, if I try to put creating/evaluating the pipeline into a helper function it fails with a mysterious error:

       

      import apache_beam as beam
      
      def evaluate(transform):
        p = beam.Pipeline('InteractiveRunner')
        pcoll = p | transform
        result = p.run()
        result.wait_until_finish()
        return result.get(pcoll)
      
      transform = beam.Create([1, 2, 3])
      evaluate(transform)

      results in 

      ---------------------------------------------------------------------------
      AttributeError                            Traceback (most recent call last)
      <ipython-input-33-271cfe7be46f> in <module>()
            9 
           10 transform = beam.Create([1, 2, 3])
      ---> 11 evaluate(transform)
      
      <ipython-input-33-271cfe7be46f> in evaluate(transform)
            4   p = beam.Pipeline('InteractiveRunner')
            5   pcoll = p | transform
      ----> 6   result = p.run()
            7   result.wait_until_finish()
            8   return result.get(pcoll)
      
      /usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py in run(self, test_runner_api)
          557         finally:
          558           shutil.rmtree(tmpdir)
      --> 559       return self.runner.run_pipeline(self, self._options)
          560     finally:
          561       shutil.rmtree(self.local_tempdir, ignore_errors=True)
      
      /usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
          134 
          135     # Make sure that sources without a user reference are still cached.
      --> 136     inst.watch_sources(pipeline)
          137 
          138     user_pipeline = ie.current_env().user_pipeline(pipeline)
      
      /usr/local/lib/python3.7/dist-packages/apache_beam/runners/interactive/pipeline_instrument.py in watch_sources(pipeline)
         1006           ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
         1007 
      -> 1008   retrieved_user_pipeline.visit(CacheableUnboundedPCollectionVisitor())
      
      AttributeError: 'NoneType' object has no attribute 'visit'

      It would be nice if this worked, or failing that, if at least an instructive error message was printed

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            shoyer Stephan Hoyer
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: