Skip to content

Commit 599e93b

Browse files
committed
pubsub tests
Signed-off-by: Joe Bowbeer <joe.bowbeer@gmail.com>
1 parent 944ad32 commit 599e93b

File tree

9 files changed

+1197
-1164
lines changed

9 files changed

+1197
-1164
lines changed

examples/testcontainers/package-lock.json

Lines changed: 986 additions & 1071 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/testcontainers/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
"description": "",
1212
"devDependencies": {
1313
"@dapr/dapr": "^3.5.2",
14+
"body-parser": "^2.2.0",
15+
"express": "^5.1.0",
1416
"ts-jest": "^29.3.2",
1517
"typescript": "^5.8.3"
1618
},
1719
"dependencies": {
1820
"testcontainers": "^10.25.0",
19-
"yaml": "^2.7.1"
21+
"yaml": "^2.8.0"
2022
}
2123
}

examples/testcontainers/src/Component.ts

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,19 @@ export type MetadataEntry = {
1818
readonly value: string;
1919
};
2020

21+
type ComponentResource = {
22+
apiVersion: "dapr.io/v1alpha1";
23+
kind: "Component";
24+
metadata: {
25+
name: string;
26+
};
27+
spec: {
28+
type: string;
29+
version: string;
30+
metadata?: MetadataEntry[];
31+
};
32+
};
33+
2134
export class Component {
2235
private readonly metadata: MetadataEntry[];
2336

@@ -43,7 +56,7 @@ export class Component {
4356
}
4457

4558
toYaml(): string {
46-
const componentObj = {
59+
const resource: ComponentResource = {
4760
apiVersion: "dapr.io/v1alpha1",
4861
kind: "Component",
4962
metadata: {
@@ -55,22 +68,11 @@ export class Component {
5568
metadata: this.metadata,
5669
},
5770
};
58-
return YAML.stringify(componentObj, { indentSeq: false });
71+
return YAML.stringify(resource, { indentSeq: false });
5972
}
6073

6174
static fromYaml(src: string): Component {
62-
const resource = YAML.parse(src) as {
63-
apiVersion: string;
64-
kind: string;
65-
metadata: {
66-
name: string;
67-
};
68-
spec: {
69-
type: string;
70-
version: string;
71-
metadata?: MetadataEntry[];
72-
};
73-
};
75+
const resource: ComponentResource = YAML.parse(src);
7476
const metadata = resource.metadata;
7577
const spec = resource.spec;
7678
return new Component(metadata.name, spec.type, spec.version, (spec.metadata ?? []));

examples/testcontainers/src/Configuration.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@ export class TracingConfigurationSettings {
4747
) {}
4848
}
4949

50+
type ConfigurationResource = {
51+
apiVersion: "dapr.io/v1alpha1";
52+
kind: "Configuration";
53+
metadata: {
54+
name: string;
55+
};
56+
spec: {
57+
tracing?: TracingConfigurationSettings;
58+
appHttpPipeline?: AppHttpPipeline;
59+
};
60+
};
61+
5062
/**
5163
* Configuration class for Dapr.
5264
*
@@ -85,17 +97,7 @@ export class Configuration {
8597
) {}
8698

8799
toYaml(): string {
88-
const configurationObj: {
89-
apiVersion: string;
90-
kind: string;
91-
metadata: {
92-
name: string;
93-
};
94-
spec: {
95-
tracing?: TracingConfigurationSettings;
96-
appHttpPipeline?: AppHttpPipeline;
97-
};
98-
} = {
100+
const resource: ConfigurationResource = {
99101
apiVersion: "dapr.io/v1alpha1",
100102
kind: "Configuration",
101103
metadata: {
@@ -106,6 +108,6 @@ export class Configuration {
106108
...{ appHttpPipeline: this.appHttpPipeline },
107109
}
108110
};
109-
return YAML.stringify(configurationObj, { indentSeq: false });
111+
return YAML.stringify(resource, { indentSeq: false });
110112
}
111113
}

examples/testcontainers/src/DaprContainer.test.ts

Lines changed: 93 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ limitations under the License.
1212
*/
1313

1414
import path from "node:path";
15+
import { promisify } from "node:util";
16+
import bodyParser from "body-parser";
17+
import express from "express";
1518
import { Network, TestContainers } from "testcontainers";
16-
import { DaprClient, DaprServer, LogLevel } from "@dapr/dapr";
19+
import { DaprClient, LogLevel } from "@dapr/dapr";
1720
import { DAPR_RUNTIME_IMAGE, DaprContainer } from "./DaprContainer";
1821

1922
describe("DaprContainer", () => {
@@ -109,59 +112,117 @@ describe("DaprContainer", () => {
109112
await network.stop();
110113
}, 60_000);
111114

112-
it("should provide pubsub in memory by default", async () => {
113-
TestContainers.exposeHostPorts(8081);
115+
it("should provide pubsub in memory by default", async () => {
116+
const app = express();
117+
app.use(bodyParser.json({ type: "application/*+json" }));
118+
119+
// Promise to resolve when the data is received
120+
let receiver: (data?: unknown) => void;
121+
const promise = new Promise((res) => {
122+
receiver = res;
123+
});
124+
125+
app.post("/events", (req, res) => {
126+
const data = req.body.data;
127+
console.log("Received data:", data);
128+
res.sendStatus(200);
129+
receiver(data);
130+
});
131+
132+
const appPort = 8081;
133+
const server = app.listen(appPort, () => {
134+
console.log(`Server is listening on port ${appPort}`);
135+
});
136+
await TestContainers.exposeHostPorts(appPort);
114137

115138
const network = await new Network().start();
116139
const dapr = new DaprContainer(DAPR_RUNTIME_IMAGE)
117140
.withNetwork(network)
118-
.withAppPort(8081)
119-
.withDaprLogLevel("debug")
141+
.withAppPort(appPort)
142+
.withDaprLogLevel("info")
143+
.withDaprApiLoggingEnabled(false)
120144
.withAppChannelAddress("host.testcontainers.internal")
121145
const startedContainer = await dapr.start();
122146

123-
const server = new DaprServer({
124-
serverHost: "127.0.0.1",
125-
serverPort: "8081",
126-
clientOptions: {
127-
daprHost: startedContainer.getHost(),
128-
daprPort: startedContainer.getHttpPort().toString()
129-
},
130-
logger: { level: LogLevel.Debug },
131-
});
132-
133147
const client = new DaprClient({
134148
daprHost: startedContainer.getHost(),
135149
daprPort: startedContainer.getHttpPort().toString(),
136150
logger: { level: LogLevel.Debug },
137151
});
138152

139-
// Promise to resolve when the message is received
140-
let processMessage: (data?: unknown) => void;
153+
console.log("Publishing message...");
154+
await client.pubsub.publish("pubsub", "topic", { key: "key", value: "value" });
155+
156+
console.log("Waiting for data...");
157+
const data = await promise;
158+
expect(data).toEqual({ key: "key", value: "value" });
159+
160+
await client.stop();
161+
await startedContainer.stop();
162+
await network.stop();
163+
await promisify(server.close.bind(server))();
164+
}, 60_000);
165+
166+
it("should route messages programmatically", async () => {
167+
const app = express();
168+
app.use(bodyParser.json({ type: "application/*+json" }));
169+
170+
// Promise to resolve when the data is received
171+
let receiver: (data?: unknown) => void;
141172
const promise = new Promise((res) => {
142-
processMessage = res;
173+
receiver = res;
174+
});
175+
176+
app.get("/dapr/subscribe", (req, res) => {
177+
res.json([
178+
{
179+
pubsubname: "pubsub",
180+
topic: "orders",
181+
routes: {
182+
default: "/orders",
183+
},
184+
}
185+
]);
186+
});
187+
188+
app.post("/orders", (req, res) => {
189+
const data = req.body.data;
190+
console.log("Received data:", data);
191+
res.sendStatus(200);
192+
receiver(data);
143193
});
144194

145-
await server.pubsub.subscribe("pubsub", "topic", async (message) => {
146-
console.log("Message received:", message);
147-
processMessage(message);
195+
const appPort = 8082;
196+
const server = app.listen(appPort, () => {
197+
console.log(`Server is listening on port ${appPort}`);
148198
});
199+
await TestContainers.exposeHostPorts(appPort);
200+
201+
const network = await new Network().start();
202+
const dapr = new DaprContainer(DAPR_RUNTIME_IMAGE)
203+
.withNetwork(network)
204+
.withAppPort(appPort)
205+
.withDaprLogLevel("info")
206+
.withDaprApiLoggingEnabled(false)
207+
.withAppChannelAddress("host.testcontainers.internal")
208+
const startedContainer = await dapr.start();
149209

150-
await server.start();
151-
// Wait for the server to start
152-
await new Promise((resolve) => setTimeout(resolve, 1000));
210+
const client = new DaprClient({
211+
daprHost: startedContainer.getHost(),
212+
daprPort: startedContainer.getHttpPort().toString(),
213+
logger: { level: LogLevel.Debug },
214+
});
153215

154216
console.log("Publishing message...");
155-
const response = await client.pubsub.publish("pubsub", "topic", { key: "key", value: "value" });
156-
console.log("Publish response:", response);
217+
await client.pubsub.publish("pubsub", "orders", { key: "key", value: "value" });
157218

158-
// Wait for the message to be processed
159-
// await new Promise((resolve) => setTimeout(resolve, 5000));
160-
const result = await promise; // FIXME
161-
expect(result).toEqual({ key: "key", value: "value" });
219+
console.log("Waiting for data...");
220+
const data = await promise;
221+
expect(data).toEqual({ key: "key", value: "value" });
162222

163-
await server.stop();
223+
await client.stop();
164224
await startedContainer.stop();
165225
await network.stop();
166-
}, 120_000);
226+
await promisify(server.close.bind(server))();
227+
}, 60_000);
167228
});

examples/testcontainers/src/DaprContainer.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export const DAPR_PROTOCOL = "http";
4141

4242
export class DaprContainer extends GenericContainer {
4343
private daprLogLevel = "info";
44+
private daprApiLogging = false;
4445
private appName = "dapr-app";
4546
private appChannelAddress?: string; // "host.testcontainers.internal"
4647
private appPort?: number;
@@ -100,8 +101,8 @@ export class DaprContainer extends GenericContainer {
100101
}
101102

102103
protected override async beforeContainerCreated(): Promise<void> {
103-
assert(this.placementContainer, "Placement container expected");
104-
assert(this.schedulerContainer, "Scheduler container expected");
104+
assert(this.placementContainer, "DaprPlacementContainer expected");
105+
assert(this.schedulerContainer, "DaprSchedulerContainer expected");
105106
const cmds = [
106107
"./daprd",
107108
"--app-id",
@@ -131,6 +132,10 @@ export class DaprContainer extends GenericContainer {
131132
cmds.push("--enable-app-health-check", "--app-health-check-path", this.appHealthCheckPath);
132133
}
133134

135+
if (this.daprApiLogging) {
136+
cmds.push("--enable-api-logging");
137+
}
138+
134139
if (this.configuration) {
135140
cmds.push("--config", `/dapr-resources/${this.configuration.name}.yaml`);
136141
}
@@ -155,7 +160,7 @@ export class DaprContainer extends GenericContainer {
155160
}
156161

157162
if (!this.subscriptions.length && this.components.length) {
158-
this.subscriptions.push(new Subscription("local", "pubsub", "topic", "/events"));
163+
this.subscriptions.push(new Subscription("local", "pubsub", "topic", undefined, "/events"));
159164
}
160165

161166
for (const component of this.components) {
@@ -258,6 +263,11 @@ export class DaprContainer extends GenericContainer {
258263
return this;
259264
}
260265

266+
withDaprApiLoggingEnabled(enabled: boolean): this {
267+
this.daprApiLogging = enabled;
268+
return this;
269+
}
270+
261271
withSubscription(subscription: Subscription): this {
262272
this.subscriptions.push(subscription);
263273
return this;

examples/testcontainers/src/HttpEndpoint.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,25 @@ limitations under the License.
1313

1414
import YAML from "yaml";
1515

16+
type HttpEndpointResource = {
17+
apiVersion: "dapr.io/v1alpha1";
18+
kind: "HTTPEndpoint";
19+
metadata: {
20+
name: string;
21+
};
22+
spec: {
23+
baseUrl: string;
24+
};
25+
};
26+
1627
export class HttpEndpoint {
1728
constructor(
1829
public readonly name: string,
1930
public readonly baseUrl: string
2031
) {}
2132

2233
toYaml(): string {
23-
const endpointObj: {
24-
apiVersion: string;
25-
kind: string;
26-
metadata: {
27-
name: string;
28-
};
29-
spec: {
30-
baseUrl: string;
31-
};
32-
} = {
34+
const resource: HttpEndpointResource= {
3335
apiVersion: "dapr.io/v1alpha1",
3436
kind: "HTTPEndpoint",
3537
metadata: {
@@ -39,6 +41,6 @@ export class HttpEndpoint {
3941
baseUrl: this.baseUrl
4042
}
4143
};
42-
return YAML.stringify(endpointObj, { indentSeq: false });
44+
return YAML.stringify(resource, { indentSeq: false });
4345
}
4446
}

0 commit comments

Comments
 (0)