Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import java.sql.SQLException;
import java.text.Format;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -33,6 +36,9 @@
import org.apache.phoenix.log.QueryLogger;
import org.apache.phoenix.monitoring.OverAllQueryMetrics;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.monitoring.SlowestScanMetricsQueue;
import org.apache.phoenix.monitoring.TopNTreeMultiMap;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
Expand All @@ -52,6 +58,8 @@
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;

import org.apache.hbase.thirdparty.com.google.gson.JsonObject;

/**
* Class that keeps common state used across processing the various clauses in a top level JDBC
* statement such as SELECT, UPSERT, DELETE, etc.
Expand Down Expand Up @@ -91,6 +99,8 @@ public class StatementContext {
private Integer totalSegmentsValue;
private boolean hasRowSizeFunction = false;
private boolean hasRawRowSizeFunction = false;
private final SlowestScanMetricsQueue slowestScanMetricsQueue;
private final int slowestScanMetricsCount;

public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
Expand All @@ -116,6 +126,8 @@ public StatementContext(StatementContext context) {
this.subqueryResults = context.subqueryResults;
this.readMetricsQueue = context.readMetricsQueue;
this.overAllQueryMetrics = context.overAllQueryMetrics;
this.slowestScanMetricsQueue = context.slowestScanMetricsQueue;
this.slowestScanMetricsCount = context.slowestScanMetricsCount;
this.queryLogger = context.queryLogger;
this.isClientSideUpsertSelect = context.isClientSideUpsertSelect;
this.isUncoveredIndex = context.isUncoveredIndex;
Expand Down Expand Up @@ -180,6 +192,10 @@ public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Bin
this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled, connection.getLogLevel());
this.overAllQueryMetrics =
new OverAllQueryMetrics(isRequestMetricsEnabled, connection.getLogLevel());
this.slowestScanMetricsCount = connection.getSlowestScanMetricsCount();
this.slowestScanMetricsQueue = slowestScanMetricsCount > 0
? new SlowestScanMetricsQueue()
: SlowestScanMetricsQueue.NOOP_SLOWEST_SCAN_METRICS_QUEUE;
this.retryingPersistentCache = Maps.<Long, Boolean> newHashMap();
this.hasFirstValidResult = new AtomicBoolean(false);
this.subStatementContexts = Sets.newHashSet();
Expand Down Expand Up @@ -475,4 +491,71 @@ public Integer getTotalSegmentsValue() {
public void setTotalSegmentsValue(Integer totalSegmentsValue) {
this.totalSegmentsValue = totalSegmentsValue;
}

/**
* FOR INTERNAL USE ONLY.
* @return the slowest scan metrics queue
*/
public SlowestScanMetricsQueue getSlowestScanMetricsQueue() {
return slowestScanMetricsQueue;
}

public List<List<JsonObject>> getTopNSlowestScanMetrics() {
if (slowestScanMetricsCount <= 0) {
return Collections.emptyList();
}
TopNTreeMultiMap<Long, List<ScanMetricsHolder>> slowestScanMetricsHolders =
TopNTreeMultiMap.getInstance(slowestScanMetricsCount, (s1, s2) -> Long.compare(s2, s1));
getSlowestScanMetricsUtil(this, new ArrayDeque<>(), 0, slowestScanMetricsHolders);
List<List<JsonObject>> topNSlowestScanMetricsHolders =
new ArrayList<>(slowestScanMetricsHolders.size());
for (Map.Entry<Long, List<ScanMetricsHolder>> entry : slowestScanMetricsHolders.entries()) {
List<JsonObject> scanMetricsHolderJsonObjects = new ArrayList<>(entry.getValue().size());
for (ScanMetricsHolder scanMetricsHolder : entry.getValue()) {
scanMetricsHolderJsonObjects.add(scanMetricsHolder.toJson());
}
topNSlowestScanMetricsHolders.add(scanMetricsHolderJsonObjects);
}
return topNSlowestScanMetricsHolders;
}

private static void getSlowestScanMetricsUtil(StatementContext node,
Deque<ScanMetricsHolder> currentScanMetricsHolders, long sumOfMillisBetweenNexts,
TopNTreeMultiMap<Long, List<ScanMetricsHolder>> topNSlowestScanMetricsHolders) {
Iterator<ScanMetricsHolder> currentScanMetricsHolderIterator =
node.getSlowestScanMetricsQueue().getIterator();
if (currentScanMetricsHolderIterator.hasNext()) {
do {
ScanMetricsHolder currentScanMetricsHolder = currentScanMetricsHolderIterator.next();
long currentMillisBetweenNexts = currentScanMetricsHolder.getCostOfScan();
long newSumOfMillisBetweenNexts = sumOfMillisBetweenNexts + currentMillisBetweenNexts;

// Add at tail of the dequeue
currentScanMetricsHolders.addLast(currentScanMetricsHolder);

traverseSlowestScanMetricsTree(node, currentScanMetricsHolders, newSumOfMillisBetweenNexts,
topNSlowestScanMetricsHolders);
currentScanMetricsHolders.removeLast();
} while (currentScanMetricsHolderIterator.hasNext());
} else {
traverseSlowestScanMetricsTree(node, currentScanMetricsHolders, sumOfMillisBetweenNexts,
topNSlowestScanMetricsHolders);
}
}

