Skip to content

Testcontainer example #701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
pubsub tests
Signed-off-by: Joe Bowbeer <joe.bowbeer@gmail.com>
  • Loading branch information
joebowbeer committed May 27, 2025
commit 599e93bb21b17b1a4e6097fb3a58de1977ba2cf7
2,057 changes: 986 additions & 1,071 deletions examples/testcontainers/package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion examples/testcontainers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
"description": "",
"devDependencies": {
"@dapr/dapr": "^3.5.2",
"body-parser": "^2.2.0",
"express": "^5.1.0",
"ts-jest": "^29.3.2",
"typescript": "^5.8.3"
},
"dependencies": {
"testcontainers": "^10.25.0",
"yaml": "^2.7.1"
"yaml": "^2.8.0"
}
}
30 changes: 16 additions & 14 deletions examples/testcontainers/src/Component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ export type MetadataEntry = {
readonly value: string;
};

type ComponentResource = {
apiVersion: "dapr.io/v1alpha1";
kind: "Component";
metadata: {
name: string;
};
spec: {
type: string;
version: string;
metadata?: MetadataEntry[];
};
};

export class Component {
private readonly metadata: MetadataEntry[];

Expand All @@ -43,7 +56,7 @@ export class Component {
}

toYaml(): string {
const componentObj = {
const resource: ComponentResource = {
apiVersion: "dapr.io/v1alpha1",
kind: "Component",
metadata: {
Expand All @@ -55,22 +68,11 @@ export class Component {
metadata: this.metadata,
},
};
return YAML.stringify(componentObj, { indentSeq: false });
return YAML.stringify(resource, { indentSeq: false });
}

static fromYaml(src: string): Component {
const resource = YAML.parse(src) as {
apiVersion: string;
kind: string;
metadata: {
name: string;
};
spec: {
type: string;
version: string;
metadata?: MetadataEntry[];
};
};
const resource: ComponentResource = YAML.parse(src);
const metadata = resource.metadata;
const spec = resource.spec;
return new Component(metadata.name, spec.type, spec.version, (spec.metadata ?? []));
Expand Down
26 changes: 14 additions & 12 deletions examples/testcontainers/src/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ export class TracingConfigurationSettings {
) {}
}

type ConfigurationResource = {
apiVersion: "dapr.io/v1alpha1";
kind: "Configuration";
metadata: {
name: string;
};
spec: {
tracing?: TracingConfigurationSettings;
appHttpPipeline?: AppHttpPipeline;
};
};

/**
* Configuration class for Dapr.
*
Expand Down Expand Up @@ -85,17 +97,7 @@ export class Configuration {
) {}

toYaml(): string {
const configurationObj: {
apiVersion: string;
kind: string;
metadata: {
name: string;
};
spec: {
tracing?: TracingConfigurationSettings;
appHttpPipeline?: AppHttpPipeline;
};
} = {
const resource: ConfigurationResource = {
apiVersion: "dapr.io/v1alpha1",
kind: "Configuration",
metadata: {
Expand All @@ -106,6 +108,6 @@ export class Configuration {
...{ appHttpPipeline: this.appHttpPipeline },
}
};
return YAML.stringify(configurationObj, { indentSeq: false });
return YAML.stringify(resource, { indentSeq: false });
}
}
125 changes: 93 additions & 32 deletions examples/testcontainers/src/DaprContainer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ limitations under the License.
*/

import path from "node:path";
import { promisify } from "node:util";
import bodyParser from "body-parser";
import express from "express";
import { Network, TestContainers } from "testcontainers";
import { DaprClient, DaprServer, LogLevel } from "@dapr/dapr";
import { DaprClient, LogLevel } from "@dapr/dapr";
import { DAPR_RUNTIME_IMAGE, DaprContainer } from "./DaprContainer";

describe("DaprContainer", () => {
Expand Down Expand Up @@ -109,59 +112,117 @@ describe("DaprContainer", () => {
await network.stop();
}, 60_000);

it("should provide pubsub in memory by default", async () => {
TestContainers.exposeHostPorts(8081);
it("should provide pubsub in memory by default", async () => {
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));

// Promise to resolve when the data is received
let receiver: (data?: unknown) => void;
const promise = new Promise((res) => {
receiver = res;
});

app.post("/events", (req, res) => {
const data = req.body.data;
console.log("Received data:", data);
res.sendStatus(200);
receiver(data);
});

const appPort = 8081;
const server = app.listen(appPort, () => {
console.log(`Server is listening on port ${appPort}`);
});
await TestContainers.exposeHostPorts(appPort);

const network = await new Network().start();
const dapr = new DaprContainer(DAPR_RUNTIME_IMAGE)
.withNetwork(network)
.withAppPort(8081)
.withDaprLogLevel("debug")
.withAppPort(appPort)
.withDaprLogLevel("info")
.withDaprApiLoggingEnabled(false)
.withAppChannelAddress("host.testcontainers.internal")
const startedContainer = await dapr.start();

const server = new DaprServer({
serverHost: "127.0.0.1",
serverPort: "8081",
clientOptions: {
daprHost: startedContainer.getHost(),
daprPort: startedContainer.getHttpPort().toString()
},
logger: { level: LogLevel.Debug },
});

const client = new DaprClient({
daprHost: startedContainer.getHost(),
daprPort: startedContainer.getHttpPort().toString(),
logger: { level: LogLevel.Debug },
});

// Promise to resolve when the message is received
let processMessage: (data?: unknown) => void;
console.log("Publishing message...");
await client.pubsub.publish("pubsub", "topic", { key: "key", value: "value" });

console.log("Waiting for data...");
const data = await promise;
expect(data).toEqual({ key: "key", value: "value" });

await client.stop();
await startedContainer.stop();
await network.stop();
await promisify(server.close.bind(server))();
}, 60_000);

it("should route messages programmatically", async () => {
const app = express();
app.use(bodyParser.json({ type: "application/*+json" }));

// Promise to resolve when the data is received
let receiver: (data?: unknown) => void;
const promise = new Promise((res) => {
processMessage = res;
receiver = res;
});

app.get("/dapr/subscribe", (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "orders",
routes: {
default: "/orders",
},
}
]);
});

app.post("/orders", (req, res) => {
const data = req.body.data;
console.log("Received data:", data);
res.sendStatus(200);
receiver(data);
});

await server.pubsub.subscribe("pubsub", "topic", async (message) => {
console.log("Message received:", message);
processMessage(message);
const appPort = 8082;
const server = app.listen(appPort, () => {
console.log(`Server is listening on port ${appPort}`);
});
await TestContainers.exposeHostPorts(appPort);

const network = await new Network().start();
const dapr = new DaprContainer(DAPR_RUNTIME_IMAGE)
.withNetwork(network)
.withAppPort(appPort)
.withDaprLogLevel("info")
.withDaprApiLoggingEnabled(false)
.withAppChannelAddress("host.testcontainers.internal")
const startedContainer = await dapr.start();

await server.start();
// Wait for the server to start
await new Promise((resolve) => setTimeout(resolve, 1000));
const client = new DaprClient({
daprHost: startedContainer.getHost(),
daprPort: startedContainer.getHttpPort().toString(),
logger: { level: LogLevel.Debug },
});

console.log("Publishing message...");
const response = await client.pubsub.publish("pubsub", "topic", { key: "key", value: "value" });
console.log("Publish response:", response);
await client.pubsub.publish("pubsub", "orders", { key: "key", value: "value" });

// Wait for the message to be processed
// await new Promise((resolve) => setTimeout(resolve, 5000));
const result = await promise; // FIXME
expect(result).toEqual({ key: "key", value: "value" });
console.log("Waiting for data...");
const data = await promise;
expect(data).toEqual({ key: "key", value: "value" });

await server.stop();
await client.stop();
await startedContainer.stop();
await network.stop();
}, 120_000);
await promisify(server.close.bind(server))();
}, 60_000);
});
16 changes: 13 additions & 3 deletions examples/testcontainers/src/DaprContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export const DAPR_PROTOCOL = "http";

