From a4510be5dacc946f09c5fd07bd4b5f407eab7a93 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Sat, 31 Jan 2026 13:11:43 +0300 Subject: [PATCH 1/5] raw --- .../AbstractRightMaterializedJoinNode.java | 1 + .../rel/CorrelatedNestedLoopJoinNode.java | 46 +++++++++----- .../query/calcite/exec/rel/HashJoinNode.java | 5 +- .../query/calcite/exec/rel/MergeJoinNode.java | 61 +++++++++++-------- .../calcite/exec/rel/NestedLoopJoinNode.java | 38 +++++++----- 5 files changed, 97 insertions(+), 54 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java index 7dce770bcaeed..27c25f24e5858 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java @@ -81,6 +81,7 @@ protected AbstractRightMaterializedJoinNode(ExecutionContext ctx, RelDataTy waitingLeft = 0; waitingRight = 0; left = null; + processed = 0; leftInBuf.clear(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java index b1d78912d49e9..dbb6ca6adffbf 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -77,6 +77,9 @@ public class CorrelatedNestedLoopJoinNode extends AbstractNode { /** */ private int rightIdx; + /** */ + private int processed; + /** */ private Row rightEmptyRow; @@ -129,8 +132,6 @@ public CorrelatedNestedLoopJoinNode(ExecutionContext ctx, RelDataType rowTy assert !F.isEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested; - checkState(); - requested = rowsCnt; onRequest(); @@ -196,8 +197,6 @@ private void pushLeft(Row row) throws Exception { assert downstream() != null; assert waitingLeft > 0; - checkState(); - waitingLeft--; if (leftInBuf == null) @@ -213,8 +212,6 @@ private void pushRight(Row row) throws Exception { assert downstream() != null; assert waitingRight > 0; - checkState(); - waitingRight--; if (rightInBuf == null) @@ -269,9 +266,8 @@ private void onRequest() throws Exception { assert F.isEmpty(rightInBuf); context().execute(() -> { - checkState(); - state = State.FILLING_LEFT; + leftSource().request(waitingLeft = leftInBufferSize); }, this::onError); @@ -282,11 +278,7 @@ private void onRequest() throws Exception { assert waitingRight == -1 || waitingRight == 0 && rightInBuf.size() == rightInBufferSize; assert waitingLeft == -1 || waitingLeft == 0 && leftInBuf.size() == leftInBufferSize; - context().execute(() -> { - checkState(); - - join(); - }, this::onError); + context().execute(this::join0, this::onError); break; @@ -308,6 +300,8 @@ private void onPushLeft() throws Exception { if (leftInBuf.size() == leftInBufferSize) { assert waitingLeft == 0; + checkState(); + prepareCorrelations(); if (waitingRight == -1) @@ -384,7 +378,8 @@ private void join() throws Exception { leftIdx = 0; while (requested > 0 && leftIdx < leftInBuf.size()) { - checkState(); + if (rescheduleJoin()) + return; Row left = leftInBuf.get(leftIdx); Row right = rightInBuf.get(rightIdx); @@ -434,6 +429,9 @@ private void join() throws Exception { try { while (requested > 0 && notMatchedIdx < leftInBuf.size()) { + if (rescheduleJoin()) + return; + requested--; downstream().push(handler.concat(leftInBuf.get(notMatchedIdx), rightEmptyRow)); @@ -500,4 +498,24 @@ private void prepareCorrelations() { context().setCorrelated(row, correlationIds.get(i).getId()); } } + + /** */ + private void join0() throws Exception { + checkState(); + + processed = 0; + + join(); + } + + /** */ + protected boolean rescheduleJoin() { + if (processed++ > IN_BUFFER_SIZE) { + context().execute(this::join0, this::onError); + + return true; + } + + return false; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java index 45aa2d778dd49..b23c66304e815 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java @@ -180,8 +180,11 @@ protected Iterator untouched() { if (key != null) hashStore.computeIfAbsent(key, k -> new TouchedArrayList<>()).add(row); - if (waitingRight == 0) + if (waitingRight == 0) { + checkState(); + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } } /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java index 1f3f97e40d7c8..1c4d902c596f4 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java @@ -77,6 +77,9 @@ public abstract class MergeJoinNode extends AbstractNode { /** */ protected boolean inLoop; + /** */ + private int processed; + /** * Flag indicating that at least one of the inputs has exchange underneath. In this case we can't prematurely end * downstream if one of the inputs is drained, we need to wait for both inputs, since async message from remote @@ -107,24 +110,16 @@ private MergeJoinNode(ExecutionContext ctx, RelDataType rowType, Comparator assert !F.isEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0; - checkState(); - requested = rowsCnt; if (!inLoop) - context().execute(this::doJoin, this::onError); - } - - /** */ - private void doJoin() throws Exception { - checkState(); - - join(); + context().execute(this::join0, this::onError); } /** {@inheritDoc} */ @Override protected void rewindInternal() { requested = 0; + processed = 0; waitingLeft = 0; waitingRight = 0; @@ -184,8 +179,6 @@ private void pushLeft(Row row) throws Exception { assert downstream() != null; assert waitingLeft > 0; - checkState(); - waitingLeft--; if (!finishing) @@ -199,8 +192,6 @@ private void pushRight(Row row) throws Exception { assert downstream() != null; assert waitingRight > 0; - checkState(); - waitingRight--; if (!finishing) @@ -214,8 +205,6 @@ private void endLeft() throws Exception { assert downstream() != null; assert waitingLeft > 0; - checkState(); - waitingLeft = NOT_WAITING; join(); @@ -226,8 +215,6 @@ private void endRight() throws Exception { assert downstream() != null; assert waitingRight > 0; - checkState(); - waitingRight = NOT_WAITING; join(); @@ -339,7 +326,8 @@ public InnerJoin(ExecutionContext ctx, RelDataType rowType, Comparator try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null) left = leftInBuf.remove(); @@ -469,7 +457,8 @@ public LeftJoin( try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null || waitingRight == NOT_WAITING)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null) { left = leftInBuf.remove(); @@ -622,7 +611,8 @@ public RightJoin( try { while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING) && (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null && !leftInBuf.isEmpty()) left = leftInBuf.remove(); @@ -796,7 +786,8 @@ public FullOuterJoin( try { while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING) && !(right == null && rightInBuf.isEmpty() && rightMaterialization == null && waitingRight != NOT_WAITING)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null && !leftInBuf.isEmpty()) { left = leftInBuf.remove(); @@ -975,7 +966,8 @@ public SemiJoin(ExecutionContext ctx, RelDataType rowType, Comparator inLoop = true; try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty())) { - checkState(); + if (rescheduleJoin()) + return; if (left == null) left = leftInBuf.remove(); @@ -1031,7 +1023,8 @@ public AntiJoin(ExecutionContext ctx, RelDataType rowType, Comparator try { while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && !(right == null && rightInBuf.isEmpty() && waitingRight != NOT_WAITING)) { - checkState(); + if (rescheduleJoin()) + return; if (left == null) left = leftInBuf.remove(); @@ -1070,4 +1063,24 @@ else if (cmp > 0) { tryToRequestInputs(); } } + + /** */ + private void join0() throws Exception { + checkState(); + + processed = 0; + + join(); + } + + /** */ + protected boolean rescheduleJoin() { + if (processed++ > IN_BUFFER_SIZE) { + context().execute(this::join0, this::onError); + + return true; + } + + return false; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java index f2f42e3b68dab..ea76074bbfcca 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java @@ -66,16 +66,17 @@ private NestedLoopJoinNode(ExecutionContext ctx, RelDataType rowType, BiPre assert downstream() != null; assert waitingRight > 0; - checkState(); - nodeMemoryTracker.onRowAdded(row); waitingRight--; rightMaterialized.add(row); - if (waitingRight == 0) + if (waitingRight == 0) { + checkState(); + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } } /** */ @@ -138,7 +139,8 @@ public InnerJoin(ExecutionContext ctx, RelDataType rowType, BiPredicate 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; if (!cond.test(left, rightMaterialized.get(rightIdx++))) continue; @@ -206,7 +208,8 @@ public LeftJoin( } while (requested > 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; if (!cond.test(left, rightMaterialized.get(rightIdx++))) continue; @@ -294,7 +297,8 @@ public RightJoin( left = leftInBuf.remove(); while (requested > 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; Row right = rightMaterialized.get(rightIdx++); @@ -323,16 +327,18 @@ public RightJoin( assert lastPushedInd >= 0; inLoop = true; + Row emptyLeft = leftRowFactory.create(); try { for (lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd);; lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1) ) { - checkState(); - if (lastPushedInd < 0) break; - Row row = rowHnd.concat(leftRowFactory.create(), rightMaterialized.get(lastPushedInd)); + if (rescheduleJoin()) + return; + + Row row = rowHnd.concat(emptyLeft, rightMaterialized.get(lastPushedInd)); rightNotMatchedIndexes.clear(lastPushedInd); @@ -415,7 +421,8 @@ public FullOuterJoin( } while (requested > 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; Row right = rightMaterialized.get(rightIdx++); @@ -456,16 +463,15 @@ public FullOuterJoin( assert lastPushedInd >= 0; inLoop = true; + Row emptyLeft = leftRowFactory.create(); try { for (lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd);; lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1) ) { - checkState(); - if (lastPushedInd < 0) break; - Row row = rowHnd.concat(leftRowFactory.create(), rightMaterialized.get(lastPushedInd)); + Row row = rowHnd.concat(emptyLeft, rightMaterialized.get(lastPushedInd)); rightNotMatchedIndexes.clear(lastPushedInd); @@ -505,7 +511,8 @@ public SemiJoin(ExecutionContext ctx, RelDataType rowType, BiPredicate 0 && rightIdx < rightMaterialized.size()) { - checkState(); + if (rescheduleJoin()) + return; if (!cond.test(left, rightMaterialized.get(rightIdx++))) continue; @@ -549,7 +556,8 @@ public AntiJoin(ExecutionContext ctx, RelDataType rowType, BiPredicate Date: Sun, 1 Feb 2026 22:08:15 +0300 Subject: [PATCH 2/5] fix --- .../calcite/exec/rel/CorrelatedNestedLoopJoinNode.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java index dbb6ca6adffbf..02c2decd58258 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -406,6 +406,8 @@ private void join() throws Exception { } if (rightIdx == rightInBuf.size()) { + int preservedRightIdx = rightIdx; + leftIdx = 0; rightIdx = 0; @@ -429,8 +431,11 @@ private void join() throws Exception { try { while (requested > 0 && notMatchedIdx < leftInBuf.size()) { - if (rescheduleJoin()) + if (rescheduleJoin()) { + rightIdx = preservedRightIdx; + return; + } requested--; From 67dc8432d6e9a488d60903b76b2267fff5608e3c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 2 Feb 2026 10:50:26 +0300 Subject: [PATCH 3/5] fix --- .../rel/CorrelatedNestedLoopJoinNode.java | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java index 02c2decd58258..24d9b6efbc678 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Set; import java.util.function.BiPredicate; - import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; @@ -378,8 +377,11 @@ private void join() throws Exception { leftIdx = 0; while (requested > 0 && leftIdx < leftInBuf.size()) { - if (rescheduleJoin()) + if (processed++ > IN_BUFFER_SIZE) { + context().execute(this::join0, this::onError); + return; + } Row left = leftInBuf.get(leftIdx); Row right = rightInBuf.get(rightIdx); @@ -431,9 +433,11 @@ private void join() throws Exception { try { while (requested > 0 && notMatchedIdx < leftInBuf.size()) { - if (rescheduleJoin()) { + if (processed++ > IN_BUFFER_SIZE) { rightIdx = preservedRightIdx; + context().execute(this::join0, this::onError); + return; } @@ -512,15 +516,4 @@ private void join0() throws Exception { join(); } - - /** */ - protected boolean rescheduleJoin() { - if (processed++ > IN_BUFFER_SIZE) { - context().execute(this::join0, this::onError); - - return true; - } - - return false; - } } From abc79fab62688bd34555cfa32292699a69ee7d0f Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 3 Feb 2026 19:31:54 +0300 Subject: [PATCH 4/5] review fixes --- .../rel/CorrelatedNestedLoopJoinNode.java | 11 +------- .../query/calcite/exec/rel/HashJoinNode.java | 5 +++- .../calcite/exec/rel/NestedLoopJoinNode.java | 27 ++++++++++--------- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java index 24d9b6efbc678..8860cc10570ce 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -408,8 +408,6 @@ private void join() throws Exception { } if (rightIdx == rightInBuf.size()) { - int preservedRightIdx = rightIdx; - leftIdx = 0; rightIdx = 0; @@ -433,14 +431,7 @@ private void join() throws Exception { try { while (requested > 0 && notMatchedIdx < leftInBuf.size()) { - if (processed++ > IN_BUFFER_SIZE) { - rightIdx = preservedRightIdx; - - context().execute(this::join0, this::onError); - - return; - } - + processed++; requested--; downstream().push(handler.concat(leftInBuf.get(notMatchedIdx), rightEmptyRow)); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java index f845fb21d9585..c3731b260377d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java @@ -198,8 +198,11 @@ protected AbstractStoringHashJoin( hashStore.computeIfAbsent(key, k -> createRowList()).add(row); } - if (waitingRight == 0) + if (waitingRight == 0) { + checkState(); + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } } /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java index ea76074bbfcca..808b853fc4a60 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java @@ -327,14 +327,12 @@ public RightJoin( assert lastPushedInd >= 0; inLoop = true; - Row emptyLeft = leftRowFactory.create(); + lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd); + try { - for (lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd);; - lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1) - ) { - if (lastPushedInd < 0) - break; + Row emptyLeft = lastPushedInd < 0 ? null : leftRowFactory.create(); + while (lastPushedInd >= 0) { if (rescheduleJoin()) return; @@ -347,6 +345,8 @@ public RightJoin( if (lastPushedInd == Integer.MAX_VALUE || requested <= 0) break; + + lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1); } } finally { @@ -463,13 +463,14 @@ public FullOuterJoin( assert lastPushedInd >= 0; inLoop = true; - Row emptyLeft = leftRowFactory.create(); + lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd); + try { - for (lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd);; - lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1) - ) { - if (lastPushedInd < 0) - break; + Row emptyLeft = lastPushedInd < 0 ? null : leftRowFactory.create(); + + while (lastPushedInd > 0) { + if (rescheduleJoin()) + return; Row row = rowHnd.concat(emptyLeft, rightMaterialized.get(lastPushedInd)); @@ -480,6 +481,8 @@ public FullOuterJoin( if (lastPushedInd == Integer.MAX_VALUE || requested <= 0) break; + + lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1); } } finally { From 424af4d0b2b31653aa943f0668f34f2107b19132 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 4 Feb 2026 21:20:27 +0300 Subject: [PATCH 5/5] fix --- .../processors/query/calcite/exec/rel/NestedLoopJoinNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java index 808b853fc4a60..51b6d2b851845 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java @@ -468,7 +468,7 @@ public FullOuterJoin( try { Row emptyLeft = lastPushedInd < 0 ? null : leftRowFactory.create(); - while (lastPushedInd > 0) { + while (lastPushedInd >= 0) { if (rescheduleJoin()) return;