Thursday, November 26, 2020

ASP.Net Core API Service discovery using Consul on Docker

What is Consul?

Consul is a networking tool that provides a fully featured service mesh and service discovery.

Objective:

Create multiple instances of Web API in docker (dynamic creation) and facilitate the service discovery at runtime to simulate load balancing.

Approach:

Pull docker image of Consul and create an instance of Consul.

Add two .Net Core Web API instances to be discoverable through Consul

Steps: 

  1. Start Docker on Windows as Linux container (because Consul has only linux image) 
  2. Pull docker image of Consul. 
  3. Run Consul container instance on Docker from the image pulled 

Browse to the below url:  

http://localhost:8500/ui/dc1/services 

One should see one container instance of Consul running 

  1. Configure ASP.Net Core Web API as below:

    Add Consul config as below in appsettings.json 

  1. "Consul": { 

        "Enabled": true, 

        "Host": "http://172.17.0.2:8500", 

        "service": "weather-service", 

        "address": "localhost", 

        "Port": 7000, 

        "PingEnabled": false, 

        "removeAfterInterval": 10, 

        "requestRetries": 3, 

        "services": [ 

        ] 

      } 

    Note: Host is a concrete IP address instead of localhost. Localhost is used when we are running a local instance of Docker on our machine. This concrete address of the docker instance can be found by running below command: 

    $ docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' docker_container_instance_id 

  1. Build the docker image. If it fails, try keeping the docker file outside the .csproj folder 

    docker build -t weatherservice . 

  2. Run 2 container instances of the webapi image 
  1. docker run -it --rm -p 7000:80  --name weatherapp1 weatherservice 

    docker run -it --rm -p 8000:80  --name weatherapp2 weatherservice 

    1. Browse to the below url: 

    One should see multiple instances of WebAPI and Consul running as below: 

    Sample code to register and call a service instance at runtime:
     
    Client code:
     var consulServices = services.GetRequiredService<IConsulHttpClient>();
                        var data=await consulServices.GetAsync<dynamic>("weather-service", "weatherforecast");

     
    Web API code: 
     
    References:

    public void ConfigureServices(IServiceCollection services)
            {
                services.AddConsul();
                services.AddAuthorization();
                services.AddControllers();
            }
     
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime, IConsulClient consulClient)
            {
                if (env.IsDevelopment())
                {
                    app.UseDeveloperExceptionPage();
                }

                app.UseHttpsRedirection();

                app.UseRouting();

                app.UseAuthorization();

                app.UseEndpoints(endpoints =>
                {
                    endpoints.MapControllers();
                });
                var serviceID = app.UseConsul();
                lifetime.ApplicationStopped.Register(() =>
                {
                    consulClient.Agent.ServiceDeregister(serviceID);
                });

            }
     
    public interface IConsulHttpClient
        {
            //Task<T> GetAsync<T>(string requestUri);
            Task<T> GetAsync<T>(string serviceName, string uriPath);
        }
     
    public class ConsulHttpClient : IConsulHttpClient
        {
            private readonly HttpClient _client;
            private IConsulClient _consulclient;

            public ConsulHttpClient(HttpClient client, IConsulClient consulclient)
            {
                _client = client;
                _consulclient = consulclient;
            }

            public async Task<T> GetAsync<T>(string serviceName, string uriPath)
            {
                var uri = await GetRequestUriAsync(serviceName, uriPath);

                var response = await _client.GetAsync(uri);

                if (!response.IsSuccessStatusCode)
                {
                    return default(T);
                }

                var content = await response.Content.ReadAsStringAsync();

                return JsonConvert.DeserializeObject<T>(content);
            }

            private async Task<Uri> GetRequestUriAsync(string serviceName, string uriPath)
            {
                //Get all services registered on Consul
                var allRegisteredServices = await _consulclient.Agent.Services();

                //Get all instance of the service went to send a request to
                var registeredServices = allRegisteredServices.Response?.Where(s => s.Value.Service.Equals(serviceName, StringComparison.OrdinalIgnoreCase)).Select(x => x.Value).ToList();

                //Get a random instance of the service
                var service = GetRandomInstance(registeredServices, serviceName);

                if (service == null)
                {
                    throw new ConsulServiceNotFoundException($"Consul service: '{serviceName}' was not found.",
                        serviceName);
                }

                var uriBuilder = new UriBuilder()
                {
                    Host = service.Address,
                    Port = service.Port,
                    Path=uriPath
                };

                return uriBuilder.Uri;
            }

            private AgentService GetRandomInstance(IList<AgentService> services, string serviceName)
            {
                Random _random = new Random();

                AgentService servToUse = null;

                servToUse = services[_random.Next(0, services.Count)];

                return servToUse;
            }
        } 
     
    public static class Extensions
        {
    public static IServiceCollection AddConsul(this IServiceCollection serviceCollection)
            {
                IConfiguration configuration;
                using (var serviceProvider = serviceCollection.BuildServiceProvider())
                {
                    configuration = serviceProvider.GetService<IConfiguration>();
                }
                ConsulOptions consulConfigOptions = configuration.GetOptions<ConsulOptions>("Consul");

                serviceCollection.Configure<ConsulOptions>(configuration.GetSection("Consul"));
                serviceCollection.AddTransient<IConsulServices, ConsulServices>();
                serviceCollection.AddTransient<ConsulServiceDiscoveryMessageHandler>();
                serviceCollection.AddHttpClient<IConsulHttpClient, ConsulHttpClient>()
                    .AddHttpMessageHandler<ConsulServiceDiscoveryMessageHandler>();

                return serviceCollection.AddSingleton<IConsulClient>(c => new ConsulClient(cfg =>
                {
                    if (!string.IsNullOrEmpty(consulConfigOptions.Host))
                    {
                        cfg.Address = new Uri(consulConfigOptions.Host);
                    }
                }));
            }

            public static TModel GetOptions<TModel>(this IConfiguration configuration, string section) where TModel : new()
            {
                var model = new TModel();
                configuration.GetSection(section).Bind(model);

                return model;
            }

            public static string UseConsul(this IApplicationBuilder app)
            {
                using (var scope = app.ApplicationServices.CreateScope())
                {
                    var Iconfig = scope.ServiceProvider.GetService<IConfiguration>();

                    var config = Iconfig.GetOptions<ConsulOptions>("Consul");
                    //var appOptions = Iconfig.GetOptions<AppOptions>("App");

                    if (!config.Enabled)
                        return String.Empty;

                    Guid serviceId = Guid.NewGuid();
                    string consulServiceID = $"{config.Service}:{serviceId}";


                    var client = scope.ServiceProvider.GetService<IConsulClient>();

                    var consulServiceRistration = new AgentServiceRegistration
                    {
                        Name = config.Service,
                        ID = consulServiceID,
                        Address = config.Address,
                        Port = config.Port,
                        //TODO : Add Tags Tags = fabioOptions.Value.Enabled ? GetFabioTags(serviceName, fabioOptions.Value.Service) : null
                    };

                    if (config.PingEnabled)
                    {
                        var healthService = scope.ServiceProvider.GetService<HealthCheckService>();

                        if (healthService != null)
                        {
                            var scheme = config.Address.StartsWith("http", StringComparison.InvariantCultureIgnoreCase)
                           ? string.Empty
                           : "http://";
                            var check = new AgentServiceCheck
                            {
                                Interval = TimeSpan.FromSeconds(5),
                                DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(10),
                                HTTP = $"{scheme}{config.Address}{(config.Port > 0 ? $":{config.Port}" : string.Empty)}/health"
                            };

                            consulServiceRistration.Checks = new[] { check };
                        }
                        else
                        {
                            throw new Exception("consul_check_initialization_exception",new Exception("Please ensure that Healthchecks has been added before adding checks to Consul."));
                        }
                    }

                    client.Agent.ServiceRegister(consulServiceRistration);

                    return consulServiceID;
                }
            } 

Sunday, August 2, 2020

C#: Async ParallelForEach

Here's an example of using ParalleForEach Async in C#

        /// <summary>
        /// Extension method for ParallelForEachAsync
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="source"></param>
        /// <param name="dop"></param>
        /// <param name="body"></param>
        /// <returns></returns>
        public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
        {
            async Task AwaitPartition(IEnumerator<T> partition)
            {
                using (partition)
                {
                    while (partition.MoveNext())
                    { await body(partition.Current); }
                }
            }

            return Task.WhenAll(
                Partitioner
                    .Create(source)
                    .GetPartitions(dop)
                    .AsParallel()
                    .Select(p => AwaitPartition(p)));
        }

Usage:

#region Extension method for ParallelForEachAsync: usage
       
        //await GetDocumentsFromDatabase(session).ForEachAsync(dop: 20, body: async entry =>
        //{
        //    _logger.Info($"Processing entry '{entry.Id}'");
        //});

#endregion

Thursday, March 26, 2020

Hosting Restful services on Windows service using OWIN


Hosting Restful services on Windows service using OWIN

Code:

public partial class Service : ServiceBase
{
protected override void OnStart(string[] args)
        {
            StartOptions options = new StartOptions();
           options.Urls.Add("http://+:9889/DPE");
            _startup = WebApp.Start<OwinStartup>(options);
        }
}


Steps on server:

1.       Open command prompt in Administration mode.
2.      Run netsh show command and redirect output to another txt file
e.g.,netsh http show urlacl >"netsh_registeredurls.txt"
=>Displays/outputs the registered urls in the HTTP.sys settings
3.      If particular port present, delete all entries using netsh http delete command
e.g., netsh http delete urlacl url=http://+:9889/
4.      Uninstall service
5.      Add registered entry in HTTP.sys
e.g., netsh http add urlacl url=http://+:9889/ user=jeder (Note: jeder is Germany equivalent of Everyone)
6.      Install service
7.      Launch service