Skip to content

Commit 298fd4e

Browse files
conico974Nicolas Dorseuilvicb
authored
Feat cache for the queue (#496)
* queue cache * moved cache out of the durable queue * local queue cache * add local in memory cache * fix rebase * changeset * add cache-tag * review fix * review * Update .changeset/large-zoos-approve.md Co-authored-by: Victor Berchet <[email protected]> --------- Co-authored-by: Nicolas Dorseuil <[email protected]> Co-authored-by: Victor Berchet <[email protected]>
1 parent 6821828 commit 298fd4e

File tree

5 files changed

+242
-2
lines changed

5 files changed

+242
-2
lines changed

.changeset/large-zoos-approve.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@opennextjs/cloudflare": minor
3+
---
4+
5+
add an optional cache for the durable queue

examples/e2e/app-router/open-next.config.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare";
22
import r2IncrementalCache from "@opennextjs/cloudflare/overrides/incremental-cache/r2-incremental-cache";
33
import shardedTagCache from "@opennextjs/cloudflare/overrides/tag-cache/do-sharded-tag-cache";
44
import doQueue from "@opennextjs/cloudflare/overrides/queue/do-queue";
5+
import queueCache from "@opennextjs/cloudflare/overrides/queue/queue-cache";
56

67
export default defineCloudflareConfig({
78
incrementalCache: r2IncrementalCache,
@@ -13,6 +14,6 @@ export default defineCloudflareConfig({
1314
numberOfHardReplicas: 2,
1415
},
1516
}),
16-
queue: doQueue,
1717
enableCacheInterception: true,
18+
queue: queueCache(doQueue),
1819
});

packages/cloudflare/src/api/overrides/queue/do-queue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js";
44
import { getCloudflareContext } from "../../cloudflare-context";
55

