Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take 2: Error when reading from External Unbounded Source and Write to PubSubIO #589

Open
cbazza opened this issue Aug 4, 2017 · 5 comments
Open

Comments

@cbazza
Copy link

@cbazza cbazza commented Aug 4, 2017

I took the title from a previous issue #105 that has been resolved and it sounds just like my issue.

I have a pipeline that reads from unbonded pubsubio and writes to pubsubio.
Testing it locally reading from TextIO file and writing to pubsubio, works great, but when I switch the source to pubsubio (with large amount of data) and deploy it to google's dataflow, it runs fine but when data needs to be written out to pubsubio I get the following NullPointerException (seems related to creating the protobuf that wraps the message sent to pubsub):

Using Apache Beam 2.0.0 and the unique thing about my linear pipeline is that it contains 2 different windows. The first one a sliding window and the second a fixed window.

                .apply(Window.<GaugeDebug>into(
                        SlidingWindows.of(windowSizeL1).every(windowOffsetL1)
                        )
                        .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(windowDelayL1)))
                        .accumulatingFiredPanes()
                        .withAllowedLateness(windowLatenessL1)
                );
...
                .apply(Combine.<List<String>, GaugeDebug>perKey(new SumGauges()))
...
                .apply(Count.<List<String>, GaugeDebug>perKey())
...
                .apply(Window.<KV<List<String>,Long>>into(
                        FixedWindows.of(windowSizeL2)
                        ))
...
                .apply(Max.perKey())
...
                .apply("WriteMessages", PubsubIO.writeMessages().to("projects/preseem/topics/carlos-test"));

Notice that I write to pubsubio with 'writeMessages' (i.e. no attributes), yet code seems to die when trying to write out attributes.

Carlos.


Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
org.apache.beam.sdk.transforms.MapElements$1$auxiliary$u4qfc9DK.invokeProcessElement(Unknown Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158)
com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:246)
com.google.cloud.dataflow.worker.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:194)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629)
org.apache.beam.examples.GaugeKVToMessage.processElement(GaugeDebug.java:396)
org.apache.beam.examples.GaugeKVToMessage$auxiliary$xI4IkwZo.invokeProcessElement(Unknown Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158)
com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:246)
com.google.cloud.dataflow.worker.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:149)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:270)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:266)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner$DoFnProcessContext$1.outputWindowedValue(SimpleOldDoFnRunner.java:463)
com.google.cloud.dataflow.worker.runners.worker.WindowingInternalsAdapters$2.outputWindowedValue(WindowingInternalsAdapters.java:63)
org.apache.beam.runners.core.ReduceFnRunner$2.output(ReduceFnRunner.java:1014)
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:428)
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:118)
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1019)
org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:893)
org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:758)
com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowViaWindowSetDoFn.processElement(GroupAlsoByWindowViaWindowSetDoFn.java:90)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:122)
com.google.cloud.dataflow.worker.runners.worker.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:101)
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:73)
com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:106)
com.google.cloud.dataflow.worker.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:198)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72)
com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:791)
com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker.access$600(StreamingDataflowWorker.java:104)
com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker$9.run(StreamingDataflowWorker.java:873)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
java.util.HashMap.putMapEntries(HashMap.java:500)
java.util.HashMap.putAll(HashMap.java:784)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.MapField$MutatabilityAwareMap.putAll(MapField.java:343)
com.google.cloud.dataflow.worker.repackaged.com.google.pubsub.v1.PubsubMessage$Builder.putAllAttributes(PubsubMessage.java:880)
com.google.cloud.dataflow.worker.runners.worker.PubsubSink$PubsubWriter.add(PubsubSink.java:131)
com.google.cloud.dataflow.worker.runners.worker.PubsubSink$PubsubWriter.add(PubsubSink.java:111)
com.google.cloud.dataflow.worker.util.common.worker.WriteOperation.process(WriteOperation.java:82)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:194)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629)
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
java.lang.RuntimeException: org.apache.beam.sdk.util.
Aug 04, 2017 1:50:32 PM org.apache.beam.runners.dataflow.DataflowPipelineJob$1 run

@cbazza
Copy link
Author

@cbazza cbazza commented Aug 4, 2017

Ok, I found the problem and it is that google dataflow worker can not handle pubsubmessage with no attributes. The local runner handles it fine but when deployed to dataflow it crashes, so had to add attributes hashmap instead of just passing null:

         Map<String, String> attributes = new HashMap<String,String>();
         attributes.put("carlos", "carlos");
         PubsubMessage m = new PubsubMessage(data, attributes);

Carlos.

@tekjar
Copy link

@tekjar tekjar commented Aug 25, 2017

Yep. I'm facing this too. Adding dummy attributes fixed this

@aaltay
Copy link
Contributor

@aaltay aaltay commented Aug 25, 2017

@asiFarran
Copy link

@asiFarran asiFarran commented Oct 11, 2019

This has cost me about 10hrs and a fair bit of hair loss. Simply incredible.
All hail @cbazza!

@cbazza
Copy link
Author

@cbazza cbazza commented Oct 11, 2019

Thank you... I lost all my hair ;) You will come across other weirdness too when you try to debug large pipelines. I had enormous ones.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
4 participants
You can’t perform that action at this time.