In today's tutorial we are going to create a simple retrospective app. It will save information about the discussed topics and show them to users in real time so they can easily see the previous, current, and future topics during the current meeting. The code is written in .NET 5. It’s divided into several services, every one of them responsible for a single context.
We assume that every service should contain health checks, OpenAPI documentation, and is run viadocker-compose
.
As a data store we are going to use MongoDB.
Architecture of the retrospective app
The application will be divided into 3 services:
• Facade
is a front-end of an application that directs requests from clients to other services mentioned below.
• Topic
is responsible for saving retrospective topics to a database (MongoDB).
• Notification
collects all notifications from other services and pushes them to subscribed services/clients. For the purposes of this demonstration, we can assume that it works like a push notification.
Communication between each of them is done via publish/subscribe.
Here is the architecture diagram:
To simplify the above, we are going to use an out-of-the-box pub/sub service. Ably fits the bill for reasons that follow.
What is Ably?
Ably delivers several services including pub/sub, history of message persistence, push notifications for mobile apps, among others. It also has a .NET library ready to use.
As a short introduction, I want to describe some base terms used in Ably:
• Channel
is a source of communication for services/clients which are connected to it.
• Message Type
is used to distinguish different types of data sent via a single channel. Services can subscribe to a concrete type(s) or to all of them.
Dive to the Code
We can check right now what the code looks like.
Facade
The first service, Facade
, shows information to a client. Service is written in .NET 5 (C#). It’s a WebAPI with a frontend part written in Angular 8.
OpenAPI and Healthchecks
This service depends only on Ably, because the startup file contains a recipe to check Ably state and generate OpenAPI documentation via Swashbuckle.
public void ConfigureServices(IServiceCollection services)
{
var config = Configuration
.GetSection("Ably")
.Get<AblyConfig>();
var ably = newAblyRealtime(config.ApiKey);
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1",newOpenApiInfo{Title="WebAppAPI",Version="v1"});
});
services.AddHealthChecks()
.AddCheck(
"AblyChannel",
newAblyChannelHealthCheck(
ably,
"Topic"
)
)
.AddCheck(
"AblyTimer",
newAblyTimerHealthCheck(
ably,
"Topic",
FSharpOption<TimeSpan>.Some(TimeSpan.FromSeconds(1)),
FSharpOption<TimeSpan>.Some(TimeSpan.FromSeconds(1))
)
);
services
.AddHealthChecksUI(s =>
s
.SetEvaluationTimeInSeconds(60)
.AddHealthCheckEndpoint("Self",$"http://{Dns.GetHostName()}/health"))
.AddInMemoryStorage();
);
…
services.AddAutofac();
…
}
public void Configure(IApplicationBuilder app,IWebHostEnvironment env)
{
…
app.UseSwagger();
app.UseSwaggerUI(c =>
{
c.SwaggerEndpoint("/swagger/v1/swagger.json","DashboardAPIV1");
});
app.UseEndpoints(endpoints =>
{
endpoints.MapControllerRoute(
name:"default",
pattern:"{controller}/{action=Index}/{id?}");
endpoints.MapHealthChecksUI( setup =>
{
setup.UIPath="/ui-health";
setup.ApiPath="/api-ui-health";
}
);
endpoints.MapHealthChecks(
"/health",
newHealthCheckOptions {
Predicate=( _ => true),
ResponseWriter=UIResponseWriter.WriteHealthCheckUIResponse
}
);
});
...
}
Actions
Service contains a single controller with 2 actions: POST
and PUT
(the GET
action is here only for testing purposes). POST
creates a topic, PUT
updates it.
[HttpPost]
public async Task<StatusCodeResult> Save([FromBody]CreateDto create)
{
var data = new Topic
(
name: Name.NewName("name"),
id: Identifier.NewId(""),
creator: Creator.NewCreator(create.Creator),
description: Description.NewDescription(create.Description),
done: Done.NewDone(false)
);
var channel = _ably.Channels.Get(_ablyConfig.Topic.Name);
var result = await channel.PublishAsync(_ablyConfig.Topic.MessageType,data);
if (!result.IsFailure)
returnOk();
_logger.LogError(result.Error.Message);
return new StatusCodeResult(500);
}
Looking at the code, we can see that user requests come to a controller action with the DTO object. This object is then transformed into a message that is known by the Topic
service. After that, the message is sent via Ably via the topic
channel.
Ably registration
Here we use an Ably service which is injected into a controller, but it is worth showing what its registration looks like. We use Autofac as DI and the whole code is below:
public class WebAppModule : Module
{
private readonly AblyConfig _config;
public WebAppModule(AblyConfig config) => _config = config;
protected override void Load(ContainerBuilder builder)
{
base.Load(builder);
builder.Register(_ => _config).AsSelf().SingleInstance();
builder.Register(_ => new AblyRealtime(_config.ApiKey)).AsSelf().SingleInstance();
}
}
public void ConfigureContainer(ContainerBuilder builder) =>
builder.RegisterModule(new WebAppModule(Configuration
.GetSection("Ably")
.Get<AblyConfig>()));
public class Program
{
public static Task Main(string[] args) =>
CreateWebHostBuilder(args).Build().RunAsync();
private static IHostBuilder CreateWebHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureWebHostDefaults(wb =>
{
wb.UseStartup<Startup>();
wb.ConfigureLogging((ctx, lg) =>
{
lg.ClearProviders();
lg.AddDebug();
lg.AddConsole();
});
});
}
The app's user interface (in Angular 8)
The frontend part as mentioned earlier is written in Angular 8. Looking at the code, we have two main Angular components:
• Home-component.ts
shows the main page with a list of actual topics and a form to add a new topic.
• Activity-component.ts
shows a page with all user activities.
Home component
The first component which shows the main page is divided into two parts. The first part shows a form to add a new topic which is extracted as a separate component (create-component.ts
) and it looks as follows:
export class CreateComponent {
public model: Create = { Description: "", Creator: "" };
constructor(
private readonly http: HttpClient,
@Inject('BASE_URL') private readonly baseUrl: string) {
}
public onSubmit = () =>
this.http.post(
`${this.baseUrl}Facade`,
JSON.stringify(this.model),
{ headers: { 'content-type': 'application/json'}})
.toPromise()
.then(_ => this.reset());
public reset = () =>
this.model = { Description: "", Creator: "" }
}
In the component view, we see a form to create a new topic to discuss. What happens inside the form logic? We map data from a form and send it to the server upon clicking the Submit button.
The second part of the home component contains a table with all topics that were created and discussed. When we create a topic, it doesn't show up immediately on a list because it comes from a server via the Ably channel.
To receive such a message, we attach to an Ably channel (Notification
channel).
export class HomeComponent {
public topics: Topic [];
constructor(
private readonly http: HttpClient,
private readonly changeDetector: ChangeDetectorRef,
@Inject('BASE_URL') private readonly baseUrl: string) {
this.http.get(`${this.baseUrl}Facade`)
.toPromise()
.then((e: AblyConfig) => {
let client = new Ably.Realtime({ key: e.apiKey });
let channel = client.channels.get(e.push.name);
channel.subscribe(e.push.messageType, msg => {
let additionalData = JSON.parse(msg.data).AdditionalData.Item;
let newRecord = JSON.parse(additionalData);
this.topics = [newRecord, ...this.topics];
this.changeDetector.detectChanges();
})
});
this.topics = [];
}
public markAsDone (model) {
this.http.post(
`${this.baseUrl}Facade`,
JSON.stringify(model),
{ headers: { 'content-type': 'application/json'}})
.toPromise()
.then(_ => {
this.topics = this.topics.filter(x => x !== model);
this.changeDetector.detectChanges();
});
}
}
Note that the code needed to create an Ably object, attach to a channel, and listen for an incoming message is very similar to what we show on the server-side.
In short, we fetch the channel, subscribe to it, and when a message of interest comes in we update the model of the home component. Also note the markAsDone
method which handles marking a topic as "already discussed" which in turn fires an update on a certain topic.
Activity Feed
The second main component (activity-component.ts
) is responsible for showing a history of all events that occurred in the application. It looks similar to the home page, but the messages arrive in a slightly different way.
export class ActivityComponent {
public activities: Topic [];
constructor(
private readonly http: HttpClient,
private readonly changeDetector: ChangeDetectorRef,
@Inject('BASE_URL') private readonly baseUrl: string) {
this.http.get(`${this.baseUrl}Facade`)
.toPromise()
.then((e: AblyConfig) => {
let client = new Ably.Realtime({ key: e.apiKey });
let channel = client.channels.get(e.push.name);
channel.attach(err => {
channel.history({untilAttach: true}, (error, results) => {
let isMsg = (maybeMsg: PaginatedResult<Message> | undefined):
maybeMsg is PaginatedResult<Message> =>
(maybeMsg as PaginatedResult<Message>) !== undefined;
if (isMsg(results))
this.activities =
results
.items
.map(msg => {
if (msg.name === e.push.messageType) {
let additionalData = JSON.parse(msg.data).AdditionalData.Item;
return JSON.parse(additionalData);
}
else {
return { Creator: "Server request", Description: "Server request", Id: msg.id, Done: false };
}
});
else
this.activities = [];
this.changeDetector.detectChanges();
})})
});
}
}
We don't fetch the channel, but directly attach to it and listen to all the messages that have come via this particular channel in the last 24 hours. The persist information is set on the Ably platform per channel. We are not able to set it in a programmatic way, but via the channel page.
Message retention
To set message retention, we need to go to the Ably dashboard and navigate to the channel page:
Enable "Persist all messages" and that's all.
Right now we know how it works on a client and Facade
side. We can also forward to the Topic
and Notification
services.
Topic
Topic
is a WebAPI written in .NET 5 (F#). It uses MongoDB as a database. It has 3 main responsibilities:
- Receiving and handling messages about new topics.
- Saving topics to Database.
- Notifying about handled topics.
Handle Ably messages
Topic
subscribes to all messages of interest, specifically those that are meant to create or update topics. The main responsibility of service is to handle said messages which are sent via Ably Channel in accordance with the requirements.
member private this.ConfigureHandlers (di: DI) (logger: ILogger<Startup>) =
let channel = di.ably.Channels.Get di.ablyConfig.Topic.Name
channel.Subscribe(
di.ablyConfig.Topic.MessageType,
fun msg ->
async {
match! Topic.Application.Topics.Save.trySave di msg.Data CancellationToken.None with
| Choice1Of2 result -> logger.LogDebug $"Process message: {result}"
| Choice2Of2 err -> logger.LogError $"Error occurred while processing message: {err}"
()
} |> Async.RunSynchronously)
When the new message comes, we receive it and map it to a business model.
type InvalidTopicFormat = exn
[<RequireQualifiedAccess>]
module Save =
let save
(di: DI)
topic
token =
async {
let! result = Repository.create token di.config topic
match result with
| Choice1Of2 (DTOResult.Create data) ->
let serializedData = JsonSerializer.Serialize(data, Serialization.serializerOpt)
let (Topic.Contract.Id id) = data.Id
let msg = {
Id = Identifier (Guid.NewGuid())
RelatedId = Id id
AdditionalData = AdditionalData serializedData
}
let channel = di.ably.Channels.Get di.ablyConfig.Notification.Name
let! pubResult = channel.PublishAsync (di.ablyConfig.Notification.MessageType, msg) |> Async.AwaitTask
if pubResult.IsFailure then
return Choice2Of2 (pubResult.Error.AsException ())
else return result
| Choice1Of2 res ->
di.logger.Log(LogLevel.Warning, $"Expected to get a create result, but got something else: {res}")
return result
| Choice2Of2 ex ->
di.logger.Log(LogLevel.Error, $"Error occurred while processing data in repository: {ex.Message}")
return result
}
let trySave
(di: DI)
(topic: obj)
token =
async {
try
let deserializedTopic = (topic :?> JObject).ToObject<Topic>()
return! save di deserializedTopic token
with msg ->
return
msg.Message
|> InvalidTopicFormat
|> Choice2Of2
}
Saving data to MongoDB
Then we save it to MongoDB which we run as a docker image (the same way as .NET services).
version: '3'
volumes:
retrospectiveLocalDb:
retrospectiveLocalDbConfig:
services:
database:
image: db
build:
context: ./storage/mongo
dockerfile: ./Dockerfile
environment:
- MONGO_INITDB_ROOT_USERNAME=sample
- MONGO_INITDB_ROOT_PASSWORD=Sample123!
volumes:
- retrospectiveLocalDb:/data/db
- retrospectiveLocalDbConfig:/data/configdb
ports:
- '27017-27019:27017-27019'
FROM mongo:4.2.3
COPY init-mongo.js /docker-entrypoint-initdb.d/
db = db.getSiblingDB('admin')
db.createUser(
{
user: "admin",
pwd: "123",
roles: [
{ role: "readWrite", db: "admin" }
]
});
db = db.getSiblingDB('retrospective')
db.createUser(
{
user: "admin",
pwd: "123",
roles: [
{ role: "readWrite", db: "retrospective" }
]
});
let private queryDb token config ``type`` (model: Option<Topic>) =
match ``type`` with
| TopicOperation.Create ->
async {
let fOpt = InsertOneOptions ()
let! _ =
connection config
|> Query.insertOne token fOpt model.Value
return QueryResult.Create model.Value
}
| TopicOperation.Update ->
async {
let fOpt = FindOneAndReplaceOptions<Topic> ()
fOpt.ReturnDocument <- ReturnDocument.After
let filter =
Filter.eq (fun (x: Topic) -> x._id) model.Value._id
let! result =
connection config
|> Query.updateWhole token fOpt model.Value filter
return QueryResult.Update result
}
| ...
...
let private query token config data ``type`` =
async {
let! mapped = ToDomain.map data
return! mapped |> (queryDb token config ``type``)
}
...
let create token config data =
async {
let! result = query token config (Some data) TopicOperation.Create
return result |> ToDTO.map
} |> Async.Catch
The code doesn't look too complicated. Easy maps from object to object and saves to the database. When the save succeeds we want to inform the client application about this fact. Because our project can contain a lot of other services, we created a service that will aggregate all kinds of push messages in one place.
Forward message to Notification
We map all the aggregated messages that were successfully saved to an object acceptable by the Notification
service, and send it to it over the Ably channel. This is done the same way that Facade
sends data to Topic
service, except we are using different channels and message types.
...
match result with
| Choice1Of2 (DTOResult.Create data) ->
let serializedData = JsonSerializer.Serialize(data, Serialization.serializerOpt)
let (Topic.Contract.Id id) = data.Id
let msg = {
Id = Identifier (Guid.NewGuid())
RelatedId = Id id
AdditionalData = AdditionalData serializedData
}
let channel = di.ably.Channels.Get di.ablyConfig.Notification.Name
let! pubResult = channel.PublishAsync (di.ablyConfig.Notification.MessageType, msg) |> Async.AwaitTask
if pubResult.IsFailure then
return Choice2Of2 (pubResult.Error.AsException ())
else return result
| Choice1Of2 res ->
di.logger.Log(LogLevel.Warning, $"Expected to get a create result, but got something else: {res}")
return result
| Choice2Of2 ex ->
di.logger.Log(LogLevel.Error, $"Error occurred while processing data in repository: {ex.Message}")
return result
…
Open API and HealthChecks
In contrast to Facade
, the Topic
service depends on two things. The first one is Ably. The second one is MongoDB which we describe in health checks.
member this.ConfigureServices(services: IServiceCollection) =
let config = Config ()
this.Configuration.Bind config
let ably = new AblyRealtime (config.Ably.ApiKey)
let loggerFactory =
LoggerFactory.Create(fun builder ->
builder.AddConsole() |> ignore
);
let logger = loggerFactory.CreateLogger<Startup>()
let di = DI.create config ably logger
services.AddSingleton<DI> (fun _ -> di) |> ignore
this.ConfigureHandlers di logger
services.AddSwaggerGen(fun c ->
c.SwaggerDoc("v1", OpenApiInfo(Title = "Topic Api", Version = "v1"))
) |> ignore
services.AddHealthChecks()
.AddCheck(
"Ably Channel",
AblyChannelHealthCheck(
ably,
"Topic"
)
)
.AddCheck(
"Ably Timer",
AblyTimerHealthCheck(
ably,
"Topic",
TimeSpan.FromSeconds 1.,
TimeSpan.FromSeconds 1.
)
)
.AddMongoDb((MongoConfig.map config.MongoDb).GetConnectionString(), name = "MongoDB")
|> ignore
services
.AddHealthChecksUI(fun s ->
s
.SetEvaluationTimeInSeconds(60)
.AddHealthCheckEndpoint("Self", $"http://{Dns.GetHostName()}/health")
|> ignore)
.AddInMemoryStorage() |> ignore
...
member this.Configure(app: IApplicationBuilder, env: IWebHostEnvironment) =
...
app.UseSwagger(fun opt -> ()) |> ignore
app.UseSwaggerUI(fun opt ->
opt.SwaggerEndpoint("/swagger/v1/swagger.json", "Topic Api V1")
opt.RoutePrefix <- String.Empty
) |> ignore
...
app.UseEndpoints(fun endpoints ->
endpoints.MapControllers() |> ignore
endpoints.MapHealthChecksUI(fun setup ->
setup.UIPath <- "/ui-health"
setup.ApiPath <- "/api-ui-health"
) |> ignore
endpoints.MapHealthChecks(
"/health",
HealthCheckOptions(
Predicate = (fun _ -> true),
ResponseWriter = Func<HttpContext, HealthReport, Task>(fun (context) (c: HealthReport) -> UIResponseWriter.WriteHealthCheckUIResponse(context, c))
)
) |> ignore
) |> ignore
We also define some HTTP endpoints just for testing purposes. You can see them in the OpenAPI documentation as follows.
Notification Service
Now we can switch to the Notification
service. This is again a WebAPI written in .NET 5 (F#).
Handle Ably messages
Similar to the Topic
service, the Notification
Service attaches to the Ably channel for new messages on the concrete channel and then handles them as they come in.
member private this.ConfigureHandlers (ably: AblyRealtime) (config: AblyConfig) (logger: ILogger) =
let channel = ably.Channels.Get config.Channels.Notification.Name
channel.Subscribe(
config.Channels.Notification.MessageType,
fun msg -> WebApi.Notifications.Save.handle ably config msg.Data logger |> Async.RunSynchronously)
Handling messages in the Notification
service basically means mapping a message and then sending it over the Ably channel to all interested clients/services. In our scenario, it is a TypeScript part of Facade
.
Forward notification (push) message to all clients
let private serializerOpt =
let options = JsonSerializerOptions()
JsonFSharpConverter(
unionEncoding = (
// Base encoding:
JsonUnionEncoding.Untagged
// Additional options:
||| JsonUnionEncoding.UnwrapOption
||| JsonUnionEncoding.UnwrapRecordCases
)
)
|> options.Converters.Add
options
let internal handle (ably: AblyRealtime) (ablyConfig: AblyConfig) (msg: obj) (logger: ILogger) =
try
let deserialized = (msg :?> JObject).ToObject<Notification.Contract.Message>()
async {
let channel = ably.Channels.Get ablyConfig.Channels.Push.Name
let msg = JsonSerializer.Serialize(deserialized, serializerOpt)
let! _ = channel.PublishAsync (ablyConfig.Channels.Push.MessageType, msg) |> Async.AwaitTask
()
}
with er ->
logger.LogError $"Error occurred while processing notification message: {er.Message}"
async {
let channel = ably.Channels.Get ablyConfig.Channels.Push.Name
let! _ = channel.PublishAsync (ablyConfig.Channels.Push.MessageType, er.Message) |> Async.AwaitTask
()
}
By omitting backend code, Facade
listens to all notifications. We define a more robust type of message serialization (done by Fsharp.System.Text.Json
) because we want to serialize all F#-specific types like Unions
and Records
to make them easily approachable from via TypeScript.
OpenAPI and Healthchecks
In the end we configure a small part of the startup.fs
file where we define health checks and OpenAPI information. Similar to Facade
, the Notification
service from an external point of view depends only on Ably services.
member this.ConfigureServices(services: IServiceCollection) =
let config = Config ()
this.Configuration.Bind config
let ably = new AblyRealtime (config.Ably.ApiKey)
services.AddSingleton<AblyRealtime>(fun _ -> ably) |> ignore
services.AddSingleton<Config> (fun _ -> config) |> ignore
let loggerFactory =
LoggerFactory.Create(fun builder ->
builder.AddConsole() |> ignore
);
let logger = loggerFactory.CreateLogger();
this.ConfigureHandlers ably config.Ably logger
services.AddSwaggerGen(fun c ->
c.SwaggerDoc("v1", OpenApiInfo(Title = "Notification Api", Version = "v1"))
) |> ignore
services.AddHealthChecks()
.AddCheck(
"Ably Channel",
AblyChannelHealthCheck(
ably,
"Notification"
)
)
.AddCheck(
"Ably Timer",
AblyTimerHealthCheck(
ably,
"Topic",
TimeSpan.FromSeconds 1.,
TimeSpan.FromSeconds 1.
)
) |> ignore
services
.AddHealthChecksUI(fun s ->
s
.SetEvaluationTimeInSeconds(60)
.AddHealthCheckEndpoint(
"Self",
$"http://{Dns.GetHostName()}/health"
) |> ignore)
.AddInMemoryStorage() |> ignore
...
member this.Configure(app: IApplicationBuilder, env: IWebHostEnvironment) =
...
app.UseSwagger(fun opt -> ()) |> ignore
app.UseSwaggerUI(fun opt ->
opt.SwaggerEndpoint("/swagger/v1/swagger.json", "Notification Api V1")
opt.RoutePrefix <- String.Empty
) |> ignore
...
app.UseEndpoints(fun endpoints ->
endpoints.MapControllers() |> ignore
endpoints.MapHealthChecksUI(fun setup ->
setup.UIPath <- "/ui-health"
setup.ApiPath <- "/api-ui-health"
) |> ignore
endpoints.MapHealthChecks(
"/health",
HealthCheckOptions(
Predicate = (fun _ -> true),
ResponseWriter = Func<HttpContext, HealthReport, Task>(fun (context) (c: HealthReport) -> UIResponseWriter.WriteHealthCheckUIResponse(context, c))
)
) |> ignore
) |> ignore
Summary
This is the whole "enterprise" application that gives you some picture of how you can use Ably in your app. Source code is available here.
Run all at once
To run the whole app you can simply run docker compose up
since the docker-compose
file is in place.
version: '3'
volumes:
retrospectiveLocalDb:
retrospectiveLocalDbConfig:
services:
database:
image: db
build:
context: ./storage/mongo
dockerfile: ./Dockerfile
environment:
- MONGO_INITDB_ROOT_USERNAME=sample
- MONGO_INITDB_ROOT_PASSWORD=Sample123!
volumes:
- retrospectiveLocalDb:/data/db
- retrospectiveLocalDbConfig:/data/configdb
ports:
- '27017-27019:27017-27019'
topic:
image: topic
build:
context: ./src
dockerfile: ./Topic/Dockerfile
ports:
- '2137:80'
depends_on:
- database
environment:
MongoDB__Host: database
hostname: topic
notification:
image: notification
build:
context: ./src
dockerfile: ./Notification/Dockerfile
ports:
- '2138:80'
hostname: notification
facade:
image: facade
build:
context: ./src
dockerfile: ./Facade/Dockerfile
ports:
- '2111:5000'
depends_on:
- notification
- topic
hostname: facade
As we can see by this simple example of an "enterprise" application we have shown how you can build a realtime full stack app with .NET, Angular, MongoDB, and Ably.
.NET Resources
- What is pub/sub and how to apply it in C# .NET to build a chat app
- Implementing a simple WebRTC signaling mechanism with FSharp, Fable, and Ably
- Building dependable realtime apps with WebSockets and .NET
- Build a live multiplayer game in Unity with Ably
About Ably
Ably is an enterprise-grade pub/sub messaging platform that makes it easy to efficiently design, quickly ship, and seamlessly scale critical realtime functionality delivered directly to end-users. Everyday Ably delivers billions of realtime messages to millions of users for thousands of companies.
The Ably platform is mathematically modelled around Four Pillars of Dependability to ensure messages don’t get lost while still being delivered at low latency over a secure, reliable, and highly available global edge network. Take the Ably APIs for a spin to see why developers from startups to industrial giants choose to build on Ably to simplify engineering, minimize DevOps overhead, and increase development velocity.