66
export default {
7-
name: "do-queue",
7+
name: "durable-queue",
88
send: async (msg: QueueMessage) => {
99
const durableObject = getCloudflareContext().env.NEXT_CACHE_DO_QUEUE;
1010
if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation");
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import type { Queue } from "@opennextjs/aws/types/overrides";
2+
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
3+
4+
import queueCache from "./queue-cache";
5+
6+
const mockedQueue = {
7+
name: "mocked-queue",
8+
send: vi.fn(),
9+
} satisfies Queue;
10+
11+
const generateMessage = () => ({
12+
MessageGroupId: "test",
13+
MessageBody: {
14+
eTag: "test",
15+
url: "test",
16+
host: "test",
17+
lastModified: Date.now(),
18+
},
19+
MessageDeduplicationId: "test",
20+
});
21+
22+
const mockedPut = vi.fn();
23+
const mockedMatch = vi.fn().mockReturnValue(null);
24+
25+
describe("queue-cache", () => {
26+
beforeEach(() => {
27+
// @ts-ignore
28+
globalThis.caches = {
29+
open: vi.fn().mockReturnValue({
30+
put: mockedPut,
31+
match: mockedMatch,
32+
}),
33+
};
34+
});
35+
36+
afterEach(() => {
37+
vi.resetAllMocks();
38+
});
39+
test("should send the message to the original queue", async () => {
40+
const msg = generateMessage();
41+
const queue = queueCache(mockedQueue, {});
42+
expect(queue.name).toBe("cached-mocked-queue");
43+
await queue.send(msg);
44+
expect(mockedQueue.send).toHaveBeenCalledWith(msg);
45+
});
46+
47+
test("should use the local cache", async () => {
48+
const msg = generateMessage();
49+
const queue = queueCache(mockedQueue, {});
50+
await queue.send(msg);
51+
52+
expect(queue.localCache.size).toBe(1);
53+
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
54+
expect(mockedPut).toHaveBeenCalled();
55+
56+
const spiedHas = vi.spyOn(queue.localCache, "has");
57+
await queue.send(msg);
58+
expect(spiedHas).toHaveBeenCalled();
59+
60+
expect(mockedQueue.send).toHaveBeenCalledTimes(1);
61+
62+
expect(mockedMatch).toHaveBeenCalledTimes(1);
63+
});
64+
65+
test("should clear the local cache after 5s", async () => {
66+
vi.useFakeTimers();
67+
const msg = generateMessage();
68+
const queue = queueCache(mockedQueue, {});
69+
await queue.send(msg);
70+
expect(queue.localCache.size).toBe(1);
71+
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
72+
73+
vi.advanceTimersByTime(5001);
74+
const alteredMsg = generateMessage();
75+
alteredMsg.MessageGroupId = "test2";
76+
await queue.send(alteredMsg);
77+
expect(queue.localCache.size).toBe(1);
78+
console.log(queue.localCache);
79+
expect(queue.localCache.has(`queue/test2/test`)).toBe(true);
80+
expect(queue.localCache.has(`queue/test/test`)).toBe(false);
81+
vi.useRealTimers();
82+
});
83+
84+
test("should use the regional cache if not in local cache", async () => {
85+
const msg = generateMessage();
86+
const queue = queueCache(mockedQueue, {});
87+
await queue.send(msg);
88+
89+
expect(mockedMatch).toHaveBeenCalledTimes(1);
90+
expect(mockedPut).toHaveBeenCalledTimes(1);
91+
expect(queue.localCache.size).toBe(1);
92+
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
93+
// We need to delete the local cache to test the regional cache
94+
queue.localCache.delete(`queue/test/test`);
95+
96+
const spiedHas = vi.spyOn(queue.localCache, "has");
97+
await queue.send(msg);
98+
expect(spiedHas).toHaveBeenCalled();
99+
expect(mockedMatch).toHaveBeenCalledTimes(2);
100+
});
101+
102+
test("should return early if the message is in the regional cache", async () => {
103+
const msg = generateMessage();
104+
const queue = queueCache(mockedQueue, {});
105+
106+
mockedMatch.mockReturnValueOnce(new Response(null, { status: 200 }));
107+
108+
const spiedSend = mockedQueue.send;
109+
await queue.send(msg);
110+
expect(spiedSend).not.toHaveBeenCalled();
111+
});
112+
});
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { error } from "@opennextjs/aws/adapters/logger.js";
2+
import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides";
3+
4+
interface QueueCachingOptions {
5+
/**
6+
* The TTL for the regional cache in seconds.
7+
* @default 5
8+
*/
9+
regionalCacheTtlSec?: number;
10+
11+
/**
12+
* Whether to wait for the queue ack before returning.
13+
* When set to false, the cache will be populated asap and the queue will be called after.
14+
* When set to true, the cache will be populated only after the queue ack is received.
15+
* @default false
16+
*/
17+
waitForQueueAck?: boolean;
18+
}
19+
20+
const DEFAULT_QUEUE_CACHE_TTL_SEC = 5;
21+
22+
class QueueCache implements Queue {
23+
readonly name;
24+
readonly regionalCacheTtlSec: number;
25+
readonly waitForQueueAck: boolean;
26+
cache: Cache | undefined;
27+
// Local mapping from key to insertedAtSec
28+
localCache: Map<string, number> = new Map();
29+
30+
constructor(
31+
private originalQueue: Queue,
32+
options: QueueCachingOptions
33+
) {
34+
this.name = `cached-${originalQueue.name}`;
35+
this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC;
36+
this.waitForQueueAck = options.waitForQueueAck ?? false;
37+
}
38+
39+
async send(msg: QueueMessage) {
40+
try {
41+
const isCached = await this.isInCache(msg);
42+
if (isCached) {
43+
return;
44+
}
45+
if (!this.waitForQueueAck) {
46+
await this.putToCache(msg);
47+
await this.originalQueue.send(msg);
48+
} else {
49+
await this.originalQueue.send(msg);
50+
await this.putToCache(msg);
51+
}
52+
} catch (e) {
53+
error("Error sending message to queue", e);
54+
} finally {
55+
this.clearLocalCache();
56+
}
57+
}
58+
59+
private async getCache() {
60+
if (!this.cache) {
61+
this.cache = await caches.open("durable-queue");
62+
}
63+
return this.cache;
64+
}
65+
66+
private getCacheUrlString(msg: QueueMessage) {
67+
return `queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`;
68+
}
69+
70+
private getCacheKey(msg: QueueMessage) {
71+
return "http://local.cache" + this.getCacheUrlString(msg);
72+
}
73+
74+
private async putToCache(msg: QueueMessage) {
75+
this.localCache.set(this.getCacheUrlString(msg), Date.now());
76+
const cacheKey = this.getCacheKey(msg);
77+
const cache = await this.getCache();
78+
await cache.put(
79+
cacheKey,
80+
new Response(null, {
81+
status: 200,
82+
headers: {
83+
"Cache-Control": `max-age=${this.regionalCacheTtlSec}`,
84+
// Tag cache is set to the value of the soft tag assigned by Next.js
85+
// This way you can invalidate this cache as well as any other regional cache
86+
"Cache-Tag": `_N_T_/${msg.MessageBody.url}`,
87+
},
88+
})
89+
);
90+
}
91+
92+
private async isInCache(msg: QueueMessage) {
93+
if (this.localCache.has(this.getCacheUrlString(msg))) {
94+
const insertedAt = this.localCache.get(this.getCacheUrlString(msg))!;
95+
if (Date.now() - insertedAt < this.regionalCacheTtlSec * 1000) {
96+
return true;
97+
}
98+
this.localCache.delete(this.getCacheUrlString(msg));
99+
return false;
100+
}
101+
const cacheKey = this.getCacheKey(msg);
102+
const cache = await this.getCache();
103+
const cachedResponse = await cache.match(cacheKey);
104+
if (cachedResponse) {
105+
return true;
106+
}
107+
}
108+
109+
/**
110+
* Remove any value older than the TTL from the local cache
111+
*/
112+
private clearLocalCache() {
113+
const insertAtSecMax = Date.now() - this.regionalCacheTtlSec * 1000;
114+
for (const [key, insertAtSec] of this.localCache.entries()) {
115+
if (insertAtSec < insertAtSecMax) {
116+
this.localCache.delete(key);
117+
}
118+
}
119+
}
120+
}
121+
122+
export default (originalQueue: Queue, opts: QueueCachingOptions = {}) => new QueueCache(originalQueue, opts);

0 commit comments

Comments
 (0)