private static void traverseSlowestScanMetricsTree(StatementContext node,
Deque<ScanMetricsHolder> currentScanMetricsHolders, long sumOfMillisBetweenNexts,
TopNTreeMultiMap<Long, List<ScanMetricsHolder>> topNSlowestScanMetricsHolders) {
Set<StatementContext> subContexts = node.getSubStatementContexts();
if (subContexts == null || subContexts.isEmpty()) {
topNSlowestScanMetricsHolders.put(sumOfMillisBetweenNexts,
() -> new ArrayList<>(currentScanMetricsHolders));
} else {
// Process sub-contexts
for (StatementContext sub : subContexts) {
getSlowestScanMetricsUtil(sub, currentScanMetricsHolders, sumOfMillisBetweenNexts,
topNSlowestScanMetricsHolders);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
Expand Down Expand Up @@ -115,8 +116,10 @@ protected void submitWork(final List<List<Scan>> nestedScans,
context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
for (final ScanLocator scanLocation : scanLocations) {
final Scan scan = scanLocation.getScan();
final ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics,
physicalTableName, scan, context.getConnection().getLogLevel());
PhoenixConnection connection = context.getConnection();
final ScanMetricsHolder scanMetricsHolder =
ScanMetricsHolder.getInstance(readMetrics, physicalTableName, scan,
connection.getLogLevel(), connection.isScanMetricsByRegionEnabled());
final TaskExecutionMetricsHolder taskMetrics =
new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
final TableResultIterator tableResultItr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,7 @@
*/
package org.apache.phoenix.iterate;

import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.NOT_SERVING_REGION_EXCEPTION_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REMOTE_RPC_RETRIES_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_NOT_SERVING_REGION_EXCEPTION;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_REMOTE_RPC_RETRIES;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_FILTERED;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_ROWS_SCANNED;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
import static org.apache.phoenix.util.ScanUtil.isDummy;

import java.io.IOException;
Expand All @@ -53,12 +29,11 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.compat.hbase.CompatScanMetrics;
import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.query.QueryServices;
Expand Down Expand Up @@ -120,88 +95,20 @@ public void close() throws SQLException {
updateMetrics();
}

private void changeMetric(CombinableMetric metric, Long value) {
if (value != null) {
metric.change(value);
}
}

private void changeMetric(GlobalClientMetrics metric, Long value) {
if (value != null) {
metric.update(value);
}
}

private void updateMetrics() {

if (scanMetricsEnabled && !scanMetricsUpdated) {
ScanMetrics scanMetrics = scanner.getScanMetrics();
Map<String, Long> scanMetricsMap = scanMetrics.getMetricsMap();
scanMetricsHolder.setScanMetricMap(scanMetricsMap);

changeMetric(scanMetricsHolder.getCountOfRPCcalls(),
scanMetricsMap.get(RPC_CALLS_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRemoteRPCcalls(),
scanMetricsMap.get(REMOTE_RPC_CALLS_METRIC_NAME));
changeMetric(scanMetricsHolder.getSumOfMillisSecBetweenNexts(),
scanMetricsMap.get(MILLIS_BETWEEN_NEXTS_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfNSRE(),
scanMetricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfBytesInResults(),
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfBytesInRemoteResults(),
scanMetricsMap.get(BYTES_IN_REMOTE_RESULTS_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRegions(),
scanMetricsMap.get(REGIONS_SCANNED_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRPCRetries(),
scanMetricsMap.get(RPC_RETRIES_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRemoteRPCRetries(),
scanMetricsMap.get(REMOTE_RPC_RETRIES_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRowsScanned(),
scanMetricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRowsFiltered(),
scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfBytesScanned(),
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRowsPaged(), dummyRowCounter);
changeMetric(scanMetricsHolder.getFsReadTime(),
CompatScanMetrics.getFsReadTime(scanMetricsMap));
changeMetric(scanMetricsHolder.getCountOfBytesReadFromFS(),
CompatScanMetrics.getBytesReadFromFs(scanMetricsMap));
changeMetric(scanMetricsHolder.getCountOfBytesReadFromMemstore(),
CompatScanMetrics.getBytesReadFromMemstore(scanMetricsMap));
changeMetric(scanMetricsHolder.getCountOfBytesReadFromBlockcache(),
CompatScanMetrics.getBytesReadFromBlockCache(scanMetricsMap));
changeMetric(scanMetricsHolder.getCountOfBlockReadOps(),
CompatScanMetrics.getBlockReadOpsCount(scanMetricsMap));
changeMetric(scanMetricsHolder.getRpcScanProcessingTime(),
CompatScanMetrics.getRpcScanProcessingTime(scanMetricsMap));
changeMetric(scanMetricsHolder.getRpcScanQueueWaitTime(),
CompatScanMetrics.getRpcScanQueueWaitTime(scanMetricsMap));

changeMetric(GLOBAL_SCAN_BYTES, scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_RPC_CALLS, scanMetricsMap.get(RPC_CALLS_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS,
scanMetricsMap.get(REMOTE_RPC_CALLS_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS,
scanMetricsMap.get(MILLIS_BETWEEN_NEXTS_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_NOT_SERVING_REGION_EXCEPTION,
scanMetricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS,
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS,
scanMetricsMap.get(BYTES_IN_REMOTE_RESULTS_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_SCANNED_REGIONS,
scanMetricsMap.get(REGIONS_SCANNED_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_RPC_RETRIES, scanMetricsMap.get(RPC_RETRIES_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_REMOTE_RPC_RETRIES,
scanMetricsMap.get(REMOTE_RPC_RETRIES_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_ROWS_SCANNED,
scanMetricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_ROWS_FILTERED,
scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME));

changeMetric(GLOBAL_PAGED_ROWS_COUNTER, dummyRowCounter);
scanMetricsHolder.populateMetrics(dummyRowCounter);
GlobalClientMetrics.populateMetrics(scanMetricsMap, dummyRowCounter);
PhoenixConnection connection = context.getConnection();
int slowestScanMetricsCount = connection.getSlowestScanMetricsCount();
if (slowestScanMetricsCount > 0) {
scanMetricsHolder.setScanMetricsByRegion(scanMetrics.collectMetricsByRegion());
context.getSlowestScanMetricsQueue().add(scanMetricsHolder);
}

scanMetricsUpdated = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,9 @@ private PeekingResultIterator nextIterator() throws SQLException {
currentScan.setAttribute(BaseScannerRegionObserverConstants.SCAN_OFFSET,
PInteger.INSTANCE.toBytes(remainingOffset));
}
PhoenixConnection connection = context.getConnection();
ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName,
currentScan, context.getConnection().getLogLevel());
currentScan, connection.getLogLevel(), connection.isScanMetricsByRegionEnabled());
TableResultIterator itr = new TableResultIterator(mutationState, currentScan,
scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime);
PeekingResultIterator peekingItr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public UnionResultIterators(List<QueryPlan> plans, StatementContext parentStmtCt
readMetricsList = Lists.newArrayListWithCapacity(nPlans);
overAllQueryMetricsList = Lists.newArrayListWithCapacity(nPlans);
for (QueryPlan plan : plans) {
parentStmtCtx.addSubStatementContext(plan.getContext());
readMetricsList.add(plan.getContext().getReadMetricsQueue());
overAllQueryMetricsList.add(plan.getContext().getOverallQueryMetrics());
iterators.add(LookAheadResultIterator.wrap(plan.iterator()));
Expand Down Expand Up @@ -102,11 +103,12 @@ public void close() throws SQLException {
private void setMetricsInParentContext() {
ReadMetricQueue parentCtxReadMetrics = parentStmtCtx.getReadMetricsQueue();
for (ReadMetricQueue readMetrics : readMetricsList) {
parentCtxReadMetrics.combineReadMetrics(readMetrics);
parentCtxReadMetrics.combineReadMetrics(readMetrics, true);
}
OverAllQueryMetrics parentCtxQueryMetrics = parentStmtCtx.getOverallQueryMetrics();
for (OverAllQueryMetrics metric : overAllQueryMetricsList) {
parentCtxQueryMetrics.combine(metric);
metric.reset();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.util.JDBCUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;

import org.apache.hbase.thirdparty.com.google.gson.JsonObject;

/**
* ResultSet suitable for truly immutable use cases that do not delete data and do not query data
* the does not exist. Returns a non-nullvalue when possible. Checks result from both the underlying
Expand Down Expand Up @@ -240,6 +243,11 @@ public Map<String, Map<MetricType, Long>> getReadMetrics() {
return metrics;
}

@Override
public List<List<JsonObject>> getTopNSlowestScanMetrics() {
return JDBCUtil.getTopNSlowestScanMetrics(rs);
}

@Override
public Map<MetricType, Long> getOverAllRequestReadMetrics() {
Map<MetricType, Long> metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import java.util.function.Function;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.util.JDBCUtil;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;

import org.apache.hbase.thirdparty.com.google.gson.JsonObject;

/**
* ParallelPhoenixResultSet that provides the standard wait until at least one cluster completes
* approach
Expand Down Expand Up @@ -127,6 +130,11 @@ public Map<String, Map<MetricType, Long>> getReadMetrics() {
return metrics;
}

@Override
public List<List<JsonObject>> getTopNSlowestScanMetrics() {
return JDBCUtil.getTopNSlowestScanMetrics(rs);
}

@Override
public Map<MetricType, Long> getOverAllRequestReadMetrics() {
Map<MetricType, Long> metrics;
Expand Down
Loading