Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add a new SplitRollupQuery
Adds a SplitRollupQuery class that suports splitting a rollup query into
two separate queries.
This is useful for when a rollup table is filled by e.g. a batch job
that processes the data from the previous day on a daily basis. Rollup
data for yesterday will then only be available some time today. This
delay SLA can be configured on a per-table basis. The delay would
specify by how much time the table can be behind real time.

If a query comes in that would query data from that blackout period
where data is only available in the raw table, but not yet guaranteed to
be in the rollup table, the incoming query can be split into two using
the SplitRollupQuery class. It wraps a query that queries the rollup
table until the last guaranteed to be available timestamp based on the
SLA; and one that gets the remaining data from the raw table.
  • Loading branch information
muffix committed Nov 13, 2019
commit 22b642a4edf982773ccec7a082a133dcb1de1c72
8 changes: 8 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tsdb_SRC := \
src/core/DownsamplingSpecification.java \
src/core/FillingDownsampler.java \
src/core/FillPolicy.java \
src/core/GroupCallback.java \
src/core/Histogram.java \
src/core/HistogramAggregation.java \
src/core/HistogramAggregationIterator.java \
Expand Down Expand Up @@ -81,11 +82,14 @@ tsdb_SRC := \
src/core/iRowSeq.java \
src/core/SaltScanner.java \
src/core/SeekableView.java \
src/core/SeekableViewChain.java \
src/core/SimpleHistogram.java \
src/core/SimpleHistogramDataPointAdapter.java \
src/core/SimpleHistogramDecoder.java \
src/core/Span.java \
src/core/SpanGroup.java \
src/core/SplitRollupQuery.java \
src/core/SplitRollupSpanGroup.java \
src/core/TSDB.java \
src/core/Tags.java \
src/core/TsdbQuery.java \
Expand Down Expand Up @@ -317,8 +321,11 @@ test_SRC := \
test/core/TestRowKey.java \
test/core/TestRowSeq.java \
test/core/TestSaltScanner.java \
test/core/TestSeekableViewChain.java \
test/core/TestSpan.java \
test/core/TestSpanGroup.java \
test/core/TestSplitRollupQuery.java \
test/core/TestSplitRollupSpanGroup.java \
test/core/TestTags.java \
test/core/TestTSDB.java \
test/core/TestTSDBAddPoint.java \
Expand Down Expand Up @@ -376,6 +383,7 @@ test_SRC := \
test/query/pojo/TestTimeSpan.java \
test/rollup/TestRollupConfig.java \
test/rollup/TestRollupInterval.java \
test/rollup/TestRollupQuery.java \
test/rollup/TestRollupSeq.java \
test/rollup/TestRollupUtils.java \
test/search/TestSearchPlugin.java \
Expand Down
30 changes: 30 additions & 0 deletions src/core/GroupCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// This file is part of OpenTSDB.
// Copyright (C) 2010-2012 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import com.stumbleupon.async.Callback;

import java.util.ArrayList;

