Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected AbstractRightMaterializedJoinNode(ExecutionContext<Row> ctx, RelDataTy
waitingLeft = 0;
waitingRight = 0;
left = null;
processed = 0;

leftInBuf.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.Set;
import java.util.function.BiPredicate;

import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
Expand Down Expand Up @@ -77,6 +76,9 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> {
/** */
private int rightIdx;

/** */
private int processed;

/** */
private Row rightEmptyRow;

Expand Down Expand Up @@ -129,8 +131,6 @@ public CorrelatedNestedLoopJoinNode(ExecutionContext<Row> ctx, RelDataType rowTy
assert !F.isEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested;

checkState();

requested = rowsCnt;

onRequest();
Expand Down Expand Up @@ -196,8 +196,6 @@ private void pushLeft(Row row) throws Exception {
assert downstream() != null;
assert waitingLeft > 0;

checkState();

waitingLeft--;

if (leftInBuf == null)
Expand All @@ -213,8 +211,6 @@ private void pushRight(Row row) throws Exception {
assert downstream() != null;
assert waitingRight > 0;

checkState();

waitingRight--;

if (rightInBuf == null)
Expand Down Expand Up @@ -269,9 +265,8 @@ private void onRequest() throws Exception {
assert F.isEmpty(rightInBuf);

context().execute(() -> {
checkState();

state = State.FILLING_LEFT;

leftSource().request(waitingLeft = leftInBufferSize);
}, this::onError);

Expand All @@ -282,11 +277,7 @@ private void onRequest() throws Exception {
assert waitingRight == -1 || waitingRight == 0 && rightInBuf.size() == rightInBufferSize;
assert waitingLeft == -1 || waitingLeft == 0 && leftInBuf.size() == leftInBufferSize;

context().execute(() -> {
checkState();

join();
}, this::onError);
context().execute(this::join0, this::onError);

break;

Expand All @@ -308,6 +299,8 @@ private void onPushLeft() throws Exception {
if (leftInBuf.size() == leftInBufferSize) {
assert waitingLeft == 0;

checkState();

prepareCorrelations();

if (waitingRight == -1)
Expand Down Expand Up @@ -384,7 +377,11 @@ private void join() throws Exception {
leftIdx = 0;

while (requested > 0 && leftIdx < leftInBuf.size()) {
checkState();
if (processed++ > IN_BUFFER_SIZE) {
context().execute(this::join0, this::onError);

return;
}

Row left = leftInBuf.get(leftIdx);
Row right = rightInBuf.get(rightIdx);
Expand Down Expand Up @@ -434,6 +431,7 @@ private void join() throws Exception {

try {
while (requested > 0 && notMatchedIdx < leftInBuf.size()) {
processed++;
requested--;

downstream().push(handler.concat(leftInBuf.get(notMatchedIdx), rightEmptyRow));
Expand Down Expand Up @@ -500,4 +498,13 @@ private void prepareCorrelations() {
context().setCorrelated(row, correlationIds.get(i).getId());
}
}

/** */
private void join0() throws Exception {
checkState();

processed = 0;

join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,11 @@ protected AbstractStoringHashJoin(
hashStore.computeIfAbsent(key, k -> createRowList()).add(row);
}

if (waitingRight == 0)
if (waitingRight == 0) {
checkState();

rightSource().request(waitingRight = IN_BUFFER_SIZE);
}
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> {
/** */
protected boolean inLoop;

/** */
private int processed;

/**
* Flag indicating that at least one of the inputs has exchange underneath. In this case we can't prematurely end
* downstream if one of the inputs is drained, we need to wait for both inputs, since async message from remote
Expand Down Expand Up @@ -107,24 +110,16 @@ private MergeJoinNode(ExecutionContext<Row> ctx, RelDataType rowType, Comparator
assert !F.isEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0;

checkState();

requested = rowsCnt;

if (!inLoop)
context().execute(this::doJoin, this::onError);
}

/** */
private void doJoin() throws Exception {
checkState();

join();
context().execute(this::join0, this::onError);
}

/** {@inheritDoc} */
@Override protected void rewindInternal() {
requested = 0;
processed = 0;
waitingLeft = 0;
waitingRight = 0;

Expand Down Expand Up @@ -184,8 +179,6 @@ private void pushLeft(Row row) throws Exception {
assert downstream() != null;
assert waitingLeft > 0;

checkState();

waitingLeft--;

if (!finishing)
Expand All @@ -199,8 +192,6 @@ private void pushRight(Row row) throws Exception {
assert downstream() != null;
assert waitingRight > 0;

checkState();

waitingRight--;

if (!finishing)
Expand All @@ -214,8 +205,6 @@ private void endLeft() throws Exception {
assert downstream() != null;
assert waitingLeft > 0;

checkState();

waitingLeft = NOT_WAITING;

join();
Expand All @@ -226,8 +215,6 @@ private void endRight() throws Exception {
assert downstream() != null;
assert waitingRight > 0;

checkState();

waitingRight = NOT_WAITING;

join();
Expand Down Expand Up @@ -339,7 +326,8 @@ public InnerJoin(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row>
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty()
|| rightMaterialization != null)) {
checkState();
if (rescheduleJoin())
return;

if (left == null)
left = leftInBuf.remove();
Expand Down Expand Up @@ -469,7 +457,8 @@ public LeftJoin(
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty()
|| rightMaterialization != null || waitingRight == NOT_WAITING)) {
checkState();
if (rescheduleJoin())
return;

if (left == null) {
left = leftInBuf.remove();
Expand Down Expand Up @@ -622,7 +611,8 @@ public RightJoin(
try {
while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING)
&& (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) {
checkState();
if (rescheduleJoin())
return;

if (left == null && !leftInBuf.isEmpty())
left = leftInBuf.remove();
Expand Down Expand Up @@ -796,7 +786,8 @@ public FullOuterJoin(
try {
while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING)
&& !(right == null && rightInBuf.isEmpty() && rightMaterialization == null && waitingRight != NOT_WAITING)) {
checkState();
if (rescheduleJoin())
return;

if (left == null && !leftInBuf.isEmpty()) {
left = leftInBuf.remove();
Expand Down Expand Up @@ -975,7 +966,8 @@ public SemiJoin(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row>
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty())) {
checkState();
if (rescheduleJoin())
return;

if (left == null)
left = leftInBuf.remove();
Expand Down Expand Up @@ -1031,7 +1023,8 @@ public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row>
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) &&
!(right == null && rightInBuf.isEmpty() && waitingRight != NOT_WAITING)) {
checkState();
if (rescheduleJoin())
return;

if (left == null)
left = leftInBuf.remove();
Expand Down Expand Up @@ -1070,4 +1063,24 @@ else if (cmp > 0) {
tryToRequestInputs();
}
}

/** */
private void join0() throws Exception {
checkState();

processed = 0;

join();
}

/** */
protected boolean rescheduleJoin() {
if (processed++ > IN_BUFFER_SIZE) {
context().execute(this::join0, this::onError);

return true;
}

return false;
}
}
Loading