Skip to content

Commit 27f7542

Browse files
authored
Implement CDCT for all interactions between Storage and Pipeline (#376)
* Implement CDCT for interactions between UI and Storage * Implement CDCT for interactions between Storage and Pipeline * Fix amqp consumer initialization Previously the amqp consumer was configured with an incorrect topic. * Refactor provider test to handle different consumers in separate tests
1 parent 70c2eb6 commit 27f7542

File tree

12 files changed

+438
-114
lines changed

12 files changed

+438
-114
lines changed

.github/workflows/ods.yml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,19 @@ jobs:
139139
with:
140140
name: storagemq-artifact
141141
path: storage_mq.tar
142+
143+
# The CDC-Test builds a cut-down image that would override the previously built production image
144+
# Therefore it is executed after uploading the production image
145+
- name: Consumer-side CDC-Test
146+
run: |
147+
bash ./storage/storage-mq/cdct-consumer.sh
148+
149+
- name: Upload contract files as artifact
150+
uses: actions/upload-artifact@v2
151+
with:
152+
name: contracts
153+
path: ./pacts/*.json
154+
if-no-files-found: error
142155

143156

144157
# ----------------- PIPELINE SERVICE --------------------
@@ -267,7 +280,7 @@ jobs:
267280
pipeline-provider:
268281
name: Pipeline Provider-side CDC-Test
269282
runs-on: ubuntu-18.04
270-
needs: [notification-build, pipeline-build, ui-build]
283+
needs: [pipeline-build, ui-build, notification-build, storage-build]
271284
steps:
272285
- uses: actions/checkout@v2
273286

doc/consumer-driven-contract-testing/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ If you want to run the tests without `docker` and `docker-compose`, you may try
3838
The files containing the source code of the Consumer-Driven Contract Tests are found in the `src` directories of those services, that have implemented CDCT. Regarding services implemented in TypeScript, the test files all share a common naming convention:
3939

4040
- `*.consumer.pact.test.ts` for **Consumer testing**, where `*` is the name of the corresponding Provider
41-
- `*.provider.pact.test.ts` for **Provider verification**, where `*` is the name of the corresponding Consumer
41+
- `*.provider.pact.test.ts` for **Provider verification**, where `*` is the name of the corresponding Consumer or a more general term if multiple Consumers are involved
4242

4343
## Writing Consumer-Driven Contract Tests
4444

docker-compose.consumer.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ services:
88
volumes:
99
- ./pacts:/pacts
1010
notification:
11+
build:
12+
target: base
13+
command: npm run test:consumer
14+
volumes:
15+
- ./pacts:/pacts
16+
storage-mq:
1117
build:
1218
target: base
1319
command: npm run test:consumer

docker-compose.provider.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ services:
88
environment:
99
AMQP_PIPELINE_EXECUTION_EXCHANGE: 'ods_global'
1010
AMQP_PIPELINE_EXECUTION_QUEUE: 'notification.pipeline-execution'
11+
AMQP_PIPELINE_CONFIG_EXCHANGE: 'ods_global'
12+
AMQP_PIPELINE_CONFIG_QUEUE: 'storage-mq.pipeline-config'
1113
volumes:
1214
- ./pacts:/pacts
1315
pipeline-outboxer:
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import path from 'path';
2+
3+
import { AmqpChannel, AmqpConnection } from '@jvalue/node-dry-amqp';
4+
import { readEnvOrDie } from '@jvalue/node-dry-basics';
5+
import { PostgresClient } from '@jvalue/node-dry-pg';
6+
import {
7+
MessageProviderPact,
8+
PactMessageProviderOptions,
9+
} from '@pact-foundation/pact';
10+
import * as AMQP from 'amqplib';
11+
12+
import {
13+
AMQP_PIPELINE_CONFIG_CREATED_TOPIC,
14+
AMQP_PIPELINE_CONFIG_DELETED_TOPIC,
15+
AMQP_PIPELINE_EXECUTION_SUCCESS_TOPIC,
16+
AMQP_URL,
17+
CONNECTION_BACKOFF,
18+
CONNECTION_RETRIES,
19+
} from './env';
20+
import * as EventPublisher from './pipeline-config/outboxEventPublisher';
21+
import { init } from './pipeline-config/pipelineDatabase';
22+
23+
const AMQP_PIPELINE_EXECUTION_EXCHANGE = readEnvOrDie(
24+
'AMQP_PIPELINE_EXECUTION_EXCHANGE',
25+
);
26+
const AMQP_PIPELINE_EXECUTION_QUEUE = readEnvOrDie(
27+
'AMQP_PIPELINE_EXECUTION_QUEUE',
28+
);
29+
30+
const AMQP_PIPELINE_CONFIG_EXCHANGE = readEnvOrDie(
31+
'AMQP_PIPELINE_CONFIG_EXCHANGE',
32+
);
33+
const AMQP_PIPELINE_CONFIG_QUEUE = readEnvOrDie('AMQP_PIPELINE_CONFIG_QUEUE');
34+
35+
describe('Pact Provider Verification', () => {
36+
let pgClient: PostgresClient;
37+
let amqpConnection: AmqpConnection;
38+
39+
const successMessages: unknown[] = [];
40+
41+
beforeAll(async () => {
42+
pgClient = await init(CONNECTION_RETRIES, CONNECTION_BACKOFF);
43+
44+
amqpConnection = new AmqpConnection(
45+
AMQP_URL,
46+
CONNECTION_RETRIES,
47+
CONNECTION_BACKOFF,
48+
() => {
49+
console.error('lost connection to AMQP');
50+
process.exit(1);
51+
},
52+
);
53+
54+
await createAmqpConsumer(
55+
amqpConnection,
56+
AMQP_PIPELINE_EXECUTION_EXCHANGE,
57+
AMQP_PIPELINE_EXECUTION_QUEUE,
58+
AMQP_PIPELINE_EXECUTION_SUCCESS_TOPIC,
59+
successMessages,
60+
);
61+
});
62+
63+
const commonProviderOptions: PactMessageProviderOptions = {
64+
provider: 'Pipeline',
65+
logDir: path.resolve(process.cwd(), '..', 'pacts', 'logs'),
66+
stateHandlers: {
67+
'any state': async (): Promise<void> => Promise.resolve(),
68+
},
69+
messageProviders: {
70+
'a success event': provideSuccessEvent,
71+
},
72+
};
73+
74+
async function provideSuccessEvent(): Promise<unknown> {
75+
await pgClient.transaction(async (client) => {
76+
await EventPublisher.publishSuccess(
77+
client,
78+
1,
79+
'some pipeline name',
80+
{},
81+
{ some: 'schema' },
82+
);
83+
});
84+
return await waitForMessage(successMessages);
85+
}
86+
87+
describe('with Notification as consumer', () => {
88+
it('validates the expectations of the notification service', async () => {
89+
const notificationPact = new MessageProviderPact({
90+
...commonProviderOptions,
91+
pactUrls: [
92+
path.resolve(
93+
process.cwd(),
94+
'..',
95+
'pacts',
96+
'notification-pipeline.json',
97+
),
98+
],
99+
});
100+
101+
await notificationPact.verify();
102+
});
103+
});
104+
105+
describe('with Storage as consumer', () => {
106+
let creationConsumer: AmqpChannel;
107+
const creationMessages: unknown[] = [];
108+
109+
let deletionConsumer: AmqpChannel;
110+
const deletionMessages: unknown[] = [];
111+
112+
beforeAll(async () => {
113+
creationConsumer = await createAmqpConsumer(
114+
amqpConnection,
115+
AMQP_PIPELINE_CONFIG_EXCHANGE,
116+
AMQP_PIPELINE_CONFIG_QUEUE,
117+
AMQP_PIPELINE_CONFIG_CREATED_TOPIC,
118+
creationMessages,
119+
);
120+
121+
deletionConsumer = await createAmqpConsumer(
122+
amqpConnection,
123+
AMQP_PIPELINE_CONFIG_EXCHANGE,
124+
AMQP_PIPELINE_CONFIG_QUEUE,
125+
AMQP_PIPELINE_CONFIG_DELETED_TOPIC,
126+
deletionMessages,
127+
);
128+
});
129+
130+
it('validates the expectations of the storage service', async () => {
131+
const storagePact = new MessageProviderPact({
132+
...commonProviderOptions,
133+
pactUrls: [
134+
path.resolve(process.cwd(), '..', 'pacts', 'storage-pipeline.json'),
135+
],
136+
messageProviders: {
137+
...commonProviderOptions.messageProviders,
138+
'a creation event': provideCreationEvent,
139+
'a deletion event': provideDeletionEvent,
140+
},
141+
});
142+
143+
await storagePact.verify();
144+
});
145+
146+
afterAll(async () => {
147+
await creationConsumer.close();
148+
await deletionConsumer.close();
149+
});
150+
151+
async function provideCreationEvent(): Promise<unknown> {
152+
await pgClient.transaction(async (client) => {
153+
await EventPublisher.publishCreation(client, 1, 'some pipeline name');
154+
});
155+
return await waitForMessage(creationMessages);
156+
}
157+
158+
async function provideDeletionEvent(): Promise<unknown> {
159+
await pgClient.transaction(async (client) => {
160+
await EventPublisher.publishDeletion(client, 1, 'some pipeline name');
161+
});
162+
return await waitForMessage(deletionMessages);
163+
}
164+
});
165+
166+
afterAll(async () => {
167+
await pgClient.close();
168+
await amqpConnection.close();
169+
});
170+
});
171+
172+
// Sets up an amqp consumer to collect the messages that are published to the message-broker by the pipeline outboxer
173+
async function createAmqpConsumer(
174+
amqpConnection: AmqpConnection,
175+
exchange: string,
176+
queue: string,
177+
topic: string,
178+
messageBuffer: unknown[],
179+
): Promise<AmqpChannel> {
180+
const amqpChannel = await amqpConnection.createChannel();
181+
182+
await amqpChannel.assertExchange(exchange, 'topic');
183+
await amqpChannel.assertQueue(queue, {
184+
exclusive: false,
185+
});
186+
await amqpChannel.bindQueue(queue, exchange, topic);
187+
188+
// Consumes the message by pushing it into the passed message buffer
189+
await amqpChannel.consume(
190+
queue,
191+
async (msg: AMQP.ConsumeMessage | null): Promise<void> => {
192+
if (msg == null) {
193+
console.error('received an AMQP message that was null');
194+
process.exit(1);
195+
}
196+
197+
if (msg.fields.routingKey === topic) {
198+
messageBuffer.push(JSON.parse(msg.content.toString()));
199+
}
200+
201+
await amqpChannel.ack(msg);
202+
},
203+
);
204+
205+
return amqpChannel;
206+
}
207+
208+
// Waits until the passed message buffer contains a message
209+
async function waitForMessage(messageBuffer: unknown[]): Promise<unknown> {
210+
while (messageBuffer.length === 0) {
211+
await new Promise((resolve) => setTimeout(resolve, 50));
212+
}
213+
return messageBuffer.pop();
214+
}

pipeline/src/notification.provider.pact.test.ts

Lines changed: 0 additions & 112 deletions
This file was deleted.

storage/storage-mq/cdct-consumer.sh

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/bin/bash
2+
3+
# abort if any command fails
4+
set -e
5+
6+
dir=$(dirname "$0")
7+
cd ${dir}/../..
8+
docker-compose -f docker-compose.yml -f docker-compose.consumer.yml build storage-mq
9+
docker-compose -f docker-compose.yml -f docker-compose.consumer.yml up --exit-code-from storage-mq --no-deps storage-mq

0 commit comments

Comments
 (0)