class GroupCallback implements Callback<Object, ArrayList<Object>> {
/**
* We're only waiting for all callbacks to complete, ignoring their return values.
*
* @param ignored The return values of the individual callbacks - ignored
* @return null
*/
@Override
public Object call(ArrayList<Object> ignored) {
return null;
}
}
32 changes: 30 additions & 2 deletions src/core/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,23 @@ public void setTimeSeries(final List<String> tsuids,
*/
public Deferred<Object> configureFromQuery(final TSQuery query,
final int index);


/**
* Prepares a query against HBase by setting up group bys and resolving
* strings to UIDs asynchronously. This replaces calls to all of the setters
* like the {@link setTimeSeries}, {@link setStartTime}, etc.
* Make sure to wait on the deferred return before calling {@link runAsync}.
* @param query The main query to fetch the start and end time from
* @param index The index of which sub query we're executing
* @param force_raw If true, always get the data from the raw table; disables rollups
* @throws IllegalArgumentException if the query was missing sub queries or
* the index was out of bounds.
* @throws NoSuchUniqueName if the name of a metric, or a tag name/value
* does not exist. (Bubbles up through the deferred)
* @since 2.4
*/
Deferred<Object> configureFromQuery(final TSQuery query, final int index, boolean force_raw);

/**
* Downsamples the results by specifying a fixed interval between points.
* <p>
Expand Down Expand Up @@ -262,7 +278,19 @@ public Deferred<Object> configureFromQuery(final TSQuery query,
* @return
*/
public boolean isHistogramQuery();


/**
* @return Whether or not this is a rollup query
* @since 2.4
*/
public boolean isRollupQuery();

/**
* @return whether this query needs to be split.
* @since 2.4
*/
public boolean needsSplitting();

/**
* Set the percentile calculation parameters for this query if this is
* a histogram query
Expand Down
97 changes: 97 additions & 0 deletions src/core/SeekableViewChain.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// This file is part of OpenTSDB.
// Copyright (C) 2014 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import java.util.List;
import java.util.NoSuchElementException;

public class SeekableViewChain implements SeekableView {

private final List<SeekableView> iterators;
private int currentIterator;

SeekableViewChain(List<SeekableView> iterators) {
this.iterators = iterators;
}

/**
* Returns {@code true} if this view has more elements.
*/
@Override
public boolean hasNext() {
SeekableView iterator = getCurrentIterator();
return iterator != null && iterator.hasNext();
}

/**
* Returns a <em>view</em> on the next data point.
* No new object gets created, the referenced returned is always the same
* and must not be stored since its internal data structure will change the
* next time {@code next()} is called.
*
* @throws NoSuchElementException if there were no more elements to iterate
* on (in which case {@link #hasNext} would have returned {@code false}.
*/
@Override
public DataPoint next() {
SeekableView iterator = getCurrentIterator();
if (iterator == null || !iterator.hasNext()) {
throw new NoSuchElementException("No elements left in iterator");
}

DataPoint next = iterator.next();

if (!iterator.hasNext()) {
currentIterator++;
}

return next;
}

/**
* Unsupported operation.
*
* @throws UnsupportedOperationException always.
*/
@Override
public void remove() {
throw new UnsupportedOperationException("Removing items is not supported");
}

/**
* Advances the iterator to the given point in time.
* <p>
* This allows the iterator to skip all the data points that are strictly
* before the given timestamp.
*
* @param timestamp A strictly positive 32 bit UNIX timestamp (in seconds).
* @throws IllegalArgumentException if the timestamp is zero, or negative,
* or doesn't fit on 32 bits (think "unsigned int" -- yay Java!).
*/
@Override
public void seek(long timestamp) {
for (final SeekableView it : iterators) {
it.seek(timestamp);
}
}

private SeekableView getCurrentIterator() {
while (currentIterator < iterators.size()) {
if (iterators.get(currentIterator).hasNext()) {
return iterators.get(currentIterator);
}
currentIterator++;
}
return null;
}
}
39 changes: 27 additions & 12 deletions src/core/SpanGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,7 @@
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;

import org.hbase.async.Bytes;
import org.hbase.async.Bytes.ByteMap;
Expand Down Expand Up @@ -109,7 +102,10 @@ final class SpanGroup implements DataPoints {

/** The TSDB to which we belong, used for resolution */
private final TSDB tsdb;


/** The group we belong to */
private byte[] group;

/**
* Ctor.
* @param tsdb The TSDB we belong to.
Expand Down Expand Up @@ -231,7 +227,7 @@ final class SpanGroup implements DataPoints {
final long query_end,
final int query_index) {
this(tsdb, start_time, end_time, spans, rate, rate_options, aggregator,
downsampler, query_start, query_end, query_index, null);
downsampler, query_start, query_end, query_index, null, new byte[0]);
}

/**
Expand Down Expand Up @@ -265,7 +261,8 @@ final class SpanGroup implements DataPoints {
final long query_start,
final long query_end,
final int query_index,
final RollupQuery rollup_query) {
final RollupQuery rollup_query,
byte[] group) {
annotations = new ArrayList<Annotation>();
this.start_time = (start_time & Const.SECOND_MASK) == 0 ?
start_time * 1000 : start_time;
Expand All @@ -285,6 +282,7 @@ final class SpanGroup implements DataPoints {
this.query_index = query_index;
this.rollup_query = rollup_query;
this.tsdb = tsdb;
this.group = group;
}

/**
Expand Down Expand Up @@ -557,6 +555,18 @@ public long timestamp(final int i) {
return getDataPoint(i).timestamp();
}

/**
* Returns the group the spans in here belong to.
*
* Returns null if the NONE aggregator was requested in the query
* Returns an empty array if there were no group bys and they're all in the same group
* Returns the group otherwise
* @return The group
*/
public byte[] group() {
return group;
}

public boolean isInteger(final int i) {
return getDataPoint(i).isInteger();
}
Expand Down Expand Up @@ -585,7 +595,8 @@ private String toStringSharedAttributes() {
+ ", aggregator=" + aggregator
+ ", downsampler=" + downsampler
+ ", query_start=" + query_start
+ ", query_end" + query_end
+ ", query_end=" + query_end
+ ", group=" + Arrays.toString(group)
+ ')';
}

Expand All @@ -602,6 +613,10 @@ public boolean isPercentile() {
public float getPercentile() {
throw new UnsupportedOperationException("getPercentile not supported");
}

public List<Span> getSpans() {
return spans;
}

/**
* Resolves the set of tag keys to their string names.
Expand Down
Loading