export class DaprContainer extends GenericContainer {
private daprLogLevel = "info";
private daprApiLogging = false;
private appName = "dapr-app";
private appChannelAddress?: string; // "host.testcontainers.internal"
private appPort?: number;
Expand Down Expand Up @@ -100,8 +101,8 @@ export class DaprContainer extends GenericContainer {
}

protected override async beforeContainerCreated(): Promise<void> {
assert(this.placementContainer, "Placement container expected");
assert(this.schedulerContainer, "Scheduler container expected");
assert(this.placementContainer, "DaprPlacementContainer expected");
assert(this.schedulerContainer, "DaprSchedulerContainer expected");
const cmds = [
"./daprd",
"--app-id",
Expand Down Expand Up @@ -131,6 +132,10 @@ export class DaprContainer extends GenericContainer {
cmds.push("--enable-app-health-check", "--app-health-check-path", this.appHealthCheckPath);
}

if (this.daprApiLogging) {
cmds.push("--enable-api-logging");
}

if (this.configuration) {
cmds.push("--config", `/dapr-resources/${this.configuration.name}.yaml`);
}
Expand All @@ -155,7 +160,7 @@ export class DaprContainer extends GenericContainer {
}

if (!this.subscriptions.length && this.components.length) {
this.subscriptions.push(new Subscription("local", "pubsub", "topic", "/events"));
this.subscriptions.push(new Subscription("local", "pubsub", "topic", undefined, "/events"));
}

for (const component of this.components) {
Expand Down Expand Up @@ -258,6 +263,11 @@ export class DaprContainer extends GenericContainer {
return this;
}

withDaprApiLoggingEnabled(enabled: boolean): this {
this.daprApiLogging = enabled;
return this;
}

withSubscription(subscription: Subscription): this {
this.subscriptions.push(subscription);
return this;
Expand Down
24 changes: 13 additions & 11 deletions examples/testcontainers/src/HttpEndpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,25 @@ limitations under the License.

import YAML from "yaml";

type HttpEndpointResource = {
apiVersion: "dapr.io/v1alpha1";
kind: "HTTPEndpoint";
metadata: {
name: string;
};
spec: {
baseUrl: string;
};
};

export class HttpEndpoint {
constructor(
public readonly name: string,
public readonly baseUrl: string
) {}

toYaml(): string {
const endpointObj: {
apiVersion: string;
kind: string;
metadata: {
name: string;
};
spec: {
baseUrl: string;
};
} = {
const resource: HttpEndpointResource= {
apiVersion: "dapr.io/v1alpha1",
kind: "HTTPEndpoint",
metadata: {
Expand All @@ -39,6 +41,6 @@ export class HttpEndpoint {
baseUrl: this.baseUrl
}
};
return YAML.stringify(endpointObj, { indentSeq: false });
return YAML.stringify(resource, { indentSeq: false });
}
}
Loading