Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add an SLA config flag for rollup intervals
Adds a configuration option for rollup intervals to specify their
maximum acceptable delay. Queries that cover a time between now and that
maximum delay will need to query other tables for that time interval.
  • Loading branch information
muffix committed Nov 13, 2019
commit caab084fb5791273ffc5e79af24f4b19073ba407
45 changes: 40 additions & 5 deletions src/rollup/RollupInterval.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ public class RollupInterval {
* also it might be compacted.
*/
private final boolean is_default_interval;

/**
* The delay SLA for this rollup interval. If a query is asking for data from a
* recent enough time interval that might not be available (or partially unavailable)
* in the table, the data points will be read from the raw table.
*/
private final String delay_sla;

/** The delay SLA in seconds */
private int max_delay_seconds;

/**
* Protected ctor used by the builder.
Expand All @@ -93,6 +103,7 @@ protected RollupInterval(final Builder builder) {
string_interval = builder.interval;
row_span = builder.rowSpan;
is_default_interval = builder.defaultInterval;
delay_sla = builder.delaySla != null ? builder.delaySla : "";

final String parsed_units = DateTime.getDurationUnits(row_span);
if (parsed_units.length() > 1) {
Expand All @@ -116,7 +127,8 @@ public String toString() {
.append(", unit_multipier=").append(unit_multiplier)
.append(", intervals=").append(intervals)
.append(", interval=").append(interval)
.append(", interval_units=").append(interval_units);
.append(", interval_units=").append(interval_units)
.append(", delay_sla=").append(delay_sla);
return buf.toString();
}

Expand All @@ -133,6 +145,7 @@ public HashCode buildHashCode() {
.putString(string_interval, Const.UTF8_CHARSET)
.putString(row_span, Const.UTF8_CHARSET)
.putBoolean(is_default_interval)
.putString(delay_sla, Const.UTF8_CHARSET)
.hash();
}

Expand All @@ -152,7 +165,8 @@ public boolean equals(final Object obj) {
&& Objects.equal(groupby_table_name, interval.groupby_table_name)
&& Objects.equal(row_span, interval.row_span)
&& Objects.equal(string_interval, interval.string_interval)
&& Objects.equal(is_default_interval, interval.is_default_interval);
&& Objects.equal(is_default_interval, interval.is_default_interval)
&& Objects.equal(delay_sla, interval.delay_sla);
}

/**
Expand Down Expand Up @@ -185,15 +199,20 @@ void validateAndCompile() {
}

interval = (int) (DateTime.parseDuration(string_interval) / 1000);
if (interval < 1) {
throw new IllegalArgumentException("Millisecond intervals are not supported");
}

if (interval >= Integer.MAX_VALUE) {
throw new IllegalArgumentException("Interval is too big: " + interval);
}
// The line above will validate for us
interval_units = string_interval.charAt(string_interval.length() - 1);

if (delay_sla != null && !delay_sla.isEmpty()) {
max_delay_seconds = (int) (DateTime.parseDuration(delay_sla) / 1000);
if (max_delay_seconds < 1) {
throw new IllegalArgumentException("Milliseconds are not supported as the maximum delay");
}
}

int num_span = 0;
switch (units) {
case 'h':
Expand Down Expand Up @@ -303,6 +322,15 @@ public boolean isDefaultInterval() {
public String getRowSpan() {
return row_span;
}

/**
* Rollup tables can have an SLA configured specifying by how much time the
* data in the table can be delayed.
* @return the maximum delay in seconds for a table as configured.
*/
public int getMaximumLag() {
return max_delay_seconds;
}

public static Builder builder() {
return new Builder();
Expand All @@ -321,6 +349,8 @@ public static class Builder {
private String rowSpan;
@JsonProperty
private boolean defaultInterval;
@JsonProperty
private String delaySla;

public Builder setTable(final String table) {
this.table = table;
Expand All @@ -346,6 +376,11 @@ public Builder setDefaultInterval(final boolean defaultInterval) {
this.defaultInterval = defaultInterval;
return this;
}

public Builder setDelaySla(final String delaySla) {
this.delaySla = delaySla;
return this;
}

public RollupInterval build() {
return new RollupInterval(this);
Expand Down
7 changes: 6 additions & 1 deletion src/utils/DateTime.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.HashMap;
import java.util.TimeZone;

import com.google.common.base.Strings;
import net.opentsdb.core.Tags;

/**
Expand Down Expand Up @@ -184,6 +185,10 @@ public static final long parseDateTimeString(final String datetime,
* @throws IllegalArgumentException if the interval was malformed.
*/
public static final long parseDuration(final String duration) {
if (duration == null || duration.isEmpty()) {
throw new IllegalArgumentException("Cannot parse null or empty duration");
}

long interval;
long multiplier;
double temp;
Expand Down Expand Up @@ -614,7 +619,7 @@ public static Calendar previousInterval(final long ts, final int interval,
* @since 2.3
*/
public static int unitsToCalendarType(final String units) {
if (units == null || units.isEmpty()) {
if (Strings.isNullOrEmpty(units)) {
throw new IllegalArgumentException("Units cannot be null or empty");
}

Expand Down
45 changes: 34 additions & 11 deletions test/rollup/TestRollupConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ public class TestRollupConfig {
private final static String tsdb_table = "tsdb";
private final static String rollup_table = "tsdb-rollup-10m";
private final static String preagg_table = "tsdb-rollup-agg-10m";
private final static String rollup_table_1h = "tsdb-rollup-1h";
private final static String preagg_table_1h = "tsdb-rollup-agg-1h";

private TSDB tsdb;
private HBaseClient client;
private RollupConfig.Builder builder;
private RollupInterval raw;
private RollupInterval tenmin;

private RollupInterval oneHourWithDelay;

@Before
public void before() throws Exception {
tsdb = PowerMockito.mock(TSDB.class);
Expand All @@ -70,26 +73,38 @@ public void before() throws Exception {
.setInterval("10m")
.setRowSpan("1d")
.build();


oneHourWithDelay = RollupInterval.builder()
.setTable(rollup_table_1h)
.setPreAggregationTable(preagg_table_1h)
.setInterval("1h")
.setRowSpan("1d")
.setDelaySla("2d")
.build();

builder = RollupConfig.builder()
.addAggregationId("Sum", 0)
.addAggregationId("Max", 1)
.addInterval(raw)
.addInterval(tenmin);
.addInterval(tenmin)
.addInterval(oneHourWithDelay);
}

@Test
public void ctor() throws Exception {
RollupConfig config = builder.build();
assertEquals(2, config.forward_intervals.size());
assertEquals(3, config.forward_intervals.size());
assertSame(raw, config.forward_intervals.get("1m"));
assertSame(tenmin, config.forward_intervals.get("10m"));

assertEquals(3, config.reverse_intervals.size());
assertSame(oneHourWithDelay, config.forward_intervals.get("1h"));

assertEquals(5, config.reverse_intervals.size());
assertSame(raw, config.reverse_intervals.get(tsdb_table));
assertSame(tenmin, config.reverse_intervals.get(rollup_table));
assertSame(tenmin, config.reverse_intervals.get(preagg_table));

assertSame(oneHourWithDelay, config.reverse_intervals.get(rollup_table_1h));
assertSame(oneHourWithDelay, config.reverse_intervals.get(preagg_table_1h));

assertEquals(2, config.aggregations_to_ids.size());
assertEquals(2, config.ids_to_aggregations.size());

Expand All @@ -102,7 +117,8 @@ public void ctor() throws Exception {
// missing aggregations
builder = RollupConfig.builder()
.addInterval(raw)
.addInterval(tenmin);
.addInterval(tenmin)
.addInterval(oneHourWithDelay);
try {
builder.build();
fail("Expected IllegalArgumentException");
Expand All @@ -113,7 +129,8 @@ public void ctor() throws Exception {
.addAggregationId("Sum", 1)
.addAggregationId("Max", 1)
.addInterval(raw)
.addInterval(tenmin);
.addInterval(tenmin)
.addInterval(oneHourWithDelay);
try {
builder.build();
fail("Expected IllegalArgumentException");
Expand All @@ -124,7 +141,8 @@ public void ctor() throws Exception {
.addAggregationId("Sum", 0)
.addAggregationId("Max", 128)
.addInterval(raw)
.addInterval(tenmin);
.addInterval(tenmin)
.addInterval(oneHourWithDelay);
try {
builder.build();
fail("Expected IllegalArgumentException");
Expand Down Expand Up @@ -175,7 +193,8 @@ public void getRollupIntervalString() throws Exception {

assertSame(raw, config.getRollupInterval("1m"));
assertSame(tenmin, config.getRollupInterval("10m"));

assertSame(oneHourWithDelay, config.getRollupInterval("1h"));

try {
config.getRollupInterval("5m");
fail("Expected NoSuchRollupForIntervalException");
Expand All @@ -199,6 +218,8 @@ public void getRollupIntervalForTable() throws Exception {
assertSame(raw, config.getRollupIntervalForTable(tsdb_table));
assertSame(tenmin, config.getRollupIntervalForTable(rollup_table));
assertSame(tenmin, config.getRollupIntervalForTable(preagg_table));
assertSame(oneHourWithDelay, config.getRollupIntervalForTable(rollup_table_1h));
assertSame(oneHourWithDelay, config.getRollupIntervalForTable(preagg_table_1h));

try {
config.getRollupIntervalForTable("nosuchtable");
Expand Down Expand Up @@ -270,5 +291,7 @@ public Deferred<Object> answer(InvocationOnMock invocation)
verify(client, times(2)).ensureTableExists(tsdb_table.getBytes());
verify(client, times(1)).ensureTableExists(rollup_table.getBytes());
verify(client, times(1)).ensureTableExists(preagg_table.getBytes());
verify(client, times(1)).ensureTableExists(rollup_table_1h.getBytes());
verify(client, times(1)).ensureTableExists(preagg_table_1h.getBytes());
}
}
11 changes: 7 additions & 4 deletions test/rollup/TestRollupInterval.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class TestRollupInterval {
private final static byte[] agg_table = preagg_table.getBytes(CHARSET);

@Test
public void ctor1SecondHour() throws Exception {
public void ctor1SecondHourNoSla() throws Exception {
final RollupInterval interval = RollupInterval.builder()
.setTable(rollup_table)
.setPreAggregationTable(preagg_table)
Expand All @@ -47,16 +47,18 @@ public void ctor1SecondHour() throws Exception {
assertEquals(preagg_table, interval.getPreAggregationTable());
assertEquals(0, Bytes.memcmp(table, interval.getTemporalTable()));
assertEquals(0, Bytes.memcmp(agg_table, interval.getGroupbyTable()));
assertEquals(0, interval.getMaximumLag());
}

// test odd boundaries
@Test
public void ctor7SecondHour() throws Exception {
public void ctor7SecondHourTwoHoursDelay() throws Exception {
final RollupInterval interval = RollupInterval.builder()
.setTable(rollup_table)
.setPreAggregationTable(preagg_table)
.setInterval("7s")
.setRowSpan("1h")
.setDelaySla("2h")
.build();
assertEquals('h', interval.getUnits());
assertEquals("7s", interval.getInterval());
Expand All @@ -67,6 +69,7 @@ public void ctor7SecondHour() throws Exception {
assertEquals(preagg_table, interval.getPreAggregationTable());
assertEquals(0, Bytes.memcmp(table, interval.getTemporalTable()));
assertEquals(0, Bytes.memcmp(agg_table, interval.getGroupbyTable()));
assertEquals(7200, interval.getMaximumLag());
}

@Test
Expand Down Expand Up @@ -404,7 +407,7 @@ public void ctorUnknownSpan() throws Exception {
.build();
}

@Test (expected = NullPointerException.class)
@Test (expected = IllegalArgumentException.class)
public void ctorNullInterval() throws Exception {
RollupInterval.builder()
.setTable(rollup_table)
Expand All @@ -414,7 +417,7 @@ public void ctorNullInterval() throws Exception {
.build();
}

@Test (expected = StringIndexOutOfBoundsException.class)
@Test (expected = IllegalArgumentException.class)
public void ctorEmptyInterval() throws Exception {
RollupInterval.builder()
.setTable(rollup_table)
Expand Down
5 changes: 5 additions & 0 deletions test/utils/TestDateTime.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ public void getDurationUnitsNull() {
public void getDurationUnitsEmpty() {
DateTime.getDurationUnits("");
}

@Test (expected = IllegalArgumentException.class)
public void getDurationIsNull() {
DateTime.getDurationUnits(null);
}

@Test
public void getDurationInterval() {
Expand Down